#!/usr/bin/env python3 # -*- coding: utf-8 -*- import time from typing import Callable import cv2 from dora import DoraStatus class Operator: """ Sending image from webcam to the dataflow """ def __init__(self): self.video_capture = cv2.VideoCapture(0) self.start_time = time.time() def on_event( self, dora_event: dict, send_output: Callable[[str, bytes], None], ) -> DoraStatus: match dora_event["type"]: case "INPUT": ret, frame = self.video_capture.read() if ret: send_output( "image", cv2.imencode(".jpg", frame)[1].tobytes(), dora_event["metadata"], ) case "STOP": print("received stop") case other: print("received unexpected event:", other) if time.time() - self.start_time < 20: return DoraStatus.CONTINUE else: return DoraStatus.STOP def __del__(self): self.video_capture.release()