From 629a218ddb297a09d477abd5dd85479ab6253b53 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 1 Dec 2023 19:07:10 +0100 Subject: [PATCH 1/8] `copy_array_into_sample` do not need to return a result --- apis/python/operator/src/lib.rs | 2 +- apis/rust/node/src/node/arrow_utils.rs | 13 +++++-------- apis/rust/node/src/node/mod.rs | 2 +- binaries/runtime/src/operator/python.rs | 2 +- binaries/runtime/src/operator/shared_lib.rs | 5 +---- 5 files changed, 9 insertions(+), 15 deletions(-) diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 83745b9c..f4712e92 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -171,7 +171,7 @@ mod tests { let size = required_data_size(arrow_array); let mut sample: AVec> = AVec::__from_elem(128, 0, size); - let info = copy_array_into_sample(&mut sample, arrow_array)?; + let info = copy_array_into_sample(&mut sample, arrow_array); let serialized_deserialized_arrow_array = RawData::Vec(sample) .into_arrow_array(&info) diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs index da14489a..8deb3ca2 100644 --- a/apis/rust/node/src/node/arrow_utils.rs +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -20,10 +20,7 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { } } -pub fn copy_array_into_sample( - target_buffer: &mut [u8], - arrow_array: &ArrayData, -) -> eyre::Result { +pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { let mut next_offset = 0; copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) } @@ -32,7 +29,7 @@ fn copy_array_into_sample_inner( target_buffer: &mut [u8], next_offset: &mut usize, arrow_array: &ArrayData, -) -> eyre::Result { +) -> ArrowTypeInfo { let mut buffer_offsets = Vec::new(); let layout = arrow::array::layout(arrow_array.data_type()); for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) { @@ -58,11 +55,11 @@ fn copy_array_into_sample_inner( let mut child_data = Vec::new(); for child in arrow_array.child_data() { - let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?; + let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child); child_data.push(child_type_info); } - Ok(ArrowTypeInfo { + ArrowTypeInfo { data_type: arrow_array.data_type().clone(), len: arrow_array.len(), null_count: arrow_array.null_count(), @@ -70,5 +67,5 @@ fn copy_array_into_sample_inner( offset: arrow_array.offset(), buffer_offsets, child_data, - }) + } } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 31725c89..d8e159b2 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -153,7 +153,7 @@ impl DoraNode { let total_len = required_data_size(&arrow_array); let mut sample = self.allocate_data_sample(total_len)?; - let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; + let type_info = copy_array_into_sample(&mut sample, &arrow_array); self.send_output_sample(output_id, type_info, parameters, Some(sample)) .wrap_err("failed to send output")?; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 5b2b12df..07a47c42 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -362,7 +362,7 @@ mod callback_impl { let total_len = required_data_size(&arrow_array); let mut sample = allocate_sample(total_len)?; - let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; + let type_info = copy_array_into_sample(&mut sample, &arrow_array); (sample, type_info) } else { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 495c1b6c..811c3cd0 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -132,10 +132,7 @@ impl<'lib> SharedLibraryOperator<'lib> { let total_len = required_data_size(&arrow_array); let mut sample: AVec> = AVec::__from_elem(128, 0, total_len); - let type_info = match copy_array_into_sample(&mut sample, &arrow_array) { - Ok(t) => t, - Err(err) => return DoraResult::from_error(err.to_string()), - }; + let type_info = copy_array_into_sample(&mut sample, &arrow_array); let event = OperatorEvent::Output { output_id: DataId::from(String::from(output_id)), From 91bd7daf506d511d7a525e48215b5bea02a9c331 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 1 Dec 2023 19:30:34 +0100 Subject: [PATCH 2/8] Adding log event --- Cargo.lock | 2 ++ binaries/daemon/Cargo.toml | 2 ++ binaries/daemon/src/lib.rs | 64 ++++++++++++++++++++++++++++++++++-- binaries/daemon/src/spawn.rs | 48 ++++++++++++++++++++++++--- 4 files changed, 109 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2f623d0..fdc266dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,8 +1476,10 @@ dependencies = [ "async-trait", "bincode", "ctrlc", + "dora-arrow-convert", "dora-core", "dora-download", + "dora-node-api", "dora-tracing", "eyre", "flume", diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 3577ed7d..2d1690a8 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -27,6 +27,8 @@ dora-core = { workspace = true } flume = "0.10.14" dora-download = { workspace = true } dora-tracing = { workspace = true, optional = true } +dora-arrow-convert = { workspace = true } +dora-node-api = { workspace = true } serde_yaml = "0.8.23" uuid = { version = "1.1.2", features = ["v4"] } futures = "0.3.25" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1515d343..10c3c5f8 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,10 +1,11 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; +use dora_arrow_convert::{ArrowData, IntoArrow}; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; use dora_core::message::uhlc::{self, HLC}; -use dora_core::message::{ArrowTypeInfo, MetadataParameters}; +use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -14,6 +15,7 @@ use dora_core::{ }, descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; + use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; @@ -40,7 +42,7 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::error; -use uuid::Uuid; +use uuid::{Timestamp, Uuid}; mod coordinator; mod inter_daemon; @@ -971,6 +973,49 @@ impl Daemon { dataflow.subscribe_channels.remove(id); } } + DoraEvent::Logs { + dataflow_id, + output_id, + message, + metadata, + } => { + let Some(dataflow) = self.running.get_mut(&dataflow_id) else { + tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`"); + return Ok(RunStatus::Continue); + }; + + let Some(subscribers) = dataflow.mappings.get(&output_id) else { + tracing::warn!("No subscribers found for {:?} in {:?}", output_id, dataflow.mappings); + return Ok(RunStatus::Continue); + }; + + let mut closed = Vec::new(); + for (receiver_id, input_id) in subscribers { + let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else { + tracing::warn!("No subscriber channel found for {:?}", output_id); + continue; + }; + + let send_result = send_with_timestamp( + channel, + daemon_messages::NodeEvent::Input { + id: input_id.clone(), + metadata: metadata.clone(), + data: Some(message.clone()), + }, + &self.clock, + ); + match send_result { + Ok(()) => {} + Err(_) => { + closed.push(receiver_id); + } + } + } + for id in closed { + dataflow.subscribe_channels.remove(id); + } + } DoraEvent::SpawnedNodeResult { dataflow_id, node_id, @@ -1153,6 +1198,13 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap { } } +fn node_outputs(node: &ResolvedNode) -> BTreeSet { + match &node.kind { + CoreNodeKind::Custom(n) => n.run_config.outputs.clone(), + CoreNodeKind::Runtime(n) => runtime_node_outputs(n), + } +} + fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap { n.operators .iter() @@ -1397,7 +1449,7 @@ impl RunningDataflow { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct OutputId(NodeId, DataId); +pub struct OutputId(NodeId, DataId); type InputId = (NodeId, DataId); struct DropTokenInformation { @@ -1465,6 +1517,12 @@ pub enum DoraEvent { interval: Duration, metadata: dora_core::message::Metadata, }, + Logs { + dataflow_id: DataflowId, + output_id: OutputId, + message: DataMessage, + metadata: Metadata, + }, SpawnedNodeResult { dataflow_id: DataflowId, node_id: NodeId, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index ccefdbed..95411637 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,10 +1,12 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, - runtime_node_outputs, DoraEvent, Event, NodeExitStatus, + log, node_communication::spawn_listener_loop, node_inputs, node_outputs, runtime_node_inputs, + runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, }; +use aligned_vec::{AVec, ConstAlign}; +use dora_arrow_convert::IntoArrow; use dora_core::{ - config::NodeRunConfig, - daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped}, + config::{DataId, NodeRunConfig}, + daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, }, @@ -12,6 +14,11 @@ use dora_core::{ message::uhlc::HLC, }; use dora_download::download_file; +use dora_node_api::{ + arrow::array::ArrayData, + arrow_utils::{copy_array_into_sample, required_data_size}, + Metadata, +}; use eyre::WrapErr; use std::{ env::{consts::EXE_EXTENSION, temp_dir}, @@ -51,6 +58,8 @@ pub async fn spawn_node( clock.clone(), ) .await?; + let outputs = node_outputs(&node); + let log_output = outputs.contains(&DataId::from("op/logs".to_string())); let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { @@ -265,6 +274,8 @@ pub async fn spawn_node( // Stderr listener stream let stderr_tx = tx.clone(); let node_id = node.id.clone(); + let uhlc = clock.clone(); + let daemon_tx_log = daemon_tx.clone(); tokio::spawn(async move { let mut buffer = String::new(); let mut finished = false; @@ -317,9 +328,38 @@ pub async fn spawn_node( let _ = daemon_tx.send(event).await; }); + let node_id = node.id.clone(); // Log to file stream. tokio::spawn(async move { while let Some(message) = rx.recv().await { + // If log is an output, we're sending the logs to the dataflow + if log_output { + // Convert logs to DataMessage + let array = message.into_arrow(); + + let array: ArrayData = array.into(); + let total_len = required_data_size(&array); + let mut sample: AVec> = + AVec::__from_elem(128, 0, total_len as usize); + + let type_info = copy_array_into_sample(&mut sample, &array); + + let metadata = Metadata::new(uhlc.new_timestamp(), type_info); + let output_id = OutputId(node_id.clone(), DataId::from("op/logs".to_string())); + let event = DoraEvent::Logs { + dataflow_id, + output_id, + metadata, + message: DataMessage::Vec(sample), + } + .into(); + let event = Timestamped { + inner: event, + timestamp: uhlc.new_timestamp(), + }; + let _ = daemon_tx_log.send(event).await; + } + let _ = file .write_all(message.as_bytes()) .await From 61bdd4b2fef33c37e06acb84fba09c8d3924ec27 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 1 Dec 2023 19:31:37 +0100 Subject: [PATCH 3/8] Adding log example --- .../python-operator-dataflow/dataflow.yml | 2 ++ examples/python-operator-dataflow/plot.py | 23 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 92bf5f2b..27d27426 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -14,6 +14,7 @@ nodes: image: webcam/image outputs: - bbox + - logs - id: plot operator: @@ -21,3 +22,4 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox + object_detection_logs: object_detection/logs diff --git a/examples/python-operator-dataflow/plot.py b/examples/python-operator-dataflow/plot.py index cd7a7200..923ade0e 100755 --- a/examples/python-operator-dataflow/plot.py +++ b/examples/python-operator-dataflow/plot.py @@ -29,6 +29,7 @@ class Operator: self.bboxs = [] self.bounding_box_messages = 0 self.image_messages = 0 + self.object_detection_logs = [] def on_event( self, @@ -69,12 +70,22 @@ class Operator: self.image_messages += 1 print("received " + str(self.image_messages) + " images") + elif dora_input["id"] == "object_detection_logs": + logs = dora_input["value"][0].as_py() + self.object_detection_logs += [logs] + ## Only keep last 10 logs + self.object_detection_logs = self.object_detection_logs[-10:] + return DoraStatus.CONTINUE + elif dora_input["id"] == "bbox" and len(self.image) != 0: bboxs = dora_input["value"].to_numpy() self.bboxs = np.reshape(bboxs, (-1, 6)) self.bounding_box_messages += 1 print("received " + str(self.bounding_box_messages) + " bounding boxes") + return DoraStatus.CONTINUE + else: + return DoraStatus.CONTINUE for bbox in self.bboxs: [ @@ -104,6 +115,18 @@ class Operator: 1, ) + for i, log in enumerate(self.object_detection_logs): + cv2.putText( + self.image, + log, + (10, 10 + 20 * i), + font, + 0.5, + (0, 255, 0), + 2, + 1, + ) + if CI != "true": cv2.imshow("frame", self.image) if cv2.waitKey(1) & 0xFF == ord("q"): From daa694ad986a798dda2b4e1a480a688731f1f4cb Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 13 Dec 2023 13:11:11 +0100 Subject: [PATCH 4/8] Add new `send_stdout_as` key for capturing stdout of custom nodes --- binaries/daemon/src/spawn.rs | 13 +++++++------ libraries/core/src/descriptor/mod.rs | 11 +++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 95411637..765da3a4 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -58,8 +58,7 @@ pub async fn spawn_node( clock.clone(), ) .await?; - let outputs = node_outputs(&node); - let log_output = outputs.contains(&DataId::from("op/logs".to_string())); + let send_stdout_to = node.send_stdout_as().map(ToOwned::to_owned); let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { @@ -333,19 +332,21 @@ pub async fn spawn_node( tokio::spawn(async move { while let Some(message) = rx.recv().await { // If log is an output, we're sending the logs to the dataflow - if log_output { + if let Some(stdout_output_name) = &send_stdout_to { // Convert logs to DataMessage let array = message.into_arrow(); let array: ArrayData = array.into(); let total_len = required_data_size(&array); - let mut sample: AVec> = - AVec::__from_elem(128, 0, total_len as usize); + let mut sample: AVec> = AVec::__from_elem(128, 0, total_len); let type_info = copy_array_into_sample(&mut sample, &array); let metadata = Metadata::new(uhlc.new_timestamp(), type_info); - let output_id = OutputId(node_id.clone(), DataId::from("op/logs".to_string())); + let output_id = OutputId( + node_id.clone(), + DataId::from(stdout_output_name.to_string()), + ); let event = DoraEvent::Logs { dataflow_id, output_id, diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 57ced26a..9fae4885 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -164,6 +164,15 @@ pub struct ResolvedNode { pub kind: CoreNodeKind, } +impl ResolvedNode { + pub fn send_stdout_as(&self) -> Option<&str> { + match &self.kind { + CoreNodeKind::Runtime(_) => None, // todo: add support for operator-level stdout capture + CoreNodeKind::Custom(n) => n.send_stdout_as.as_deref(), + } + } +} + #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ResolvedDeploy { pub machine: String, @@ -275,6 +284,8 @@ pub struct CustomNode { pub envs: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, #[serde(flatten)] pub run_config: NodeRunConfig, From aa81da014227e3aecbb026aab48f9bd373f17641 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 28 Feb 2024 12:33:21 +0100 Subject: [PATCH 5/8] Add possibility to send stdout for operators and add warnings when there is multiple operators --- binaries/daemon/src/lib.rs | 6 ++++- binaries/daemon/src/spawn.rs | 2 +- .../python-operator-dataflow/dataflow.yml | 3 ++- libraries/core/src/descriptor/mod.rs | 27 ++++++++++++++++--- 4 files changed, 32 insertions(+), 6 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 10c3c5f8..702373ba 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -985,7 +985,11 @@ impl Daemon { }; let Some(subscribers) = dataflow.mappings.get(&output_id) else { - tracing::warn!("No subscribers found for {:?} in {:?}", output_id, dataflow.mappings); + tracing::warn!( + "No subscribers found for {:?} in {:?}", + output_id, + dataflow.mappings + ); return Ok(RunStatus::Continue); }; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 765da3a4..9d736b5a 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -58,7 +58,7 @@ pub async fn spawn_node( clock.clone(), ) .await?; - let send_stdout_to = node.send_stdout_as().map(ToOwned::to_owned); + let send_stdout_to = node.send_stdout_as(); let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 27d27426..c7500944 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -10,11 +10,12 @@ nodes: - id: object_detection operator: python: object_detection.py + send_stdout_as: stdout inputs: image: webcam/image outputs: - bbox - - logs + - stdout - id: plot operator: diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 9fae4885..7f114e59 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -10,6 +10,7 @@ use std::{ fmt, path::{Path, PathBuf}, }; +use tracing::warn; pub use visualize::collect_dora_timers; mod validate; @@ -165,10 +166,28 @@ pub struct ResolvedNode { } impl ResolvedNode { - pub fn send_stdout_as(&self) -> Option<&str> { + pub fn send_stdout_as(&self) -> Option { match &self.kind { - CoreNodeKind::Runtime(_) => None, // todo: add support for operator-level stdout capture - CoreNodeKind::Custom(n) => n.send_stdout_as.as_deref(), + // TODO: Split stdout between operators + CoreNodeKind::Runtime(n) => { + let count = n + .operators + .iter() + .filter(|op| op.config.send_stdout_as.is_some()) + .count(); + if count == 1 && n.operators.len() > 1 { + warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") + } else if count > 1 { + warn!("More than one `send_stdout_as` operators for a runtime node. Selecting the first stdout operator.") + } + n.operators.iter().find_map(|op| { + op.config + .send_stdout_as + .clone() + .map(|stdout| format!("{}/{}", op.id, stdout)) + }) + } + CoreNodeKind::Custom(n) => n.send_stdout_as.clone(), } } } @@ -233,6 +252,8 @@ pub struct OperatorConfig { #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, } #[derive(Debug, Serialize, Deserialize, Clone)] From 1e659c6c11723d3ee18b3399360acd12c3a6e3c7 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 28 Feb 2024 14:43:28 +0100 Subject: [PATCH 6/8] Replace `logs` with `stdout` --- examples/python-operator-dataflow/dataflow.yml | 2 +- examples/python-operator-dataflow/plot.py | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index c7500944..ca069f7d 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -23,4 +23,4 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox - object_detection_logs: object_detection/logs + object_detection_stdout: object_detection/stdout diff --git a/examples/python-operator-dataflow/plot.py b/examples/python-operator-dataflow/plot.py index 923ade0e..f7f2ddd0 100755 --- a/examples/python-operator-dataflow/plot.py +++ b/examples/python-operator-dataflow/plot.py @@ -29,7 +29,7 @@ class Operator: self.bboxs = [] self.bounding_box_messages = 0 self.image_messages = 0 - self.object_detection_logs = [] + self.object_detection_stdout = [] def on_event( self, @@ -70,11 +70,11 @@ class Operator: self.image_messages += 1 print("received " + str(self.image_messages) + " images") - elif dora_input["id"] == "object_detection_logs": - logs = dora_input["value"][0].as_py() - self.object_detection_logs += [logs] - ## Only keep last 10 logs - self.object_detection_logs = self.object_detection_logs[-10:] + elif dora_input["id"] == "object_detection_stdout": + stdout = dora_input["value"][0].as_py() + self.object_detection_stdout += [stdout] + ## Only keep last 10 stdout + self.object_detection_stdout = self.object_detection_stdout[-10:] return DoraStatus.CONTINUE elif dora_input["id"] == "bbox" and len(self.image) != 0: @@ -115,7 +115,7 @@ class Operator: 1, ) - for i, log in enumerate(self.object_detection_logs): + for i, log in enumerate(self.object_detection_stdout): cv2.putText( self.image, log, From 12af6a100796c377f67ebe052a596834a1dc7988 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 29 Feb 2024 14:21:31 +0100 Subject: [PATCH 7/8] Removing unused imports --- binaries/daemon/src/lib.rs | 10 +--------- binaries/daemon/src/spawn.rs | 2 +- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 702373ba..a66ec83c 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,6 +1,5 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; -use dora_arrow_convert::{ArrowData, IntoArrow}; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; @@ -42,7 +41,7 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::error; -use uuid::{Timestamp, Uuid}; +use uuid::Uuid; mod coordinator; mod inter_daemon; @@ -1202,13 +1201,6 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap { } } -fn node_outputs(node: &ResolvedNode) -> BTreeSet { - match &node.kind { - CoreNodeKind::Custom(n) => n.run_config.outputs.clone(), - CoreNodeKind::Runtime(n) => runtime_node_outputs(n), - } -} - fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap { n.operators .iter() diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 9d736b5a..d64db99f 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,5 +1,5 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, node_outputs, runtime_node_inputs, + log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, }; use aligned_vec::{AVec, ConstAlign}; From b32a7e49240080a96505c3c14a20672f31ffb0f1 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 29 Feb 2024 14:30:56 +0100 Subject: [PATCH 8/8] Make `send_stdout_as` fail if there is more than one entry for a runtime node --- binaries/daemon/src/spawn.rs | 4 +++- libraries/core/src/descriptor/mod.rs | 12 ++++++------ libraries/core/src/descriptor/validate.rs | 6 ++++++ 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index d64db99f..7a204bd4 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -58,7 +58,9 @@ pub async fn spawn_node( clock.clone(), ) .await?; - let send_stdout_to = node.send_stdout_as(); + let send_stdout_to = node + .send_stdout_as() + .context("Could not resolve `send_stdout_as` configuration")?; let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 7f114e59..90ad74e6 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,7 +1,7 @@ use crate::config::{ CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId, }; -use eyre::{bail, Context, Result}; +use eyre::{bail, eyre, Context, Result}; use serde::{Deserialize, Serialize}; use serde_with_expand_env::with_expand_envs; use std::{ @@ -166,7 +166,7 @@ pub struct ResolvedNode { } impl ResolvedNode { - pub fn send_stdout_as(&self) -> Option { + pub fn send_stdout_as(&self) -> Result> { match &self.kind { // TODO: Split stdout between operators CoreNodeKind::Runtime(n) => { @@ -178,16 +178,16 @@ impl ResolvedNode { if count == 1 && n.operators.len() > 1 { warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") } else if count > 1 { - warn!("More than one `send_stdout_as` operators for a runtime node. Selecting the first stdout operator.") + return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime.")); } - n.operators.iter().find_map(|op| { + Ok(n.operators.iter().find_map(|op| { op.config .send_stdout_as .clone() .map(|stdout| format!("{}/{}", op.id, stdout)) - }) + })) } - CoreNodeKind::Custom(n) => n.send_stdout_as.clone(), + CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()), } } } diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 93063de9..316d3c58 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -93,6 +93,12 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result }; } + // Check that nodes can resolve `send_stdout_as` + for node in &nodes { + node.send_stdout_as() + .context("Could not resolve `send_stdout_as` configuration")?; + } + if has_python_operator { check_python_runtime()?; }