Browse Source

Add record3d

tags/v0.3.9-rc1
haixuantao 1 year ago
parent
commit
8d6e4ccb66
6 changed files with 264 additions and 0 deletions
  1. +37
    -0
      node-hub/dora-record3d/README.md
  2. +11
    -0
      node-hub/dora-record3d/dora_record3d/__init__.py
  3. +5
    -0
      node-hub/dora-record3d/dora_record3d/__main__.py
  4. +109
    -0
      node-hub/dora-record3d/dora_record3d/main.py
  5. +16
    -0
      node-hub/dora-record3d/pyproject.toml
  6. +86
    -0
      node-hub/dora-record3d/tests/test_demo.py

+ 37
- 0
node-hub/dora-record3d/README.md View File

@@ -0,0 +1,37 @@
# dora-record3d

## Getting started

- Install it with pip:

```bash
pip install -e .
```

## Contribution Guide

- Format with [ruff](https://docs.astral.sh/ruff/):

```bash
ruff check . --fix
```

- Lint with ruff:

```bash
ruff check .
```

- Test with [pytest](https://github.com/pytest-dev/pytest)

```bash
pytest . # Test
```

## YAML Specification

## Examples

## License

dora-record3d's code are released under the MIT License

+ 11
- 0
node-hub/dora-record3d/dora_record3d/__init__.py View File

@@ -0,0 +1,11 @@
import os

# Define the path to the README file relative to the package directory
readme_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), "README.md")

# Read the content of the README file
try:
with open(readme_path, "r", encoding="utf-8") as f:
__doc__ = f.read()
except FileNotFoundError:
__doc__ = "README file not found."

+ 5
- 0
node-hub/dora-record3d/dora_record3d/__main__.py View File

@@ -0,0 +1,5 @@
from .main import main


if __name__ == "__main__":
main()

+ 109
- 0
node-hub/dora-record3d/dora_record3d/main.py View File

@@ -0,0 +1,109 @@
from threading import Event

import cv2
import numpy as np
import pyarrow as pa
from dora import Node
from record3d import Record3DStream


class DemoApp:
def __init__(self):
self.event = Event()
self.session = None
self.DEVICE_TYPE__TRUEDEPTH = 0
self.DEVICE_TYPE__LIDAR = 1
self.stop = False

def on_new_frame(self):
"""
This method is called from non-main thread, therefore cannot be used for presenting UI.
"""
self.event.set() # Notify the main thread to stop waiting and process new frame.

def on_stream_stopped(self):
self.stop = True
print("Stream stopped")

def connect_to_device(self, dev_idx):
print("Searching for devices")
devs = Record3DStream.get_connected_devices()
print("{} device(s) found".format(len(devs)))
for dev in devs:
print("\tID: {}\n\tUDID: {}\n".format(dev.product_id, dev.udid))

if len(devs) <= dev_idx:
raise RuntimeError(
"Cannot connect to device #{}, try different index.".format(dev_idx)
)

dev = devs[dev_idx]
self.session = Record3DStream()
self.session.on_new_frame = self.on_new_frame
self.session.on_stream_stopped = self.on_stream_stopped
self.session.connect(dev) # Initiate connection and start capturing

def get_intrinsic_mat_from_coeffs(self, coeffs):
return np.array(
[[coeffs.fx, 0, coeffs.tx], [0, coeffs.fy, coeffs.ty], [0, 0, 1]]
)

def start_processing_stream(self):
node = Node()

for event in node:
if self.stop:
break
if event["type"] == "INPUT":
if event["id"] == "TICK":
self.event.wait() # Wait for new frame to arrive

# Copy the newly arrived RGBD frame
depth = self.session.get_depth_frame()
rgb = self.session.get_rgb_frame()
intrinsic_mat = self.get_intrinsic_mat_from_coeffs(
self.session.get_intrinsic_mat()
)

if depth.shape != rgb.shape:
rgb = cv2.resize(rgb, (depth.shape[1], depth.shape[0]))

node.send_output(
"image",
pa.array(rgb.ravel()),
metadata={
"encoding": "rgb8",
"width": rgb.shape[1],
"height": rgb.shape[0],
},
)

node.send_output(
"depth",
pa.array(depth.ravel().astype(np.float64())),
metadata={
"width": depth.shape[1],
"height": depth.shape[0],
"encoding": "CV_64F",
"focal": [
int(intrinsic_mat[0, 0]),
int(intrinsic_mat[1, 1]),
],
"resolution": [
int(intrinsic_mat[0, 2]),
int(intrinsic_mat[1, 2]),
],
},
)

self.event.clear()


def main():
app = DemoApp()
app.connect_to_device(dev_idx=0)
app.start_processing_stream()


if __name__ == "__main__":
main()

+ 16
- 0
node-hub/dora-record3d/pyproject.toml View File

@@ -0,0 +1,16 @@
[project]
name = "dora-record3d"
version = "0.0.0"
authors = [{ name = "Your Name", email = "email@email.com" }]
description = "dora-record3d"
license = { text = "MIT" }
readme = "README.md"
requires-python = ">=3.9"

dependencies = ["dora-rs >= 0.3.6", "record3d>=1.4"]

[dependency-groups]
dev = ["pytest >=8.1.1", "ruff >=0.9.1"]

[project.scripts]
dora-record3d = "dora_record3d.main:main"

+ 86
- 0
node-hub/dora-record3d/tests/test_demo.py View File

@@ -0,0 +1,86 @@
from threading import Event

import cv2
import numpy as np
from record3d import Record3DStream


class DemoApp:
def __init__(self):
self.event = Event()
self.session = None
self.DEVICE_TYPE__TRUEDEPTH = 0
self.DEVICE_TYPE__LIDAR = 1

def on_new_frame(self):
"""
This method is called from non-main thread, therefore cannot be used for presenting UI.
"""
self.event.set() # Notify the main thread to stop waiting and process new frame.

def on_stream_stopped(self):
print("Stream stopped")

def connect_to_device(self, dev_idx):
print("Searching for devices")
devs = Record3DStream.get_connected_devices()
print("{} device(s) found".format(len(devs)))
for dev in devs:
print("\tID: {}\n\tUDID: {}\n".format(dev.product_id, dev.udid))

if len(devs) <= dev_idx:
raise RuntimeError(
"Cannot connect to device #{}, try different index.".format(dev_idx)
)

dev = devs[dev_idx]
self.session = Record3DStream()
self.session.on_new_frame = self.on_new_frame
self.session.on_stream_stopped = self.on_stream_stopped
self.session.connect(dev) # Initiate connection and start capturing

def get_intrinsic_mat_from_coeffs(self, coeffs):
return np.array(
[[coeffs.fx, 0, coeffs.tx], [0, coeffs.fy, coeffs.ty], [0, 0, 1]]
)

def start_processing_stream(self):
while True:
self.event.wait() # Wait for new frame to arrive

# Copy the newly arrived RGBD frame
depth = self.session.get_depth_frame()
rgb = self.session.get_rgb_frame()
confidence = self.session.get_confidence_frame()
intrinsic_mat = self.get_intrinsic_mat_from_coeffs(
self.session.get_intrinsic_mat()
)
camera_pose = self.session.get_camera_pose() # Quaternion + world position (accessible via camera_pose.[qx|qy|qz|qw|tx|ty|tz])

print(intrinsic_mat)

# You can now e.g. create point cloud by projecting the depth map using the intrinsic matrix.

# Postprocess it
if self.session.get_device_type() == self.DEVICE_TYPE__TRUEDEPTH:
depth = cv2.flip(depth, 1)
rgb = cv2.flip(rgb, 1)

rgb = cv2.cvtColor(rgb, cv2.COLOR_RGB2BGR)

# Show the RGBD Stream
cv2.imshow("RGB", rgb)
cv2.imshow("Depth", depth)

if confidence.shape[0] > 0 and confidence.shape[1] > 0:
cv2.imshow("Confidence", confidence * 100)

cv2.waitKey(1)

self.event.clear()


if __name__ == "__main__":
app = DemoApp()
app.connect_to_device(dev_idx=0)
app.start_processing_stream()

Loading…
Cancel
Save