from api.model.model import PicoDet import time import cv2 import json import datetime import os import sys from utils.utils import send_notifycation import collections from utils.utils import doNotMove,mouseClick def getFile(ruleFile): if getattr(sys, 'frozen', False): absPath = os.path.dirname(os.path.abspath(sys.executable)) elif __file__: absPath = os.path.dirname(os.path.abspath(__file__)) else: absPath = '' return os.path.join(absPath,ruleFile) def get_settings_status_name(data, settings,setting_name): # 访问指定 "setting" 的信息 for setting in data[settings]: if setting["name"] == setting_name: # 使用 filter() 和 map() 函数提取信息 names = list(map(lambda x: x["name"], filter(lambda x: x["status"], setting["status"]))) return names[0] def get_settings_status_root(data, settings): # 访问指定 "setting" 的信息 names = list(map(lambda x: x["name"], filter(lambda x: x["status"], data[settings]))) return names[0] def get_setting_status(data, settings,setting_name): # 使用 filter() 和 map() 函数提取信息 status = list(map(lambda x: x["status"], filter(lambda x: x["name"] == setting_name, data[settings]))) return status[0] def get_photo(cap): # cap = cv2.VideoCapture(0) # 开启摄像头 f, frame = cap.read() # 将摄像头中的一帧图片数据保存 return frame # print('开始咯!') # while cv2.waitKey(1)==-1: # #计时 def get_photo_detect(cap,net): ''' 输入{图像,网络模型},输出[预测时间,预测结果] ''' result = net.detect_img(get_photo(cap),show_result=False) return result def get_photo_detect_img(cap,net): ''' 输入{图像,网络模型},输出[预测时间,预测图片] ''' start = time.time() [result,img] = net.detect_img(get_photo(cap),show_result=True) # cv2.imshow('Video Cam', result) # # print(result) end = time.time() # print('time:',end-start) # cap.release() # 关闭摄像头 # # cv2.destroyAllWindows() return [end-start,result,img] def get_photo_base64(cap): import base64 # import numpy as np img = get_photo(cap) image = cv2.imencode('.jpg',img)[1] image_code = str(base64.b64encode(image))[2:-1] return image_code def mat2base64(frame): import base64 # import numpy as np image = cv2.imencode('.jpg',frame)[1] image_code = str(base64.b64encode(image))[2:-1] return image_code def getModelSetting(args): import requests url = 'https://gitee.com/JiangNoah/emc/raw/master/model.json' r = requests.get(url) model_dict = (r.json())["models"] #将get_settings_status_name(args,"ModelSetting","模型版本设置")在model_dict的list中对应的status设置为true for model in model_dict: if model["name"] == get_settings_status_name(args,"ModelSetting","模型版本设置"): model["status"] = True else: model["status"] = False args["ModelSetting"][0]["status"] = model_dict return args def checkAction(queue,type,action,tip): # 输入一个长度为3的队列,根据情况判断是否执行动作 if(queue[0]["status"]==0 and queue[1]["status"]==1 and queue[2]["status"]==0 and type=="张开持续1S" and (queue[2]["time"]-queue[0]["time"])>1): if(action=="悬停鼠标"): doNotMove() if(tip): send_notifycation("触发悬停鼠标","EMC") elif(action=="单击按钮"): mouseClick() if(tip): send_notifycation("触发单击按钮","EMC") return True elif(queue[0]["status"]==1 and queue[1]["status"]==0 and queue[2]["status"]==1 and type=="闭上持续1S" and (queue[2]["time"]-queue[0]["time"])>1): if(action=="悬停鼠标"): doNotMove() if(tip): send_notifycation("触发悬停鼠标","EMC") elif(action=="单击按钮"): mouseClick() if(tip): send_notifycation("触发单击按钮","EMC") return True return False class API: '''本地API,供前端JS调用''' window = None net = None cap = None args = None # 状态队列 queue = None def __init__(self): with open(getFile("config.json"),'r',encoding='utf8')as fp: self.args = json.load(fp) self.args = getModelSetting(self.args) self.net = PicoDet( get_settings_status_name(self.args,"ModelSetting","模型版本设置"), self.args['classfile'], prob_threshold=float(get_setting_status(self.args,"ModelSetting",'confThreshold')), iou_threshold=float(get_setting_status(self.args,"ModelSetting",'nmsThreshold'))) # net.detect_folder(args['img_fold'], args['result_fold']) # 调用摄像头拍摄照片 self.cap = cv2.VideoCapture(0) # 开启摄像头 # cv2.namedWindow('Video Cam', cv2.WINDOW_NORMAL) if(self.args['toggle'] and self.args['tip']): send_notifycation("EMC程序已启动","EMC") self.queue = collections.deque(maxlen=3) def detectThreading(self): if self.args['toggle']==False: pass result = get_photo_detect(self.cap,self.net) eye = -1 mouth = -1 for i in result: if i["classid"] == "closed_eye": eye = 0 elif i["classid"] == "open_eye": eye = 1 elif i["classid"] == "closed_mouth": mouth = 0 elif i["classid"] == "open_mouth": mouth = 1 status = -1 if get_settings_status_root(self.args,"control") == "嘴控": if(mouth==0): status = 0 elif(mouth==1): status = 1 elif get_settings_status_root(self.args,"control") == "眼控": if(eye==0): status = 0 elif(eye==1): status = 1 elif get_settings_status_root(self.args,"control") == "嘴/眼控": if(mouth==0 or eye==0): status = 0 elif(mouth==1 or eye==1): status = 1 if(status!=-1): if(len(self.queue)<3 or status!=self.queue[-1]["status"]): self.queue.append({"status":status,"time":time.time()}) if(len(self.queue)==3): if(checkAction(self.queue,get_settings_status_name(self.args,"ControlSetting","控制设置"),get_settings_status_name(self.args,"ControlSetting","控制功能"),self.args['tip'])): self.queue.clear()#执行动作则清空队列 else: self.queue[-1]["time"]=time.time() def getPrimaryImg(self,type="不裁剪"): # 三种处理图片的方式:不裁剪、居中裁剪成224*224、居中裁剪成320*320。 if(type=="不裁剪"): return get_photo_base64(self.cap) elif(type=="居中裁剪成224*224"): img = get_photo(self.cap) height=len(img) width=len(img[0]) if(height>224 and width>224): y0 = height//2 x0 = width//2 x1 = x0-112 y1 = y0-112 x2 = x0+112 y2 = y0+112 img = img[y1:y2, x1:x2] return mat2base64(img) elif(type=="居中裁剪成320*320"): img = get_photo(self.cap) height=len(img) width=len(img[0]) if(height>320 and width>320): y0 = height//2 x0 = width//2 x1 = x0-160 y1 = y0-160 x2 = x0+160 y2 = y0+160 img = img[y1:y2, x1:x2] return mat2base64(img) def getDetectImg(self): [time,result,img] = get_photo_detect_img(self.cap,self.net) return [time,result,mat2base64(img)]#AttributeError: 'InferenceSession' object has no attribute 'detect' def getIndexSetting(self): toggle = self.args['toggle'] tip = self.args['tip'] control = self.args['control'] return [toggle,tip,control] def getAdvanceSetting(self): return [self.args['ControlSetting'], self.args['ModelSetting'], self.args['otherSetting']] def changeSetting(self,data): self.args.update(data) with open(getFile("config.json"),'w',encoding='utf8')as fp: json.dump(self.args,fp,ensure_ascii=False,indent=4) def getPersonalizationPrimaryImg(self): #判断是否存在"primary_img"文件夹 if(os.path.exists(getFile("primary_img"))==False): #不存在则创建 os.mkdir(getFile("primary_img")) #返回该文件夹下jpg文件的数量和列表 return [len(os.listdir(getFile("primary_img"))),os.listdir(getFile("primary_img"))] def setPersonalizationPrimaryImg(self,type="不裁剪"): #判断是否存在"primary_img"文件夹 if(os.path.exists(getFile("primary_img"))==False): #不存在则创建 os.mkdir(getFile("primary_img")) #保存图片 # 图片名字是当前时间 # 三种处理图片的方式:不裁剪、居中裁剪成224*224、居中裁剪成320*320。 if(type=="不裁剪"): cv2.imwrite(getFile("primary_img")+"/"+time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())+".jpg",get_photo(self.cap)) elif(type=="居中裁剪成224*224"): img = get_photo(self.cap) height=len(img) width=len(img[0]) if(height>224 and width>224): y0 = height//2 x0 = width//2 x1 = x0-112 y1 = y0-112 x2 = x0+112 y2 = y0+112 img = img[y1:y2, x1:x2] cv2.imwrite(getFile("primary_img")+"/"+time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())+".jpg",img) elif(type=="居中裁剪成320*320"): img = get_photo(self.cap) height=len(img) width=len(img[0]) if(height>320 and width>320): y0 = height//2 x0 = width//2 x1 = x0-160 y1 = y0-160 x2 = x0+160 y2 = y0+160 img = img[y1:y2, x1:x2] cv2.imwrite(getFile("primary_img")+"/"+time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())+".jpg",img) #返回该文件夹下jpg文件的数量和列表 return [len(os.listdir(getFile("primary_img"))),os.listdir(getFile("primary_img"))] def openPrimaryImgFiles(self): #判断是否存在"primary_img"文件夹 if(os.path.exists(getFile("primary_img"))==False): #不存在则创建 os.mkdir(getFile("primary_img")) #打开文件夹 os.startfile(getFile("primary_img")) def getPrimaryListImg(self,img_name): #判断是否存在"primary_img"文件夹 if(os.path.exists(getFile("primary_img"))==False): #不存在则创建 os.mkdir(getFile("primary_img")) #读取图片 img = cv2.imread(getFile("primary_img")+"/"+img_name) #返回图片 return mat2base64(img) def saveAnnotationData(self,data,dataset): #判断是否存在"annotation_data"文件夹 if(os.path.exists(getFile("annotation_data"))==False): #不存在则创建 os.mkdir(getFile("annotation_data")) #保存COCO数据集 with open(getFile("annotation_data")+"/"+"dataset.json",'w',encoding='utf8')as fp: json.dump(dataset,fp,ensure_ascii=False,indent=4) #保存标注数据 with open(getFile("annotation_data")+"/"+"data.json",'w',encoding='utf8')as fp: json.dump(data,fp,ensure_ascii=False,indent=4) def getAnnotationData(self): #判断是否存在"annotation_data"文件夹 if(os.path.exists(getFile("annotation_data"))==False): #不存在则创建 os.mkdir(getFile("annotation_data")) #判断是否存在"dataset.json"文件 if(os.path.exists(getFile("annotation_data")+"/"+"dataset.json")): #存在则读取 with open(getFile("annotation_data")+"/"+"dataset.json",'r',encoding='utf8')as fp: dataset = json.load(fp) else: #不存在则创建 return {} #判断是否存在"data.json"文件 if(os.path.exists(getFile("annotation_data")+"/"+"data.json")): #存在则读取 with open(getFile("annotation_data")+"/"+"data.json",'r',encoding='utf8')as fp: data = json.load(fp) else: #不存在则创建 return {} #返回数据 return {"mergeData":dataset,"annotationData":data}