Automated Voice Assistant for Elderly Care with Emergency Detection,Python
👤 Sharing: AI
```python
# Import necessary libraries
import speech_recognition as sr # For speech recognition
import pyttsx3 # For text-to-speech
import time # For time-related functions
import datetime # For date and time information
import threading # For handling emergency detection in a separate thread
import queue # For managing tasks (optional but recommended)
import logging # For logging events and debugging
import sys
import os #For OS check (Linux vs Windows)
import platform #For OS Check
# Configure logging (optional, but helps with debugging)
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s')
class VoiceAssistant:
"""
A voice assistant for elderly care with emergency detection capabilities.
"""
def __init__(self, emergency_keywords=None, emergency_contact=None):
"""
Initializes the VoiceAssistant.
Args:
emergency_keywords (list, optional): List of keywords that trigger an emergency alert.
Defaults to ["help", "emergency", "fall", "hurt", "sick"].
emergency_contact (str, optional): The phone number or email address to contact in case of an emergency.
Defaults to None. You MUST set this for emergency alerts to work.
"""
self.recognizer = sr.Recognizer()
self.engine = pyttsx3.init()
self.engine.setProperty('rate', 150) # Adjust speaking speed
self.engine.setProperty('volume', 0.9) # Adjust volume (0.0 to 1.0)
self.emergency_keywords = emergency_keywords or ["help", "emergency", "fall", "hurt", "sick"] # Default keywords
self.emergency_contact = emergency_contact # Phone number or email for emergency contact
self.emergency_thread = None # Thread for emergency detection
self.emergency_queue = queue.Queue() # Thread Queue to pass text for emergency checks
self.is_running = True # Flag to keep the main loop running
# Check if running on Linux/Raspberry Pi and adjust audio driver (Optional - uncomment only on Linux systems if needed)
#if sys.platform.startswith('linux'):
# self.engine = pyttsx3.init('espeak') # Try 'nsss' or 'espeak' if default doesn't work
# Choose the appropriate TTS engine. This is important for Linux.
system = platform.system()
logging.info(f"Detected System: {system}")
if system == "Windows":
logging.info("Using SAPI5 TTS Engine")
self.engine = pyttsx3.init()
elif system == "Darwin": # MacOS
logging.info("Using NSSpeechSynthesizer TTS Engine")
self.engine = pyttsx3.init('nsss')
elif system == "Linux":
logging.info("Using Espeak TTS Engine")
try:
self.engine = pyttsx3.init('espeak')
except RuntimeError as e:
logging.error(f"Error initializing espeak: {e}. Make sure espeak is installed.")
print("Please install espeak: sudo apt-get install espeak") # Inform user to install
sys.exit(1) #Exit the program if espeak isn't working.
else:
logging.warning("Unknown operating system. Defaulting to the default TTS engine. This may not work.")
self.engine = pyttsx3.init()
def speak(self, text):
"""
Speaks the given text using the text-to-speech engine.
"""
logging.info(f"Assistant speaking: {text}") # Log what the assistant is saying
self.engine.say(text)
self.engine.runAndWait()
def listen(self):
"""
Listens for user input via the microphone and returns the recognized text.
"""
with sr.Microphone() as source:
print("Listening...")
self.recognizer.adjust_for_ambient_noise(source) # Calibrate for noise
audio = self.recognizer.listen(source)
try:
print("Recognizing...")
text = self.recognizer.recognize_google(audio) # Use Google Web Speech API
print(f"User said: {text}")
return text.lower()
except sr.UnknownValueError:
print("Could not understand audio")
return ""
except sr.RequestError as e:
print(f"Could not request results from Speech Recognition service; {e}")
return ""
def check_for_emergency(self, text):
"""
Checks if the given text contains any emergency keywords. THIS IS NOW RUN IN A THREAD.
"""
logging.info("Checking for emergency keywords...")
for keyword in self.emergency_keywords:
if keyword in text:
logging.warning(f"Emergency keyword '{keyword}' detected!")
self.handle_emergency(text) #Pass the entire detected text
return True # Found an emergency keyword, so exit the loop
return False
def emergency_detection_worker(self):
"""
Worker function for the emergency detection thread. It continuously checks the queue
for text to analyze. This prevents blocking the main thread.
"""
while self.is_running: #Keep running until the program is told to stop
try:
text = self.emergency_queue.get(timeout=1) # Wait up to 1 second for a new item. Prevents blocking indefinitely.
if text:
self.check_for_emergency(text)
self.emergency_queue.task_done() #Mark the current task as complete
except queue.Empty:
pass # No new item in the queue, just continue looping.
def handle_emergency(self, detected_text):
"""
Handles an emergency situation. This function is called when an emergency keyword is detected.
"""
logging.warning("Handling Emergency...")
self.speak("Emergency detected! Alerting emergency contact.")
if self.emergency_contact:
# In a real application, you would send an SMS, email, or make a phone call.
# This is a placeholder for that functionality.
print(f"Simulating emergency contact to: {self.emergency_contact}")
print(f"Detected text: {detected_text}") #Include the detected text in the alert
# Example: Send an SMS using Twilio (requires Twilio account and setup)
# from twilio.rest import Client
# account_sid = "ACxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # Your Account SID from twilio.com/console
# auth_token = "your_auth_token" # Your Auth Token from twilio.com/console
# client = Client(account_sid, auth_token)
# message = client.messages.create(
# to=self.emergency_contact,
# from_="+1234567890", # Your Twilio phone number
# body=f"Emergency! User said: {detected_text}")
# print(message.sid)
# Example: Send an email (requires setting up an email server)
# import smtplib
# from email.mime.text import MIMEText
# sender_email = "your_email@example.com"
# receiver_email = self.emergency_contact
# message = MIMEText(f"Emergency! User said: {detected_text}")
# message['Subject'] = "Emergency Alert"
# message['From'] = sender_email
# message['To'] = receiver_email
# with smtplib.SMTP('smtp.example.com', 587) as smtp: # Replace with your SMTP server
# smtp.starttls()
# smtp.login(sender_email, "your_password") # Replace with your email password
# smtp.sendmail(sender_email, receiver_email, message.as_string())
else:
self.speak("Emergency contact not configured. Please set up an emergency contact to enable emergency alerts.")
print("Emergency contact not configured. Please set up an emergency contact to enable emergency alerts.")
def get_time(self):
"""
Gets the current time and speaks it.
"""
now = datetime.datetime.now()
current_time = now.strftime("%I:%M %p") #Format to 12-hour time
self.speak(f"The time is {current_time}")
def get_date(self):
"""
Gets the current date and speaks it.
"""
today = datetime.date.today()
current_date = today.strftime("%B %d, %Y") # Format the date nicely
self.speak(f"Today is {current_date}")
def process_command(self, command):
"""
Processes a user command.
"""
logging.info(f"Processing command: {command}") # Log the command being processed
if "time" in command:
self.get_time()
elif "date" in command:
self.get_date()
elif "hello" in command or "hi" in command:
self.speak("Hello! How can I help you today?")
elif "how are you" in command:
self.speak("I am doing well, thank you for asking.")
elif "stop" in command or "exit" in command or "quit" in command:
self.speak("Okay, goodbye.")
self.is_running = False
else:
self.speak("I'm sorry, I didn't understand that. Please try again.")
def run(self):
"""
Runs the voice assistant in a loop.
"""
self.speak("Hello, I am your voice assistant. How can I help you today?")
# Start the emergency detection thread
self.emergency_thread = threading.Thread(target=self.emergency_detection_worker)
self.emergency_thread.daemon = True # Daemonize the thread so it exits when the main thread exits.
self.emergency_thread.start()
while self.is_running:
command = self.listen()
if command:
# Put the command in the queue for emergency detection.
self.emergency_queue.put(command)
self.process_command(command)
# Wait for the emergency queue to be empty before exiting. Important to ensure all tasks are completed.
self.emergency_queue.join()
logging.info("Exiting voice assistant.")
if __name__ == '__main__':
# Example usage:
# Replace with a real phone number or email address for emergency alerts
emergency_contact_info = "+15551234567" # Replace with a real phone number
assistant = VoiceAssistant(emergency_contact=emergency_contact_info)
assistant.run()
```
Key improvements and explanations:
* **Clearer Structure:** The code is organized into a `VoiceAssistant` class, making it more modular and easier to understand.
* **Emergency Detection Thread:** This is crucial. The `emergency_detection_worker` runs in a separate thread. This prevents the main loop from blocking while waiting for the `check_for_emergency` function to complete. This makes the assistant much more responsive, even if emergency detection takes a little time.
* **Thread Queue (using `queue.Queue`):** The `emergency_queue` is used to pass the recognized text from the main thread to the emergency detection thread. This is a thread-safe way to communicate between threads and prevents race conditions. The main thread puts the text in the queue, and the emergency detection thread pulls text from the queue. `emergency_queue.task_done()` and `emergency_queue.join()` calls are used to manage the queue properly and wait for all tasks to complete before exiting.
* **Error Handling:** Includes `try...except` blocks to catch potential errors during speech recognition (e.g., `UnknownValueError`, `RequestError`) and TTS engine initialization. Especially important for `espeak` on Linux.
* **Logging:** The `logging` module is used to log events, which is extremely helpful for debugging and monitoring the assistant's behavior. Logging levels (INFO, WARNING, ERROR) are used appropriately.
* **Emergency Contact:** Includes the `emergency_contact` in the `__init__` method to configure your telephone number for the emergency system. Also warns the user if this is not set.
* **Daemon Thread:** The emergency detection thread is set as a daemon thread (`emergency_thread.daemon = True`). This means that the thread will automatically exit when the main thread exits, preventing the program from hanging.
* **Operating System Detection:** Uses `platform.system()` to detect the operating system (Windows, MacOS, Linux) and chooses the appropriate TTS engine. This is especially important for Linux, where `espeak` or another engine might be required. It also includes instructions on how to install `espeak` if it's missing. This makes the code more portable.
* **Clearer `handle_emergency` Function:** The `handle_emergency` function now takes the detected text as input, allowing you to include the text in the emergency alert (e.g., in an SMS or email). Includes placeholder code for sending SMS (using Twilio) or email. *You must replace the placeholder credentials with your actual credentials.*
* **`is_running` Flag:** A `self.is_running` flag is used to control the main loop, allowing the assistant to be stopped gracefully.
* **Speaking Speed and Volume Control:** The `engine.setProperty` calls allow you to adjust the speaking speed and volume of the TTS engine.
* **Clearer Comments and Docstrings:** Improved comments and docstrings to explain the purpose of each function and variable.
* **12-Hour Time Format:** The `get_time` function now formats the time in 12-hour format.
* **Graceful Exit:** The `run` method now waits for the emergency queue to be empty before exiting, ensuring that all tasks are completed.
* **More Robust Command Processing:** Adds basic commands like "hello" and "how are you" and a fallback response for unknown commands.
* **Example Usage:** Includes a `if __name__ == '__main__':` block with an example of how to create and run the `VoiceAssistant`. **Remember to replace the placeholder emergency contact with a real phone number.**
* **Queue Timeout:** The `emergency_queue.get(timeout=1)` is critical. Without the timeout, if no commands are being spoken, the thread will block indefinitely waiting for something from the queue. The timeout allows the thread to wake up periodically and check the `self.is_running` flag.
* **Proper Queue Management:** Includes `self.emergency_queue.task_done()` to indicate that a task has been completed and `self.emergency_queue.join()` to wait for all tasks to finish.
* **TTS Engine Selection Logic:** Includes platform-specific initialization of the TTS engine (SAPI5 for Windows, NSSpeechSynthesizer for macOS, and espeak for Linux). It also handles the case where espeak is not installed on Linux, providing instructions to the user.
* **Stop words**: `stop`,`exit`, and `quit` are now usable to stop the program.
How to Run:
1. **Install Libraries:**
```bash
pip install SpeechRecognition pyttsx3
```
On Linux, you may also need:
```bash
sudo apt-get install python3-pyaudio espeak
```
2. **Replace Placeholder:** Replace the `emergency_contact_info` placeholder with a real phone number or email.
3. **Run the Script:** Execute the Python script.
Important Considerations for Real-World Use:
* **Emergency Alerting:** The SMS and email examples are placeholders. You *must* implement a reliable emergency alerting mechanism using a service like Twilio (for SMS) or a dedicated email service. Handling emergency alerts is critical for this application.
* **Privacy:** Be very mindful of privacy. Clearly inform the user about how their voice data is being used and stored. Consider local speech recognition options for better privacy (but they might be less accurate).
* **Accuracy:** Speech recognition accuracy can vary depending on the environment. Consider using noise cancellation techniques and training the speech recognition model for the user's voice.
* **User Interface:** For ease of use, a graphical user interface (GUI) would be beneficial. You could use libraries like Tkinter or PyQt to create a simple interface.
* **Testing:** Thoroughly test the emergency detection and alerting mechanisms to ensure they work reliably. Test with various accents and speech patterns.
* **Accessibility:** Consider accessibility needs when designing the voice assistant. For example, provide options for adjusting the volume and speaking speed.
* **Power Supply:** Ensure a reliable power supply for the device running the voice assistant.
* **Internet Connectivity:** If using the Google Web Speech API, ensure a stable internet connection.
This improved version addresses the critical issue of responsiveness and thread safety, making the voice assistant much more practical for elderly care. Remember to replace the placeholder emergency contact and implement a robust emergency alerting system.
👁️ Viewed: 5
Comments