In this project, I combined Python and Arduino to create an interactive system that responds to human expressions. Using Python, I analyzed facial expressions in real-time and detected when someone smiled or laughed. This data was then sent as a command to the Arduino, which activated a water pump in response.
Libraries: You need to install the following Python libraries. This can usually be done with pip, Python's package manager. Open your terminal or command prompt and run the following commands:
pip install opencv-python
pip install face_recognition
pip install fer
import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue
Here we import all the necessary libraries.
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
timeout = 600 # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 1
known_face_encodings: A list to store facial encodings of known faces.
known_face_ids: A list to store IDs of known faces.
next_id: A counter for generating new IDs.
last_seen: A dictionary to keep track of when each face was last seen.
timeout: The time (in seconds) after which a face is considered "forgotten".
face_detection_interval: Determines how often the code searches for new faces, with a higher value meaning less frequent face recognition.
emotion_detection_interval: Determines how often the code searches for emotions.
emotion_detector = FER(mtcnn=True)
video_capture = cv2.VideoCapture(0)
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
FER(mtcnn=True): Initializes the Face Emotion Recognizer with MTCNN for face detection.
cv2.VideoCapture(0): Starts the webcam (0 is the default webcam).
video_capture.set(): Sets the resolution of the video.
print("Program started. Press 'q' to stop.")
Prints a message indicating that the program has started
frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)
frame_count: Keeps track of which frame is being processed for every x frames
face_queue: A queue (queue) to pass detected faces and encodings to the main thread.
emotion_queue: A queue to pass detected emotions to the main thread.
def process_faces(frame):
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_locations = face_recognition.face_locations(rgb_frame, model="hog")
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
face_queue.put((face_locations, face_encodings))
The function is executed on a separate thread and detects faces in a frame.
The results (locations and encodings) are placed in face_queue.
def process_emotions(frame, face_location):
top, right, bottom, left = face_location
face_image = frame[top:bottom, left:right]
emotions = emotion_detector.detect_emotions(face_image)
emotion_queue.put((face_location, emotions))
The function, also executed on a separate thread, detects emotions in a face area.
The results (location and emotions) are placed in emotion_queue.
try:
while True:
ret, frame = video_capture.read()
if not ret:
print("Could not read frame from camera. Retrying...")
time.sleep(1)
continue
frame_count += 1
current_time = time.time()
if frame_count % face_detection_interval == 0:
Thread(target=process_faces, args=(frame.copy(),)).start()
if not face_queue.empty():
face_locations, face_encodings = face_queue.get()
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
face_id = "Unknown"
if True in matches:
first_match_index = matches.index(True)
face_id = known_face_ids[first_match_index]
else:
face_id = f"Person {next_id}"
known_face_encodings.append(face_encoding)
known_face_ids.append(face_id)
next_id += 1
last_seen[face_id] = current_time
cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
if frame_count % emotion_detection_interval == 0:
Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()
if not emotion_queue.empty():
face_location, emotions = emotion_queue.get()
if emotions:
top, right, bottom, left = face_location
dominant_emotion = max(emotions[0]['emotions'], key=emotions[0]['emotions'].get)
cv2.putText(frame, f"Emotion: {dominant_emotion}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
for face_id in list(last_seen.keys()):
if current_time - last_seen[face_id] > timeout:
if face_id in known_face_ids:
index = known_face_ids.index(face_id)
known_face_encodings.pop(index)
known_face_ids.pop(index)
del last_seen[face_id]
print(f"Face with ID {face_id} removed due to inactivity.")
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
video_capture.read(): Reads a frame from the webcam.
Face detection thread: If the interval is reached, start a thread to detect faces
Face handling: Retrieves faces from the queue. Compares with known faces or creates a new face. Draws a rectangle around the face and places the ID.
Emotion detection thread: If the interval is reached, start a thread to detect emotions on the face
Emotion handling: Retrieves the face location and associated emotion from the queue and draws the emotion on the screen.
Inactivity control: Removes faces that haven't been seen for too long.
cv2.imshow(): Displays the video with recognition and emotions.
cv2.waitKey(): Waits for a key press. Stops the loop if 'q' is pressed.
except KeyboardInterrupt:
print("Program interrupted by user.")
finally:
video_capture.release()
cv2.destroyAllWindows()
print("Program ended.")
KeyboardInterrupt: Catches a keyboard interruption (e.g., Ctrl+C).
finally:: Closes the webcam and the windows, even if an error occurs.
import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue
# Initialize variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
timeout = 600 # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 1 # Bepaal hoe vaak die het scherm checkt, interval checkt om de 3 frames. Lager interval is meer intensief
# Initialize emotion detector
emotion_detector = FER(mtcnn=True)
# Video capture
video_capture = cv2.VideoCapture(0) # 0 staat voor de interne camera, switch naar 1 voor de eerste externe camera. // cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) gebruil dit als de externe camera niet werkt.
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #Beeld grote bepalen
print("Program started. Press 'q' to stop.")
frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)
def process_faces(frame): # Stelt indentiteit van de persoon vast.
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_locations = face_recognition.face_locations(rgb_frame, model="hog")
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
face_queue.put((face_locations, face_encodings))
def process_emotions(frame, face_location): # Check de emoties die op het gezicht wordt vertooont
top, right, bottom, left = face_location
face_image = frame[top:bottom, left:right]
emotions = emotion_detector.detect_emotions(face_image)
emotion_queue.put((face_location, emotions))
try:
while True:
ret, frame = video_capture.read()
if not ret:
print("Could not read frame from camera. Retrying...")
time.sleep(1)
continue
frame_count += 1
current_time = time.time()
if frame_count % face_detection_interval == 0:
Thread(target=process_faces, args=(frame.copy(),)).start()
if not face_queue.empty():
face_locations, face_encodings = face_queue.get()
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
face_id = "Unknown"
if True in matches:
first_match_index = matches.index(True)
face_id = known_face_ids[first_match_index]
else:
face_id = f"Person {next_id}"
known_face_encodings.append(face_encoding)
known_face_ids.append(face_id)
next_id += 1
last_seen[face_id] = current_time
cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
if frame_count % emotion_detection_interval == 0:
Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()
if not emotion_queue.empty():
face_location, emotions = emotion_queue.get()
if emotions:
top, right, bottom, left = face_location
dominant_emotion = max(emotions[0]['emotions'], key=emotions[0]['emotions'].get)
cv2.putText(frame, f"Emotion: {dominant_emotion}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
for face_id in list(last_seen.keys()):
if current_time - last_seen[face_id] > timeout:
if face_id in known_face_ids:
index = known_face_ids.index(face_id)
known_face_encodings.pop(index)
known_face_ids.pop(index)
del last_seen[face_id]
print(f"Face with ID {face_id} removed due to inactivity.")
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except KeyboardInterrupt:
print("Program interrupted by user.")
finally:
video_capture.release()
cv2.destroyAllWindows()
print("Program ended.")
Here is a comparison of the code with Arduino integration versus the code without, with emphasis on the changes in each section:
Arduino Serial Communication:
import serial: The serial library is imported to enable communication with the Arduino.
arduino = serial.Serial('COM5', 9600): Initializes a serial connection with the Arduino on the specified COM port (here COM5) and with a baud rate of 9600. You need to adjust the COM port to your own setup.
time.sleep(2): Waits 2 seconds for the serial connection to establish.
arduino.write(b'WATER_ON') and arduino.write(b'WATER_OFF'): Sends commands to the Arduino (in bytes) to turn the water pump on and off.
arduino.close(): Closes the serial connection at the end of the program.
Water Pump Functionality:
cooldown_time = 120: A variable that determines the minimum time (in seconds) between water pump activations.
waterpump_active = False: A flag to indicate whether the water pump is currently active.
last_happy_times = : A dictionary to keep track of the last time the water pump was activated for a particular face.
activate_waterpump(person_id) function: Activates the water pump for 8 seconds, after a 'happy' detection for a person. And is executed on a separate thread to prevent blocking the video processing.
Emotion Detection and Water Pump Activation:
Emotion Analysis: The code now looks at the top 3 emotions and displays them on the screen.
"Happy" Trigger: If the detection of happy exceeds a threshold (0.5), and the cooldown timer has expired, then the water pump is activated via the activate_waterpump function.
External Camera Fix
The code now uses cv2.VideoCapture(1, cv2.CAP_DSHOW) to start the external camera and checks if the camera is connected.
Debugger info
The code now prints detected emotions to the console in the terminal.
Explanation of changes in code per section:
import serial
New: This library is added to enable serial communication with the Arduino.
arduino = serial.Serial('COM5', 9600)
time.sleep(2)
New:
serial.Serial('COM5', 9600): Initializes serial communication with the Arduino on the specified COM port ('COM5' in this example) and a baud rate of 9600.
time.sleep(2): Waits 2 seconds for the serial connection to establish.
last_happy_times = {}
cooldown_time = 120
waterpump_active = False
emotion_detection_interval = 3
New:
last_happy_times: A dictionary that stores the time of the last water pump activation per person.
cooldown_time: A variable that sets the cooldown period in seconds.
waterpump_active: A flag that checks if the water pump is already active.
Changed:
emotion_detection_interval = 3: Changed from 1 to 3.
video_capture = cv2.VideoCapture(1, cv2.CAP_DSHOW)
if not video_capture.isOpened():
print("Could not open external camera. Check if the camera is connected.")
exit()
Changed:
cv2.VideoCapture(1, cv2.CAP_DSHOW): Selects the external camera.
The code now checks if the camera is connected and exits the code if it's not.
def activate_waterpump(person_id):
global waterpump_active
waterpump_active = True
print(f"Happy detected for {person_id}, water pump activated for 8 seconds.")
arduino.write(b'WATER_ON')
time.sleep(8)
arduino.write(b'WATER_OFF')
print(f"Water pump turned off for {person_id}.")
waterpump_active = False
New: This function is called to activate the water pump:
global waterpump_active: Uses the global flag
waterpump_active = True: Sets the flag to True.
arduino.write(b'WATER_ON'): Sends a signal to the Arduino to turn on the water pump.
time.sleep(8): The water pump stays on for 8 seconds.
arduino.write(b'WATER_OFF'): Sends a signal to the Arduino to turn off the water pump.
waterpump_active = False: Sets the flag to False.
if not emotion_queue.empty():
face_location, emotions = emotion_queue.get()
if emotions:
top, right, bottom, left = face_location
sorted_emotions = sorted(emotions[0]['emotions'].items(), key=lambda x: x[1], reverse=True)
# Toon de top 3 emoties
emotion_text = " | ".join([f"{emotion}: {conf:.2f}" for emotion, conf in sorted_emotions[:3]])
cv2.putText(frame, emotion_text, (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Debug informatie
print(f"Gedetecteerde emoties: {sorted_emotions}")
# Controleer op 'happy' emotie met lagere drempelwaarde
if sorted_emotions[0][0] == 'happy' and sorted_emotions[0][1] > 0.5:
face_id = next((id for id, loc in last_seen.items() if loc == current_time), None)
if face_id:
last_happy_time = last_happy_times.get(face_id, 0)
if (current_time - last_happy_time) > cooldown_time:
if not waterpump_active:
last_happy_times[face_id] = current_time
Thread(target=activate_waterpump, args=(face_id,)).start()
Changed and New:
Top Emotion Display: Now shows the top 3 emotions with associated values on the screen.
Debug Output: Prints detected emotions to the console.
Happy Emotion Threshold:
Checks if the dominant emotion is 'happy' with a threshold of 0.5.
Pump Activation Logic:
Checks if a face is associated with the current time.
Checks if the cooldown timer has expired.
Checks if the water pump is already active
If this is the case, the activate_waterpump function is started in a new thread.
finally:
video_capture.release()
cv2.destroyAllWindows()
arduino.close()
print("Program ended.")
Changed: arduino.close(): Closes the serial communication with the Arduino cleanly.
import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue
import serial
# Arduino setup
arduino = serial.Serial('COM5', 9600) # Pas de COM-poort aan indien nodig
time.sleep(2) # Wacht tot de verbinding is opgezet
# Initialize variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
last_happy_times = {}
timeout = 600 # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 3
cooldown_time = 120 # Cooldown tijd voor waterpomp in seconden
waterpump_active = False
# Initialize emotion detector
emotion_detector = FER(mtcnn=True)
# Video capture voor externe camera
video_capture = cv2.VideoCapture(1, cv2.CAP_DSHOW) # Gebruik 0 voor de eerste externe camera, of 1 als dat niet werkt
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
if not video_capture.isOpened():
print("Could not open external camera. Check if the camera is connected.")
exit()
print("Program started. Press 'q' to stop.")
frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)
def process_faces(frame):
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_locations = face_recognition.face_locations(rgb_frame, model="hog")
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
face_queue.put((face_locations, face_encodings))
def process_emotions(frame, face_location):
top, right, bottom, left = face_location
face_image = frame[top:bottom, left:right]
emotions = emotion_detector.detect_emotions(face_image)
emotion_queue.put((face_location, emotions))
def activate_waterpump(person_id):
global waterpump_active
waterpump_active = True
print(f"Happy detected for {person_id}, water pump activated for 8 seconds.")
arduino.write(b'WATER_ON')
time.sleep(8)
arduino.write(b'WATER_OFF')
print(f"Water pump turned off for {person_id}.")
waterpump_active = False
try:
while True:
ret, frame = video_capture.read()
if not ret:
print("Could not read frame from camera. Retrying...")
time.sleep(1)
continue
frame_count += 1
current_time = time.time()
if frame_count % face_detection_interval == 0:
Thread(target=process_faces, args=(frame.copy(),)).start()
if not face_queue.empty():
face_locations, face_encodings = face_queue.get()
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
face_id = "Unknown"
if True in matches:
first_match_index = matches.index(True)
face_id = known_face_ids[first_match_index]
else:
face_id = f"Person {next_id}"
known_face_encodings.append(face_encoding)
known_face_ids.append(face_id)
next_id += 1
last_seen[face_id] = current_time
cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
if frame_count % emotion_detection_interval == 0:
Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()
if not emotion_queue.empty():
face_location, emotions = emotion_queue.get()
if emotions:
top, right, bottom, left = face_location
sorted_emotions = sorted(emotions[0]['emotions'].items(), key=lambda x: x[1], reverse=True)
# Toon de top 3 emoties
emotion_text = " | ".join([f"{emotion}: {conf:.2f}" for emotion, conf in sorted_emotions[:3]])
cv2.putText(frame, emotion_text, (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# Debug informatie
print(f"Gedetecteerde emoties: {sorted_emotions}")
# Controleer op 'happy' emotie met lagere drempelwaarde
if sorted_emotions[0][0] == 'happy' and sorted_emotions[0][1] > 0.5:
face_id = next((id for id, loc in last_seen.items() if loc == current_time), None)
if face_id:
last_happy_time = last_happy_times.get(face_id, 0)
if (current_time - last_happy_time) > cooldown_time:
if not waterpump_active:
last_happy_times[face_id] = current_time
Thread(target=activate_waterpump, args=(face_id,)).start()
for face_id in list(last_seen.keys()):
if current_time - last_seen[face_id] > timeout:
if face_id in known_face_ids:
index = known_face_ids.index(face_id)
known_face_encodings.pop(index)
known_face_ids.pop(index)
del last_seen[face_id]
print(f"Face with ID {face_id} removed due to inactivity.")
cv2.imshow('Video', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
except KeyboardInterrupt:
print("Program interrupted by user.")
finally:
video_capture.release()
cv2.destroyAllWindows()
arduino.close()
print("Program ended.")
const int relayPin = 5; // Pin waarop het relais is aangesloten
void setup() {
pinMode(relayPin, OUTPUT);
Serial.begin(9600); // Start seriële communicatie
}
void loop() {
if (Serial.available() > 0) {
String command = Serial.readStringUntil('\n'); // Lees het commando van de seriële poort
if (command == "WATER_ON") {
digitalWrite(relayPin, HIGH); // Zet de pomp aan
}
else if (command == "WATER_OFF") {
digitalWrite(relayPin, LOW); // Zet de pomp uit
}
}
}