基于MTCNN与insightface的人脸打卡系统
注:篇幅较长,持续更新状态
2019.4.10
阶段一:基于MTCNN的人脸检测
前期已有实现,遂不再重复。github代码持续更新,现更新到version 0.2,博客由于代码更新幅度较大且仅提供入门参考遂不再更新,如有更新那就是我有时间和节操了~。
version 0.1博客地址:人脸检测与识别:MTCNN人脸检测
github地址:https://github.com/friedhelm739/MTCNN-tensorflow
阶段二:基于insightface的人脸识别
项目环境及配置:ubuntu16.04+2*GTX 1080ti+Python3.6+Anaconda5.2.0+Tensorflow1.7-gpu
本阶段是对《ArcFace: Additive Angular Margin Loss for Deep Face Recognition》论文的复现,网上解读文章很多,大家可以择优选读,关于代码解读有一系列比较好的解读,对入门理解源码有一定的帮助。
博客地址:人脸检测与识别:基于MTCNN与insightface的人脸打卡系统
github地址:https://github.com/friedhelm739/Insightface-tensorflow
本阶段代码参考:
- https://github.com/deepinsight/insightface
- https://github.com/luckycallor/InsightFace-tensorflow (非常感谢)
- https://github.com/auroua/InsightFace_TF
- https://github.com/tensorflow/models
在此对其表示衷心的感谢。
1、数据获取与处理
本文数据可以很轻松的从源代码的Dataset Zoo内获取,本文使用CASIA数据集。
下载后解压成如图1形式:
其中训练用的是train.idx和train.rec,其他的bin文件都是验证用的。
解压后需要使用/data/gen_tfrecord_mxdata.py将原MXNet训练数据格式转换为tensorflow的TFRecord格式,代码直接抄袭,如下所示:
# -*- coding: utf-8 -*- """ @author: friedhelm """ import tensorflow as tf import mxnet as mx import os import io import numpy as np import cv2 import time from scipy import misc import argparse from core import config def arg_parse(): parser=argparse.ArgumentParser() parser.add_argument("--read_dir",default=config.mxdata_dir,type=str, help='directory to read data') parser.add_argument("--save_dir",default=config.tfrecord_dir,type=str, help='path to save TFRecord file') return parser def main(): with tf.python_io.TFRecordwriter(save_dir) as writer: idx_path = os.path.join(read_dir, 'train.idx') bin_path = os.path.join(read_dir, 'train.rec') imgrec = mx.recordio.MXIndexedRecordIO(idx_path, bin_path, 'r') s = imgrec.read_idx(0) header, _ = mx.recordio.unpack(s) imgidx = list(range(1, int(header.label[0]))) labels = [] for i in imgidx: img_info = imgrec.read_idx(i) header, img = mx.recordio.unpack(img_info) label = int(header.label) labels.append(label) img = io.BytesIO(img) img = misc.imread(img).astype(np.uint8) img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) #img = cv2.resize(img, (112,112)) img_raw = img.tobytes() example=tf.train.Example(features=tf.train.Features(feature={ "img" : tf.train.Feature(bytes_list=tf.train.BytesList(value=[img_raw])), "label" : tf.train.Feature(int64_list=tf.train.Int64List(value=[label])), })) writer.write(example.SerializeToString()) if i % 10000 == 0: print('%d pics processed' % i,"time: ", time.time()-begin) if __name__ == "__main__": parser=arg_parse() save_dir=parser.save_dir read_dir=parser.read_dir begin=time.time() main()
测试数据直接使用,如想转换请参考上述代码。测试数据参照源码,是使用MTCNN检测人脸得到的,下面以lfw为例,代码路径:/data/gen_lfw_data.py。代码基本为搬运源码,由于数据集的特性,MTCNN会检测出多张未标注人脸,或者同一人脸多检测框的情况,这时源码使用以图片中心为准的思路,只取与图片中心点偏差最小的人脸框,。
# -*- coding: utf-8 -*- """ @author: friedhelm """ import sys sys.path.append("../") from core.MTCNN.mtcnn_detector import MTCNN_Detector from core.MTCNN.MTCNN_model import Pnet_model,Rnet_model,Onet_model import numpy as np import os from collections import namedtuple from easydict import EasyDict as edict from scipy import misc import cv2 from collections import namedtuple from core import config import argparse from core.tool import preprocess def arg_parse(): parser=argparse.ArgumentParser() parser.add_argument("--input_dir",default=config.lfw_dir,type=str, help='directory to read lfw data') parser.add_argument("--output_dir",default=config.lfw_save_dir,type=str, help='path to save lfw_face data') parser.add_argument("--image_size",default="112,112",type=str, help='image size') return parser def get_DataSet(input_dir, min_images=1): ret = [] label = 0 person_names = [] for person_name in os.listdir(input_dir): person_names.append(person_name) person_names = sorted(person_names) for person_name in person_names: _subdir = os.path.join(input_dir, person_name) if not os.path.isdir(_subdir): continue _ret = [] for img in os.listdir(_subdir): fimage = edict() fimage.id = os.path.join(person_name, img) fimage.classname = str(label) fimage.image_path = os.path.join(_subdir, img) fimage.bbox = None fimage.landmark = None _ret.append(fimage) if len(_ret)>=min_images: ret += _ret label+=1 return ret def main(args): dataset = get_DataSet(args.input_dir) print('dataset size', 'lfw', len(dataset)) print('Creating networks and loading parameters') if(model_name in ["Pnet","Rnet","Onet"]): model[0]=Pnet_model if(model_name in ["Rnet","Onet"]): model[1]=Rnet_model if(model_name=="Onet"): model[2]=Onet_model detector=MTCNN_Detector(model,model_path,batch_size,factor,min_face_size,threshold) if not os.path.exists(args.output_dir): os.makedirs(args.output_dir) output_filename = os.path.join(args.output_dir, 'lfw_list') print('begin to generate') with open(output_filename, "w") as text_file: nrof_images_total = 0 nrof = np.zeros( (2,), dtype=np.int32) for fimage in dataset: if nrof_images_total%100==0: print("Processing %d, (%s)" % (nrof_images_total, nrof)) nrof_images_total += 1 image_path = fimage.image_path if not os.path.exists(image_path): print('image not found (%s)'%image_path) continue try: img = cv2.imread(image_path) except (IOError, ValueError, IndexError) as e: errorMessage = '{}: {}'.format(image_path, e) print(errorMessage) else: _paths = fimage.image_path.split('/') a,b = _paths[-2], _paths[-1] target_dir = os.path.join(args.output_dir, a) if not os.path.exists(target_dir): os.makedirs(target_dir) target_file = os.path.join(target_dir, b) _bbox = None _landmark = None bounding_boxes, points = detector.detect_single_face(img,False) nrof_faces = np.shape(bounding_boxes)[0] if nrof_faces>0: det = bounding_boxes[:,0:4] img_size = np.asarray(img.shape)[0:2] bindex = 0 if nrof_faces>1: #select the center face according to the characterize of lfw bounding_box_size = (det[:,2]-det[:,0])*(det[:,3]-det[:,1]) img_center = img_size / 2 offsets = np.vstack([ (det[:,0]+det[:,2])/2-img_center[1], (det[:,1]+det[:,3])/2-img_center[0] ]) offset_dist_squared = np.sum(np.power(offsets,2.0),0) bindex = np.argmax(bounding_box_size-offset_dist_squared*2.0) # some extra weight on the centering _bbox = bounding_boxes[bindex, 0:4] _landmark = points[bindex, :] nrof[0]+=1 else: nrof[1]+=1 warped = preprocess(img, bbox=_bbox, landmark = _landmark, image_size=args.image_size) cv2.imwrite(target_file, warped) oline = '%d\t%s\t%d\n' % (1,target_file, int(fimage.classname)) text_file.write(oline) if __name__=="__main__": model=[None,None,None] #原文参数 factor=0.79 threshold=[0.8,0.8,0.6] min_face_size=20 #原文参数 batch_size=1 model_name="Onet" base_dir="." model_path=[os.path.join(base_dir,"model/MTCNN_model/Pnet_model/Pnet_model.ckpt-20000"), os.path.join(base_dir,"model/MTCNN_model/Rnet_model/Rnet_model.ckpt-40000"), os.path.join(base_dir,"model/MTCNN_model/Onet_model/Onet_model.ckpt-40000")] args=arg_parse() #User = namedtuple('User', ['input_dir', 'output_dir', 'image_size']) #args = User(input_dir='./data/lfw', output_dir='./data/lfw_face', image_size="112,112") main(args)
在检测出人脸框后会进行人脸对齐操作,这一步使用的是preprocessing函数,采用skimage的SimilarityTransform()仿射函数针对人脸关键点进行人脸对齐,其中cv2的仿射变换函数我也使用过,效果不如skimage的好。/core/preprocessing.py代码如下所示:
# -*- coding: utf-8 -*- """ @author: friedhelm """ import numpy as np from skimage import transform as trans import cv2 def preprocess(img, bbox=None, landmark=None, **kwargs): M = None image_size = [] str_image_size = kwargs.get('image_size', '') if len(str_image_size)>0: image_size = [int(x) for x in str_image_size.split(',')] if len(image_size)==1: image_size = [image_size[0], image_size[0]] assert len(image_size)==2 assert image_size[0]==112 assert image_size[0]==112 or image_size[1]==96 # define desire position of landmarks src = np.array([ [30.2946, 51.6963], [65.5318, 51.5014], [48.0252, 71.7366], [33.5493, 92.3655], [62.7299, 92.2041] ], dtype=np.float32 ) if image_size[1]==112: src[:,0] += 8.0 if ((landmark is not None)&(kwargs.get('align', True))): assert len(image_size)==2 dst = landmark.astype(np.float32) #skimage affine tform = trans.SimilarityTransform() tform.estimate(dst, src) M = tform.params[0:2,:] # #cv2 affine , worse than skimage # src = src[0:3,:] # dst = dst[0:3,:] # M = cv2.getAffineTransform(dst,src) if M is None: if bbox is None: #use center crop det = np.zeros(4, dtype=np.int32) det[0] = int(img.shape[1]*0.0625) det[1] = int(img.shape[0]*0.0625) det[2] = img.shape[1] - det[0] det[3] = img.shape[0] - det[1] else: det = bbox margin = kwargs.get('margin', 44) bb = np.zeros(4, dtype=np.int32) bb[0] = np.maximum(det[0]-margin/2, 0) bb[1] = np.maximum(det[1]-margin/2, 0) bb[2] = np.minimum(det[2]+margin/2, img.shape[1]) bb[3] = np.minimum(det[3]+margin/2, img.shape[0]) ret = img[bb[1]:bb[3],bb[0]:bb[2],:] if len(image_size)>0: ret = cv2.resize(ret, (image_size[1], image_size[0])) return ret else: #do align using landmark assert len(image_size)==2 warped = cv2.warpAffine(img,M,(image_size[1],image_size[0]), borderValue = 0.0) return warped
在人脸检测与对齐完成后,制作lfw_pair文件,代码依旧抄袭源码,根据lfw官网下载的pairs.txt文件进行测试集数据制作,格式为5000个样本,2500个同一人正样本,和2500个负样本,人脸识别的验证就是相当于人脸验证。/data/gen_eval_pickle_data.py如下:
# -*- coding: utf-8 -*- """ @author: friedhelm """ import argparse import pickle import os import numpy as np from collections import namedtuple from core import config def get_paths(lfw_dir, pairs, file_ext): nrof_skipped_pairs = 0 path_list = [] issame_list = [] for pair in pairs: if len(pair) == 3: path0 = os.path.join(lfw_dir, pair[0], pair[0] + '_' + '%04d' % int(pair[1])+'.'+file_ext) path1 = os.path.join(lfw_dir, pair[0], pair[0] + '_' + '%04d' % int(pair[2])+'.'+file_ext) issame = True elif len(pair) == 4: path0 = os.path.join(lfw_dir, pair[0], pair[0] + '_' + '%04d' % int(pair[1])+'.'+file_ext) path1 = os.path.join(lfw_dir, pair[2], pair[2] + '_' + '%04d' % int(pair[3])+'.'+file_ext) issame = False if os.path.exists(path0) and os.path.exists(path1): # Only add the pair if both paths exist path_list += (path0,path1) issame_list.append(issame) else: print('not exists', path0, path1) nrof_skipped_pairs += 1 if nrof_skipped_pairs>0: print('Skipped %d image pairs' % nrof_skipped_pairs) return path_list, issame_list def read_pairs(pairs_filename): pairs = [] with open(pairs_filename, 'r') as f: for line in f.readlines()[1:]: pair = line.strip().split() pairs.append(pair) return np.array(pairs) def arg_parse(): parser = argparse.ArgumentParser(description='Package LFW images') parser.add_argument('--input_dir', default=config.mxdata_dir, help='path to load') parser.add_argument('--output_dir', default=config.eval_dir, help='path to save.') return parser if __name__=="__main__": args = arg_parse() # User = namedtuple('User', ['input_dir', 'output_dir']) # args = User(input_dir='./data', output_dir='./data/lfw_face.db') lfw_dir = args.input_dir lfw_pairs = read_pairs(os.path.join(lfw_dir, 'pairs.txt')) lfw_dir = os.path.join(lfw_dir, 'lfw_face') lfw_paths, issame_list = get_paths(lfw_dir, lfw_pairs, 'jpg') lfw_bins = [] i = 0 for path in lfw_paths: with open(path, 'rb') as fin: _bin = fin.read() lfw_bins.append(_bin) i+=1 if i%1000==0: print('loading lfw', i) with open(args.output_dir, 'wb') as f: pickle.dump((lfw_bins, issame_list), f, protocol=pickle.HIGHEST_PROTOCOL)
至此数据制作完成。
2、人脸识别模型与损失函数
人脸识别的重点在于其损失函数而不在于模型,insightface的论文(下统称论文)中仅对Resnet模型进行了小修改,模型代码参考官方slim_model与参考代码2。论文修改如下所示:
- 将残差模块的stride=2的卷积模块提前
- 激活函数改用prelu
- 取消第一层max_pool与最后的全局average_pool
修改后模型代码在/core目录下,请自行参阅,博客不再附代码。
损失函数为Arcloss,具体原理请参考其他博客。代码如下所示:
# -*- coding: utf-8 -*- """ @author: friedhelm """ import sys sys.path.append("../") import tensorflow as tf import tensorflow.contrib.slim as slim import math from core import config def arcface_loss(inputs,labels,s,m): with tf.name_scope("arcface_loss"): weight = tf.get_variable("loss_wight",[inputs.get_shape().as_list()[-1], config.class_num], initializer = tf.contrib.layers.xavier_initializer(), regularizer=slim.l2_regularizer(config.model_params["weight_decay"])) inputs = tf.nn.l2_normalize(inputs, axis=1) weight = tf.nn.l2_normalize(weight, axis=0) sin_m = math.sin(m) cos_m = math.cos(m) mm = sin_m * m threshold = math.cos(math.pi - m) cos_theta = tf.matmul(inputs,weight,name="cos_theta") sin_theta = tf.sqrt(tf.subtract(1. , tf.square(cos_theta))) cos_theta_m = s * tf.subtract(tf.multiply(cos_theta , cos_m) , tf.multiply(sin_theta , sin_m)) keep_val = s * (cos_theta - mm) cond_v = cos_theta - threshold cond= tf.cast(tf.nn.relu(cond_v),dtype=tf.bool) cos_theta_m_keep = tf.where(cond , cos_theta_m , keep_val) mask = tf.one_hot(labels , config.class_num) inv_mask = tf.subtract(1., mask) output = tf.add(tf.multiply(mask , cos_theta_m_keep) , tf.multiply(inv_mask , s * cos_theta) , name="arcface_loss") return output
代码中其他的都好理解,就是阈值这块有点费劲,源码中设定了cos(pi-m)的阈值,一旦cos(/theta )越界就将cos(/theta +m)设为cos(/theta )--m *sin(m)。
3、模型训练
本文代码选用双GPU运行,单gpu版可在/train/train.py中找到。/train/train_multi_gpus.py代码如下所示:
import tensorflow as tf from train.train_tool import arcface_loss,read_single_tfrecord,average_gradients from core import Arcface_model,config import time import os from evaluate.evaluate import evaluation,load_bin def train(image,label,train_phase_dropout,train_phase_bn, images_batch, images_f_batch, issame_list_batch): train_images_split = tf.split(image, config.gpu_num) train_labels_split = tf.split(label, config.gpu_num) global_step = tf.Variable(name='global_step', initial_value=0, trainable=False) inc_op = tf.assign_add(global_step, 1, name='increment_global_step') scale = int(512.0/batch_size) lr_steps = [scale*s for s in config.lr_steps] lr_values = [v/scale for v in config.lr_values] lr = tf.train.piecewise_constant(global_step, boundaries=lr_steps, values=lr_values, name='lr_schedule') opt = tf.train.MomentumOptimizer(learning_rate=lr, momentum=config.momentum) embds = [] logits = [] inference_loss = [] wd_loss = [] total_train_loss = [] pred = [] tower_grads = [] update_ops = [] for i in range(config.gpu_num): sub_train_images = train_images_split[i] sub_train_labels = train_labels_split[i] with tf.device("/gpu:%d"%(i)): with tf.variable_scope(tf.get_variable_scope(),reuse=(i>0)): net, end_points = Arcface_model.get_embd(sub_train_images, train_phase_dropout, train_phase_bn,config.model_params) logit = arcface_loss(net,sub_train_labels,config.s,config.m) arc_loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits = logit , labels = sub_train_labels)) L2_loss = tf.reduce_sum(tf.get_collection(tf.GraphKeys.REGULARIZATION_LOSSES)) train_loss = arc_loss + L2_loss pred.append(tf.to_int32(tf.argmax(tf.nn.softmax(logit),axis=1))) tower_grads.append(opt.compute_gradients(train_loss)) update_ops.append(tf.get_collection(tf.GraphKeys.UPDATE_OPS)) embds.append(net) logits.append(logit) inference_loss.append(arc_loss) wd_loss.append(L2_loss) total_train_loss.append(train_loss) embds = tf.concat(embds, axis=0) logits = tf.concat(logits, axis=0) pred = tf.concat(pred, axis=0) wd_loss = tf.add_n(wd_loss)/config.gpu_num inference_loss = tf.add_n(inference_loss)/config.gpu_num train_ops = [opt.apply_gradients(average_gradients(tower_grads))] train_ops.extend(update_ops) train_op = tf.group(*train_ops) with tf.name_scope('loss'): train_loss = tf.add_n(total_train_loss)/config.gpu_num tf.summary.scalar('train_loss',train_loss) with tf.name_scope('accuracy'): train_accuracy = tf.reduce_mean(tf.cast(tf.equal(pred, label), tf.float32)) tf.summary.scalar('train_accuracy',train_accuracy) saver=tf.train.Saver(max_to_keep=20) merged=tf.summary.merge_all() train_images,train_labels=read_single_tfrecord(addr,batch_size,img_size) tf_config = tf.ConfigProto(allow_soft_placement=True) tf_config.gpu_options.allow_growth = True with tf.Session(config=tf_config) as sess: sess.run((tf.global_variables_initializer(), tf.local_variables_initializer())) coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(sess=sess,coord=coord) writer_train=tf.summary.FileWriter(model_path,sess.graph) print("start") try: for i in range(1,train_step): image_batch,label_batch=sess.run([train_images,train_labels]) sess.run([train_op,inc_op],feed_dict={image:image_batch,label:label_batch,train_phase_dropout:True,train_phase_bn:True}) if(i%100==0): summary=sess.run(merged,feed_dict={image:image_batch,label:label_batch,train_phase_dropout:True,train_phase_bn:True}) writer_train.add_summary(summary,i) if(i%1000==0): print('times: ',i) # print('train_accuracy: ',sess.run(train_accuracy,feed_dict={image:image_batch,label:label_batch,train_phase_dropout:True,train_phase_bn:True})) # print('train_loss: ',sess.run(train_loss,{image:image_batch,label:label_batch,train_phase_dropout:True,train_phase_bn:True})) print('time: ',time.time()-begin) if(i%5000==0): f.write("itrations: %d"%(i)+'\n') for idx in range(len(eval_datasets)): tpr, fpr, accuracy, best_thresholds = evaluation(sess, images_batch[idx], images_f_batch[idx], issame_list_batch[idx], batch_size, img_size, dropout_flag=config.eval_dropout_flag, bn_flag=config.eval_bn_flag, embd=embds, image=image, train_phase_dropout=train_phase_dropout, train_phase_bn=train_phase_bn) print("%s datasets get %.3f acc"%(eval_datasets[idx].split("/")[-1].split(".")[0],accuracy)) f.write("\t %s \t %.3f \t \t "%(eval_datasets[idx].split("/")[-1].split(".")[0],accuracy)+str(best_thresholds)+'\n') f.write('\n') if((i>150000)&(i%config.model_save_gap==0)): saver.save(sess,os.path.join(model_path,model_name),global_step=i) except tf.errors.OutOfRangeError: print("finished") finally: coord.request_stop() writer_train.close() coord.join(threads) f.close() def main(): with tf.name_scope('input'): image = tf.placeholder(tf.float32,[batch_size,img_size,img_size,3],name='image') label = tf.placeholder(tf.int32,[batch_size],name='label') train_phase_dropout = tf.placeholder(dtype=tf.bool, shape=None, name='train_phase_dropout') train_phase_bn = tf.placeholder(dtype=tf.bool, shape=None, name='train_phase_bn') images_batch = [] images_f_batch = [] issame_list_batch = [] for dataset_path in eval_datasets: images, images_f, issame_list = load_bin(dataset_path, img_size) images_batch.append(images) images_f_batch.append(images_f) issame_list_batch.append(issame_list) train(image,label, train_phase_dropout, train_phase_bn, images_batch, images_f_batch, issame_list_batch) if __name__ == "__main__": img_size = config.img_size batch_size = config.batch_size addr = config.addrt model_name = config.model_name train_step = config.train_step model_path = config.model_patht eval_datasets = config.eval_datasets begin=time.time() f = open("./eval_record.txt", 'w') f.write("\t dataset \t accuracy \t best_thresholds \t"+'\n') main() # tensorboard --logdir=/home/dell/Desktop/insightface/model/Arcface_model/
4、结果
训练过程曲线就不贴了,在github了,里有相应的event文件。验证结果如图2所示,其中lfw_face为自己生成的lfw验证集:
5、总结
- 原本中使用prelu进行训练,而我使用prelu激活函数验证的结果并不好,我又使用参考代码2的leaky_relu模型验证,验证结果比prelu的高出很多。
- 人脸识别受到人脸检测和对齐的极大影响。
阶段三:基于MTCNN与insightface的人脸打卡系统
项目环境及配置:ubuntu16.04+2*GTX 1080ti+Python3.6+Anaconda5.2.0+Tensorflow1.7-gpu+Mysql5.7.25
博客地址:人脸检测与识别:基于MTCNN与insightface的人脸打卡系统
github地址:https://github.com/friedhelm739/Insightface-tensorflow