Browse Source

Add Python example (#67)

tags/v0.0.0-test.4
Xavier Tao GitHub 3 years ago
parent
commit
6242b6e3a0
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 481 additions and 126 deletions
  1. +35
    -0
      .github/workflows/ci-python.yml
  2. +4
    -0
      Cargo.toml
  3. +0
    -29
      binaries/coordinator/examples/graphs/python_test.yml
  4. +0
    -8
      binaries/coordinator/examples/nodes/python/printer.py
  5. +0
    -11
      binaries/coordinator/examples/nodes/python/static_string.py
  6. +7
    -3
      binaries/runtime/src/operator/mod.rs
  7. +1
    -0
      examples/python-dataflow/.gitignore
  8. +33
    -0
      examples/python-dataflow/REAMDE.md
  9. +27
    -0
      examples/python-dataflow/dataflow.yml
  10. +27
    -0
      examples/python-dataflow/dataflow_without_webcam.yml
  11. +23
    -0
      examples/python-dataflow/no_webcam.py
  12. +43
    -0
      examples/python-dataflow/object_detection.py
  13. +86
    -0
      examples/python-dataflow/plot.py
  14. +43
    -0
      examples/python-dataflow/requirements.txt
  15. +35
    -0
      examples/python-dataflow/run.rs
  16. +12
    -0
      examples/python-dataflow/run.sh
  17. +82
    -0
      examples/python-dataflow/utils.py
  18. +23
    -0
      examples/python-dataflow/webcam.py
  19. +0
    -36
      examples/python-operator/op.py
  20. +0
    -39
      examples/python-operator/op2.py

+ 35
- 0
.github/workflows/ci-python.yml View File

@@ -0,0 +1,35 @@

name: CI-python

# Filter CI as this job will take time.
on:
push:
paths:
- apis/python/**
- binaries/runtime/**
pull_request:
branches:
- main
paths:
- apis/python/**
- binaries/runtime/**
jobs:

examples:
name: "Python Examples"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Install Cap'n Proto
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y capnproto libcapnp-dev
- uses: actions/setup-python@v2
with:
python-version: 3.8.10
- name: "Python Dataflow example"
uses: actions-rs/cargo@v1
with:
command: run
args: --example python-dataflow

+ 4
- 0
Cargo.toml View File

@@ -36,3 +36,7 @@ path = "examples/rust-dataflow/run.rs"
[[example]]
name = "cxx-dataflow"
path = "examples/c++-dataflow/run.rs"

[[example]]
name = "python-dataflow"
path = "examples/python-dataflow/run.rs"

+ 0
- 29
binaries/coordinator/examples/graphs/python_test.yml View File

@@ -1,29 +0,0 @@
communication:
zenoh:
prefix: /foo

nodes:
- id: static-string
custom:
run: python examples/nodes/python/static_string.py
outputs:
- string

- id: python-printer
custom:
run: python examples/nodes/python/printer.py
inputs:
string: static-string/string
time2: rust-timer/time
- id: rust-timer
custom:
run: cargo run --example source_timer
outputs:
- time

- id: rust-logger
custom:
run: cargo run --example sink_logger
inputs:
time: static-string/string

+ 0
- 8
binaries/coordinator/examples/nodes/python/printer.py View File

@@ -1,8 +0,0 @@
from dora import Node

node = Node()

for id, value in node:
print(f"From Python, id: {id}, value: {value}") if value is not [] else None

print("printer finished")

+ 0
- 11
binaries/coordinator/examples/nodes/python/static_string.py View File

@@ -1,11 +0,0 @@
import time

from dora import Node

node = Node()

for i in range(100):
node.send_output("string", b"Hello World")
time.sleep(0.1)

print("static string finished")

+ 7
- 3
binaries/runtime/src/operator/mod.rs View File

@@ -1,6 +1,7 @@
use dora_core::descriptor::{OperatorDefinition, OperatorSource};
use dora_node_api::config::DataId;
use eyre::{eyre, Context};
use log::warn;
use std::any::Any;
use tokio::sync::mpsc::{self, Sender};

@@ -56,9 +57,12 @@ impl Operator {
)
})?
.try_send(OperatorInput { id, value })
.map_err(|err| match err {
tokio::sync::mpsc::error::TrySendError::Closed(_) => eyre!("operator crashed"),
tokio::sync::mpsc::error::TrySendError::Full(_) => eyre!("operator queue full"),
.or_else(|err| match err {
tokio::sync::mpsc::error::TrySendError::Closed(_) => Err(eyre!("operator crashed")),
tokio::sync::mpsc::error::TrySendError::Full(_) => {
warn!("operator queue full");
Ok(())
}
})
}



+ 1
- 0
examples/python-dataflow/.gitignore View File

@@ -0,0 +1 @@
*.pt

+ 33
- 0
examples/python-dataflow/REAMDE.md View File

@@ -0,0 +1,33 @@
# Python Dataflow Example

This examples shows how to create and connect dora operators and custom nodes in Python.

## Overview

The [`dataflow.yml`](./dataflow.yml) defines a simple dataflow graph with the following three nodes:

- a webcam node, that connects to your webcam and feed the dataflow with webcam frame as jpeg compressed bytearray.
- an object detection node, that apply Yolo v5 on the webcam image. The model is imported from Pytorch Hub. The output is the bouding box of each object detected, the confidence and the class. You can have more info here: https://pytorch.org/hub/ultralytics_yolov5/
- a window plotting node, that will retrieve the webcam image and the Yolov5 bounding box and join the two together.

## Getting started

```bash
cargo run --example python-dataflow
```

## Installation

To install, you should run the `install.sh` script.

```bash
install.sh
```

## Run the dataflow as a standalone

- Start the `dora-coordinator`, passing the paths to the dataflow file and the `dora-runtime` as arguments:

```
../../target/release/dora-coordinator run dataflow.yml ../../target/release/dora-runtime
```

+ 27
- 0
examples/python-dataflow/dataflow.yml View File

@@ -0,0 +1,27 @@
communication:
zenoh:
prefix: /example-python-dataflow

nodes:
- id: webcam
custom:
run: ./webcam.py
inputs:
timer: dora/timer/millis/100
outputs:
- image
- id: object_detection
operator:
python: object_detection.py
inputs:
image: webcam/image
outputs:
- bbox

- id: plot
operator:
python: plot.py
inputs:
image: webcam/image
bbox: object_detection/bbox

+ 27
- 0
examples/python-dataflow/dataflow_without_webcam.yml View File

@@ -0,0 +1,27 @@
communication:
zenoh:
prefix: /example-python-no-webcam-dataflow

nodes:
- id: no_webcam
custom:
run: ./no_webcam.py
inputs:
timer: dora/timer/millis/100
outputs:
- image
- id: object_detection
operator:
python: object_detection.py
inputs:
image: no_webcam/image
outputs:
- bbox

- id: plot
operator:
python: plot.py
inputs:
image: no_webcam/image
bbox: object_detection/bbox

+ 23
- 0
examples/python-dataflow/no_webcam.py View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import urllib.request

import cv2
import numpy as np
from dora import Node

req = urllib.request.urlopen("https://ultralytics.com/images/zidane.jpg")

arr = np.asarray(bytearray(req.read()), dtype=np.uint8)
node = Node()

start = time.time()

while time.time() - start < 20:
# Wait next input
node.next()
node.send_output("image", arr.tobytes())

time.sleep(1)

+ 43
- 0
examples/python-dataflow/object_detection.py View File

@@ -0,0 +1,43 @@
from enum import Enum
from typing import Callable

import cv2
import numpy as np
import torch


class DoraStatus(Enum):
CONTINUE = 0
STOP = 1


class Operator:
"""
Infering object from images
"""

def __init__(self):
self.model = torch.hub.load("ultralytics/yolov5", "yolov5n")

def on_input(
self,
input_id: str,
value: bytes,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
"""Handle image

Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""

frame = np.frombuffer(value, dtype="uint8")
frame = cv2.imdecode(frame, -1)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)

results = self.model(frame) # includes NMS
arrays = np.array(results.xyxy[0].cpu()).tobytes()
send_output("bbox", arrays)
return DoraStatus.CONTINUE

+ 86
- 0
examples/python-dataflow/plot.py View File

@@ -0,0 +1,86 @@
import os
from enum import Enum
from typing import Callable

import cv2
import numpy as np

from utils import LABELS

CI = os.environ.get("CI")

font = cv2.FONT_HERSHEY_SIMPLEX


class DoraStatus(Enum):
CONTINUE = 0
STOP = 1


class Operator:
"""
Plot image and bounding box
"""

def __init__(self):
self.image = []

def on_input(
self,
input_id: str,
value: bytes,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.

Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""
if input_id == "image":
frame = np.frombuffer(value, dtype="uint8")
frame = cv2.imdecode(frame, -1)
self.image = frame

elif input_id == "bbox" and len(self.image) != 0:
bboxs = np.frombuffer(value, dtype="float32")
bboxs = np.reshape(bboxs, (-1, 6))
for bbox in bboxs:
[
min_x,
min_y,
max_x,
max_y,
confidence,
label,
] = bbox
cv2.rectangle(
self.image,
(int(min_x), int(min_y)),
(int(max_x), int(max_y)),
(0, 255, 0),
2,
)

cv2.putText(
self.image,
LABELS[int(label)] + f", {confidence:0.2f}",
(int(max_x), int(max_y)),
font,
0.75,
(0, 255, 0),
2,
1,
)

if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):
return DoraStatus.STOP

return DoraStatus.CONTINUE

def drop_operator(self):
cv2.destroyAllWindows()

+ 43
- 0
examples/python-dataflow/requirements.txt View File

@@ -0,0 +1,43 @@
# YOLOv5 requirements
# Usage: pip install -r requirements.txt

# Base ----------------------------------------
matplotlib>=3.2.2
numpy>=1.18.5
opencv-python>=4.1.1
Pillow>=7.1.2
PyYAML>=5.3.1
requests>=2.23.0
scipy>=1.4.1
torch>=1.7.0
torchvision>=0.8.1
tqdm>=4.64.0
protobuf<=3.20.1 # https://github.com/ultralytics/yolov5/issues/8012

# Logging -------------------------------------
tensorboard>=2.4.1
# wandb
# clearml

# Plotting ------------------------------------
pandas>=1.1.4
seaborn>=0.11.0

# Export --------------------------------------
# coremltools>=5.2 # CoreML export
# onnx>=1.9.0 # ONNX export
# onnx-simplifier>=0.4.1 # ONNX simplifier
# nvidia-pyindex # TensorRT export
# nvidia-tensorrt # TensorRT export
# scikit-learn==0.19.2 # CoreML quantization
# tensorflow>=2.4.1 # TFLite export (or tensorflow-cpu, tensorflow-aarch64)
# tensorflowjs>=3.9.0 # TF.js export
# openvino-dev # OpenVINO export

# Extras --------------------------------------
ipython # interactive notebook
psutil # system utilization
thop>=0.1.1 # FLOPs computation
# albumentations>=1.0.3
# pycocotools>=2.0 # COCO mAP
# roboflow

+ 35
- 0
examples/python-dataflow/run.rs View File

@@ -0,0 +1,35 @@
use eyre::{bail, Context};
use std::{env, path::Path};

#[tokio::main]
async fn main() -> eyre::Result<()> {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

build_package("dora-runtime").await?;

run(root).await?;

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build").arg("--release");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

async fn run(_root: &Path) -> eyre::Result<()> {
let mut run = tokio::process::Command::new("sh");
run.arg("./run.sh");
if !run.status().await?.success() {
bail!("failed to run python example.");
};
Ok(())
}

+ 12
- 0
examples/python-dataflow/run.sh View File

@@ -0,0 +1,12 @@
python3 -m venv .env
. $(pwd)/.env/bin/activate
# Dev dependencies
pip install maturin
cd ../../apis/python/node
maturin develop
cd ../../../examples/python-dataflow

# Dependencies
pip install -r requirements.txt

cargo run -p dora-coordinator --release -- run dataflow_without_webcam.yml

+ 82
- 0
examples/python-dataflow/utils.py View File

@@ -0,0 +1,82 @@
LABELS = [
"ABC",
"bicycle",
"car",
"motorcycle",
"airplane",
"bus",
"train",
"truck",
"boat",
"traffic light",
"fire hydrant",
"stop sign",
"parking meter",
"bench",
"bird",
"cat",
"dog",
"horse",
"sheep",
"cow",
"elephant",
"bear",
"zebra",
"giraffe",
"backpack",
"umbrella",
"handbag",
"tie",
"suitcase",
"frisbee",
"skis",
"snowboard",
"sports ball",
"kite",
"baseball bat",
"baseball glove",
"skateboard",
"surfboard",
"tennis racket",
"bottle",
"wine glass",
"cup",
"fork",
"knife",
"spoon",
"bowl",
"banana",
"apple",
"sandwich",
"orange",
"broccoli",
"carrot",
"hot dog",
"pizza",
"donut",
"cake",
"chair",
"couch",
"potted plant",
"bed",
"dining table",
"toilet",
"tv",
"laptop",
"mouse",
"remote",
"keyboard",
"cell phone",
"microwave",
"oven",
"toaster",
"sink",
"refrigerator",
"book",
"clock",
"vase",
"scissors",
"teddy bear",
"hair drier",
"toothbrush",
]

+ 23
- 0
examples/python-dataflow/webcam.py View File

@@ -0,0 +1,23 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time

import cv2
from dora import Node

node = Node()

video_capture = cv2.VideoCapture(0)

start = time.time()

# Run for 20 seconds
while time.time() - start < 20:
# Wait next input
node.next()
ret, frame = video_capture.read()
if ret:
node.send_output("image", cv2.imencode(".jpg", frame)[1].tobytes())

video_capture.release()

+ 0
- 36
examples/python-operator/op.py View File

@@ -1,36 +0,0 @@
from typing import Callable
from enum import Enum

class DoraStatus(Enum):
CONTINUE = 0
STOP = 1

class Operator:
"""
Example operator incrementing a counter every times its been called.

The current value of the counter is sent back to dora on `counter`.
"""

def __init__(self, counter=0):
self.counter = counter

def on_input(
self,
input_id: str,
value: bytes,
send_output: Callable[[str, bytes], None],
):
"""Handle input by incrementing count by one.

Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""
val_len = len(value)
print(f"PYTHON received input {input_id}; value length: {val_len}")
send_output("counter", (self.counter % 256).to_bytes(1, "little"))
self.counter = self.counter + 1

return DoraStatus.OK

+ 0
- 39
examples/python-operator/op2.py View File

@@ -1,39 +0,0 @@
from typing import Callable
from enum import Enum

class DoraStatus(Enum):
OK = 0
STOP = 1

class Operator:
"""
Example operator incrementing a counter every times its been called.

The current value of the counter is sent back to dora on `counter`.
"""

def __init__(self, counter=0):
self.counter = counter

def on_input(
self,
input_id: str,
value: bytes,
send_output: Callable[[str, bytes], None],
):
"""Handle input by incrementing count by one.

Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""
val_len = len(value)
print(f"PYTHON received input {input_id}; value length: {val_len}")
send_output("counter", (self.counter % 256).to_bytes(1, "little"))
self.counter = self.counter + 1

if self.counter > 500:
return DoraStatus.STOP
else:
return DoraStatus.OK

Loading…
Cancel
Save