Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Update Face Recognition & Save timeline #54

Merged
merged 1 commit into from
Jun 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions serving/backend/app/api/face.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,28 @@ async def get_timeline_face(info: dict):
face_timelines (dict) : 특정 인물들에 대한 timeline을 list형태로 제공 ex) face_timelines : {"person_00" : [[]], "person_03" : [[]]}
"""

result_path = os.path.join(FILE_DIR, info['id'])
video = os.path.join(result_path, 'original.mp4')
video_path = os.path.join(FILE_DIR, info['id'])
video = os.path.join(video_path, 'original.mp4')

timelines = {}
# recognition
target_people = info['face']
result_path = os.path.join(FILE_DIR, info['id'], 'result', 'result.npy')

timelines = FaceRecognition(video, target_people, result_path)

save_path = os.path.join(FILE_DIR, info['id'], 'face_timelines.npy')
np.save(save_path, timelines)
# timelines = {}

for face in info['face']:
image_file = os.listdir(os.path.join(result_path, 'result', face))[0]
# for face in info['face']:
# image_file = os.listdir(os.path.join(result_path, 'result', face))[0]

image = os.path.join(result_path, 'result', face, image_file)
# image = os.path.join(result_path, 'result', face, image_file)

timeline = FaceRecognition(video, [image])
timelines[face] = timeline
# timeline = FaceRecognition(video, [image])
# timelines[face] = timeline
# FE에서 선택한 사람을 받아 face recognition 진행 예정
return {"id" : info['id'], "face": timelines}
return {"id" : info['id']}


# TODO: /show-people (face clustering 결과 보여주기)
Expand Down
7 changes: 5 additions & 2 deletions serving/backend/app/api/highlight.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from ml.face_functions import FinalTimeline

import os
import numpy as np

router = APIRouter(tags=["highlight"])

Expand All @@ -28,10 +29,12 @@ async def read_highlight(timelines: dict):
people_img (dict) : 선택한 인물들의 이미지 디렉토리를 dictionary형태로 담아서 제공한다. ex) "people_img" : {"person_00" : "people/person_00.png", "person_03" : "people/person_03.png"}
"""
print(timelines)
face_timeline = timelines['face']
laugh_timeline = timelines['laugh']
id = timelines['id']

face_timelines_dir = os.path.join(FILE_DIR, id, 'face_timelines.npy')
face_timeline = np.load(face_timelines_dir, allow_pickle=True).item()

shorts = FinalTimeline(laugh_timeline, face_timeline, id)

return {"id" : id, "shorts": shorts, "people_img" : {}}
return {"id" : id, "shorts": shorts, "people_img" : timelines['people_img']}
17 changes: 8 additions & 9 deletions serving/backend/app/ml/face_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,19 @@ def FaceClustering(video_path: str = "", save_dir:str = ""):


########## Face Recognition ############
def FaceRecognition(video_path: str="", target_path: str=""):
# Load config
cfg = load_json('./ml/config.json')
def FaceRecognition(video_path: str="", target_people: list=[], result_path: str=""):

# Initialize Face Recognizor
recognizer = FaceRecognizer(video_path=video_path,
target_path=target_path,
model_cfg=cfg['face_recognition'])
result_data = np.load(result_path, allow_pickle=True).item()
target_encoding = [result_data[person]['repr_encoding'] for person in target_people]

# Initialize Face Recognizor
recognizer = FaceRecognizer(video_path, target_encoding=target_encoding)

# save frame numbers from video
output_frames = recognizer.recognize_faces()
timelines, output_frames = recognizer.recognize_faces()

# make timeline from output frames per each person
people_timeline = recognizer.make_people_timeline(output_frames)
people_timeline = recognizer.make_people_timeline(timelines, output_frames, target_people)

return people_timeline

Expand Down
218 changes: 162 additions & 56 deletions serving/backend/app/ml/face_recognizer/face_recog.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
import dlib
import face_recognition
import face_recognition_models
import torch
import cv2
import numpy as np
from PIL import Image
import sys
sys.path.append('../')
import ml.imagecluster.calc as calc

class FaceRecognizer:
def __init__(self,video_path,target_path,model_cfg):
def __init__(self,video_path,target_encoding,batch_size=16):
self.video_path = video_path
self.target_path = target_path
self.target_count = len(self.target_path)
self.model_cfg = model_cfg
self.target_encoding = target_encoding
self.target_count = len(target_encoding)
self.batch_size = batch_size

self.src = cv2.VideoCapture(self.video_path)
self.src_info = {
Expand All @@ -20,85 +25,186 @@ def __init__(self,video_path,target_path,model_cfg):
}


def recognize_faces(self):
# load target images
target_image = [face_recognition.load_image_file(x) for x in self.target_path]
target_loc = [face_recognition.face_locations(x, model="cnn") for x in target_image]
target_face_encoding = [face_recognition.face_encodings(img,loc)[0] for img,loc in zip(target_image,target_loc)]
known_faces = target_face_encoding
def initialize_gpu(self):
test = np.array(np.random.rand(10,10,3),dtype='uint8')
face_recognition.face_locations(test,model='cnn')


def get_face_and_cloth_image(self, frame, boxes):
padded_faces = []
padded_clothes = []

img_height, img_width = frame.shape[:2]
for box in boxes:
(box_top, box_right, box_bottom, box_left) = box # 딱 얼굴 이미지
box_width = box_right - box_left
box_height = box_bottom - box_top
# padding
crop_top = max(box_top - box_height, 0)
pad_top = -min(box_top - box_height, 0)
crop_bottom = min(box_bottom + box_height, img_height - 1)
pad_bottom = max(box_bottom + box_height - img_height, 0)
crop_left = max(box_left - box_width, 0)
pad_left = -min(box_left - box_width, 0)
crop_right = min(box_right + box_width, img_width - 1)
pad_right = max(box_right + box_width - img_width, 0)
# cropping
face_image = frame[crop_top:crop_bottom, crop_left:crop_right]
cloth_image = frame[box_bottom+int(box_height*0.2):crop_bottom, crop_left:crop_right]
# return
if (pad_top == 0 and pad_bottom == 0):
if (pad_left == 0 and pad_right == 0):
padded_faces.append(face_image)
padded_clothes.append(cloth_image)
continue
padded_face = cv2.copyMakeBorder(face_image, pad_top, pad_bottom,
pad_left, pad_right, cv2.BORDER_CONSTANT)
padded_cloth = cv2.copyMakeBorder(cloth_image, pad_top, pad_bottom,
pad_left, pad_right, cv2.BORDER_CONSTANT)

padded_faces.append(padded_face)
padded_clothes.append(padded_cloth)
return padded_faces, padded_clothes


def preprocess(self, images, size):
try:
imgs = []
for image in images:
img = Image.fromarray(image).convert('RGB').resize(size, resample=3)
imgs.append(img)
return imgs
except OSError as ex:
print(f"skipping file...: {ex}")
return None

def recognize_faces(self):
frames = []
frames_real_time = []
output_frame = [[] for _ in range(self.target_count)]
frame_count = 0
frame_num = 0
cloth_encoding_model = calc.get_model()

last_frame = None
start_frame_num = 0
min_scene_frames = 15
timelines = []
total_target_frames = 100
down_scale_factor = 10
transition_threshold = 100

self.initialize_gpu()

while self.src.isOpened():
ret, frame = self.src.read()
if not ret:
break

# scene detect
cur_frame = frame[::down_scale_factor, ::down_scale_factor, :]

if last_frame is None:
last_frame = cur_frame
start_original_frame = frame
last_original_frame = frame
start_frame_num = frame_num
frame_num += 1
continue

# BGR->RGB & Crop
frame = frame[:, :, ::-1]
cropped = frame[int(frame.shape[0]*0.2):int(frame.shape[0]*0.8), int(frame.shape[1]*0.2):int(frame.shape[1]*0.8)]
frame = cropped
num_pixels = cur_frame.shape[0] * cur_frame.shape[1]
rgb_distance = np.abs(cur_frame - last_frame) / float(num_pixels)
rgb_distance = rgb_distance.sum() / 3.0
last_frame = cur_frame
start_original_frame = frame

if rgb_distance > transition_threshold and frame_num - start_frame_num > min_scene_frames:
timelines.append((start_frame_num, frame_num - 1))
start_frame_num = frame_num

last_original_frame = last_original_frame[:, :, ::-1]
height, width = last_original_frame.shape[:2]
last_original_frame = last_original_frame[int(height*0.2):, int(width*0.2):int(width*0.8)]
if height > 600:
last_original_frame = cv2.resize(last_original_frame, None, fx=0.6, fy=0.6)
frames.append(last_original_frame)
frames_real_time.append(frame_num-1)

# CHECK_FRAME 마다 frame을 batch에 저장
if frame_count % self.model_cfg['check_frame'] == 0:
frames.append(frame)
frames_real_time.append(frame_count)
start_original_frame = start_original_frame[:, :, ::-1]
height, width = start_original_frame.shape[:2]
start_original_frame = start_original_frame[int(height*0.2):, int(width*0.2):int(width*0.8)]
if height > 600:
start_original_frame = cv2.resize(start_original_frame, None, fx=0.6, fy=0.6)
frames.append(start_original_frame)
frames_real_time.append(frame_num)

# BATCH_SIZE에 도달하면 recognition수행
if len(frames) == self.model_cfg['batch_size']:
if len(frames) == self.batch_size:
batch_of_face_locations = face_recognition.batch_face_locations(frames, number_of_times_to_upsample=0)
for frame_number_in_batch, face_locations in enumerate(batch_of_face_locations):
face_encodings = face_recognition.face_encodings(frames[frame_number_in_batch], face_locations)
for face_encoding in face_encodings:
match = face_recognition.compare_faces(known_faces, face_encoding, tolerance=0.40)
for i in range(len(match)):
if match[i]:
output_frame[i].append(frames_real_time[frame_number_in_batch])
face_encodings = []
for face_location in face_locations:
top, right, bottom, left = face_location
resized_frame = cv2.resize(frames[frame_number_in_batch][top:bottom,left:right], dsize=(224,224))
resized_encodings = face_recognition.face_encodings(resized_frame,[(0,223,223,0)], model='small')[0] # list 안에 인물 수만큼 numpy array
face_encodings.append(resized_encodings)
if len(face_locations) > 0:
upper_body_images, cloth_images = self.get_face_and_cloth_image(frames[frame_number_in_batch], face_locations)
preprocessed_cloth_images = self.preprocess(cloth_images, (224, 224))
cloth_encodings = calc.fingerprint(preprocessed_cloth_images, cloth_encoding_model,device = torch.device(device='cuda'))
for i in range(len(face_encodings)):
normalized_face_encoding = face_encodings[i] / np.linalg.norm(face_encodings[i])
normalized_cloth_encoding = cloth_encodings[i] / np.linalg.norm(cloth_encodings[i])
encoding = np.concatenate((normalized_face_encoding*1, normalized_cloth_encoding*0.7), axis=0)
match = face_recognition.compare_faces(self.target_encoding, encoding, tolerance=0.40)
for i in range(len(match)):
if match[i]:
output_frame[i].append(frames_real_time[frame_number_in_batch])

frames = []
frames_real_time = []

frame_count += 1

last_original_frame = frame
frame_num += 1

# 마지막 batch 처리
if len(frames) > 0:
batch_of_face_locations = face_recognition.batch_face_locations(frames, number_of_times_to_upsample=0)
for frame_number_in_batch, face_locations in enumerate(batch_of_face_locations):
face_encodings = face_recognition.face_encodings(frames[frame_number_in_batch], face_locations)

for face_encoding in face_encodings:
match = face_recognition.compare_faces(known_faces, face_encoding, tolerance=0.40)
for i in range(len(match)):
if match[i]:
output_frame[i].append(frames_real_time[frame_number_in_batch])
if len(face_locations) > 0:
upper_body_images, cloth_images = self.get_face_and_cloth_image(frames[frame_number_in_batch], face_locations)
preprocessed_cloth_images = self.preprocess(cloth_images, (224, 224))
cloth_encodings = calc.fingerprint(preprocessed_cloth_images, cloth_encoding_model,device = torch.device(device='cuda'))
for i in range(len(face_encodings)):
normalized_face_encoding = face_encodings[i] / np.linalg.norm(face_encodings[i])
normalized_cloth_encoding = cloth_encodings[i] / np.linalg.norm(cloth_encodings[i])
encoding = np.concatenate((normalized_face_encoding*1, normalized_cloth_encoding*0.7), axis=0)
match = face_recognition.compare_faces(self.target_encoding, encoding, tolerance=0.40)
for i in range(len(match)):
if match[i]:
output_frame[i].append(frames_real_time[frame_number_in_batch])

self.src.release()

return output_frame
return timelines, output_frame


def make_people_timeline(self,frames):
def make_people_timeline(self,scene_frame,people_frame, target_people):
fps = self.src_info['fps']
# people_timeline = []
for frame in frames:
person_timeline=[]
if len(frame)==0:
# people_timeline.append([])
person_timeline.append([])
people_timeline = {}
for idx, person_frame in enumerate(people_frame):
if len(person_frame)==0:
people_timeline.append([])
continue
# person_timeline=[]
start=frame[0]
end=frame[0]
for f in frame:
if f-end>33:
person_timeline.append((round((start-8)/fps,2),round((end+8)/fps,2)))
start,end=f,f
else:
end = f
person_timeline.append((round((start-8)/fps,2),round(end/fps,2)))
# people_timeline.append(person_timeline)
# return people_timeline
return person_timeline
person_timeline=[]
scene_index = 0
for frame in person_frame:
for i in range(scene_index, len(scene_frame)):
start, end = scene_frame[i]
if start<=frame<=end:
person_timeline.append((round(start/fps,2), round(end/fps,2)))
scene_index = i+1
break
people_timeline[target_people[idx]] = person_timeline

return people_timeline