| @@ -1476,8 +1476,10 @@ dependencies = [ | |||
| "async-trait", | |||
| "bincode", | |||
| "ctrlc", | |||
| "dora-arrow-convert", | |||
| "dora-core", | |||
| "dora-download", | |||
| "dora-node-api", | |||
| "dora-tracing", | |||
| "eyre", | |||
| "flume", | |||
| @@ -27,6 +27,8 @@ dora-core = { workspace = true } | |||
| flume = "0.10.14" | |||
| dora-download = { workspace = true } | |||
| dora-tracing = { workspace = true, optional = true } | |||
| dora-arrow-convert = { workspace = true } | |||
| dora-node-api = { workspace = true } | |||
| serde_yaml = "0.8.23" | |||
| uuid = { version = "1.1.2", features = ["v4"] } | |||
| futures = "0.3.25" | |||
| @@ -1,10 +1,11 @@ | |||
| use aligned_vec::{AVec, ConstAlign}; | |||
| use coordinator::CoordinatorEvent; | |||
| use dora_arrow_convert::{ArrowData, IntoArrow}; | |||
| use dora_core::config::{Input, OperatorId}; | |||
| use dora_core::coordinator_messages::CoordinatorRequest; | |||
| use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; | |||
| use dora_core::message::uhlc::{self, HLC}; | |||
| use dora_core::message::{ArrowTypeInfo, MetadataParameters}; | |||
| use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; | |||
| use dora_core::{ | |||
| config::{DataId, InputMapping, NodeId}, | |||
| coordinator_messages::DaemonEvent, | |||
| @@ -14,6 +15,7 @@ use dora_core::{ | |||
| }, | |||
| descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, | |||
| }; | |||
| use eyre::{bail, eyre, Context, ContextCompat}; | |||
| use futures::{future, stream, FutureExt, TryFutureExt}; | |||
| use futures_concurrency::stream::Merge; | |||
| @@ -40,7 +42,7 @@ use tokio::sync::oneshot::Sender; | |||
| use tokio::sync::{mpsc, oneshot}; | |||
| use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; | |||
| use tracing::error; | |||
| use uuid::Uuid; | |||
| use uuid::{Timestamp, Uuid}; | |||
| mod coordinator; | |||
| mod inter_daemon; | |||
| @@ -971,6 +973,49 @@ impl Daemon { | |||
| dataflow.subscribe_channels.remove(id); | |||
| } | |||
| } | |||
| DoraEvent::Logs { | |||
| dataflow_id, | |||
| output_id, | |||
| message, | |||
| metadata, | |||
| } => { | |||
| let Some(dataflow) = self.running.get_mut(&dataflow_id) else { | |||
| tracing::warn!("Logs event for unknown dataflow `{dataflow_id}`"); | |||
| return Ok(RunStatus::Continue); | |||
| }; | |||
| let Some(subscribers) = dataflow.mappings.get(&output_id) else { | |||
| tracing::warn!("No subscribers found for {:?} in {:?}", output_id, dataflow.mappings); | |||
| return Ok(RunStatus::Continue); | |||
| }; | |||
| let mut closed = Vec::new(); | |||
| for (receiver_id, input_id) in subscribers { | |||
| let Some(channel) = dataflow.subscribe_channels.get(receiver_id) else { | |||
| tracing::warn!("No subscriber channel found for {:?}", output_id); | |||
| continue; | |||
| }; | |||
| let send_result = send_with_timestamp( | |||
| channel, | |||
| daemon_messages::NodeEvent::Input { | |||
| id: input_id.clone(), | |||
| metadata: metadata.clone(), | |||
| data: Some(message.clone()), | |||
| }, | |||
| &self.clock, | |||
| ); | |||
| match send_result { | |||
| Ok(()) => {} | |||
| Err(_) => { | |||
| closed.push(receiver_id); | |||
| } | |||
| } | |||
| } | |||
| for id in closed { | |||
| dataflow.subscribe_channels.remove(id); | |||
| } | |||
| } | |||
| DoraEvent::SpawnedNodeResult { | |||
| dataflow_id, | |||
| node_id, | |||
| @@ -1153,6 +1198,13 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> { | |||
| } | |||
| } | |||
| fn node_outputs(node: &ResolvedNode) -> BTreeSet<DataId> { | |||
| match &node.kind { | |||
| CoreNodeKind::Custom(n) => n.run_config.outputs.clone(), | |||
| CoreNodeKind::Runtime(n) => runtime_node_outputs(n), | |||
| } | |||
| } | |||
| fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap<DataId, Input> { | |||
| n.operators | |||
| .iter() | |||
| @@ -1397,7 +1449,7 @@ impl RunningDataflow { | |||
| } | |||
| #[derive(Debug, Clone, PartialEq, Eq, Hash)] | |||
| struct OutputId(NodeId, DataId); | |||
| pub struct OutputId(NodeId, DataId); | |||
| type InputId = (NodeId, DataId); | |||
| struct DropTokenInformation { | |||
| @@ -1465,6 +1517,12 @@ pub enum DoraEvent { | |||
| interval: Duration, | |||
| metadata: dora_core::message::Metadata, | |||
| }, | |||
| Logs { | |||
| dataflow_id: DataflowId, | |||
| output_id: OutputId, | |||
| message: DataMessage, | |||
| metadata: Metadata, | |||
| }, | |||
| SpawnedNodeResult { | |||
| dataflow_id: DataflowId, | |||
| node_id: NodeId, | |||
| @@ -1,10 +1,12 @@ | |||
| use crate::{ | |||
| log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, | |||
| runtime_node_outputs, DoraEvent, Event, NodeExitStatus, | |||
| log, node_communication::spawn_listener_loop, node_inputs, node_outputs, runtime_node_inputs, | |||
| runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, | |||
| }; | |||
| use aligned_vec::{AVec, ConstAlign}; | |||
| use dora_arrow_convert::IntoArrow; | |||
| use dora_core::{ | |||
| config::NodeRunConfig, | |||
| daemon_messages::{DataflowId, NodeConfig, RuntimeConfig, Timestamped}, | |||
| config::{DataId, NodeRunConfig}, | |||
| daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, | |||
| descriptor::{ | |||
| resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE, | |||
| }, | |||
| @@ -12,6 +14,11 @@ use dora_core::{ | |||
| message::uhlc::HLC, | |||
| }; | |||
| use dora_download::download_file; | |||
| use dora_node_api::{ | |||
| arrow::array::ArrayData, | |||
| arrow_utils::{copy_array_into_sample, required_data_size}, | |||
| Metadata, | |||
| }; | |||
| use eyre::WrapErr; | |||
| use std::{ | |||
| env::{consts::EXE_EXTENSION, temp_dir}, | |||
| @@ -51,6 +58,8 @@ pub async fn spawn_node( | |||
| clock.clone(), | |||
| ) | |||
| .await?; | |||
| let outputs = node_outputs(&node); | |||
| let log_output = outputs.contains(&DataId::from("op/logs".to_string())); | |||
| let mut child = match node.kind { | |||
| dora_core::descriptor::CoreNodeKind::Custom(n) => { | |||
| @@ -265,6 +274,8 @@ pub async fn spawn_node( | |||
| // Stderr listener stream | |||
| let stderr_tx = tx.clone(); | |||
| let node_id = node.id.clone(); | |||
| let uhlc = clock.clone(); | |||
| let daemon_tx_log = daemon_tx.clone(); | |||
| tokio::spawn(async move { | |||
| let mut buffer = String::new(); | |||
| let mut finished = false; | |||
| @@ -317,9 +328,38 @@ pub async fn spawn_node( | |||
| let _ = daemon_tx.send(event).await; | |||
| }); | |||
| let node_id = node.id.clone(); | |||
| // Log to file stream. | |||
| tokio::spawn(async move { | |||
| while let Some(message) = rx.recv().await { | |||
| // If log is an output, we're sending the logs to the dataflow | |||
| if log_output { | |||
| // Convert logs to DataMessage | |||
| let array = message.into_arrow(); | |||
| let array: ArrayData = array.into(); | |||
| let total_len = required_data_size(&array); | |||
| let mut sample: AVec<u8, ConstAlign<128>> = | |||
| AVec::__from_elem(128, 0, total_len as usize); | |||
| let type_info = copy_array_into_sample(&mut sample, &array); | |||
| let metadata = Metadata::new(uhlc.new_timestamp(), type_info); | |||
| let output_id = OutputId(node_id.clone(), DataId::from("op/logs".to_string())); | |||
| let event = DoraEvent::Logs { | |||
| dataflow_id, | |||
| output_id, | |||
| metadata, | |||
| message: DataMessage::Vec(sample), | |||
| } | |||
| .into(); | |||
| let event = Timestamped { | |||
| inner: event, | |||
| timestamp: uhlc.new_timestamp(), | |||
| }; | |||
| let _ = daemon_tx_log.send(event).await; | |||
| } | |||
| let _ = file | |||
| .write_all(message.as_bytes()) | |||
| .await | |||