You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

webcam.py 2.0 kB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import time
  4. from typing import Callable
  5. import os
  6. import cv2
  7. import numpy as np
  8. import pyarrow as pa
  9. from dora import DoraStatus
  10. CAMERA_WIDTH = 640
  11. CAMERA_HEIGHT = 480
  12. CAMERA_INDEX = os.getenv("CAMERA_INDEX", 0)
  13. font = cv2.FONT_HERSHEY_SIMPLEX
  14. class Operator:
  15. """
  16. Sending image from webcam to the dataflow
  17. """
  18. def __init__(self):
  19. self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
  20. self.start_time = time.time()
  21. self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
  22. self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
  23. def on_event(
  24. self,
  25. dora_event: str,
  26. send_output: Callable[[str, bytes], None],
  27. ) -> DoraStatus:
  28. match dora_event["type"]:
  29. case "INPUT":
  30. ret, frame = self.video_capture.read()
  31. if ret:
  32. frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
  33. ## Push an error image in case the camera is not available.
  34. else:
  35. frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
  36. cv2.putText(
  37. frame,
  38. "No Webcam was found at index %d" % (CAMERA_INDEX),
  39. (int(30), int(30)),
  40. font,
  41. 0.75,
  42. (255, 255, 255),
  43. 2,
  44. 1,
  45. )
  46. send_output(
  47. "image",
  48. pa.array(frame.ravel()),
  49. dora_event["metadata"],
  50. )
  51. case "STOP":
  52. print("received stop")
  53. case other:
  54. print("received unexpected event:", other)
  55. if time.time() - self.start_time < 20:
  56. return DoraStatus.CONTINUE
  57. else:
  58. return DoraStatus.STOP
  59. def __del__(self):
  60. self.video_capture.release()

DORA (Dataflow-Oriented Robotic Architecture) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed datafl