Browse Source

Merge pull request #388 from dora-rs/output-logs

Add option to send `stdout` as node/operator output
tags/v0.3.3-rc1
Haixuan Xavier Tao GitHub 1 year ago
parent
commit
2615b04d2e
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
13 changed files with 180 additions and 21 deletions
  1. +2
    -0
      Cargo.lock
  2. +1
    -1
      apis/python/operator/src/lib.rs
  3. +5
    -8
      apis/rust/node/src/node/arrow_utils.rs
  4. +1
    -1
      apis/rust/node/src/node/mod.rs
  5. +2
    -0
      binaries/daemon/Cargo.toml
  6. +56
    -2
      binaries/daemon/src/lib.rs
  7. +46
    -3
      binaries/daemon/src/spawn.rs
  8. +1
    -1
      binaries/runtime/src/operator/python.rs
  9. +1
    -4
      binaries/runtime/src/operator/shared_lib.rs
  10. +3
    -0
      examples/python-operator-dataflow/dataflow.yml
  11. +23
    -0
      examples/python-operator-dataflow/plot.py
  12. +33
    -1
      libraries/core/src/descriptor/mod.rs
  13. +6
    -0
      libraries/core/src/descriptor/validate.rs

+ 2
- 0
Cargo.lock View File

@@ -1476,8 +1476,10 @@ dependencies = [
"async-trait",
"bincode",
"ctrlc",
"dora-arrow-convert",
"dora-core",
"dora-download",
"dora-node-api",
"dora-tracing",
"eyre",
"flume",


+ 1
- 1
apis/python/operator/src/lib.rs View File

@@ -171,7 +171,7 @@ mod tests {
let size = required_data_size(arrow_array);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, size);

let info = copy_array_into_sample(&mut sample, arrow_array)?;
let info = copy_array_into_sample(&mut sample, arrow_array);

let serialized_deserialized_arrow_array = RawData::Vec(sample)
.into_arrow_array(&info)


+ 5
- 8
apis/rust/node/src/node/arrow_utils.rs View File

@@ -20,10 +20,7 @@ fn required_data_size_inner(array: &ArrayData, next_offset: &mut usize) {
}
}

pub fn copy_array_into_sample(
target_buffer: &mut [u8],
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
pub fn copy_array_into_sample(target_buffer: &mut [u8], arrow_array: &ArrayData) -> ArrowTypeInfo {
let mut next_offset = 0;
copy_array_into_sample_inner(target_buffer, &mut next_offset, arrow_array)
}
@@ -32,7 +29,7 @@ fn copy_array_into_sample_inner(
target_buffer: &mut [u8],
next_offset: &mut usize,
arrow_array: &ArrayData,
) -> eyre::Result<ArrowTypeInfo> {
) -> ArrowTypeInfo {
let mut buffer_offsets = Vec::new();
let layout = arrow::array::layout(arrow_array.data_type());
for (buffer, spec) in arrow_array.buffers().iter().zip(&layout.buffers) {
@@ -58,11 +55,11 @@ fn copy_array_into_sample_inner(

let mut child_data = Vec::new();
for child in arrow_array.child_data() {
let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child)?;
let child_type_info = copy_array_into_sample_inner(target_buffer, next_offset, child);
child_data.push(child_type_info);
}

Ok(ArrowTypeInfo {
ArrowTypeInfo {
data_type: arrow_array.data_type().clone(),
len: arrow_array.len(),
null_count: arrow_array.null_count(),
@@ -70,5 +67,5 @@ fn copy_array_into_sample_inner(
offset: arrow_array.offset(),
buffer_offsets,
child_data,
})
}
}

+ 1
- 1
apis/rust/node/src/node/mod.rs View File

@@ -153,7 +153,7 @@ impl DoraNode {
let total_len = required_data_size(&arrow_array);

let mut sample = self.allocate_data_sample(total_len)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array);

self.send_output_sample(output_id, type_info, parameters, Some(sample))
.wrap_err("failed to send output")?;


+ 2
- 0
binaries/daemon/Cargo.toml View File

@@ -27,6 +27,8 @@ dora-core = { workspace = true }
flume = "0.10.14"
dora-download = { workspace = true }
dora-tracing = { workspace = true, optional = true }
dora-arrow-convert = { workspace = true }
dora-node-api = { workspace = true }
serde_yaml = "0.8.23"
uuid = { version = "1.1.2", features = ["v4"] }
futures = "0.3.25"


+ 56
- 2
binaries/daemon/src/lib.rs View File

@@ -4,7 +4,7 @@ use dora_core::config::{Input, OperatorId};
use dora_core::coordinator_messages::CoordinatorRequest;
use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped};
use dora_core::message::uhlc::{self, HLC};
use dora_core::message::{ArrowTypeInfo, MetadataParameters};
use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters};
use dora_core::{
config::{DataId, InputMapping, NodeId},
coordinator_messages::DaemonEvent,
@@ -14,6 +14,7 @@ use dora_core::{
},
descriptor::{CoreNodeKind, Descriptor, ResolvedNode},
};

use eyre::{bail, eyre, Context, ContextCompat};
use futures::{future, stream, FutureExt, TryFutureExt};
use futures_concurrency::stream::Merge;
@@ -971,6 +972,53 @@ impl Daemon {
dataflow.subscribe_channels.remove(id);
}
}
DoraEvent::Logs {
dataflow_id,
output_id,
message,
metadata,
} => {
let Some(dataflow) = self.running.get_mut(&dataflow_id) else {
tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`");
return Ok(RunStatus::Continue);
};

let Some(subscribers) = dataflow.mappings.get(&output_id) else {
tracing::warn!(
"No subscribers found for {:?} in {:?}",
output_id,
dataflow.mappings
);
return Ok(RunStatus::Continue);
};

let mut closed = Vec::new();
for (receiver_id, input_id) in subscribers {
let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else {
tracing::warn!("No subscriber channel found for {:?}", output_id);
continue;
};

let send_result = send_with_timestamp(
channel,
daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: Some(message.clone()),
},
&self.clock,
);
match send_result {
Ok(()) => {}
Err(_) => {
closed.push(receiver_id);
}
}
}
for id in closed {
dataflow.subscribe_channels.remove(id);
}
}
DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
@@ -1397,7 +1445,7 @@ impl RunningDataflow {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct OutputId(NodeId, DataId);
pub struct OutputId(NodeId, DataId);
type InputId = (NodeId, DataId);

struct DropTokenInformation {
@@ -1465,6 +1513,12 @@ pub enum DoraEvent {
interval: Duration,
metadata: dora_core::message::Metadata,
},
Logs {
dataflow_id: DataflowId,
output_id: OutputId,
message: DataMessage,
metadata: Metadata,
},
SpawnedNodeResult {
dataflow_id: DataflowId,
node_id: NodeId,


+ 46
- 3
binaries/daemon/src/spawn.rs View File

@@ -1,10 +1,12 @@
use crate::{
log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs,
runtime_node_outputs, DoraEvent, Event, NodeExitStatus,
runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId,
};
use aligned_vec::{AVec, ConstAlign};
use dora_arrow_convert::IntoArrow;
use dora_core::{
config::NodeRunConfig,
daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped},
config::{DataId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE,
},
@@ -12,6 +14,11 @@ use dora_core::{
message::uhlc::HLC,
};
use dora_download::download_file;
use dora_node_api::{
arrow::array::ArrayData,
arrow_utils::{copy_array_into_sample, required_data_size},
Metadata,
};
use eyre::WrapErr;
use std::{
env::{consts::EXE_EXTENSION, temp_dir},
@@ -51,6 +58,9 @@ pub async fn spawn_node(
clock.clone(),
)
.await?;
let send_stdout_to = node
.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;

let mut child = match node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
@@ -265,6 +275,8 @@ pub async fn spawn_node(
// Stderr listener stream
let stderr_tx = tx.clone();
let node_id = node.id.clone();
let uhlc = clock.clone();
let daemon_tx_log = daemon_tx.clone();
tokio::spawn(async move {
let mut buffer = String::new();
let mut finished = false;
@@ -317,9 +329,40 @@ pub async fn spawn_node(
let _ = daemon_tx.send(event).await;
});

let node_id = node.id.clone();
// Log to file stream.
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
// If log is an output, we're sending the logs to the dataflow
if let Some(stdout_output_name) = &send_stdout_to {
// Convert logs to DataMessage
let array = message.into_arrow();

let array: ArrayData = array.into();
let total_len = required_data_size(&array);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, total_len);

let type_info = copy_array_into_sample(&mut sample, &array);

let metadata = Metadata::new(uhlc.new_timestamp(), type_info);
let output_id = OutputId(
node_id.clone(),
DataId::from(stdout_output_name.to_string()),
);
let event = DoraEvent::Logs {
dataflow_id,
output_id,
metadata,
message: DataMessage::Vec(sample),
}
.into();
let event = Timestamped {
inner: event,
timestamp: uhlc.new_timestamp(),
};
let _ = daemon_tx_log.send(event).await;
}

let _ = file
.write_all(message.as_bytes())
.await


+ 1
- 1
binaries/runtime/src/operator/python.rs View File

@@ -362,7 +362,7 @@ mod callback_impl {
let total_len = required_data_size(&arrow_array);
let mut sample = allocate_sample(total_len)?;

let type_info = copy_array_into_sample(&mut sample, &arrow_array)?;
let type_info = copy_array_into_sample(&mut sample, &arrow_array);

(sample, type_info)
} else {


+ 1
- 4
binaries/runtime/src/operator/shared_lib.rs View File

@@ -132,10 +132,7 @@ impl<'lib> SharedLibraryOperator<'lib> {
let total_len = required_data_size(&arrow_array);
let mut sample: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, total_len);

let type_info = match copy_array_into_sample(&mut sample, &arrow_array) {
Ok(t) => t,
Err(err) => return DoraResult::from_error(err.to_string()),
};
let type_info = copy_array_into_sample(&mut sample, &arrow_array);

let event = OperatorEvent::Output {
output_id: DataId::from(String::from(output_id)),


+ 3
- 0
examples/python-operator-dataflow/dataflow.yml View File

@@ -10,10 +10,12 @@ nodes:
- id: object_detection
operator:
python: object_detection.py
send_stdout_as: stdout
inputs:
image: webcam/image
outputs:
- bbox
- stdout

- id: plot
operator:
@@ -21,3 +23,4 @@ nodes:
inputs:
image: webcam/image
bbox: object_detection/bbox
object_detection_stdout: object_detection/stdout

+ 23
- 0
examples/python-operator-dataflow/plot.py View File

@@ -29,6 +29,7 @@ class Operator:
self.bboxs = []
self.bounding_box_messages = 0
self.image_messages = 0
self.object_detection_stdout = []

def on_event(
self,
@@ -69,12 +70,22 @@ class Operator:
self.image_messages += 1
print("received " + str(self.image_messages) + " images")

elif dora_input["id"] == "object_detection_stdout":
stdout = dora_input["value"][0].as_py()
self.object_detection_stdout += [stdout]
## Only keep last 10 stdout
self.object_detection_stdout = self.object_detection_stdout[-10:]
return DoraStatus.CONTINUE

elif dora_input["id"] == "bbox" and len(self.image) != 0:
bboxs = dora_input["value"].to_numpy()
self.bboxs = np.reshape(bboxs, (-1, 6))

self.bounding_box_messages += 1
print("received " + str(self.bounding_box_messages) + " bounding boxes")
return DoraStatus.CONTINUE
else:
return DoraStatus.CONTINUE

for bbox in self.bboxs:
[
@@ -104,6 +115,18 @@ class Operator:
1,
)

for i, log in enumerate(self.object_detection_stdout):
cv2.putText(
self.image,
log,
(10, 10 + 20 * i),
font,
0.5,
(0, 255, 0),
2,
1,
)

if CI != "true":
cv2.imshow("frame", self.image)
if cv2.waitKey(1) & 0xFF == ord("q"):


+ 33
- 1
libraries/core/src/descriptor/mod.rs View File

@@ -1,7 +1,7 @@
use crate::config::{
CommunicationConfig, DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use eyre::{bail, Context, Result};
use eyre::{bail, eyre, Context, Result};
use serde::{Deserialize, Serialize};
use serde_with_expand_env::with_expand_envs;
use std::{
@@ -10,6 +10,7 @@ use std::{
fmt,
path::{Path, PathBuf},
};
use tracing::warn;
pub use visualize::collect_dora_timers;

mod validate;
@@ -164,6 +165,33 @@ pub struct ResolvedNode {
pub kind: CoreNodeKind,
}

impl ResolvedNode {
pub fn send_stdout_as(&self) -> Result<Option<String>> {
match &self.kind {
// TODO: Split stdout between operators
CoreNodeKind::Runtime(n) => {
let count = n
.operators
.iter()
.filter(|op| op.config.send_stdout_as.is_some())
.count();
if count == 1 && n.operators.len() > 1 {
warn!("All stdout from all operators of a runtime are going to be sent in the selected `send_stdout_as` operator.")
} else if count > 1 {
return Err(eyre!("More than one `send_stdout_as` entries for a runtime node. Please only use one `send_stdout_as` per runtime."));
}
Ok(n.operators.iter().find_map(|op| {
op.config
.send_stdout_as
.clone()
.map(|stdout| format!("{}/{}", op.id, stdout))
}))
}
CoreNodeKind::Custom(n) => Ok(n.send_stdout_as.clone()),
}
}
}

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResolvedDeploy {
pub machine: String,
@@ -224,6 +252,8 @@ pub struct OperatorConfig {

#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -275,6 +305,8 @@ pub struct CustomNode {
pub envs: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub build: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub send_stdout_as: Option<String>,

#[serde(flatten)]
pub run_config: NodeRunConfig,


+ 6
- 0
libraries/core/src/descriptor/validate.rs View File

@@ -93,6 +93,12 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result
};
}

// Check that nodes can resolve `send_stdout_as`
for node in &nodes {
node.send_stdout_as()
.context("Could not resolve `send_stdout_as` configuration")?;
}

if has_python_operator {
check_python_runtime()?;
}


Loading…
Cancel
Save