Back

Emotion Detection

2024

Project

In this project, I combined Python and Arduino to create an interactive system that responds to human expressions. Using Python, I analyzed facial expressions in real-time and detected when someone smiled or laughed. This data was then sent as a command to the Arduino, which activated a water pump in response.


Code Explanation:

Libraries: You need to install the following Python libraries. This can usually be done with pip, Python's package manager. Open your terminal or command prompt and run the following commands:

pip install opencv-python
pip install face_recognition
pip install fer
  1. Importing libraries
import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue

Here we import all the necessary libraries.

  1. Initializing variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
timeout = 600  # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 1

known_face_encodings: A list to store facial encodings of known faces.

known_face_ids: A list to store IDs of known faces.

next_id: A counter for generating new IDs.

last_seen: A dictionary to keep track of when each face was last seen.

timeout: The time (in seconds) after which a face is considered "forgotten".

face_detection_interval: Determines how often the code searches for new faces, with a higher value meaning less frequent face recognition.

emotion_detection_interval: Determines how often the code searches for emotions.

  1. Initializing the emotion detector and video capture
emotion_detector = FER(mtcnn=True)
video_capture = cv2.VideoCapture(0)
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

FER(mtcnn=True): Initializes the Face Emotion Recognizer with MTCNN for face detection.

cv2.VideoCapture(0): Starts the webcam (0 is the default webcam).

video_capture.set(): Sets the resolution of the video.

  1. Printing Start Message
print("Program started. Press 'q' to stop.")

Prints a message indicating that the program has started

  1. Queue initialization and frame count
frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)

frame_count: Keeps track of which frame is being processed for every x frames

face_queue: A queue (queue) to pass detected faces and encodings to the main thread.

emotion_queue: A queue to pass detected emotions to the main thread.

  1. Process Face function
def process_faces(frame):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_locations = face_recognition.face_locations(rgb_frame, model="hog")
    face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
    face_queue.put((face_locations, face_encodings))

The function is executed on a separate thread and detects faces in a frame.

The results (locations and encodings) are placed in face_queue.

  1. Process Emotion function
def process_emotions(frame, face_location):
    top, right, bottom, left = face_location
    face_image = frame[top:bottom, left:right]
    emotions = emotion_detector.detect_emotions(face_image)
    emotion_queue.put((face_location, emotions))

The function, also executed on a separate thread, detects emotions in a face area.

The results (location and emotions) are placed in emotion_queue.

  1. Main loop
try:
    while True:
        ret, frame = video_capture.read()
        if not ret:
            print("Could not read frame from camera. Retrying...")
            time.sleep(1)
            continue

        frame_count += 1
        current_time = time.time()

        if frame_count % face_detection_interval == 0:
            Thread(target=process_faces, args=(frame.copy(),)).start()

        if not face_queue.empty():
            face_locations, face_encodings = face_queue.get()
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                face_id = "Unknown"

                if True in matches:
                    first_match_index = matches.index(True)
                    face_id = known_face_ids[first_match_index]
                else:
                    face_id = f"Person {next_id}"
                    known_face_encodings.append(face_encoding)
                    known_face_ids.append(face_id)
                    next_id += 1

                last_seen[face_id] = current_time

                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                if frame_count % emotion_detection_interval == 0:
                    Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()

        if not emotion_queue.empty():
            face_location, emotions = emotion_queue.get()
            if emotions:
                top, right, bottom, left = face_location
                dominant_emotion = max(emotions[0]['emotions'], key=emotions[0]['emotions'].get)
                cv2.putText(frame, f"Emotion: {dominant_emotion}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                
        for face_id in list(last_seen.keys()):
            if current_time - last_seen[face_id] > timeout:
                if face_id in known_face_ids:
                    index = known_face_ids.index(face_id)
                    known_face_encodings.pop(index)
                    known_face_ids.pop(index)
                del last_seen[face_id]
                print(f"Face with ID {face_id} removed due to inactivity.")
                
        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

video_capture.read(): Reads a frame from the webcam.

Face detection thread: If the interval is reached, start a thread to detect faces

Face handling: Retrieves faces from the queue. Compares with known faces or creates a new face. Draws a rectangle around the face and places the ID.

Emotion detection thread: If the interval is reached, start a thread to detect emotions on the face

Emotion handling: Retrieves the face location and associated emotion from the queue and draws the emotion on the screen.

Inactivity control: Removes faces that haven't been seen for too long.

cv2.imshow(): Displays the video with recognition and emotions.

cv2.waitKey(): Waits for a key press. Stops the loop if 'q' is pressed.

  1. Exit procedures
except KeyboardInterrupt:
    print("Program interrupted by user.")

finally:
    video_capture.release()
    cv2.destroyAllWindows()
    print("Program ended.")

KeyboardInterrupt: Catches a keyboard interruption (e.g., Ctrl+C).

finally:: Closes the webcam and the windows, even if an error occurs.


Complete code:

import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue

# Initialize variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
timeout = 600  # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 1 # Bepaal hoe vaak die het scherm checkt, interval checkt om de 3 frames. Lager interval is meer intensief 

# Initialize emotion detector
emotion_detector = FER(mtcnn=True)

# Video capture
video_capture = cv2.VideoCapture(0) # 0 staat voor de interne camera, switch naar 1 voor de eerste externe camera. // cap = cv2.VideoCapture(0, cv2.CAP_DSHOW) gebruil dit als de externe camera niet werkt.
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) #Beeld grote bepalen

print("Program started. Press 'q' to stop.")

frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)

def process_faces(frame): # Stelt indentiteit van de persoon vast.
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_locations = face_recognition.face_locations(rgb_frame, model="hog")
    face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
    face_queue.put((face_locations, face_encodings))

def process_emotions(frame, face_location): # Check de emoties die op het gezicht wordt vertooont 
    top, right, bottom, left = face_location
    face_image = frame[top:bottom, left:right]
    emotions = emotion_detector.detect_emotions(face_image)
    emotion_queue.put((face_location, emotions))

try:
    while True:
        ret, frame = video_capture.read()
        if not ret:
            print("Could not read frame from camera. Retrying...")
            time.sleep(1)
            continue

        frame_count += 1
        current_time = time.time()

        if frame_count % face_detection_interval == 0:
            Thread(target=process_faces, args=(frame.copy(),)).start()

        if not face_queue.empty():
            face_locations, face_encodings = face_queue.get()
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                face_id = "Unknown"

                if True in matches:
                    first_match_index = matches.index(True)
                    face_id = known_face_ids[first_match_index]
                else:
                    face_id = f"Person {next_id}"
                    known_face_encodings.append(face_encoding)
                    known_face_ids.append(face_id)
                    next_id += 1

                last_seen[face_id] = current_time

                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                if frame_count % emotion_detection_interval == 0:
                    Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()

        if not emotion_queue.empty():
            face_location, emotions = emotion_queue.get()
            if emotions:
                top, right, bottom, left = face_location
                dominant_emotion = max(emotions[0]['emotions'], key=emotions[0]['emotions'].get)
                cv2.putText(frame, f"Emotion: {dominant_emotion}", (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

        for face_id in list(last_seen.keys()):
            if current_time - last_seen[face_id] > timeout:
                if face_id in known_face_ids:
                    index = known_face_ids.index(face_id)
                    known_face_encodings.pop(index)
                    known_face_ids.pop(index)
                del last_seen[face_id]
                print(f"Face with ID {face_id} removed due to inactivity.")

        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except KeyboardInterrupt:
    print("Program interrupted by user.")

finally:
    video_capture.release()
    cv2.destroyAllWindows()
    print("Program ended.")

Codeface

Code with Arduino:

Here is a comparison of the code with Arduino integration versus the code without, with emphasis on the changes in each section:

  1. Arduino Serial Communication:

    import serial: The serial library is imported to enable communication with the Arduino.

    arduino = serial.Serial('COM5', 9600): Initializes a serial connection with the Arduino on the specified COM port (here COM5) and with a baud rate of 9600. You need to adjust the COM port to your own setup.

    time.sleep(2): Waits 2 seconds for the serial connection to establish.

    arduino.write(b'WATER_ON') and arduino.write(b'WATER_OFF'): Sends commands to the Arduino (in bytes) to turn the water pump on and off.

    arduino.close(): Closes the serial connection at the end of the program.

  2. Water Pump Functionality:

    cooldown_time = 120: A variable that determines the minimum time (in seconds) between water pump activations.

    waterpump_active = False: A flag to indicate whether the water pump is currently active.

    last_happy_times = : A dictionary to keep track of the last time the water pump was activated for a particular face.

    activate_waterpump(person_id) function: Activates the water pump for 8 seconds, after a 'happy' detection for a person. And is executed on a separate thread to prevent blocking the video processing.

  3. Emotion Detection and Water Pump Activation:

    Emotion Analysis: The code now looks at the top 3 emotions and displays them on the screen.

    "Happy" Trigger: If the detection of happy exceeds a threshold (0.5), and the cooldown timer has expired, then the water pump is activated via the activate_waterpump function.

  4. External Camera Fix

    The code now uses cv2.VideoCapture(1, cv2.CAP_DSHOW) to start the external camera and checks if the camera is connected.

  5. Debugger info

    The code now prints detected emotions to the console in the terminal.

Explanation of changes in code per section:

  1. Import the serial library:
import serial

New: This library is added to enable serial communication with the Arduino.

  1. Arduino Setup:
arduino = serial.Serial('COM5', 9600)
time.sleep(2)

New:

serial.Serial('COM5', 9600): Initializes serial communication with the Arduino on the specified COM port ('COM5' in this example) and a baud rate of 9600.

time.sleep(2): Waits 2 seconds for the serial connection to establish.

  1. Changed variable initialization:
last_happy_times = {}
cooldown_time = 120
waterpump_active = False
emotion_detection_interval = 3

New:

last_happy_times: A dictionary that stores the time of the last water pump activation per person.

cooldown_time: A variable that sets the cooldown period in seconds.

waterpump_active: A flag that checks if the water pump is already active.

Changed:

emotion_detection_interval = 3: Changed from 1 to 3.

  1. Video Capture - External Camera:
     
video_capture = cv2.VideoCapture(1, cv2.CAP_DSHOW)
if not video_capture.isOpened():
    print("Could not open external camera. Check if the camera is connected.")
    exit()

Changed:

cv2.VideoCapture(1, cv2.CAP_DSHOW): Selects the external camera.

The code now checks if the camera is connected and exits the code if it's not.

  1. activate_waterpump function:
      
def activate_waterpump(person_id):
    global waterpump_active
    waterpump_active = True
    print(f"Happy detected for {person_id}, water pump activated for 8 seconds.")
    arduino.write(b'WATER_ON')
    time.sleep(8)
    arduino.write(b'WATER_OFF')
    print(f"Water pump turned off for {person_id}.")
    waterpump_active = False
   

New: This function is called to activate the water pump:

global waterpump_active: Uses the global flag

waterpump_active = True: Sets the flag to True.

arduino.write(b'WATER_ON'): Sends a signal to the Arduino to turn on the water pump.

time.sleep(8): The water pump stays on for 8 seconds.

arduino.write(b'WATER_OFF'): Sends a signal to the Arduino to turn off the water pump.

waterpump_active = False: Sets the flag to False.

  1. Changes in the Main Loop (Emotion Handling and Pump Activation):
      
if not emotion_queue.empty():
    face_location, emotions = emotion_queue.get()
    if emotions:
        top, right, bottom, left = face_location
        sorted_emotions = sorted(emotions[0]['emotions'].items(), key=lambda x: x[1], reverse=True)

        # Toon de top 3 emoties
        emotion_text = " | ".join([f"{emotion}: {conf:.2f}" for emotion, conf in sorted_emotions[:3]])
        cv2.putText(frame, emotion_text, (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
        
        # Debug informatie
        print(f"Gedetecteerde emoties: {sorted_emotions}")
        
        # Controleer op 'happy' emotie met lagere drempelwaarde
        if sorted_emotions[0][0] == 'happy' and sorted_emotions[0][1] > 0.5:
            face_id = next((id for id, loc in last_seen.items() if loc == current_time), None)
            if face_id:
                last_happy_time = last_happy_times.get(face_id, 0)
                if (current_time - last_happy_time) > cooldown_time:
                    if not waterpump_active:
                        last_happy_times[face_id] = current_time
                        Thread(target=activate_waterpump, args=(face_id,)).start()

Changed and New:

Top Emotion Display: Now shows the top 3 emotions with associated values on the screen.

Debug Output: Prints detected emotions to the console.

Happy Emotion Threshold:

Checks if the dominant emotion is 'happy' with a threshold of 0.5.

Pump Activation Logic:

Checks if a face is associated with the current time.

Checks if the cooldown timer has expired.

Checks if the water pump is already active

If this is the case, the activate_waterpump function is started in a new thread.

  1. Exit procedure finally block:
      
finally:
    video_capture.release()
    cv2.destroyAllWindows()
    arduino.close()
    print("Program ended.")

Changed: arduino.close(): Closes the serial communication with the Arduino cleanly.


Complete code:


import cv2
import face_recognition
from fer import FER
import time
from threading import Thread
from queue import Queue
import serial

# Arduino setup
arduino = serial.Serial('COM5', 9600)  # Pas de COM-poort aan indien nodig
time.sleep(2)  # Wacht tot de verbinding is opgezet

# Initialize variables
known_face_encodings = []
known_face_ids = []
next_id = 1
last_seen = {}
last_happy_times = {}
timeout = 600  # 10 minutes in seconds
face_detection_interval = 2
emotion_detection_interval = 3
cooldown_time = 120  # Cooldown tijd voor waterpomp in seconden
waterpump_active = False

# Initialize emotion detector
emotion_detector = FER(mtcnn=True)

# Video capture voor externe camera
video_capture = cv2.VideoCapture(1, cv2.CAP_DSHOW)  # Gebruik 0 voor de eerste externe camera, of 1 als dat niet werkt
video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)

if not video_capture.isOpened():
    print("Could not open external camera. Check if the camera is connected.")
    exit()

print("Program started. Press 'q' to stop.")

frame_count = 0
face_queue = Queue(maxsize=5)
emotion_queue = Queue(maxsize=5)

def process_faces(frame):
    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    face_locations = face_recognition.face_locations(rgb_frame, model="hog")
    face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
    face_queue.put((face_locations, face_encodings))

def process_emotions(frame, face_location):
    top, right, bottom, left = face_location
    face_image = frame[top:bottom, left:right]
    emotions = emotion_detector.detect_emotions(face_image)
    emotion_queue.put((face_location, emotions))

def activate_waterpump(person_id):
    global waterpump_active
    waterpump_active = True
    print(f"Happy detected for {person_id}, water pump activated for 8 seconds.")
    arduino.write(b'WATER_ON')
    time.sleep(8)
    arduino.write(b'WATER_OFF')
    print(f"Water pump turned off for {person_id}.")
    waterpump_active = False

try:
    while True:
        ret, frame = video_capture.read()
        if not ret:
            print("Could not read frame from camera. Retrying...")
            time.sleep(1)
            continue

        frame_count += 1
        current_time = time.time()

        if frame_count % face_detection_interval == 0:
            Thread(target=process_faces, args=(frame.copy(),)).start()

        if not face_queue.empty():
            face_locations, face_encodings = face_queue.get()
            for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
                matches = face_recognition.compare_faces(known_face_encodings, face_encoding)
                face_id = "Unknown"

                if True in matches:
                    first_match_index = matches.index(True)
                    face_id = known_face_ids[first_match_index]
                else:
                    face_id = f"Person {next_id}"
                    known_face_encodings.append(face_encoding)
                    known_face_ids.append(face_id)
                    next_id += 1

                last_seen[face_id] = current_time

                cv2.rectangle(frame, (left, top), (right, bottom), (0, 255, 0), 2)
                cv2.putText(frame, face_id, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

                if frame_count % emotion_detection_interval == 0:
                    Thread(target=process_emotions, args=(frame.copy(), (top, right, bottom, left))).start()

        if not emotion_queue.empty():
            face_location, emotions = emotion_queue.get()
            if emotions:
                top, right, bottom, left = face_location
                sorted_emotions = sorted(emotions[0]['emotions'].items(), key=lambda x: x[1], reverse=True)
                
                # Toon de top 3 emoties
                emotion_text = " | ".join([f"{emotion}: {conf:.2f}" for emotion, conf in sorted_emotions[:3]])
                cv2.putText(frame, emotion_text, (left, bottom + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
                
                # Debug informatie
                print(f"Gedetecteerde emoties: {sorted_emotions}")
                
                # Controleer op 'happy' emotie met lagere drempelwaarde
                if sorted_emotions[0][0] == 'happy' and sorted_emotions[0][1] > 0.5:
                    face_id = next((id for id, loc in last_seen.items() if loc == current_time), None)
                    if face_id:
                        last_happy_time = last_happy_times.get(face_id, 0)
                        if (current_time - last_happy_time) > cooldown_time:
                            if not waterpump_active:
                                last_happy_times[face_id] = current_time
                                Thread(target=activate_waterpump, args=(face_id,)).start()

        for face_id in list(last_seen.keys()):
            if current_time - last_seen[face_id] > timeout:
                if face_id in known_face_ids:
                    index = known_face_ids.index(face_id)
                    known_face_encodings.pop(index)
                    known_face_ids.pop(index)
                del last_seen[face_id]
                print(f"Face with ID {face_id} removed due to inactivity.")

        cv2.imshow('Video', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

except KeyboardInterrupt:
    print("Program interrupted by user.")

finally:
    video_capture.release()
    cv2.destroyAllWindows()
    arduino.close()
    print("Program ended.")

Arduino code:

const int relayPin = 5;  // Pin waarop het relais is aangesloten

void setup() {
  pinMode(relayPin, OUTPUT);
  Serial.begin(9600);  // Start seriële communicatie
}

void loop() {
  if (Serial.available() > 0) {
    String command = Serial.readStringUntil('\n');  // Lees het commando van de seriële poort

    if (command == "WATER_ON") {
      digitalWrite(relayPin, HIGH);  // Zet de pomp aan
    } 
    else if (command == "WATER_OFF") {
      digitalWrite(relayPin, LOW);   // Zet de pomp uit
    }
  }
}