Browse Source

Runtime tracing (#95)

* add opentelemetry feature to python op

* modify `python-dataflow` to test opentelemetry

* change `timer` topic to `tick`

* adding tracing to `shared_libraries`

* example install refactoring

* Rename `input` to `dora_input` for python ex
tags/v0.0.0-test-pr-120
Xavier Tao GitHub 3 years ago
parent
commit
c2bb4c1f67
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 117 additions and 35 deletions
  1. +2
    -0
      Cargo.lock
  2. +8
    -0
      binaries/runtime/Cargo.toml
  3. +24
    -2
      binaries/runtime/src/operator/mod.rs
  4. +34
    -10
      binaries/runtime/src/operator/python.rs
  5. +29
    -3
      binaries/runtime/src/operator/shared_lib.rs
  6. +1
    -1
      examples/python-dataflow/dataflow.yml
  7. +1
    -1
      examples/python-dataflow/dataflow_without_webcam.yml
  8. +1
    -1
      examples/python-dataflow/no_webcam.py
  9. +4
    -6
      examples/python-dataflow/object_detection.py
  10. +7
    -8
      examples/python-dataflow/plot.py
  11. +3
    -1
      examples/python-dataflow/requirements.txt
  12. +2
    -1
      examples/python-dataflow/run.sh
  13. +1
    -1
      examples/python-dataflow/webcam.py

+ 2
- 0
Cargo.lock View File

@@ -1038,12 +1038,14 @@ dependencies = [
"dora-message",
"dora-node-api",
"dora-operator-api-types",
"dora-tracing",
"eyre",
"fern",
"flume",
"futures",
"futures-concurrency",
"libloading",
"opentelemetry",
"pyo3",
"serde_yaml 0.8.23",
"tokio",


+ 8
- 0
binaries/runtime/Cargo.toml View File

@@ -14,6 +14,11 @@ dora-node-api = { path = "../../apis/rust/node", default-features = false, featu
] }
dora-operator-api-types = { path = "../../apis/rust/operator/types" }
dora-core = { version = "0.1.0", path = "../../libraries/core" }
dora-tracing = { path = "../../libraries/extensions/telemetry/tracing", optional = true }
opentelemetry = { version = "0.17", features = [
"rt-tokio",
"metrics",
], optional = true }
eyre = "0.6.8"
futures = "0.3.21"
futures-concurrency = "2.0.3"
@@ -29,3 +34,6 @@ flume = "0.10.14"
dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"

[features]
tracing = ["opentelemetry", "dora-tracing"]

+ 24
- 2
binaries/runtime/src/operator/mod.rs View File

@@ -9,6 +9,8 @@ use tokio::sync::mpsc::Sender;

mod python;
mod shared_lib;
#[cfg(feature = "tracing")]
use dora_tracing::init_tracing;

#[tracing::instrument(skip(communication))]
pub fn spawn_operator(
@@ -42,9 +44,21 @@ pub fn spawn_operator(
})
.collect::<Result<_, _>>()?;

#[cfg(feature = "tracing")]
let tracer = init_tracing(format!("{node_id}/{}", operator_definition.id).as_str())
.wrap_err("could not initiate tracing for operator")?;

match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, inputs, publishers).wrap_err_with(|| {
shared_lib::spawn(
path,
events_tx,
inputs,
publishers,
#[cfg(feature = "tracing")]
tracer,
)
.wrap_err_with(|| {
format!(
"failed to spawn shared library operator for {}",
operator_definition.id
@@ -52,7 +66,15 @@ pub fn spawn_operator(
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, inputs, publishers).wrap_err_with(|| {
python::spawn(
path,
events_tx,
inputs,
publishers,
#[cfg(feature = "tracing")]
tracer,
)
.wrap_err_with(|| {
format!(
"failed to spawn Python operator for {}",
operator_definition.id


+ 34
- 10
binaries/runtime/src/operator/python.rs View File

@@ -2,8 +2,22 @@

use super::OperatorEvent;
use dora_node_api::{communication::Publisher, config::DataId};
#[cfg(feature = "tracing")]
use dora_tracing::{deserialize_context, serialize_context};
use eyre::{bail, eyre, Context};
use pyo3::{pyclass, types::IntoPyDict, types::PyBytes, Py, Python};
#[cfg(feature = "tracing")]
use opentelemetry::{
sdk::trace,
trace::{TraceContextExt, Tracer},
Context as OtelContext,
};

use pyo3::{
pyclass,
types::IntoPyDict,
types::{PyBytes, PyDict},
Py, Python,
};
use std::{
collections::HashMap,
panic::{catch_unwind, AssertUnwindSafe},
@@ -30,6 +44,7 @@ pub fn spawn(
events_tx: Sender<OperatorEvent>,
inputs: flume::Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
#[cfg(feature = "tracing")] tracer: trace::Tracer,
) -> eyre::Result<()> {
if !path.exists() {
bail!("No python file exists at {}", path.display());
@@ -82,17 +97,26 @@ pub fn spawn(
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;

while let Ok(input) = inputs.recv() {
#[cfg(feature = "tracing")]
let span = tracer.start_with_context(
format!("{}", input.id),
&deserialize_context(&input.metadata.open_telemetry_context.to_string()),
);
#[cfg(feature = "tracing")]
let cx = serialize_context(&OtelContext::current_with_span(span));
#[cfg(not(feature = "tracing"))]
let cx = "";
let status_enum = Python::with_gil(|py| {
let metadata = PyDict::new(py);
metadata.set_item("open_telemetry_context", &cx)?;
let input_dict = PyDict::new(py);

input_dict.set_item("id", input.id.as_str())?;
input_dict.set_item("data", PyBytes::new(py, &input.data()))?;
input_dict.set_item("metadata", metadata)?;

operator
.call_method1(
py,
"on_input",
(
input.id.to_string(),
PyBytes::new(py, &input.data()),
send_output.clone(),
),
)
.call_method1(py, "on_input", (input_dict, send_output.clone()))
.map_err(traceback)
})?;
let status_val = Python::with_gil(|py| status_enum.getattr(py, "value"))


+ 29
- 3
binaries/runtime/src/operator/shared_lib.rs View File

@@ -5,9 +5,17 @@ use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput,
};
#[cfg(feature = "tracing")]
use dora_tracing::{deserialize_context, serialize_context};
use eyre::{bail, eyre, Context};
use flume::Receiver;
use libloading::Symbol;
#[cfg(feature = "tracing")]
use opentelemetry::{
sdk::trace,
trace::{TraceContextExt, Tracer},
Context as OtelContext,
};
use std::{
collections::HashMap,
ffi::c_void,
@@ -24,6 +32,7 @@ pub fn spawn(
events_tx: Sender<OperatorEvent>,
inputs: Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
#[cfg(feature = "tracing")] tracer: trace::Tracer,
) -> eyre::Result<()> {
let path = adjust_shared_library_path(path)?;

@@ -38,7 +47,11 @@ pub fn spawn(

let operator = SharedLibraryOperator { inputs, bindings };

operator.run(publishers)
operator.run(
publishers,
#[cfg(feature = "tracing")]
tracer,
)
});
match catch_unwind(closure) {
Ok(Ok(())) => {
@@ -63,7 +76,11 @@ struct SharedLibraryOperator<'lib> {
}

impl<'lib> SharedLibraryOperator<'lib> {
fn run(self, publishers: HashMap<DataId, Box<dyn Publisher>>) -> eyre::Result<()> {
fn run(
self,
publishers: HashMap<DataId, Box<dyn Publisher>>,
#[cfg(feature = "tracing")] tracer: trace::Tracer,
) -> eyre::Result<()> {
let operator_context = {
let DoraInitResult {
result,
@@ -118,11 +135,20 @@ impl<'lib> SharedLibraryOperator<'lib> {
});

while let Ok(input) = self.inputs.recv() {
#[cfg(feature = "tracing")]
let span = tracer.start_with_context(
format!("{}", input.id),
&deserialize_context(&input.metadata.open_telemetry_context.to_string()),
);
#[cfg(feature = "tracing")]
let cx = serialize_context(&OtelContext::current_with_span(span));
#[cfg(not(feature = "tracing"))]
let cx = "";
let operator_input = dora_operator_api_types::Input {
data: input.data().into_owned().into(),
id: String::from(input.id).into(),
metadata: Metadata {
open_telemetry_context: String::new().into(),
open_telemetry_context: cx.to_string().into(),
},
};



+ 1
- 1
examples/python-dataflow/dataflow.yml View File

@@ -7,7 +7,7 @@ nodes:
custom:
run: ./webcam.py
inputs:
timer: dora/timer/millis/100
tick: dora/timer/millis/100
outputs:
- image


+ 1
- 1
examples/python-dataflow/dataflow_without_webcam.yml View File

@@ -7,7 +7,7 @@ nodes:
custom:
run: ./no_webcam.py
inputs:
timer: dora/timer/millis/100
tick: dora/timer/millis/100
outputs:
- image


+ 1
- 1
examples/python-dataflow/no_webcam.py View File

@@ -16,7 +16,7 @@ node = Node()
start = time.time()

while time.time() - start < 20:
# Wait next input
# Wait next dora_input
node.next()
node.send_output("image", arr.tobytes())



+ 4
- 6
examples/python-dataflow/object_detection.py View File

@@ -21,23 +21,21 @@ class Operator:

def on_input(
self,
input_id: str,
value: bytes,
dora_input: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
"""Handle image

Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
dora_input (dict): Dict containing the "id", "data", and "metadata"
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""

frame = np.frombuffer(value, dtype="uint8")
frame = np.frombuffer(dora_input["data"], dtype="uint8")
frame = cv2.imdecode(frame, -1)
frame = frame[:, :, ::-1] # OpenCV image (BGR to RGB)

results = self.model(frame) # includes NMS
arrays = np.array(results.xyxy[0].cpu()).tobytes()
send_output("bbox", arrays)
send_output("bbox", arrays, dora_input["metadata"])
return DoraStatus.CONTINUE

+ 7
- 8
examples/python-dataflow/plot.py View File

@@ -27,25 +27,24 @@ class Operator:

def on_input(
self,
input_id: str,
value: bytes,
dora_input: dict,
send_output: Callable[[str, bytes], None],
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.

Args:
input_id (str): Id of the input declared in the yaml configuration
value (bytes): Bytes message of the input
dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
dora_input["data"] (bytes): Bytes message of the dora_input
send_output (Callable[[str, bytes]]): Function enabling sending output back to dora.
"""
if input_id == "image":
frame = np.frombuffer(value, dtype="uint8")
if dora_input["id"] == "image":
frame = np.frombuffer(dora_input["data"], dtype="uint8")
frame = cv2.imdecode(frame, -1)
self.image = frame

elif input_id == "bbox" and len(self.image) != 0:
bboxs = np.frombuffer(value, dtype="float32")
elif dora_input["id"] == "bbox" and len(self.image) != 0:
bboxs = np.frombuffer(dora_input["data"], dtype="float32")
bboxs = np.reshape(bboxs, (-1, 6))
for bbox in bboxs:
[


+ 3
- 1
examples/python-dataflow/requirements.txt View File

@@ -40,4 +40,6 @@ psutil # system utilization
thop>=0.1.1 # FLOPs computation
# albumentations>=1.0.3
# pycocotools>=2.0 # COCO mAP
# roboflow
# roboflow

opencv-python>=4.1.1

+ 2
- 1
examples/python-dataflow/run.sh View File

@@ -7,6 +7,7 @@ maturin develop
cd ../../../examples/python-dataflow

# Dependencies
pip install --upgrade pip
pip install -r requirements.txt

cargo run -p dora-coordinator --release -- run dataflow_without_webcam.yml
cargo run -p dora-coordinator --release -- run dataflow_without_webcam.yml

+ 1
- 1
examples/python-dataflow/webcam.py View File

@@ -14,7 +14,7 @@ start = time.time()

# Run for 20 seconds
while time.time() - start < 20:
# Wait next input
# Wait next dora_input
node.next()
ret, frame = video_capture.read()
if ret:


Loading…
Cancel
Save