diff --git a/Cargo.lock b/Cargo.lock index 031ee07f..0b8b121e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,8 +1476,10 @@ dependencies = [ "async-trait", "bincode", "ctrlc", + "dora-arrow-convert", "dora-core", "dora-download", + "dora-node-api", "dora-tracing", "eyre", "flume", diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 83745b9c..f4712e92 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -171,7 +171,7 @@ mod tests { let size = required_data_size(arrow_array); let mut sample: AVec> = AVec::__from_elem(128, 0, size); - let info = copy_array_into_sample(&mut sample, arrow_array)?; + let info = copy_array_into_sample(&mut sample, arrow_array); let serialized_deserialized_arrow_array = RawData::Vec(sample) .into_arrow_array(&info) diff --git a/apis/rust/node/src/node/arrow_utils.rs b/apis/rust/node/src/node/arrow_utils.rs index da14489a..8deb3ca2 100644 --- a/apis/rust/node/src/node/arrow_utils.rs +++ b/apis/rust/node/src/node/arrow_utils.rs @@ -20,10 +20,7 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) { } } -pub fn copy_array_into_sample( - target_buffer: &mut [u8], - arrow_array: &ArrayData, -) -> eyre::Result { +pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo { let mut next_offset = 0; copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array) } @@ -32,7 +29,7 @@ fn copy_array_into_sample_inner( target_buffer: &mut [u8], next_offset: &mut usize, arrow_array: &ArrayData, -) -> eyre::Result { +) -> ArrowTypeInfo { let mut buffer_offsets = Vec::new(); let layout = arrow::array::layout(arrow_array.data_type()); for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) { @@ -58,11 +55,11 @@ fn copy_array_into_sample_inner( let mut child_data = Vec::new(); for child in arrow_array.child_data() { - let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?; + let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child); child_data.push(child_type_info); } - Ok(ArrowTypeInfo { + ArrowTypeInfo { data_type: arrow_array.data_type().clone(), len: arrow_array.len(), null_count: arrow_array.null_count(), @@ -70,5 +67,5 @@ fn copy_array_into_sample_inner( offset: arrow_array.offset(), buffer_offsets, child_data, - }) + } } diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 31725c89..d8e159b2 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -153,7 +153,7 @@ impl DoraNode { let total_len = required_data_size(&arrow_array); let mut sample = self.allocate_data_sample(total_len)?; - let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; + let type_info = copy_array_into_sample(&mut sample, &arrow_array); self.send_output_sample(output_id, type_info, parameters, Some(sample)) .wrap_err("failed to send output")?; diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 3577ed7d..2d1690a8 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -27,6 +27,8 @@ dora-core = { workspace = true } flume = "0.10.14" dora-download = { workspace = true } dora-tracing = { workspace = true, optional = true } +dora-arrow-convert = { workspace = true } +dora-node-api = { workspace = true } serde_yaml = "0.8.23" uuid = { version = "1.1.2", features = ["v4"] } futures = "0.3.25" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1515d343..a66ec83c 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -4,7 +4,7 @@ use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; use dora_core::message::uhlc::{self, HLC}; -use dora_core::message::{ArrowTypeInfo, MetadataParameters}; +use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -14,6 +14,7 @@ use dora_core::{ }, descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; + use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; @@ -971,6 +972,53 @@ impl Daemon { dataflow.subscribe_channels.remove(id); } } + DoraEvent::Logs { + dataflow_id, + output_id, + message, + metadata, + } => { + let Some(dataflow) = self.running.get_mut(&dataflow_id) else { + tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`"); + return Ok(RunStatus::Continue); + }; + + let Some(subscribers) = dataflow.mappings.get(&output_id) else { + tracing::warn!( + "No subscribers found for {:?} in {:?}", + output_id, + dataflow.mappings + ); + return Ok(RunStatus::Continue); + }; + + let mut closed = Vec::new(); + for (receiver_id, input_id) in subscribers { + let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else { + tracing::warn!("No subscriber channel found for {:?}", output_id); + continue; + }; + + let send_result = send_with_timestamp( + channel, + daemon_messages::NodeEvent::Input { + id: input_id.clone(), + metadata: metadata.clone(), + data: Some(message.clone()), + }, + &self.clock, + ); + match send_result { + Ok(()) => {} + Err(_) => { + closed.push(receiver_id); + } + } + } + for id in closed { + dataflow.subscribe_channels.remove(id); + } + } DoraEvent::SpawnedNodeResult { dataflow_id, node_id, @@ -1397,7 +1445,7 @@ impl RunningDataflow { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct OutputId(NodeId, DataId); +pub struct OutputId(NodeId, DataId); type InputId = (NodeId, DataId); struct DropTokenInformation { @@ -1465,6 +1513,12 @@ pub enum DoraEvent { interval: Duration, metadata: dora_core::message::Metadata, }, + Logs { + dataflow_id: DataflowId, + output_id: OutputId, + message: DataMessage, + metadata: Metadata, + }, SpawnedNodeResult { dataflow_id: DataflowId, node_id: NodeId, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index ccefdbed..7a204bd4 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,10 +1,12 @@ use crate::{ log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, - runtime_node_outputs, DoraEvent, Event, NodeExitStatus, + runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, }; +use aligned_vec::{AVec, ConstAlign}; +use dora_arrow_convert::IntoArrow; use dora_core::{ - config::NodeRunConfig, - daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped}, + config::{DataId, NodeRunConfig}, + daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, }, @@ -12,6 +14,11 @@ use dora_core::{ message::uhlc::HLC, }; use dora_download::download_file; +use dora_node_api::{ + arrow::array::ArrayData, + arrow_utils::{copy_array_into_sample, required_data_size}, + Metadata, +}; use eyre::WrapErr; use std::{ env::{consts::EXE_EXTENSION, temp_dir}, @@ -51,6 +58,9 @@ pub async fn spawn_node( clock.clone(), ) .await?; + let send_stdout_to = node + .send_stdout_as() + .context("Could not resolve `send_stdout_as` configuration")?; let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { @@ -265,6 +275,8 @@ pub async fn spawn_node( // Stderr listener stream let stderr_tx = tx.clone(); let node_id = node.id.clone(); + let uhlc = clock.clone(); + let daemon_tx_log = daemon_tx.clone(); tokio::spawn(async move { let mut buffer = String::new(); let mut finished = false; @@ -317,9 +329,40 @@ pub async fn spawn_node( let _ = daemon_tx.send(event).await; }); + let node_id = node.id.clone(); // Log to file stream. tokio::spawn(async move { while let Some(message) = rx.recv().await { + // If log is an output, we're sending the logs to the dataflow + if let Some(stdout_output_name) = &send_stdout_to { + // Convert logs to DataMessage + let array = message.into_arrow(); + + let array: ArrayData = array.into(); + let total_len = required_data_size(&array); + let mut sample: AVec> = AVec::__from_elem(128, 0, total_len); + + let type_info = copy_array_into_sample(&mut sample, &array); + + let metadata = Metadata::new(uhlc.new_timestamp(), type_info); + let output_id = OutputId( + node_id.clone(), + DataId::from(stdout_output_name.to_string()), + ); + let event = DoraEvent::Logs { + dataflow_id, + output_id, + metadata, + message: DataMessage::Vec(sample), + } + .into(); + let event = Timestamped { + inner: event, + timestamp: uhlc.new_timestamp(), + }; + let _ = daemon_tx_log.send(event).await; + } + let _ = file .write_all(message.as_bytes()) .await diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 5b2b12df..07a47c42 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -362,7 +362,7 @@ mod callback_impl { let total_len = required_data_size(&arrow_array); let mut sample = allocate_sample(total_len)?; - let type_info = copy_array_into_sample(&mut sample, &arrow_array)?; + let type_info = copy_array_into_sample(&mut sample, &arrow_array); (sample, type_info) } else { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 495c1b6c..811c3cd0 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -132,10 +132,7 @@ impl<'lib> SharedLibraryOperator<'lib> { let total_len = required_data_size(&arrow_array); let mut sample: AVec> = AVec::__from_elem(128, 0, total_len); - let type_info = match copy_array_into_sample(&mut sample, &arrow_array) { - Ok(t) => t, - Err(err) => return DoraResult::from_error(err.to_string()), - }; + let type_info = copy_array_into_sample(&mut sample, &arrow_array); let event = OperatorEvent::Output { output_id: DataId::from(String::from(output_id)), diff --git a/examples/python-operator-dataflow/dataflow.yml b/examples/python-operator-dataflow/dataflow.yml index 92bf5f2b..ca069f7d 100644 --- a/examples/python-operator-dataflow/dataflow.yml +++ b/examples/python-operator-dataflow/dataflow.yml @@ -10,10 +10,12 @@ nodes: - id: object_detection operator: python: object_detection.py + send_stdout_as: stdout inputs: image: webcam/image outputs: - bbox + - stdout - id: plot operator: @@ -21,3 +23,4 @@ nodes: inputs: image: webcam/image bbox: object_detection/bbox + object_detection_stdout: object_detection/stdout diff --git a/examples/python-operator-dataflow/plot.py b/examples/python-operator-dataflow/plot.py index cd7a7200..f7f2ddd0 100755 --- a/examples/python-operator-dataflow/plot.py +++ b/examples/python-operator-dataflow/plot.py @@ -29,6 +29,7 @@ class Operator: self.bboxs = [] self.bounding_box_messages = 0 self.image_messages = 0 + self.object_detection_stdout = [] def on_event( self, @@ -69,12 +70,22 @@ class Operator: self.image_messages += 1 print("received " + str(self.image_messages) + " images") + elif dora_input["id"] == "object_detection_stdout": + stdout = dora_input["value"][0].as_py() + self.object_detection_stdout += [stdout] + ## Only keep last 10 stdout + self.object_detection_stdout = self.object_detection_stdout[-10:] + return DoraStatus.CONTINUE + elif dora_input["id"] == "bbox" and len(self.image) != 0: bboxs = dora_input["value"].to_numpy() self.bboxs = np.reshape(bboxs, (-1, 6)) self.bounding_box_messages += 1 print("received " + str(self.bounding_box_messages) + " bounding boxes") + return DoraStatus.CONTINUE + else: + return DoraStatus.CONTINUE for bbox in self.bboxs: [ @@ -104,6 +115,18 @@ class Operator: 1, ) + for i, log in enumerate(self.object_detection_stdout): + cv2.putText( + self.image, + log, + (10, 10 + 20 * i), + font, + 0.5, + (0, 255, 0), + 2, + 1, + ) + if CI != "true": cv2.imshow("frame", self.image) if cv2.waitKey(1) & 0xFF == ord("q"): diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 57ced26a..90ad74e6 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,7 +1,7 @@ use crate::config::{ CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId, }; -use eyre::{bail, Context, Result}; +use eyre::{bail, eyre, Context, Result}; use serde::{Deserialize, Serialize}; use serde_with_expand_env::with_expand_envs; use std::{ @@ -10,6 +10,7 @@ use std::{ fmt, path::{Path, PathBuf}, }; +use tracing::warn; pub use visualize::collect_dora_timers; mod validate; @@ -164,6 +165,33 @@ pub struct ResolvedNode { pub kind: CoreNodeKind, } +impl ResolvedNode { + pub fn send_stdout_as(&self) -> Result> { + match &self.kind { + // TODO: Split stdout between operators + CoreNodeKind::Runtime(n) => { + let count = n + .operators + .iter() + .filter(|op| op.config.send_stdout_as.is_some()) + .count(); + if count == 1 && n.operators.len() > 1 { + warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.") + } else if count > 1 { + return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime.")); + } + Ok(n.operators.iter().find_map(|op| { + op.config + .send_stdout_as + .clone() + .map(|stdout| format!("{}/{}", op.id, stdout)) + })) + } + CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()), + } + } +} + #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ResolvedDeploy { pub machine: String, @@ -224,6 +252,8 @@ pub struct OperatorConfig { #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -275,6 +305,8 @@ pub struct CustomNode { pub envs: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] pub build: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub send_stdout_as: Option, #[serde(flatten)] pub run_config: NodeRunConfig, diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index 93063de9..316d3c58 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -93,6 +93,12 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result }; } + // Check that nodes can resolve `send_stdout_as` + for node in &nodes { + node.send_stdout_as() + .context("Could not resolve `send_stdout_as` configuration")?; + } + if has_python_operator { check_python_runtime()?; }