You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

webcam.py 2.1 kB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import time
  4. from typing import Callable, Optional
  5. import os
  6. import cv2
  7. import numpy as np
  8. import pyarrow as pa
  9. from dora import DoraStatus
  10. CAMERA_WIDTH = 640
  11. CAMERA_HEIGHT = 480
  12. CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
  13. font = cv2.FONT_HERSHEY_SIMPLEX
  14. class Operator:
  15. """
  16. Sending image from webcam to the dataflow
  17. """
  18. def __init__(self):
  19. self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
  20. self.start_time = time.time()
  21. self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
  22. self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
  23. def on_event(
  24. self,
  25. dora_event: str,
  26. send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
  27. ) -> DoraStatus:
  28. match dora_event["type"]:
  29. case "INPUT":
  30. ret, frame = self.video_capture.read()
  31. if ret:
  32. frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
  33. ## Push an error image in case the camera is not available.
  34. else:
  35. frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
  36. cv2.putText(
  37. frame,
  38. "No Webcam was found at index %d" % (CAMERA_INDEX),
  39. (int(30), int(30)),
  40. font,
  41. 0.75,
  42. (255, 255, 255),
  43. 2,
  44. 1,
  45. )
  46. send_output(
  47. "image",
  48. pa.array(frame.ravel()),
  49. dora_event["metadata"],
  50. )
  51. case "STOP":
  52. print("received stop")
  53. case other:
  54. print("received unexpected event:", other)
  55. if time.time() - self.start_time < 20:
  56. return DoraStatus.CONTINUE
  57. else:
  58. return DoraStatus.STOP
  59. def __del__(self):
  60. self.video_capture.release()

DORA (Dataflow-Oriented Robotic Architecture) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed datafl