From 91bd7daf506d511d7a525e48215b5bea02a9c331 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 1 Dec 2023 19:30:34 +0100 Subject: [PATCH] Adding log event --- Cargo.lock | 2 ++ binaries/daemon/Cargo.toml | 2 ++ binaries/daemon/src/lib.rs | 64 ++++++++++++++++++++++++++++++++++-- binaries/daemon/src/spawn.rs | 48 ++++++++++++++++++++++++--- 4 files changed, 109 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a2f623d0..fdc266dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1476,8 +1476,10 @@ dependencies = [ "async-trait", "bincode", "ctrlc", + "dora-arrow-convert", "dora-core", "dora-download", + "dora-node-api", "dora-tracing", "eyre", "flume", diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index 3577ed7d..2d1690a8 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -27,6 +27,8 @@ dora-core = { workspace = true } flume = "0.10.14" dora-download = { workspace = true } dora-tracing = { workspace = true, optional = true } +dora-arrow-convert = { workspace = true } +dora-node-api = { workspace = true } serde_yaml = "0.8.23" uuid = { version = "1.1.2", features = ["v4"] } futures = "0.3.25" diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 1515d343..10c3c5f8 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -1,10 +1,11 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; +use dora_arrow_convert::{ArrowData, IntoArrow}; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; use dora_core::message::uhlc::{self, HLC}; -use dora_core::message::{ArrowTypeInfo, MetadataParameters}; +use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -14,6 +15,7 @@ use dora_core::{ }, descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; + use eyre::{bail, eyre, Context, ContextCompat}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; @@ -40,7 +42,7 @@ use tokio::sync::oneshot::Sender; use tokio::sync::{mpsc, oneshot}; use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; use tracing::error; -use uuid::Uuid; +use uuid::{Timestamp, Uuid}; mod coordinator; mod inter_daemon; @@ -971,6 +973,49 @@ impl Daemon { dataflow.subscribe_channels.remove(id); } } + DoraEvent::Logs { + dataflow_id, + output_id, + message, + metadata, + } => { + let Some(dataflow) = self.running.get_mut(&dataflow_id) else { + tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`"); + return Ok(RunStatus::Continue); + }; + + let Some(subscribers) = dataflow.mappings.get(&output_id) else { + tracing::warn!("No subscribers found for {:?} in {:?}", output_id, dataflow.mappings); + return Ok(RunStatus::Continue); + }; + + let mut closed = Vec::new(); + for (receiver_id, input_id) in subscribers { + let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else { + tracing::warn!("No subscriber channel found for {:?}", output_id); + continue; + }; + + let send_result = send_with_timestamp( + channel, + daemon_messages::NodeEvent::Input { + id: input_id.clone(), + metadata: metadata.clone(), + data: Some(message.clone()), + }, + &self.clock, + ); + match send_result { + Ok(()) => {} + Err(_) => { + closed.push(receiver_id); + } + } + } + for id in closed { + dataflow.subscribe_channels.remove(id); + } + } DoraEvent::SpawnedNodeResult { dataflow_id, node_id, @@ -1153,6 +1198,13 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap { } } +fn node_outputs(node: &ResolvedNode) -> BTreeSet { + match &node.kind { + CoreNodeKind::Custom(n) => n.run_config.outputs.clone(), + CoreNodeKind::Runtime(n) => runtime_node_outputs(n), + } +} + fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap { n.operators .iter() @@ -1397,7 +1449,7 @@ impl RunningDataflow { } #[derive(Debug, Clone, PartialEq, Eq, Hash)] -struct OutputId(NodeId, DataId); +pub struct OutputId(NodeId, DataId); type InputId = (NodeId, DataId); struct DropTokenInformation { @@ -1465,6 +1517,12 @@ pub enum DoraEvent { interval: Duration, metadata: dora_core::message::Metadata, }, + Logs { + dataflow_id: DataflowId, + output_id: OutputId, + message: DataMessage, + metadata: Metadata, + }, SpawnedNodeResult { dataflow_id: DataflowId, node_id: NodeId, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index ccefdbed..95411637 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,10 +1,12 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, - runtime_node_outputs, DoraEvent, Event, NodeExitStatus, + log, node_communication::spawn_listener_loop, node_inputs, node_outputs, runtime_node_inputs, + runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, }; +use aligned_vec::{AVec, ConstAlign}; +use dora_arrow_convert::IntoArrow; use dora_core::{ - config::NodeRunConfig, - daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped}, + config::{DataId, NodeRunConfig}, + daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, }, @@ -12,6 +14,11 @@ use dora_core::{ message::uhlc::HLC, }; use dora_download::download_file; +use dora_node_api::{ + arrow::array::ArrayData, + arrow_utils::{copy_array_into_sample, required_data_size}, + Metadata, +}; use eyre::WrapErr; use std::{ env::{consts::EXE_EXTENSION, temp_dir}, @@ -51,6 +58,8 @@ pub async fn spawn_node( clock.clone(), ) .await?; + let outputs = node_outputs(&node); + let log_output = outputs.contains(&DataId::from("op/logs".to_string())); let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { @@ -265,6 +274,8 @@ pub async fn spawn_node( // Stderr listener stream let stderr_tx = tx.clone(); let node_id = node.id.clone(); + let uhlc = clock.clone(); + let daemon_tx_log = daemon_tx.clone(); tokio::spawn(async move { let mut buffer = String::new(); let mut finished = false; @@ -317,9 +328,38 @@ pub async fn spawn_node( let _ = daemon_tx.send(event).await; }); + let node_id = node.id.clone(); // Log to file stream. tokio::spawn(async move { while let Some(message) = rx.recv().await { + // If log is an output, we're sending the logs to the dataflow + if log_output { + // Convert logs to DataMessage + let array = message.into_arrow(); + + let array: ArrayData = array.into(); + let total_len = required_data_size(&array); + let mut sample: AVec> = + AVec::__from_elem(128, 0, total_len as usize); + + let type_info = copy_array_into_sample(&mut sample, &array); + + let metadata = Metadata::new(uhlc.new_timestamp(), type_info); + let output_id = OutputId(node_id.clone(), DataId::from("op/logs".to_string())); + let event = DoraEvent::Logs { + dataflow_id, + output_id, + metadata, + message: DataMessage::Vec(sample), + } + .into(); + let event = Timestamped { + inner: event, + timestamp: uhlc.new_timestamp(), + }; + let _ = daemon_tx_log.send(event).await; + } + let _ = file .write_all(message.as_bytes()) .await