| @@ -0,0 +1,73 @@ | |||
| import os | |||
| from dataclasses import dataclass | |||
| import cv2 | |||
| import numpy as np | |||
| from dora import Node | |||
| CI = os.environ.get("CI") | |||
| IMAGE_WIDTH = int(os.getenv("IMAGE_WIDTH", "640")) | |||
| IMAGE_HEIGHT = int(os.getenv("IMAGE_HEIGHT", "480")) | |||
| FONT = cv2.FONT_HERSHEY_SIMPLEX | |||
| @dataclass | |||
| class Plotter: | |||
| frame: np.array = np.array([]) | |||
| bboxes: np.array = np.array([[]]) | |||
| conf: np.array = np.array([]) | |||
| label: np.array = np.array([]) | |||
| if __name__ == "__main__": | |||
| plotter = Plotter() | |||
| node = Node() | |||
| for event in node: | |||
| event_type = event["type"] | |||
| if event_type == "INPUT": | |||
| if event["id"] == "image": | |||
| frame = event["value"].to_numpy() | |||
| frame = frame.reshape((IMAGE_HEIGHT, IMAGE_WIDTH, 3)).copy() | |||
| plotter.frame = frame | |||
| elif event["id"] == "bbox": | |||
| bboxes = event["value"][0]["bbox"].values.to_numpy() | |||
| conf = event["value"][0]["conf"].values.to_numpy() | |||
| label = event["value"][0]["names"].values.to_pylist() | |||
| plotter.bboxes = np.reshape(bboxes, (-1, 4)) | |||
| plotter.conf = conf | |||
| plotter.label = label | |||
| continue | |||
| for bbox in zip(plotter.bboxes, plotter.conf, plotter.label): | |||
| [ | |||
| [min_x, min_y, max_x, max_y], | |||
| confidence, | |||
| label, | |||
| ] = bbox | |||
| cv2.rectangle( | |||
| plotter.frame, | |||
| (int(min_x), int(min_y)), | |||
| (int(max_x), int(max_y)), | |||
| (0, 255, 0), | |||
| 2, | |||
| ) | |||
| cv2.putText( | |||
| plotter.frame, | |||
| f"{label}, {confidence:0.2f}", | |||
| (int(max_x) - 120, int(max_y) - 10), | |||
| FONT, | |||
| 0.5, | |||
| (0, 255, 0), | |||
| 2, | |||
| 1, | |||
| ) | |||
| if CI != "true": | |||
| cv2.imshow("frame", plotter.frame) | |||
| if cv2.waitKey(1) & 0xFF == ord("q"): | |||
| break | |||
| @@ -0,0 +1,2 @@ | |||
| numpy<2.0.0 | |||
| opencv-python | |||
| @@ -0,0 +1,3 @@ | |||
| numpy<2.0.0 | |||
| pyarrow | |||
| opencv-python | |||
| @@ -0,0 +1,52 @@ | |||
| import os | |||
| import time | |||
| import cv2 | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| CAM_INDEX = int(os.getenv("CAM_INDEX", "0")) | |||
| IMAGE_WIDTH = int(os.getenv("IMAGE_WIDTH", "640")) | |||
| IMAGE_HEIGHT = int(os.getenv("IMAGE_HEIGHT", "480")) | |||
| MAX_DURATION = int(os.getenv("DURATION", "20")) | |||
| FONT = cv2.FONT_HERSHEY_SIMPLEX | |||
| start = time.time() | |||
| if __name__ == "__main__": | |||
| video_capture = cv2.VideoCapture(CAM_INDEX) | |||
| node = Node() | |||
| while time.time() - start < MAX_DURATION: | |||
| event = node.next() | |||
| if event is None: | |||
| break | |||
| if event is not None: | |||
| event_type = event["type"] | |||
| if event_type == "INPUT": | |||
| ret, frame = video_capture.read() | |||
| # Fail to read camera | |||
| if not ret: | |||
| frame = np.zeros((IMAGE_HEIGHT, IMAGE_WIDTH, 3), dtype=np.uint8) | |||
| cv2.putText( | |||
| frame, | |||
| "No Webcam was found at index %d" % (CAM_INDEX), | |||
| (int(30), int(30)), | |||
| FONT, | |||
| 0.75, | |||
| (255, 255, 255), | |||
| 2, | |||
| 1, | |||
| ) | |||
| node.send_output( | |||
| "image", | |||
| pa.array(frame.ravel()), | |||
| event["metadata"], | |||
| ) | |||
| @@ -0,0 +1,3 @@ | |||
| numpy<2.0.0 | |||
| pyarrow | |||
| ultralytics | |||
| @@ -0,0 +1,50 @@ | |||
| ## Imports | |||
| import os | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| from ultralytics import YOLO | |||
| from dora import Node | |||
| ## OS Environment variable | |||
| IMAGE_WIDTH = int(os.getenv("IMAGE_WIDTH", "640")) | |||
| IMAGE_HEIGHT = int(os.getenv("IMAGE_HEIGHT", "480")) | |||
| MODEL = os.getenv("MODEL", "yolov8n.pt") | |||
| if __name__ == "__main__": | |||
| model = YOLO(MODEL) | |||
| node = Node("object_detection") | |||
| for event in node: | |||
| event_type = event["type"] | |||
| if event_type == "INPUT": | |||
| event_id = event["id"] | |||
| if event_id == "image": | |||
| frame = ( | |||
| event["value"].to_numpy().reshape((IMAGE_HEIGHT, IMAGE_WIDTH, 3)) | |||
| ) | |||
| frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB) | |||
| results = model(frame, verbose=False) # includes NMS | |||
| # Process results | |||
| bboxes = np.array(results[0].boxes.xyxy.cpu()) | |||
| conf = np.array(results[0].boxes.conf.cpu()) | |||
| labels = np.array(results[0].boxes.cls.cpu()) | |||
| names = [model.names.get(label) for label in labels] | |||
| node.send_output( | |||
| "bbox", | |||
| pa.array( | |||
| [ | |||
| { | |||
| "bbox": bboxes.ravel(), | |||
| "conf": conf, | |||
| "labels": labels, | |||
| "names": names, | |||
| } | |||
| ] | |||
| ), | |||
| event["metadata"], | |||
| ) | |||