You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

webcam.py 2.2 kB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
1 year ago
10 months ago
10 months ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. """TODO: Add docstring."""
  2. import os
  3. import time
  4. import cv2
  5. import numpy as np
  6. import pyarrow as pa
  7. from dora import DoraStatus
  8. CAMERA_WIDTH = 640
  9. CAMERA_HEIGHT = 480
  10. CAMERA_INDEX = int(os.getenv("CAMERA_INDEX", 0))
  11. CI = os.environ.get("CI")
  12. font = cv2.FONT_HERSHEY_SIMPLEX
  13. class Operator:
  14. """Sending image from webcam to the dataflow."""
  15. def __init__(self):
  16. """TODO: Add docstring."""
  17. self.video_capture = cv2.VideoCapture(CAMERA_INDEX)
  18. self.start_time = time.time()
  19. self.video_capture.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
  20. self.video_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
  21. self.failure_count = 0
  22. def on_event(
  23. self,
  24. dora_event: str,
  25. send_output,
  26. ) -> DoraStatus:
  27. """TODO: Add docstring."""
  28. event_type = dora_event["type"]
  29. if event_type == "INPUT":
  30. ret, frame = self.video_capture.read()
  31. if ret:
  32. frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
  33. self.failure_count = 0
  34. ## Push an error image in case the camera is not available.
  35. elif self.failure_count > 10:
  36. frame = np.zeros((CAMERA_HEIGHT, CAMERA_WIDTH, 3), dtype=np.uint8)
  37. cv2.putText(
  38. frame,
  39. f"No Webcam was found at index {CAMERA_INDEX}",
  40. (30, 30),
  41. font,
  42. 0.75,
  43. (255, 255, 255),
  44. 2,
  45. 1,
  46. )
  47. else:
  48. self.failure_count += 1
  49. return DoraStatus.CONTINUE
  50. send_output(
  51. "image",
  52. pa.array(frame.ravel()),
  53. dora_event["metadata"],
  54. )
  55. elif event_type == "STOP":
  56. print("received stop")
  57. else:
  58. print("received unexpected event:", event_type)
  59. if time.time() - self.start_time < 20 or CI != "true":
  60. return DoraStatus.CONTINUE
  61. return DoraStatus.STOP
  62. def __del__(self):
  63. """TODO: Add docstring."""
  64. self.video_capture.release()