This program loads a video, detects and tracks faces in the video and detects the estimated age and gender of each person. The objective is to collect data on the number of unique people in a video and their attributes.
Overall code structure
The video is processed frame by frame.
The main data structure is people_info. This is a list of dictionaries where each dictionary holds attributes for a unique person. It holds the Track IDs that belong to the person, the face embedding and the age and gender categories.
Each frame:
Detect all faces using YOLOv3 object detector for faces.
Use the tracking algorithm, DeepSORT, to determine Track IDs. A track ID identifies a certain face trajectory in the video. The same person can have multiple Track IDs if they are in the video more than once.
If there is a new Track ID present, check whether the face in this frame is high quality enough to be processed by the face recognition, age and gender detectors.
- High quality : use face recognition to compare with people previously seen. If the person is new, determine age and gender with caffemodels. Add this track to person_info either as a new entry or add to an existing entry.
- Low quality : keep checking each frame until you get a high quality image of this face.
Output an annotated video showing:
- Bounding boxes around each face, tracking a person
- The 'Track ID' that identifies a single appearance of a person (frame sequence where a particular person is present)
- The estimated 'Person ID' that identifies a unique person. If the same person appears more than once, they should be assigned the same ID.
- Estimated age and gender. These are recalculated every n frames. They often change from frame to frame because the person looks slightly different.
All models are open source and pretrained.
The code
This code I'm posting for review is originally copied from theAIGuysCode/yolov4-deepsort and refers to some additional modules you can find there. I have modified it to add recognition and age/gender detection functionality. This script is called from the command line like:
python object_tracker.py --weights ./checkpoints/yolov3-widerface --model yolov3 --video ./data/video/interview.mp4 --output ./outputs/interview.avi --dont_show --face --age_gender
My additions are the --face and --age_gender command line options.
Question
I would like to get your opinion on how I should refactor my code before I do more development on this project. I'm most interested in the parts of the code that are my own additions: the functions at the beginning of the code and where they're being called in the main function (anywhere inside a if FLAGS.age_gender). I would be interested in how to best join my new additions with the existing stuff. Currently, I think the structure is confusing since each face crop is being resized at different points with different bounding box formats and nested if statements everywhere. In what structure should I store information about unique people and track IDs?
I want to develop the code to improve the age/gender and recognition accuracy. I want to store information from multiple frames of a particular person so I can average the estimated age and gender to improve the estimate accuracy. Also, storing multiple face embeddings for the same person in different frames so I can implement 'voting' to improve the face recognition matches.
Any recommendations for this code are welcome.
import os
# comment out below line to enable tensorflow logging outputs
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
import time
import tensorflow as tf
physical_devices = tf.config.experimental.list_physical_devices("GPU")
if len(physical_devices) > 0:
tf.config.experimental.set_memory_growth(physical_devices[0], True)
from absl import app, flags, logging
from absl.flags import FLAGS
import core.utils as utils
from core.yolov4 import filter_boxes
from tensorflow.python.saved_model import tag_constants
from core.config import cfg
import cv2
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.compat.v1 import ConfigProto
from tensorflow.compat.v1 import InteractiveSession
# deep sort imports
from deep_sort import preprocessing, nn_matching
from deep_sort.detection import Detection
from deep_sort.tracker import Tracker
from tools import generate_detections as gdet
from enum import Enum
from imutils import paths
import face_recognition
import pickle
from PIL import Image, ImageDraw
import dlib
flags.DEFINE_string("framework", "tf", "(tf, tflite, trt")
flags.DEFINE_string("weights", "./checkpoints/yolov4-416", "path to weights file")
flags.DEFINE_integer("size", 416, "resize images to")
flags.DEFINE_boolean("tiny", False, "yolo or yolo-tiny")
flags.DEFINE_string("model", "yolov4", "yolov3 or yolov4")
flags.DEFINE_string(
"video", "./data/video/test.mp4", "path to input video or set to 0 for webcam"
)
flags.DEFINE_string("output", None, "path to output video")
flags.DEFINE_string(
"output_format", "XVID", "codec used in VideoWriter when saving video to file"
)
flags.DEFINE_float("iou", 0.45, "iou threshold")
flags.DEFINE_float("score", 0.95, "score threshold")
flags.DEFINE_boolean("dont_show", False, "dont show video output")
flags.DEFINE_boolean("info", False, "show detailed info of tracked objects")
flags.DEFINE_boolean("count", False, "count objects being tracked on screen")
flags.DEFINE_boolean("face", False, "using yoloface")
flags.DEFINE_boolean("age_gender", False, "detecting age and gender")
# age and gender models accept a face crop image that is 224x224 pixels.
MIN_FACE_SIZE: int = 224
# the size of the bounding box needs to be expanded to be input to age/gender models
BBOX_SCALING = 1.9
class AgeCategories(Enum):
Child = slice(None, 13)
GenZ = slice(13, 23)
Millennial = slice(23, 30)
GenX = slice(30, 55)
Boomer = slice(55, None)
AgeCategoryFromIndex = {i: cat for i, cat in enumerate(AgeCategories)}
def restrict_bbox(bbox, w, h):
"""for tlbr, edits bbox so it is within the bounds of the frame. w and h are frame width and height"""
if len(bbox) is 4:
return [
min(max(bbox[0], 0), w),
min(max(bbox[1], 0), h),
min(max(bbox[2], 0), w),
min(max(bbox[3], 0), h),
]
else:
logger.info("bbox is not valid")
def expand_bbox_square(bbox, frame_width, frame_height):
"""for tlbr, expands the bbox so it is a square, also scales to cover the full face. From trial and error I found that scaling of 1.9 works best."""
height = bbox[3] - bbox[1]
width = bbox[2] - bbox[0]
# making bbox a square for processing
square_bbox_width = max(width, height) * BBOX_SCALING
x_centre = (bbox[0] + bbox[2]) / 2
y_centre = (bbox[1] + bbox[3]) / 2
y_1_square = y_centre - (square_bbox_width / 2)
x_1_square = x_centre - (square_bbox_width / 2)
y_2_square = y_centre + (square_bbox_width / 2)
x_2_square = x_centre + (square_bbox_width / 2)
square_bbox = restrict_bbox(
[x_1_square, y_1_square, x_2_square, y_2_square], frame_width, frame_height
)
centre = [x_centre, y_centre]
return square_bbox, centre
def get_age_gender(face_crop, age_model, gender_model):
"""detects age and gender of a face crop image. The provided image must be square and > 224 pixels wide."""
assert face_crop.shape[0] == face_crop.shape[1], "face crop is not square"
assert (
face_crop.shape[0] >= MIN_FACE_SIZE
), f"Too little - expected 224; got {face_crop.shape[0]}"
detected_face = cv2.resize(
face_crop, (MIN_FACE_SIZE, MIN_FACE_SIZE), interpolation=cv2.INTER_LINEAR
) # (224, 224, 3) now
img_blob = cv2.dnn.blobFromImage(
detected_face
) # img_blob shape is (1, 3, 224, 224)
gender_model.setInput(img_blob)
gender_class = gender_model.forward()[0]
gender = "Woman " if np.argmax(gender_class) == 0 else "Man"
age_model.setInput(img_blob)
age_dist = age_model.forward()[0]
slot_ages = [sum(age_dist[cat.value]) for cat in AgeCategories]
age_category_name = AgeCategoryFromIndex[np.argmax(slot_ages)].name
return age_category_name, gender
def is_face_image_good(face_crop, track, save_faces=True, use_landmarks=False):
"""checks if the face crop image is high quality so it can be processed by
recognition/age/gender. Checks face is large enough (so high resolution) and
square (age/gender models accept 224x224 face images). Checks if the face isn't
side facing by trying to generate face landmarks. Side facing faces cannot generate landmarks."""
width = face_crop.shape[0]
height = face_crop.shape[1]
if (width > MIN_FACE_SIZE) and (width == height):
# check whether face is side facing by seeing if landmarks can be generated
# face_recogniton accepts rgb ordering
if use_landmarks:
face_landmarks_list = face_recognition.face_landmarks(face_crop)
if len(face_landmarks_list) != 0:
if save_faces:
# need to convert to bgr ordering to use cv2 imwrite
cv2.imwrite(
"outputs/face_images/original_" + str(track.track_id) + ".jpg",
cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB),
)
# pillow uses rgb ordering
pil_image = Image.fromarray(face_crop)
d = ImageDraw.Draw(pil_image)
for face_landmarks in face_landmarks_list:
# Let's trace out each facial feature in the image with a line!
for facial_feature in face_landmarks.keys():
d.line(face_landmarks[facial_feature], width=5)
pil_image.save(
"outputs/face_images/"
+ "annotated_"
+ str(track.track_id)
+ ".jpg"
)
return True
else:
print("Face is the right size but landmarks could not be generated.")
return False
else:
cv2.imwrite(
"outputs/face_images/original_" + str(track.track_id) + ".jpg",
cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB),
)
return True
else:
print("Face not the right size: " + str(face_crop.shape))
return False
def match_new_face(face_crop, people_info, track, age_model, gender_model):
"""Find out whether we have seen this person before by comparing against people_info. We generate an embedding for the new
face and see if there are any matches with earlier recorded people."""
# resizing face to be smaller to try and avoid memory error - a face that was approx 700x700 was causing a memory allocation error.
face_crop = cv2.resize(
face_crop, (MIN_FACE_SIZE, MIN_FACE_SIZE), interpolation=cv2.INTER_LINEAR
)
face_embeddings = face_recognition.face_encodings(face_crop)
if len(face_embeddings) > 0:
face_embedding = face_embeddings[0]
# compare the new face with all other people seen before
for i, person in enumerate(people_info):
past_face_embedding = person["face_embedding"]
isSame = face_recognition.compare_faces(
[past_face_embedding], face_embedding
)[0]
if isSame:
print(
"person in track "
+ str(track.track_id)
+ " has been seen in a previous tracklet group: "
+ str(person["track_ids"])
+ " and the person ID is: "
+ str(i)
)
person_id = i
# choose to update age/gender
age_category_name, gender = get_age_gender(
face_crop, age_model, gender_model
)
people_info[i]["track_ids"].append(track.track_id)
people_info[i]["gender"] = gender
people_info[i]["age_category_name"] = age_category_name
break
# if no match found, create a new person id.
else:
person_id = len(people_info)
print(
"person in track "
+ str(track.track_id)
+ " has never been seen before - assign new person ID: "
+ str(person_id)
)
# this is a totally new person, so we want to calc their age and gender
age_category_name, gender = get_age_gender(
face_crop, age_model, gender_model
)
people_info.append(
{
"track_ids": [track.track_id],
"age_category_name": age_category_name,
"gender": gender,
"face_embedding": face_embedding,
}
)
else:
person_id = "unknown"
age_category_name = "unknown"
gender = "unknown"
print("no face detected for recognition - discard")
return age_category_name, gender, person_id
def main(_argv):
# Definition of the parameters
max_cosine_distance = 0.4
nn_budget = None
nms_max_overlap = 1.0
# initialize deep sort
model_filename = "model_data/mars-small128.pb"
encoder = gdet.create_box_encoder(model_filename, batch_size=1)
# calculate cosine distance metric
metric = nn_matching.NearestNeighborDistanceMetric(
"cosine", max_cosine_distance, nn_budget
)
# initialize tracker
tracker = Tracker(metric)
# load configuration for object detector
config = ConfigProto()
config.gpu_options.allow_growth = True
session = InteractiveSession(config=config)
STRIDES, ANCHORS, NUM_CLASS, XYSCALE = utils.load_config(FLAGS)
input_size = FLAGS.size
video_path = FLAGS.video
# load tflite model if flag is set
if FLAGS.framework == "tflite":
interpreter = tf.lite.Interpreter(model_path=FLAGS.weights)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
print(input_details)
print(output_details)
# otherwise load standard tensorflow saved model
else:
saved_model_loaded = tf.saved_model.load(
FLAGS.weights, tags=[tag_constants.SERVING]
)
infer = saved_model_loaded.signatures["serving_default"]
if FLAGS.age_gender:
gender_model = cv2.dnn.readNetFromCaffe(
"model_data/gender.prototxt", "model_data/gender.caffemodel"
)
age_model = cv2.dnn.readNetFromCaffe(
"model_data/age.prototxt", "model_data/dex_chalearn_iccv2015.caffemodel"
)
# make list that will hold age/gender predictions and face embedding.
# each element of the list will represent a unique person (not a tracket). the element is a dict with fields holding age, gender, embedding and track ids
people_info = []
# begin video capture
try:
vid = cv2.VideoCapture(int(video_path))
except:
vid = cv2.VideoCapture(video_path)
out = None
frame_width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
# get video ready to save locally if flag is set
if FLAGS.output:
# by default VideoCapture returns float instead of int
fps = int(vid.get(cv2.CAP_PROP_FPS))
codec = cv2.VideoWriter_fourcc(*FLAGS.output_format)
out = cv2.VideoWriter(FLAGS.output, codec, fps, (frame_width, frame_height))
frame_num = 0
# while video is running
while True:
return_value, frame = vid.read()
if return_value:
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(frame)
else:
print("Video has ended or failed, try a different video format!")
break
frame_num += 1
# print('Frame #: ', frame_num)
frame_size = frame.shape[:2]
image_data = cv2.resize(frame, (input_size, input_size))
image_data = image_data / 255.0
image_data = image_data[np.newaxis, ...].astype(np.float32)
start_time = time.time()
# run detections on tflite if flag is set
if FLAGS.framework == "tflite":
interpreter.set_tensor(input_details[0]["index"], image_data)
interpreter.invoke()
pred = [
interpreter.get_tensor(output_details[i]["index"])
for i in range(len(output_details))
]
# run detections using yolov3 if flag is set
if FLAGS.model == "yolov3" and FLAGS.tiny == True:
boxes, pred_conf = filter_boxes(
pred[1],
pred[0],
score_threshold=0.25,
input_shape=tf.constant([input_size, input_size]),
)
else:
boxes, pred_conf = filter_boxes(
pred[0],
pred[1],
score_threshold=0.25,
input_shape=tf.constant([input_size, input_size]),
)
else:
batch_data = tf.constant(image_data)
pred_bbox = infer(batch_data)
for key, value in pred_bbox.items():
boxes = value[:, :, 0:4]
pred_conf = value[:, :, 4:]
(
boxes,
scores,
classes,
valid_detections,
) = tf.image.combined_non_max_suppression(
boxes=tf.reshape(boxes, (tf.shape(boxes)[0], -1, 1, 4)),
scores=tf.reshape(
pred_conf, (tf.shape(pred_conf)[0], -1, tf.shape(pred_conf)[-1])
),
max_output_size_per_class=50,
max_total_size=50,
iou_threshold=FLAGS.iou,
score_threshold=FLAGS.score,
)
# convert data to numpy arrays and slice out unused elements
num_objects = valid_detections.numpy()[0]
bboxes = boxes.numpy()[0]
bboxes = bboxes[0 : int(num_objects)]
scores = scores.numpy()[0]
scores = scores[0 : int(num_objects)]
classes = classes.numpy()[0]
classes = classes[0 : int(num_objects)]
# format bounding boxes from normalized ymin, xmin, ymax, xmax ---> xmin, ymin, width, height
original_h, original_w, _ = frame.shape
bboxes = utils.format_boxes(bboxes, original_h, original_w)
# store all predictions in one parameter for simplicity when calling functions
pred_bbox = [bboxes, scores, classes, num_objects]
# read in all class names from config
class_names = utils.read_class_names(cfg.YOLO.CLASSES)
# by default allow all classes in .names file
allowed_classes = list(class_names.values())
# loop through objects and use class index to get class name, allow only classes in allowed_classes list
names = []
deleted_indx = []
for i in range(num_objects):
class_indx = int(classes[i])
class_name = class_names[class_indx]
if class_name not in allowed_classes:
deleted_indx.append(i)
else:
names.append(class_name)
names = np.array(names)
count = len(names)
if FLAGS.count:
cv2.putText(
frame,
"Objects being tracked: {}".format(count),
(5, 35),
cv2.FONT_HERSHEY_COMPLEX_SMALL,
2,
(0, 255, 0),
2,
)
print("Objects being tracked: {}".format(count))
# delete detections that are not in allowed_classes
bboxes = np.delete(bboxes, deleted_indx, axis=0)
scores = np.delete(scores, deleted_indx, axis=0)
# encode yolo detections and feed to tracker
features = encoder(frame, bboxes)
detections = [
Detection(bbox, score, class_name, feature)
for bbox, score, class_name, feature in zip(bboxes, scores, names, features)
]
# initialize color map
cmap = plt.get_cmap("tab20b")
colors = [cmap(i)[:3] for i in np.linspace(0, 1, 20)]
# run non-maxima supression
boxs = np.array([d.tlwh for d in detections])
scores = np.array([d.confidence for d in detections])
classes = np.array([d.class_name for d in detections])
indices = preprocessing.non_max_suppression(
boxs, classes, nms_max_overlap, scores
)
detections = [detections[i] for i in indices]
# Call the tracker
tracker.predict()
tracker.update(detections)
# update tracks
for track in tracker.tracks:
if not track.is_confirmed() or track.time_since_update > 1:
continue
bbox = track.to_tlbr()
class_name = track.get_class()
if FLAGS.age_gender:
# we want the bbox to be larger and square (if not on the edge of the frame) so it is in the right format for age/gender detection.
bbox, bbox_centre = expand_bbox_square(bbox, frame_width, frame_height)
# face_crop is in rgb ordering
face_crop = frame[
int(bbox[1]) : int(bbox[3]), int(bbox[0]) : int(bbox[2])
]
if any(track.track_id in person["track_ids"] for person in people_info):
person_id = np.argmax(
[
track.track_id in person["track_ids"]
for person in people_info
]
)
# decide to update age and gender prediction periodically, e.g. every 20 frames attempt to update
if frame_num % 10 == 0:
if is_face_image_good(face_crop, track):
age_category_name, gender = get_age_gender(
face_crop, age_model, gender_model
)
people_info[person_id][
"age_category_name"
] = age_category_name
people_info[person_id]["gender"] = gender
print("updated age and gender")
age_category_name = people_info[person_id]["age_category_name"]
gender = people_info[person_id]["gender"]
print(
"this track ID "
+ str(track.track_id)
+ " has been recorded before - person ID: "
+ str(person_id)
)
else:
print(
"new tracklet ID "
+ str(track.track_id)
+ " - checking whether face image is high quality"
)
# this is a new tracklet
if is_face_image_good(face_crop, track):
print("face image is high quality")
# need to check if we've seen this person before with face recognition
age_category_name, gender, person_id = match_new_face(
face_crop, people_info, track, age_model, gender_model
)
else:
# discard
person_id = "unknown"
age_category_name = "unknown"
gender = "unknown"
print(
"face image is of low quality - discard and keep checking for a high quality face image"
)
# draw bbox on screen
color = colors[int(track.track_id) % len(colors)]
color = [i * 255 for i in color]
cv2.rectangle(
frame,
(int(bbox[0]), int(bbox[1])),
(int(bbox[2]), int(bbox[3])),
color,
2,
)
cv2.rectangle(
frame,
(int(bbox[0]), int(bbox[1] - 30)),
(
int(bbox[0]) + (len(class_name) + len(str(track.track_id))) * 17,
int(bbox[1]),
),
color,
-1,
)
cv2.putText(
frame,
class_name + " Tracker ID - " + str(track.track_id),
(int(bbox[0]), int(bbox[1] + 20)),
0,
0.75,
(255, 255, 255),
2,
)
if FLAGS.age_gender:
cv2.putText(
frame,
"Person ID - "
+ str(person_id)
+ ", "
+ age_category_name
+ ", "
+ gender,
(int(bbox[0]), int(bbox[3] - 10)),
0,
0.75,
(255, 255, 255),
2,
)
# if enable info flag then print details about each track
if FLAGS.info:
print(
"Tracker ID: {}, Class: {}, BBox Coords (xmin, ymin, xmax, ymax): {}".format(
str(track.track_id),
class_name,
(int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])),
)
)
# calculate frames per second of running detections
fps = 1.0 / (time.time() - start_time)
print("FPS: %.2f" % fps)
result = np.asarray(frame)
result = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
if not FLAGS.dont_show:
cv2.imshow("Output Video", result)
# if output flag is set, save video file
if FLAGS.output:
out.write(result)
if cv2.waitKey(1) & 0xFF == ord("q"):
break
cv2.destroyAllWindows()
if __name__ == "__main__":
try:
app.run(main)
except SystemExit:
pass
