#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ /** * Copyright 2020 Zhejiang Lab. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============================================================= */ """ import json import os import time from datetime import datetime import cv2 import numpy as np from track_only.mot_track_kc import KCTracker from track_only.util import draw_bboxes_conf_cls #将box四点坐标转换成左上角坐标和宽和高,并过滤低置信度的框 def bbox_to_xywh_cls_conf(bbox_xyxyc, conf_thresh=0.5): if any(bbox_xyxyc[:, 4] >= conf_thresh): bbox = bbox_xyxyc[bbox_xyxyc[:, 4] >= conf_thresh, :] bbox[:, 2] = bbox[:, 2] - bbox[:, 0] # bbox[:, 3] = bbox[:, 3] - bbox[:, 1] # return bbox else: return [] #opencv显示目标框和置信度 def showResult(result_xyxyc, ori_im, save_pth, color=(0, 255, 0), conf_th=0.4): ori_im = ori_im.copy() result_xyxyc = result_xyxyc.copy() result_xyxyc = result_xyxyc[result_xyxyc[:, 4] > conf_th] for d in result_xyxyc: x, y, x2, y2, conf = d cv2.putText(ori_im, str(conf), (int(x), int(y + 15)), cv2.FONT_HERSHEY_PLAIN, 1, color, 1) cv2.rectangle(ori_im, (int(x), int(y)), (int(x2), int(y2)), (0, 255, 0), 1) cv2.imwrite(save_pth, ori_im) def makeFile(file_pth): file_dir, file_name = os.path.split(file_pth) os.makedirs(file_dir, exist_ok=True) if os.path.exists(file_pth): os.remove(file_pth) #跟踪处理类 class Detector(object): def __init__( self, vid_path, min_confidence=0.4, max_cosine_distance=0.2, max_iou_distance=0.7, max_age=30, out_dir='res/'): self.vdo = cv2.VideoCapture() self.out_dir = out_dir if not os.path.exists(out_dir): os.makedirs(out_dir) self.class_ = 80 self.kc_tracker = [] if self.class_ > 0: for cls_id in range(self.class_): kc_tracker = KCTracker( confidence_l=0.01, confidence_h=0.02, use_filter=True, max_cosine_distance=max_cosine_distance, max_iou_distance=max_iou_distance, max_age=max_age, cls_=cls_id) self.kc_tracker.append(kc_tracker) else: print("class_ is error!") return None _, filename = os.path.split(vid_path) self.mot_txt = os.path.join(self.out_dir, filename[:-4] + '.txt') self.mot_txt_filter = os.path.join( self.out_dir, filename[:-4] + '_filter.txt') self.mot_txt_bk = os.path.join(self.out_dir, filename[:-4] + '_bk.txt') self.det_txt = os.path.join(self.out_dir, filename[:-4] + '_det.txt') self.video_name = os.path.join( self.out_dir, filename[:-4] + '_res.avi') self.features_npy = os.path.join( self.out_dir, filename[:-4] + '_det.npy') self.save_feature = False self.all_features = [] self.write_det_txt = False self.write_video = False self.use_tracker = True self.person_id = 1 self.write_img = True self.write_json = True self.read_json = True self.write_bk = False self.temp_dir = filename[:-4] if self.write_img: self.img_dir = os.path.join( self.out_dir + '/' + self.temp_dir, 'imgs') os.makedirs(self.img_dir, exist_ok=True) if self.write_json or self.read_json: self.json_dir = os.path.join( self.out_dir + '/' + self.temp_dir, 'json') if self.write_json: os.makedirs(self.json_dir, exist_ok=True) print("Track Detector init sucessed!\n") #打开视频,并创建结果视频 def open(self, video_path): assert os.path.isfile(video_path), "Error: path error" print("video_path %s \n" % video_path) self.vdo.open(video_path) self.im_width = int(self.vdo.get(cv2.CAP_PROP_FRAME_WIDTH)) self.im_height = int(self.vdo.get(cv2.CAP_PROP_FRAME_HEIGHT)) self.fps = self.vdo.get(cv2.CAP_PROP_FPS) self.area = 0, 0, self.im_width, self.im_height if self.write_video: fourcc = cv2.VideoWriter_fourcc(*'XVID') self.output = cv2.VideoWriter( self.video_name, fourcc, self.fps, (self.im_width, self.im_height)) if self.im_width > 0 and self.im_height > 0: print("open video sucessed!\n") #结果保存成json文件 def save_file(self, path, item): item = json.dumps((item)) try: if not os.path.exists(path): with open(path, "w", encoding='utf-8') as f: f.write(item + "\n") else: with open(path, "w", encoding='utf-8') as f: f.write(item + "\n") except Exception as e: print("write error==>", e) #从coco json文件中解析目标框 def de_coco_format(self, ann_json): json_data = [] # annotations = ann_json['annotation'] if ann_json: for i, annotation in enumerate(ann_json): conf_ = float(annotation['score']) cls_ = int(annotation['category_id'] - 1 - 80) x1 = float(annotation['bbox'][0]) x2 = float(annotation['bbox'][2] + x1) y1 = float(annotation['bbox'][1]) y2 = float(annotation['bbox'][3] + y1) object_ = [x1, y1, x2, y2, conf_, cls_] json_data.append(np.array(object_)) return np.array(json_data) #将目标框等信息保存成coco json文件 def coco_format(self, type_, id_name, im, result): # annotations = [] temp = [] height, width, _ = im.shape if result.shape[0] == 0: return temp else: for j in range(result.shape[0]): cls_id = int(result[j][6]) + 1 + 80 x1 = int(result[j][0]) x2 = int(result[j][2]) y1 = int(result[j][1]) y2 = int(result[j][3]) track_id = int(result[j][5]) score = float(result[j][4]) width = max(0, x2 - x1) height = max(0, y2 - y1) temp.append({ 'category_id': cls_id, 'area': width * height, 'iscrowd': 0, 'bbox': [x1, y1, width, height], 'segmentation': [[x1, y1, x2, y1, x2, y2, x1, y2]], 'score': score, 'track_id': track_id }) return temp #跟踪处理,这里用于调试 def detect(self): xmin, ymin, xmax, ymax = self.area frame_no = 0 avg_fps = 0.0 # for path, img, ori_im, vid_cap in self.dataset: while True: ret, ori_im = self.vdo.read() if ori_im is None: break frame_no += 1 start = time.time() im = ori_im[ymin:ymax, xmin:xmax] t1 = time.time() # results = self.yolo_detect.run(img, ori_im) results = [] if self.write_json: jsonname = self.json_dir + '/' + str(frame_no) + '.json' if len(results) > 0: ann = self.coco_format(1, 1, ori_im, results) self.save_file(jsonname, ann) if self.read_json: jsonname = self.json_dir + '/' + str(frame_no) + '.json' # print("jsonname %s" %(jsonname)) with open(jsonname, 'r', encoding='utf8')as fp: ann_json = json.load(fp) results = self.de_coco_format(ann_json) t2 = time.time() bbox_xywhcs = [] if len(results) > 0: for cls_id in range(self.class_): results_cls = np.array(results) results_cls = results_cls[results_cls[:, 5] == cls_id] results_cls_ = results_cls[:, [0, 1, 2, 3, 4]] bbox_xywhcs = bbox_to_xywh_cls_conf( results_cls_, conf_thresh=0.05) if len(bbox_xywhcs) > 0: outputs = [] features = [] feature_type = 0 if cls_id != 0: feature_type = 1 if self.use_tracker: outputs, features_ = self.kc_tracker[cls_id].update( frame_no, bbox_xywhcs, ori_im, type=feature_type) features.append(features_) if self.save_feature: if self.all_features is None: self.all_features = features else: self.all_features = np.vstack( (self.all_features, features)) if len(outputs) > 0: bbox_xyxy = outputs[:, 1:5] identities = outputs[:, 0] confs = outputs[:, -1] ori_im = draw_bboxes_conf_cls( ori_im, bbox_xyxy, confs, identities, offset=( xmin, ymin), cls_id_=cls_id) else: for cls_id in range(self.class_): self.kc_tracker[cls_id].update(frame_no, bbox_xywhcs, im) end = time.time() fps = 1 / (end - start) avg_fps += fps if frame_no % 100 == 0: print( "detect cost time: {}s, fps: {}, frame_no : {} track cost:{}".format( end - start, fps, frame_no, end - t2)) if self.write_video: self.output.write(ori_im) if self.write_img: cv2.imwrite( os.path.join( self.img_dir, '{:06d}.jpg'.format(frame_no)), ori_im) self.vdo.release() if self.save_feature: self.saveFeature(self.features_npy, self.all_features) #跟踪处理,用于自动标定 def run_track(self, image_list, label_list): frame_no = 0 avg_fps = 0.0 # self.json_dir = label_json_path for image_id, image in enumerate(image_list): label_name = label_list[image_id] if not os.path.exists(image): return ('image_path_error:' + image) if not os.path.exists(label_name): return ('label_path_error:' + label_name) ori_im = cv2.imread(image) if ori_im is None: return ('image error:' + image) frame_no += 1 print(frame_no) start = time.time() t1 = time.time() results = [] if self.read_json: jsonname = str(label_name) with open(jsonname, 'r', encoding='utf8')as fp: ann_json = json.load(fp) results = self.de_coco_format(ann_json) t2 = time.time() bbox_xywhcs = [] if len(results) > 0: outputs = [] for cls_id in range(self.class_): results_cls = np.array(results) results_cls = results_cls[results_cls[:, 5] == cls_id] results_cls_ = results_cls[:, [0, 1, 2, 3, 4]] bbox_xywhcs = bbox_to_xywh_cls_conf( results_cls_, conf_thresh=0.05) if len(bbox_xywhcs) > 0: output = [] features = [] feature_type = 1 if self.use_tracker: output, features_ = self.kc_tracker[cls_id].update( frame_no, bbox_xywhcs, ori_im, type=feature_type) features.append(features_) if self.save_feature: if self.all_features is None: self.all_features = features else: self.all_features = np.vstack( (self.all_features, features)) if len(output) > 0: bbox_xyxy = output[:, 0:4] identities = output[:, 5] confs = output[:, 4] ori_im = draw_bboxes_conf_cls( ori_im, bbox_xyxy, confs, identities, offset=( 0, 0), cls_id_=cls_id) output = np.insert( output, 6, values=cls_id, axis=1) if len(outputs) == 0: outputs = output else: outputs = np.vstack((outputs, output)) if self.write_json: jsonname = str(label_name) if len(outputs) > 0: ann = self.coco_format(1, 1, ori_im, outputs) self.save_file(jsonname, ann) else: for cls_id in range(self.class_): self.kc_tracker[cls_id].update( frame_no, bbox_xywhcs, ori_im) end = time.time() fps = 1 / (end - start) avg_fps += fps if self.write_video: self.output.write(ori_im) if self.write_img: cv2.imwrite( os.path.join( self.img_dir, '{:06d}.jpg'.format(frame_no)), ori_im) self.vdo.release() if self.save_feature: self.saveFeature(self.features_npy, self.all_features) return 'OK' if __name__ == "__main__": os.environ['CUDA_VISIBLE_DEVICES'] = '0' start_time = datetime.now() print('start time:', start_time) vid_path = 'demo.avi' filename = 'demo.avi' det = Detector( filename, min_confidence=0.35, max_cosine_distance=0.2, max_iou_distance=0.7, max_age=30, out_dir='results/xxx') det.save_feature = False det.write_det_txt = False det.use_tracker = True det.write_video = False det.write_bk = False image_path = 'test/' label_json_path = 'test-json/' image_list = os.listdir(image_path) image_list.sort(reverse=False) label_list = os.listdir(label_json_path) label_list.sort(reverse=False) image_list_ = [] label_list_ = [] for label_ in label_list: label_list_.append((label_json_path + label_)) for image_ in image_list: image_list_.append((image_path + image_)) det.run_track(image_list_, label_list_) end_time = datetime.now() print(' cost hour:', (end_time - start_time) / 3600)