You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

webcam.py 1.9 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import os
  4. import time
  5. import cv2
  6. import numpy as np
  7. import pyarrow as pa
  8. from dora import DoraStatus
  9. CAMERA_WIDTH = 640
  10. CAMERA_HEIGHT = 480
  11. CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
  12. font = cv2.FONT_HERSHEY_SIMPLEX
  13. class Operator:
  14. """
  15. Sending image from webcam to the dataflow
  16. """
  17. def __init__(self):
  18. self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
  19. self.start_time = time.time()
  20. self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
  21. self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
  22. def on_event(
  23. self,
  24. dora_event: str,
  25. send_output,
  26. ) -> DoraStatus:
  27. event_type = dora_event["type"]
  28. if event_type == "INPUT":
  29. ret, frame = self.video_capture.read()
  30. if ret:
  31. frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
  32. ## Push an error image in case the camera is not available.
  33. else:
  34. frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
  35. cv2.putText(
  36. frame,
  37. "No Webcam was found at index %d" % (CAMERA_INDEX),
  38. (int(30), int(30)),
  39. font,
  40. 0.75,
  41. (255, 255, 255),
  42. 2,
  43. 1,
  44. )
  45. send_output(
  46. "image",
  47. pa.array(frame.ravel()),
  48. dora_event["metadata"],
  49. )
  50. elif event_type == "INPUT":
  51. print("received stop")
  52. else:
  53. print("received unexpected event:", event_type)
  54. if time.time() - self.start_time < 20:
  55. return DoraStatus.CONTINUE
  56. else:
  57. return DoraStatus.STOP
  58. def __del__(self):
  59. self.video_capture.release()

DORA (Dataflow-Oriented Robotic Architecture) is middleware designed to streamline and simplify the creation of AI-based robotic applications. It offers low latency, composable, and distributed datafl