From 3ad402ce452eca99b92a2a37fe7fc113b0374060 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 8 Jun 2024 16:53:36 +0200 Subject: [PATCH 01/16] Add dynamic node event loop and dynamic node connection --- .github/workflows/ci.yml | 11 +- Cargo.lock | 9 ++ Cargo.toml | 1 + apis/python/node/src/lib.rs | 9 +- apis/rust/node/Cargo.toml | 1 + apis/rust/node/src/daemon_connection/tcp.rs | 28 +++- apis/rust/node/src/node/mod.rs | 79 +++++++-- binaries/daemon/src/dynamic_node.rs | 138 ++++++++++++++++ binaries/daemon/src/lib.rs | 150 +++++++++++++----- binaries/daemon/src/node_communication/mod.rs | 11 +- binaries/daemon/src/spawn.rs | 52 +++--- examples/python-dataflow/dataflow_dynamic.yml | 25 +++ examples/python-dataflow/plot_dynamic.py | 97 +++++++++++ examples/rust-dataflow/dataflow_dynamic.yml | 31 ++++ .../rust-dataflow/sink-dynamic/Cargo.toml | 10 ++ .../rust-dataflow/sink-dynamic/src/main.rs | 39 +++++ libraries/core/src/daemon_messages.rs | 35 +++- libraries/core/src/descriptor/mod.rs | 47 ++++++ libraries/core/src/descriptor/validate.rs | 3 +- libraries/core/src/topics.rs | 10 +- tool_nodes/dora-record/README.md | 56 +++++++ 21 files changed, 742 insertions(+), 100 deletions(-) create mode 100644 binaries/daemon/src/dynamic_node.rs create mode 100644 examples/python-dataflow/dataflow_dynamic.yml create mode 100755 examples/python-dataflow/plot_dynamic.py create mode 100644 examples/rust-dataflow/dataflow_dynamic.yml create mode 100644 examples/rust-dataflow/sink-dynamic/Cargo.toml create mode 100644 examples/rust-dataflow/sink-dynamic/src/main.rs create mode 100644 tool_nodes/dora-record/README.md diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9aee930..b164270b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -289,8 +289,13 @@ jobs: dora start dataflow.yml --name ci-rust-test sleep 10 dora stop --name ci-rust-test --grace-duration 5s + dora build ../examples/rust-dataflow/dataflow_dynamic.yml + dora start ../examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + cargo run -p rust-dataflow-example-sink-dynamic + sleep 5 + dora stop --name ci-rust-dynamic --grace-duration 5s dora destroy - + - name: "Test CLI (Python)" timeout-minutes: 30 # fail-fast by using bash shell explictly @@ -312,6 +317,10 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + python ../examples/python-dataflow/plot_dynamic.py + sleep 5 + dora stop --name ci-python-test --grace-duration 5s dora destroy clippy: diff --git a/Cargo.lock b/Cargo.lock index aa104495..7bee35aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2395,6 +2395,7 @@ dependencies = [ "futures", "futures-concurrency", "futures-timer", + "serde_json", "serde_yaml 0.8.26", "shared-memory-server", "shared_memory_extended", @@ -8038,6 +8039,14 @@ dependencies = [ "eyre", ] +[[package]] +name = "rust-dataflow-example-sink-dynamic" +version = "0.3.4" +dependencies = [ + "dora-node-api", + "eyre", +] + [[package]] name = "rust-dataflow-example-status-node" version = "0.3.4" diff --git a/Cargo.toml b/Cargo.toml index ddabbf53..57300902 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "examples/rust-dataflow/node", "examples/rust-dataflow/status-node", "examples/rust-dataflow/sink", + "examples/rust-dataflow/sink-dynamic", "examples/rust-ros2-dataflow/node", "examples/benchmark/node", "examples/benchmark/sink", diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 95fd29c7..b48ec762 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -3,6 +3,7 @@ use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; +use dora_node_api::dora_core::config::NodeId; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DoraNode, EventStream}; use dora_operator_api_python::{pydict_to_metadata, PyEvent}; @@ -32,8 +33,12 @@ pub struct Node { #[pymethods] impl Node { #[new] - pub fn new() -> eyre::Result { - let (node, events) = DoraNode::init_from_env()?; + pub fn new(node_id: Option) -> eyre::Result { + let (node, events) = if let Some(node_id) = node_id { + DoraNode::init_from_node_id(NodeId::from(node_id))? + } else { + DoraNode::init_from_env()? + }; Ok(Node { events: Events::Dora(events), diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index 015ad9ad..41d7fe6b 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -26,6 +26,7 @@ futures-concurrency = "7.3.0" futures-timer = "3.0.2" dora-arrow-convert = { workspace = true } aligned-vec = "0.5.0" +serde_json = "1.0.86" [dev-dependencies] tokio = { version = "1.24.2", features = ["rt"] } diff --git a/apis/rust/node/src/daemon_connection/tcp.rs b/apis/rust/node/src/daemon_connection/tcp.rs index 62794d0a..15fdf19d 100644 --- a/apis/rust/node/src/daemon_connection/tcp.rs +++ b/apis/rust/node/src/daemon_connection/tcp.rs @@ -5,13 +5,21 @@ use std::{ net::TcpStream, }; +enum Serializer { + Bincode, + SerdeJson, +} pub fn request( connection: &mut TcpStream, request: &Timestamped, ) -> eyre::Result { send_message(connection, request)?; - if request.inner.expects_tcp_reply() { - receive_reply(connection) + if request.inner.expects_tcp_bincode_reply() { + receive_reply(connection, Serializer::Bincode) + .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) + // Use serde json for message with variable length + } else if request.inner.expects_tcp_json_reply() { + receive_reply(connection, Serializer::SerdeJson) .and_then(|reply| reply.ok_or_else(|| eyre!("server disconnected unexpectedly"))) } else { Ok(DaemonReply::Empty) @@ -27,7 +35,10 @@ fn send_message( Ok(()) } -fn receive_reply(connection: &mut TcpStream) -> eyre::Result> { +fn receive_reply( + connection: &mut TcpStream, + serializer: Serializer, +) -> eyre::Result> { let raw = match tcp_receive(connection) { Ok(raw) => raw, Err(err) => match err.kind() { @@ -43,9 +54,14 @@ fn receive_reply(connection: &mut TcpStream) -> eyre::Result } }, }; - bincode::deserialize(&raw) - .wrap_err("failed to deserialize DaemonReply") - .map(Some) + match serializer { + Serializer::Bincode => bincode::deserialize(&raw) + .wrap_err("failed to deserialize DaemonReply") + .map(Some), + Serializer::SerdeJson => serde_json::from_slice(&raw) + .wrap_err("failed to deserialize DaemonReply") + .map(Some), + } } fn tcp_send(connection: &mut (impl Write + Unpin), message: &[u8]) -> std::io::Result<()> { diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 2b3e4e17..31fdd8a6 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -1,4 +1,4 @@ -use crate::EventStream; +use crate::{daemon_connection::DaemonChannel, EventStream}; use self::{ arrow_utils::{copy_array_into_sample, required_data_size}, @@ -9,10 +9,12 @@ use aligned_vec::{AVec, ConstAlign}; use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, - daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig}, + daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, descriptor::Descriptor, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, + topics::{DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, LOCALHOST}, }; + use eyre::{bail, WrapErr}; use shared_memory_extended::{Shmem, ShmemConf}; use std::{ @@ -43,6 +45,7 @@ pub struct DoraNode { cache: VecDeque, dataflow_descriptor: Descriptor, + dynamic: bool, } impl DoraNode { @@ -66,6 +69,50 @@ impl DoraNode { Self::init(node_config) } + /// Initiate a node from a dataflow id and a node id. + /// + /// ```no_run + /// use dora_node_api::DoraNode; + /// use dora_node_api::dora_core::config::NodeId; + /// + /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot"); + /// ``` + /// + pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { + // Make sure that the node is initialized outside of dora start. + if let Ok(node_config_string) = std::env::var("DORA_NODE_CONFIG") { + let node_config: NodeConfig = serde_yaml::from_str(&node_config_string) + .context("failed to deserialize operator config")?; + assert!( + node_config.node_id == node_id, + "Node id within the yaml description and the node_id does not match. Please either run this node in either dynamic mode or change or remove `node_id` specification in the code." + ); + return Self::init(node_config); + } + + let daemon_address = (LOCALHOST, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT).into(); + + let mut channel = + DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?; + let clock = Arc::new(uhlc::HLC::default()); + + let reply = channel + .request(&Timestamped { + inner: DaemonRequest::NodeConfig { node_id }, + timestamp: clock.new_timestamp(), + }) + .wrap_err("failed to request node config from daemon")?; + match reply { + dora_core::daemon_messages::DaemonReply::NodeConfig { + result: Ok(node_config), + } => Self::init(node_config), + dora_core::daemon_messages::DaemonReply::NodeConfig { result: Err(error) } => { + bail!("failed to get node config from daemon: {error}") + } + _ => bail!("unexpected reply from daemon"), + } + } + #[tracing::instrument] pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { let NodeConfig { @@ -74,8 +121,8 @@ impl DoraNode { run_config, daemon_communication, dataflow_descriptor, + dynamic, } = node_config; - let clock = Arc::new(uhlc::HLC::default()); let event_stream = @@ -91,13 +138,13 @@ impl DoraNode { let node = Self { id: node_id, dataflow_id, - node_config: run_config, + node_config: run_config.clone(), control_channel, clock, sent_out_shared_memory: HashMap::new(), drop_stream, cache: VecDeque::new(), - + dynamic, dataflow_descriptor, }; Ok((node, event_stream)) @@ -335,16 +382,18 @@ impl Drop for DoraNode { #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")] fn drop(&mut self) { // close all outputs first to notify subscribers as early as possible - if let Err(err) = self - .control_channel - .report_closed_outputs( - std::mem::take(&mut self.node_config.outputs) - .into_iter() - .collect(), - ) - .context("failed to close outputs on drop") - { - tracing::warn!("{err:?}") + if !self.dynamic { + if let Err(err) = self + .control_channel + .report_closed_outputs( + std::mem::take(&mut self.node_config.outputs) + .into_iter() + .collect(), + ) + .context("failed to close outputs on drop") + { + tracing::warn!("{err:?}") + } } while !self.sent_out_shared_memory.is_empty() { diff --git a/binaries/daemon/src/dynamic_node.rs b/binaries/daemon/src/dynamic_node.rs new file mode 100644 index 00000000..7b1f9dc3 --- /dev/null +++ b/binaries/daemon/src/dynamic_node.rs @@ -0,0 +1,138 @@ +use crate::tcp_utils::{tcp_receive, tcp_send}; +use dora_core::daemon_messages::{DaemonReply, DaemonRequest, DynamicNodeEvent, Timestamped}; +use eyre::Context; +use std::{io::ErrorKind, net::SocketAddr}; +use tokio::{ + net::{TcpListener, TcpStream}, + sync::oneshot, +}; + +#[derive(Debug)] +pub struct DynamicNodeEventWrapper { + pub event: DynamicNodeEvent, + pub reply_tx: oneshot::Sender>, +} + +pub async fn spawn_listener_loop( + bind: SocketAddr, + machine_id: String, + events_tx: flume::Sender>, +) -> eyre::Result { + let socket = match TcpListener::bind(bind).await { + Ok(socket) => socket, + Err(err) => { + return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) + } + }; + let listen_port = socket + .local_addr() + .wrap_err("failed to get local addr of socket")? + .port(); + + tokio::spawn(async move { + listener_loop(socket, events_tx).await; + tracing::debug!("Dynamic node listener loop finished for machine `{machine_id}`"); + }); + + Ok(listen_port) +} + +async fn listener_loop( + listener: TcpListener, + events_tx: flume::Sender>, +) { + loop { + match listener + .accept() + .await + .wrap_err("failed to accept new connection") + { + Err(err) => { + tracing::info!("{err}"); + } + Ok((connection, _)) => { + tokio::spawn(handle_connection_loop(connection, events_tx.clone())); + } + } + } +} + +async fn handle_connection_loop( + mut connection: TcpStream, + events_tx: flume::Sender>, +) { + if let Err(err) = connection.set_nodelay(true) { + tracing::warn!("failed to set nodelay for connection: {err}"); + } + + loop { + match receive_message(&mut connection).await { + Ok(Some(Timestamped { + inner: DaemonRequest::NodeConfig { node_id }, + timestamp, + })) => { + let (reply_tx, reply_rx) = oneshot::channel(); + if events_tx + .send_async(Timestamped { + inner: DynamicNodeEventWrapper { + event: DynamicNodeEvent::NodeConfig { node_id }, + reply_tx, + }, + timestamp, + }) + .await + .is_err() + { + break; + } + let Ok(reply) = reply_rx.await else { + tracing::warn!("daemon sent no reply"); + continue; + }; + if let Some(reply) = reply { + let serialized = match serde_json::to_vec(&reply) + .wrap_err("failed to serialize DaemonReply") + { + Ok(r) => r, + Err(err) => { + tracing::error!("{err:?}"); + continue; + } + }; + if let Err(err) = tcp_send(&mut connection, &serialized).await { + tracing::warn!("failed to send reply to dynamic node: {err}"); + continue; + }; + } + } + Ok(None) => break, + Err(err) => { + tracing::warn!("{err:?}"); + break; + } + _ => tracing::warn!( + "Unexpected Daemon Request that is not yet by Additional dynamic node controls" + ), + } + } +} + +async fn receive_message( + connection: &mut TcpStream, +) -> eyre::Result>> { + let raw = match tcp_receive(connection).await { + Ok(raw) => raw, + Err(err) => match err.kind() { + ErrorKind::UnexpectedEof + | ErrorKind::ConnectionAborted + | ErrorKind::ConnectionReset => return Ok(None), + _other => { + return Err(err) + .context("unexpected I/O error while trying to receive DynamicNodeEvent") + } + }, + }; + bincode::deserialize(&raw) + .wrap_err("failed to deserialize DynamicNodeEvent") + .map(Some) +} diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 8b85ee8e..2b0ea639 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -2,9 +2,13 @@ use aligned_vec::{AVec, ConstAlign}; use coordinator::CoordinatorEvent; use dora_core::config::{Input, OperatorId}; use dora_core::coordinator_messages::CoordinatorRequest; -use dora_core::daemon_messages::{DataMessage, InterDaemonEvent, Timestamped}; +use dora_core::daemon_messages::{ + DataMessage, DynamicNodeEvent, InterDaemonEvent, NodeConfig, Timestamped, +}; +use dora_core::descriptor::runtime_node_inputs; use dora_core::message::uhlc::{self, HLC}; use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; +use dora_core::topics::{DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, LOCALHOST}; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -15,7 +19,8 @@ use dora_core::{ descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; -use eyre::{bail, eyre, Context, ContextCompat}; +use dynamic_node::DynamicNodeEventWrapper; +use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; @@ -44,6 +49,7 @@ use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; mod coordinator; +mod dynamic_node; mod inter_daemon; mod log; mod node_communication; @@ -87,7 +93,7 @@ impl Daemon { let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; - // spawn listen loop + // spawn inter daemon listen loop let (events_tx, events_rx) = flume::bounded(10); let listen_port = inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; @@ -111,8 +117,25 @@ impl Daemon { }, ); + // Spawn dynamic node listener loop + let (events_tx, events_rx) = flume::bounded(10); + let dynamic_node_address = + SocketAddr::new(LOCALHOST, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT); // TODO: Make this config + let _listen_port = + dynamic_node::spawn_listener_loop(dynamic_node_address, machine_id.clone(), events_tx) + .await?; + let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped { + inner: Event::DynamicNode(e.inner), + timestamp: e.timestamp, + }); Self::run_general( - (coordinator_events, ctrlc_events, daemon_events).merge(), + ( + coordinator_events, + ctrlc_events, + daemon_events, + dynamic_node_events, + ) + .merge(), Some(coordinator_addr), machine_id, None, @@ -276,6 +299,7 @@ impl Daemon { RunStatus::Continue => {} RunStatus::Exit => break, }, + Event::DynamicNode(event) => self.handle_dynamic_node_event(event).await?, Event::HeartbeatInterval => { if let Some(connection) = &mut self.coordinator_connection { let msg = serde_json::to_vec(&Timestamped { @@ -437,7 +461,6 @@ impl Daemon { .running .get_mut(&dataflow_id) .wrap_err_with(|| format!("no running dataflow with ID `{dataflow_id}`"))?; - // .stop_all(&self.clock.clone(), grace_duration); let reply = DaemonCoordinatorReply::StopResult(Ok(())); let _ = reply_tx @@ -599,10 +622,8 @@ impl Daemon { .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) { - Ok(pid) => { - dataflow - .running_nodes - .insert(node_id.clone(), RunningNode { pid }); + Ok(running_node) => { + dataflow.running_nodes.insert(node_id, running_node); } Err(err) => { tracing::error!("{err:?}"); @@ -624,6 +645,66 @@ impl Daemon { Ok(()) } + async fn handle_dynamic_node_event( + &mut self, + event: DynamicNodeEventWrapper, + ) -> eyre::Result<()> { + match event { + DynamicNodeEventWrapper { + event: DynamicNodeEvent::NodeConfig { node_id }, + reply_tx, + } => { + let number_node_id = self + .running + .iter() + .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id)) + .count(); + + let node_config = match number_node_id { + 2.. => { + let _ = reply_tx.send(Some(DaemonReply::NodeConfig { + result: Err(format!( + "multiple dataflows contains dynamic node id {}. Please only have one running dataflow with the specified node id if you want to use dynamic node", + node_id + ) + .to_string()), + })); + return Ok(()); + } + 1 => self + .running + .iter() + .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id)) + .map(|(_id, dataflow)| -> Result { + Ok(dataflow + .running_nodes + .get(&node_id) + .context("no node with ID `{node_id}` within the given dataflow")? + .node_config + .clone()) + }) + .next() + .context("no node with ID `{node_id}`")? + .context("failed to get node config within given dataflow")?, + 0 => { + let _ = reply_tx.send(Some(DaemonReply::NodeConfig { + result: Err("no node with ID `{node_id}`".to_string()), + })); + return Ok(()); + } + }; + + let reply = DaemonReply::NodeConfig { + result: Ok(node_config), + }; + let _ = reply_tx.send(Some(reply)).map_err(|_| { + error!("could not send node info reply from daemon to coordinator") + }); + Ok(()) + } + } + } + async fn handle_node_event( &mut self, event: DaemonNodeEvent, @@ -920,7 +1001,11 @@ impl Daemon { .await?; dataflow.running_nodes.remove(node_id); - if dataflow.running_nodes.is_empty() { + if dataflow + .running_nodes + .iter() + .all(|(_id, n)| n.node_config.dynamic) + { let result = match self.dataflow_errors.get(&dataflow.id) { None => Ok(()), Some(errors) => { @@ -1227,33 +1312,6 @@ fn node_inputs(node: &ResolvedNode) -> BTreeMap { } } -fn runtime_node_inputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeMap { - n.operators - .iter() - .flat_map(|operator| { - operator.config.inputs.iter().map(|(input_id, mapping)| { - ( - DataId::from(format!("{}/{input_id}", operator.id)), - mapping.clone(), - ) - }) - }) - .collect() -} - -fn runtime_node_outputs(n: &dora_core::descriptor::RuntimeNode) -> BTreeSet { - n.operators - .iter() - .flat_map(|operator| { - operator - .config - .outputs - .iter() - .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) - }) - .collect() -} - async fn send_input_closed_events( dataflow: &mut RunningDataflow, inter_daemon_connections: &mut BTreeMap, @@ -1330,7 +1388,8 @@ fn close_input( #[derive(Debug, Clone)] struct RunningNode { - pid: u32, + pid: Option, + node_config: NodeConfig, } pub struct RunningDataflow { @@ -1444,12 +1503,14 @@ impl RunningDataflow { system.refresh_processes(); for (node, node_details) in running_nodes.iter() { - if let Some(process) = system.process(Pid::from(node_details.pid as usize)) { - process.kill(); - warn!( - "{node} was killed due to not stopping within the {:#?} grace period", - duration - ) + if let Some(pid) = node_details.pid { + if let Some(process) = system.process(Pid::from(pid as usize)) { + process.kill(); + warn!( + "{node} was killed due to not stopping within the {:#?} grace period", + duration + ) + } } } }); @@ -1515,6 +1576,7 @@ pub enum Event { Coordinator(CoordinatorEvent), Daemon(InterDaemonEvent), Dora(DoraEvent), + DynamicNode(DynamicNodeEventWrapper), HeartbeatInterval, CtrlC, } diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 8fd200de..9c8ff346 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -6,6 +6,7 @@ use dora_core::{ Timestamped, }, message::uhlc, + topics::LOCALHOST, }; use eyre::{eyre, Context}; use futures::{future, task, Future}; @@ -13,7 +14,6 @@ use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ collections::{BTreeMap, VecDeque}, mem, - net::Ipv4Addr, sync::Arc, task::Poll, }; @@ -39,8 +39,7 @@ pub async fn spawn_listener_loop( ) -> eyre::Result { match config { LocalCommunicationConfig::Tcp => { - let localhost = Ipv4Addr::new(127, 0, 0, 1); - let socket = match TcpListener::bind((localhost, 0)).await { + let socket = match TcpListener::bind((LOCALHOST, 0)).await { Ok(socket) => socket, Err(err) => { return Err( @@ -347,6 +346,12 @@ impl Listener { .await .wrap_err("failed to send register reply")?; } + DaemonRequest::NodeConfig { .. } => { + let reply = DaemonReply::Result(Err("unexpected node config message".into())); + self.send_reply(reply, connection) + .await + .wrap_err("failed to send register reply")?; + } DaemonRequest::OutputsDone => { let (reply_sender, reply) = oneshot::channel(); self.process_daemon_event( diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 90751c5b..89c86073 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -1,15 +1,15 @@ use crate::{ - log, node_communication::spawn_listener_loop, node_inputs, runtime_node_inputs, - runtime_node_outputs, DoraEvent, Event, NodeExitStatus, OutputId, + log, node_communication::spawn_listener_loop, node_inputs, DoraEvent, Event, NodeExitStatus, + OutputId, RunningNode, }; use aligned_vec::{AVec, ConstAlign}; use dora_arrow_convert::IntoArrow; use dora_core::{ - config::{DataId, NodeRunConfig}, + config::DataId, daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped}, descriptor::{ resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource, - ResolvedNode, SHELL_SOURCE, + ResolvedNode, DYNAMIC_SOURCE, SHELL_SOURCE, }, get_python_path, message::uhlc::HLC, @@ -42,7 +42,7 @@ pub async fn spawn_node( daemon_tx: mpsc::Sender>, dataflow_descriptor: Descriptor, clock: Arc, -) -> eyre::Result { +) -> eyre::Result { let node_id = node.id.clone(); tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); @@ -63,9 +63,24 @@ pub async fn spawn_node( .send_stdout_as() .context("Could not resolve `send_stdout_as` configuration")?; + let node_config = NodeConfig { + dataflow_id, + node_id: node_id.clone(), + run_config: node.kind.run_config(), + daemon_communication, + dataflow_descriptor, + dynamic: node.kind.dynamic(), + }; + let mut child = match node.kind { dora_core::descriptor::CoreNodeKind::Custom(n) => { let mut command = match n.source.as_str() { + DYNAMIC_SOURCE => { + return Ok(RunningNode { + pid: None, + node_config, + }); + } SHELL_SOURCE => { if cfg!(target_os = "windows") { let mut cmd = tokio::process::Command::new("cmd"); @@ -117,17 +132,11 @@ pub async fn spawn_node( command.current_dir(working_dir); command.stdin(Stdio::null()); - let node_config = NodeConfig { - dataflow_id, - node_id: node_id.clone(), - run_config: n.run_config.clone(), - daemon_communication, - dataflow_descriptor, - }; command.env( "DORA_NODE_CONFIG", - serde_yaml::to_string(&node_config).wrap_err("failed to serialize node config")?, + serde_yaml::to_string(&node_config.clone()) + .wrap_err("failed to serialize node config")?, ); // Injecting the env variable defined in the `yaml` into // the node runtime. @@ -223,16 +232,7 @@ pub async fn spawn_node( command.current_dir(working_dir); let runtime_config = RuntimeConfig { - node: NodeConfig { - dataflow_id, - node_id: node_id.clone(), - run_config: NodeRunConfig { - inputs: runtime_node_inputs(&n), - outputs: runtime_node_outputs(&n), - }, - daemon_communication, - dataflow_descriptor, - }, + node: node_config.clone(), operators: n.operators, }; command.env( @@ -271,6 +271,10 @@ pub async fn spawn_node( let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); let pid = child.id().unwrap(); + let running_node = RunningNode { + pid: Some(pid), + node_config, + }; let stdout_tx = tx.clone(); // Stdout listener stream @@ -454,5 +458,5 @@ pub async fn spawn_node( .send(()) .map_err(|_| error!("Could not inform that log file thread finished")); }); - Ok(pid) + Ok(running_node) } diff --git a/examples/python-dataflow/dataflow_dynamic.yml b/examples/python-dataflow/dataflow_dynamic.yml new file mode 100644 index 00000000..d43efc83 --- /dev/null +++ b/examples/python-dataflow/dataflow_dynamic.yml @@ -0,0 +1,25 @@ +nodes: + - id: webcam + custom: + source: ./webcam.py + inputs: + tick: + source: dora/timer/millis/50 + queue_size: 1000 + outputs: + - image + + - id: object_detection + custom: + source: ./object_detection.py + inputs: + image: webcam/image + outputs: + - bbox + + - id: plot + custom: + source: dynamic + inputs: + image: webcam/image + bbox: object_detection/bbox diff --git a/examples/python-dataflow/plot_dynamic.py b/examples/python-dataflow/plot_dynamic.py new file mode 100755 index 00000000..b3eda8b7 --- /dev/null +++ b/examples/python-dataflow/plot_dynamic.py @@ -0,0 +1,97 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import os +from dora import Node +from dora import DoraStatus + +import cv2 +import numpy as np +from utils import LABELS + +CI = os.environ.get("CI") + +font = cv2.FONT_HERSHEY_SIMPLEX + + +class Plotter: + """ + Plot image and bounding box + """ + + def __init__(self): + self.image = [] + self.bboxs = [] + + def on_input( + self, + dora_input, + ) -> DoraStatus: + """ + Put image and bounding box on cv2 window. + + Args: + dora_input["id"] (str): Id of the dora_input declared in the yaml configuration + dora_input["value"] (arrow array): message of the dora_input + """ + if dora_input["id"] == "image": + frame = dora_input["value"].to_numpy() + frame = cv2.imdecode(frame, -1) + self.image = frame + + elif dora_input["id"] == "bbox" and len(self.image) != 0: + bboxs = dora_input["value"].to_numpy() + self.bboxs = np.reshape(bboxs, (-1, 6)) + for bbox in self.bboxs: + [ + min_x, + min_y, + max_x, + max_y, + confidence, + label, + ] = bbox + cv2.rectangle( + self.image, + (int(min_x), int(min_y)), + (int(max_x), int(max_y)), + (0, 255, 0), + 2, + ) + + cv2.putText( + self.image, + LABELS[int(label)] + f", {confidence:0.2f}", + (int(max_x), int(max_y)), + font, + 0.75, + (0, 255, 0), + 2, + 1, + ) + + if CI != "true": + cv2.imshow("frame", self.image) + if cv2.waitKey(1) & 0xFF == ord("q"): + return DoraStatus.STOP + + return DoraStatus.CONTINUE + + +plotter = Plotter() + +node = Node("plot") + +for event in node: + event_type = event["type"] + if event_type == "INPUT": + status = plotter.on_input(event) + if status == DoraStatus.CONTINUE: + pass + elif status == DoraStatus.STOP: + print("plotter returned stop status") + break + elif event_type == "STOP": + print("received stop") + else: + print("received unexpected event:", event_type) diff --git a/examples/rust-dataflow/dataflow_dynamic.yml b/examples/rust-dataflow/dataflow_dynamic.yml new file mode 100644 index 00000000..78be0f19 --- /dev/null +++ b/examples/rust-dataflow/dataflow_dynamic.yml @@ -0,0 +1,31 @@ +nodes: + - id: rust-node + custom: + build: cargo build -p rust-dataflow-example-node + source: ../../target/debug/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/100 + outputs: + - random + - id: rust-status-node + custom: + build: cargo build -p rust-dataflow-example-status-node + source: ../../target/debug/rust-dataflow-example-status-node + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink-dynamic + custom: + build: cargo build -p rust-dataflow-example-sink-dynamic + source: dynamic + inputs: + message: rust-status-node/status + - id: dora-record + custom: + build: cargo build -p dora-record + source: ../../target/debug/dora-record + inputs: + message: rust-status-node/status + random: rust-node/random diff --git a/examples/rust-dataflow/sink-dynamic/Cargo.toml b/examples/rust-dataflow/sink-dynamic/Cargo.toml new file mode 100644 index 00000000..4e93c428 --- /dev/null +++ b/examples/rust-dataflow/sink-dynamic/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rust-dataflow-example-sink-dynamic" +version.workspace = true +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { workspace = true, features = ["tracing"] } +eyre = "0.6.8" diff --git a/examples/rust-dataflow/sink-dynamic/src/main.rs b/examples/rust-dataflow/sink-dynamic/src/main.rs new file mode 100644 index 00000000..58f36e41 --- /dev/null +++ b/examples/rust-dataflow/sink-dynamic/src/main.rs @@ -0,0 +1,39 @@ +use dora_node_api::{self, dora_core::config::NodeId, DoraNode, Event}; +use eyre::{bail, Context}; + +fn main() -> eyre::Result<()> { + let (_node, mut events) = + DoraNode::init_from_node_id(NodeId::from("rust-sink-dynamic".to_string()))?; + + while let Some(event) = events.recv() { + match event { + Event::Input { + id, + metadata: _, + data, + } => match id.as_str() { + "message" => { + let received_string: &str = + TryFrom::try_from(&data).context("expected string message")?; + println!("sink received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + }, + Event::Stop => { + println!("Received manual stop"); + } + Event::InputClosed { id } => { + println!("Input `{id}` was closed"); + } + other => eprintln!("Received unexpected input: {other:?}"), + } + } + + Ok(()) +} diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index 91c634cc..ae6fc262 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -14,13 +14,14 @@ use aligned_vec::{AVec, ConstAlign}; use dora_message::{uhlc, Metadata}; use uuid::{NoContext, Timestamp, Uuid}; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct NodeConfig { pub dataflow_id: DataflowId, pub node_id: NodeId, pub run_config: NodeRunConfig, pub daemon_communication: DaemonCommunication, pub dataflow_descriptor: Descriptor, + pub dynamic: bool, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -68,13 +69,18 @@ pub enum DaemonRequest { SubscribeDrop, NextFinishedDropTokens, EventStreamDropped, + NodeConfig { + node_id: NodeId, + }, } impl DaemonRequest { - pub fn expects_tcp_reply(&self) -> bool { + pub fn expects_tcp_bincode_reply(&self) -> bool { #[allow(clippy::match_like_matches_macro)] match self { - DaemonRequest::SendMessage { .. } | DaemonRequest::ReportDropTokens { .. } => false, + DaemonRequest::SendMessage { .. } + | DaemonRequest::NodeConfig { .. } + | DaemonRequest::ReportDropTokens { .. } => false, DaemonRequest::Register { .. } | DaemonRequest::Subscribe | DaemonRequest::CloseOutputs(_) @@ -85,6 +91,23 @@ impl DaemonRequest { | DaemonRequest::EventStreamDropped => true, } } + + pub fn expects_tcp_json_reply(&self) -> bool { + #[allow(clippy::match_like_matches_macro)] + match self { + DaemonRequest::NodeConfig { .. } => true, + DaemonRequest::Register { .. } + | DaemonRequest::Subscribe + | DaemonRequest::CloseOutputs(_) + | DaemonRequest::OutputsDone + | DaemonRequest::NextEvent { .. } + | DaemonRequest::SubscribeDrop + | DaemonRequest::NextFinishedDropTokens + | DaemonRequest::ReportDropTokens { .. } + | DaemonRequest::SendMessage { .. } + | DaemonRequest::EventStreamDropped => false, + } + } } #[derive(serde::Serialize, serde::Deserialize, Clone)] @@ -136,6 +159,7 @@ pub enum DaemonReply { PreparedMessage { shared_memory_id: SharedMemoryId }, NextEvents(Vec>), NextDropEvents(Vec>), + NodeConfig { result: Result }, Empty, } @@ -229,6 +253,11 @@ pub enum DaemonCoordinatorEvent { Heartbeat, } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +pub enum DynamicNodeEvent { + NodeConfig { node_id: NodeId }, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum InterDaemonEvent { Output { diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index fc836412..61122a25 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -16,6 +16,7 @@ pub use visualize::collect_dora_timers; mod validate; mod visualize; pub const SHELL_SOURCE: &str = "shell"; +pub const DYNAMIC_SOURCE: &str = "dynamic"; /// Dataflow description #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] @@ -319,6 +320,52 @@ pub enum CoreNodeKind { Custom(CustomNode), } +pub fn runtime_node_inputs(n: &RuntimeNode) -> BTreeMap { + n.operators + .iter() + .flat_map(|operator| { + operator.config.inputs.iter().map(|(input_id, mapping)| { + ( + DataId::from(format!("{}/{input_id}", operator.id)), + mapping.clone(), + ) + }) + }) + .collect() +} + +fn runtime_node_outputs(n: &RuntimeNode) -> BTreeSet { + n.operators + .iter() + .flat_map(|operator| { + operator + .config + .outputs + .iter() + .map(|output_id| DataId::from(format!("{}/{output_id}", operator.id))) + }) + .collect() +} + +impl CoreNodeKind { + pub fn run_config(&self) -> NodeRunConfig { + match self { + CoreNodeKind::Runtime(n) => NodeRunConfig { + inputs: runtime_node_inputs(n), + outputs: runtime_node_outputs(n), + }, + CoreNodeKind::Custom(n) => n.run_config.clone(), + } + } + + pub fn dynamic(&self) -> bool { + match self { + CoreNodeKind::Runtime(_n) => false, + CoreNodeKind::Custom(n) => n.source == DYNAMIC_SOURCE, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] #[serde(transparent)] pub struct RuntimeNode { diff --git a/libraries/core/src/descriptor/validate.rs b/libraries/core/src/descriptor/validate.rs index fe558096..e1740429 100644 --- a/libraries/core/src/descriptor/validate.rs +++ b/libraries/core/src/descriptor/validate.rs @@ -9,7 +9,7 @@ use eyre::{bail, eyre, Context}; use std::{path::Path, process::Command}; use tracing::info; -use super::{resolve_path, Descriptor, SHELL_SOURCE}; +use super::{resolve_path, Descriptor, DYNAMIC_SOURCE, SHELL_SOURCE}; const VERSION: &str = env!("CARGO_PKG_VERSION"); pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> { @@ -21,6 +21,7 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result match &node.kind { descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() { SHELL_SOURCE => (), + DYNAMIC_SOURCE => (), source => { if source_is_url(source) { info!("{source} is a URL."); // TODO: Implement url check. diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 506c1b42..4da3ab7b 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,4 +1,10 @@ -use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration}; +use std::{ + collections::BTreeSet, + fmt::Display, + net::{IpAddr, Ipv4Addr}, + path::PathBuf, + time::Duration, +}; use uuid::Uuid; use crate::{ @@ -6,7 +12,9 @@ use crate::{ descriptor::Descriptor, }; +pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; +pub const DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT: u16 = 0xD02B; pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; diff --git a/tool_nodes/dora-record/README.md b/tool_nodes/dora-record/README.md new file mode 100644 index 00000000..ae1b948e --- /dev/null +++ b/tool_nodes/dora-record/README.md @@ -0,0 +1,56 @@ +# dora-record + +dora data recording using Apache Arrow Parquet. + +This nodes is still experimental. + +## Getting Started + +```bash +cargo install dora-record --locked +``` + +## Adding to existing graph: + +```yaml +- id: dora-record + custom: + source: dora-record + inputs: + image: webcam/image + text: webcam/text + # You can add any input and it is going to be logged. +``` + +## Output Files + +Format: Parquet file + +path: `out//.parquet` + +Columns: + +- trace_id: String, representing the id of the current trace +- span_id: String, representing the unique span id +- timestamp_uhlc: u64, representing the timestamp in [Unique Hybrid Logical Clock time](https://github.com/atolab/uhlc-rs) +- timestamp_utc: DataType::Timestamp(Milliseconds), representing the timestamp in Coordinated Universal Time. +- `` : Column containing the input in its defined format. + +Example: + +```json +{ + "trace_id": "2fd23ddf1b5d2aa38ddb86ceedb55928", + "span_id": "15aef03e0f052bbf", + "timestamp_uhlc": "7368873278370007008", + "timestamp_utc": 1715699508406, + "random": [1886295351360621740] +} +``` + +## merging multiple file + +We can merge input files using the `trace_id` that is going to be shared when using opentelemetry features. + +- `trace_id` can also be queried from UI such as jaeger UI, influxDB and so on... +- `trace_id` keep tracks of the logical flow of data, compared to timestamp based merging that might not reflect the actual logical flow of data. From d180aa73457d99b0320dd922eace4ba085cb2b5c Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 8 Jun 2024 18:33:46 +0200 Subject: [PATCH 02/16] Adding configuration for dynamic nodes port --- .github/workflows/ci.yml | 5 +++-- binaries/cli/src/main.rs | 14 +++++++++----- binaries/coordinator/src/lib.rs | 2 +- binaries/daemon/src/lib.rs | 19 +++++++++++-------- binaries/daemon/src/spawn.rs | 4 +++- examples/multiple-daemons/run.rs | 14 ++++++++++---- 6 files changed, 37 insertions(+), 21 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b164270b..b9eae42e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -289,8 +289,9 @@ jobs: dora start dataflow.yml --name ci-rust-test sleep 10 dora stop --name ci-rust-test --grace-duration 5s - dora build ../examples/rust-dataflow/dataflow_dynamic.yml - dora start ../examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic + cd .. + dora build examples/rust-dataflow/dataflow_dynamic.yml + dora start examples/rust-dataflow/dataflow_dynamic.yml --name ci-rust-dynamic cargo run -p rust-dataflow-example-sink-dynamic sleep 5 dora stop --name ci-rust-dynamic --grace-duration 5s diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 099aed2e..ca447811 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -6,7 +6,7 @@ use dora_core::{ descriptor::Descriptor, topics::{ ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -174,9 +174,12 @@ enum Command { /// Unique identifier for the machine (required for distributed dataflows) #[clap(long)] machine_id: Option, - /// The IP address and port this daemon will bind to. + /// The inter daemon IP address and port this daemon will bind to. #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] addr: SocketAddr, + /// The dynamic node port this daemon will bind to. + #[clap(long, default_value_t = DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT)] + dynamic_node_port: u16, /// Address and port number of the dora coordinator #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] coordinator_addr: SocketAddr, @@ -376,7 +379,7 @@ fn run() -> eyre::Result<()> { match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, - (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, + (None, None) => stop_dataflow_dynamic(grace_duration, &mut *session)?, } } Command::Destroy { @@ -411,6 +414,7 @@ fn run() -> eyre::Result<()> { Command::Daemon { coordinator_addr, addr, + dynamic_node_port, machine_id, run_dataflow, } => { @@ -435,7 +439,7 @@ fn run() -> eyre::Result<()> { if coordinator_addr.ip() == LOCALHOST { tracing::info!("Starting in local mode"); } - Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr, dynamic_node_port).await } } }) @@ -476,7 +480,7 @@ fn start_dataflow( } } -fn stop_dataflow_interactive( +fn stop_dataflow_dynamic( grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5032d6a5..4993cdf7 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -655,7 +655,7 @@ async fn send_heartbeat_message( inner: DaemonCoordinatorEvent::Heartbeat, timestamp, }) - .unwrap(); + .context("Could not serialize heartbeat message")?; tcp_send(connection, &message) .await diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 2b0ea639..6c4d58bd 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -8,7 +8,7 @@ use dora_core::daemon_messages::{ use dora_core::descriptor::runtime_node_inputs; use dora_core::message::uhlc::{self, HLC}; use dora_core::message::{ArrowTypeInfo, Metadata, MetadataParameters}; -use dora_core::topics::{DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, LOCALHOST}; +use dora_core::topics::LOCALHOST; use dora_core::{ config::{DataId, InputMapping, NodeId}, coordinator_messages::DaemonEvent, @@ -87,7 +87,8 @@ impl Daemon { pub async fn run( coordinator_addr: SocketAddr, machine_id: String, - bind_addr: SocketAddr, + inter_daemon_addr: SocketAddr, + dynamic_node_port: u16, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); @@ -96,7 +97,8 @@ impl Daemon { // spawn inter daemon listen loop let (events_tx, events_rx) = flume::bounded(10); let listen_port = - inter_daemon::spawn_listener_loop(bind_addr, machine_id.clone(), events_tx).await?; + inter_daemon::spawn_listener_loop(inter_daemon_addr, machine_id.clone(), events_tx) + .await?; let daemon_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::Daemon(e.inner), timestamp: e.timestamp, @@ -119,11 +121,12 @@ impl Daemon { // Spawn dynamic node listener loop let (events_tx, events_rx) = flume::bounded(10); - let dynamic_node_address = - SocketAddr::new(LOCALHOST, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT); // TODO: Make this config - let _listen_port = - dynamic_node::spawn_listener_loop(dynamic_node_address, machine_id.clone(), events_tx) - .await?; + let _listen_port = dynamic_node::spawn_listener_loop( + (LOCALHOST, dynamic_node_port).into(), + machine_id.clone(), + events_tx, + ) + .await?; let dynamic_node_events = events_rx.into_stream().map(|e| Timestamped { inner: Event::DynamicNode(e.inner), timestamp: e.timestamp, diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 89c86073..92e8b165 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -270,7 +270,9 @@ pub async fn spawn_node( .expect("Failed to create log file"); let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); - let pid = child.id().unwrap(); + let pid = child.id().context( + "Could not get the pid for the just spawned node and indicate that there is an error", + )?; let running_node = RunningNode { pid: Some(pid), node_config, diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 32a695cc..dae921ee 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -52,8 +52,8 @@ async fn main() -> eyre::Result<()> { ) .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); - let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); - let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); + let daemon_a = run_daemon(coordinator_addr.to_string(), "A", 9843); // Random port + let daemon_b = run_daemon(coordinator_addr.to_string(), "B", 9842); tracing::info!("Spawning coordinator and daemons"); let mut tasks = JoinSet::new(); @@ -211,7 +211,11 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { Ok(()) } -async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { +async fn run_daemon( + coordinator: String, + machine_id: &str, + dynamic_node_port: u16, +) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); cmd.arg("run"); @@ -221,7 +225,9 @@ async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { .arg("--machine-id") .arg(machine_id) .arg("--coordinator-addr") - .arg(coordinator); + .arg(coordinator) + .arg("--dynamic-node-port") + .arg(dynamic_node_port.to_string()); if !cmd.status().await?.success() { bail!("failed to run dataflow"); }; From 7651ef82ad85bfb31e975f54d40ae5292929c856 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 8 Jun 2024 22:00:57 +0200 Subject: [PATCH 03/16] adding requirements installation within the CI --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b9eae42e..76c5dfe7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,6 +318,7 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s + pip install -r ../examples/python-dataflow/requirements.txt dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic python ../examples/python-dataflow/plot_dynamic.py sleep 5 From faed55c42c8d98877ff3a82c2ef99bbb2060f40e Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 9 Jun 2024 09:01:14 +0200 Subject: [PATCH 04/16] Reuse example virtual env --- .github/workflows/ci.yml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 76c5dfe7..c24e22d5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,9 +318,15 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s - pip install -r ../examples/python-dataflow/requirements.txt - dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic - python ../examples/python-dataflow/plot_dynamic.py + cd .. + if [ ! -d "examples/.env/bin" ]; then # Reuse Example venv + mv examples/.env/Scripts examples/.env/bin # venv is placed under `Scripts` on Windows + fi + source examples/.env/bin/activate + dora destroy # Restart the daemon with the current virtual environment + dora up + dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + python examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s dora destroy From fe8722b7f5e23af668008566d1ce3364bb55b33b Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 9 Jun 2024 10:00:50 +0200 Subject: [PATCH 05/16] Use tmp dir to free up disk space on /tmp --- .github/workflows/ci.yml | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c24e22d5..c691d990 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,15 +318,9 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s - cd .. - if [ ! -d "examples/.env/bin" ]; then # Reuse Example venv - mv examples/.env/Scripts examples/.env/bin # venv is placed under `Scripts` on Windows - fi - source examples/.env/bin/activate - dora destroy # Restart the daemon with the current virtual environment - dora up - dora start examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic - python examples/python-dataflow/plot_dynamic.py + pip install -r ../examples/python-dataflow/requirements.txt --cache-dir=. + dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic + python ../examples/python-dataflow/plot_dynamic.py sleep 5 dora stop --name ci-python-test --grace-duration 5s dora destroy From 5c97affc65afc8899c9855cc36feb4e3255f36de Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sun, 9 Jun 2024 11:03:37 +0200 Subject: [PATCH 06/16] Remove object detection from dynamic plot for CI purpose --- .github/workflows/ci.yml | 2 +- examples/python-dataflow/dataflow_dynamic.yml | 9 --------- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c691d990..3c04f38a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -318,7 +318,7 @@ jobs: dora start dataflow.yml --name ci-python-test sleep 10 dora stop --name ci-python-test --grace-duration 5s - pip install -r ../examples/python-dataflow/requirements.txt --cache-dir=. + pip install opencv-python dora start ../examples/python-dataflow/dataflow_dynamic.yml --name ci-python-dynamic python ../examples/python-dataflow/plot_dynamic.py sleep 5 diff --git a/examples/python-dataflow/dataflow_dynamic.yml b/examples/python-dataflow/dataflow_dynamic.yml index d43efc83..677f8a7f 100644 --- a/examples/python-dataflow/dataflow_dynamic.yml +++ b/examples/python-dataflow/dataflow_dynamic.yml @@ -9,17 +9,8 @@ nodes: outputs: - image - - id: object_detection - custom: - source: ./object_detection.py - inputs: - image: webcam/image - outputs: - - bbox - - id: plot custom: source: dynamic inputs: image: webcam/image - bbox: object_detection/bbox From 8c77459f5adac369b06da45321f84915811c2804 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 10 Jun 2024 20:07:42 +0200 Subject: [PATCH 07/16] Fix typo in cli stop interactive --- binaries/cli/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index ca447811..d54a43aa 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -379,7 +379,7 @@ fn run() -> eyre::Result<()> { match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, - (None, None) => stop_dataflow_dynamic(grace_duration, &mut *session)?, + (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, } } Command::Destroy { @@ -480,7 +480,7 @@ fn start_dataflow( } } -fn stop_dataflow_dynamic( +fn stop_dataflow_interactive( grace_duration: Option, session: &mut TcpRequestReplyConnection, ) -> eyre::Result<()> { From a0a95b730cd12723075c7c6e6ca3ee8de678e60f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 10 Jun 2024 20:41:04 +0200 Subject: [PATCH 08/16] Rename dynamic node listener -> local listener --- apis/rust/node/src/node/mod.rs | 4 ++-- binaries/cli/src/main.rs | 16 ++++++++-------- binaries/daemon/src/lib.rs | 12 ++++++------ .../src/{dynamic_node.rs => local_listener.rs} | 10 +++++----- examples/multiple-daemons/run.rs | 4 ++-- libraries/core/src/topics.rs | 2 +- 6 files changed, 24 insertions(+), 24 deletions(-) rename binaries/daemon/src/{dynamic_node.rs => local_listener.rs} (93%) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 31fdd8a6..8995a8fc 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -12,7 +12,7 @@ use dora_core::{ daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, descriptor::Descriptor, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, - topics::{DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, LOCALHOST}, + topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST}, }; use eyre::{bail, WrapErr}; @@ -90,7 +90,7 @@ impl DoraNode { return Self::init(node_config); } - let daemon_address = (LOCALHOST, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT).into(); + let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(); let mut channel = DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index d54a43aa..e3f900b2 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -6,7 +6,7 @@ use dora_core::{ descriptor::Descriptor, topics::{ ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -176,10 +176,10 @@ enum Command { machine_id: Option, /// The inter daemon IP address and port this daemon will bind to. #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] - addr: SocketAddr, - /// The dynamic node port this daemon will bind to. - #[clap(long, default_value_t = DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT)] - dynamic_node_port: u16, + inter_daemon_addr: SocketAddr, + /// Local listen port for event such as dynamic node. + #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] + local_listen_port: u16, /// Address and port number of the dora coordinator #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] coordinator_addr: SocketAddr, @@ -413,8 +413,8 @@ fn run() -> eyre::Result<()> { } Command::Daemon { coordinator_addr, - addr, - dynamic_node_port, + inter_daemon_addr, + local_listen_port, machine_id, run_dataflow, } => { @@ -439,7 +439,7 @@ fn run() -> eyre::Result<()> { if coordinator_addr.ip() == LOCALHOST { tracing::info!("Starting in local mode"); } - Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr, dynamic_node_port).await + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await } } }) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 6c4d58bd..72c699e3 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -19,11 +19,11 @@ use dora_core::{ descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; -use dynamic_node::DynamicNodeEventWrapper; use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; +use local_listener::DynamicNodeEventWrapper; use pending::PendingNodes; use shared_memory_server::ShmemConf; use std::sync::Arc; @@ -49,8 +49,8 @@ use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; mod coordinator; -mod dynamic_node; mod inter_daemon; +mod local_listener; mod log; mod node_communication; mod pending; @@ -88,7 +88,7 @@ impl Daemon { coordinator_addr: SocketAddr, machine_id: String, inter_daemon_addr: SocketAddr, - dynamic_node_port: u16, + local_listen_port: u16, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); @@ -119,10 +119,10 @@ impl Daemon { }, ); - // Spawn dynamic node listener loop + // Spawn local listener loop let (events_tx, events_rx) = flume::bounded(10); - let _listen_port = dynamic_node::spawn_listener_loop( - (LOCALHOST, dynamic_node_port).into(), + let _listen_port = local_listener::spawn_listener_loop( + (LOCALHOST, local_listen_port).into(), machine_id.clone(), events_tx, ) diff --git a/binaries/daemon/src/dynamic_node.rs b/binaries/daemon/src/local_listener.rs similarity index 93% rename from binaries/daemon/src/dynamic_node.rs rename to binaries/daemon/src/local_listener.rs index 7b1f9dc3..dbffe39e 100644 --- a/binaries/daemon/src/dynamic_node.rs +++ b/binaries/daemon/src/local_listener.rs @@ -31,7 +31,7 @@ pub async fn spawn_listener_loop( tokio::spawn(async move { listener_loop(socket, events_tx).await; - tracing::debug!("Dynamic node listener loop finished for machine `{machine_id}`"); + tracing::debug!("Local listener loop finished for machine `{machine_id}`"); }); Ok(listen_port) @@ -100,7 +100,7 @@ async fn handle_connection_loop( } }; if let Err(err) = tcp_send(&mut connection, &serialized).await { - tracing::warn!("failed to send reply to dynamic node: {err}"); + tracing::warn!("failed to send reply: {err}"); continue; }; } @@ -111,7 +111,7 @@ async fn handle_connection_loop( break; } _ => tracing::warn!( - "Unexpected Daemon Request that is not yet by Additional dynamic node controls" + "Unexpected Daemon Request that is not yet by Additional local listener controls" ), } } @@ -128,11 +128,11 @@ async fn receive_message( | ErrorKind::ConnectionReset => return Ok(None), _other => { return Err(err) - .context("unexpected I/O error while trying to receive DynamicNodeEvent") + .context("unexpected I/O error while trying to receive DaemonRequest") } }, }; bincode::deserialize(&raw) - .wrap_err("failed to deserialize DynamicNodeEvent") + .wrap_err("failed to deserialize DaemonRequest") .map(Some) } diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index dae921ee..59e75fb0 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -214,7 +214,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { async fn run_daemon( coordinator: String, machine_id: &str, - dynamic_node_port: u16, + local_listen_port: u16, ) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); @@ -227,7 +227,7 @@ async fn run_daemon( .arg("--coordinator-addr") .arg(coordinator) .arg("--dynamic-node-port") - .arg(dynamic_node_port.to_string()); + .arg(local_listen_port.to_string()); if !cmd.status().await?.success() { bail!("failed to run dataflow"); }; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 4da3ab7b..8e90e48f 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -14,7 +14,7 @@ use crate::{ pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; -pub const DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT: u16 = 0xD02B; +pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B; pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; From 3f2f274036f0b380d12b7b9957244a0526d60e3f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 10 Jun 2024 20:50:51 +0200 Subject: [PATCH 09/16] Remove unnecessary condition on dynamic node --- apis/rust/node/src/node/mod.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 8995a8fc..3f60f01a 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -45,7 +45,6 @@ pub struct DoraNode { cache: VecDeque, dataflow_descriptor: Descriptor, - dynamic: bool, } impl DoraNode { @@ -121,7 +120,7 @@ impl DoraNode { run_config, daemon_communication, dataflow_descriptor, - dynamic, + dynamic: _, } = node_config; let clock = Arc::new(uhlc::HLC::default()); @@ -144,7 +143,6 @@ impl DoraNode { sent_out_shared_memory: HashMap::new(), drop_stream, cache: VecDeque::new(), - dynamic, dataflow_descriptor, }; Ok((node, event_stream)) @@ -382,18 +380,16 @@ impl Drop for DoraNode { #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")] fn drop(&mut self) { // close all outputs first to notify subscribers as early as possible - if !self.dynamic { - if let Err(err) = self - .control_channel - .report_closed_outputs( - std::mem::take(&mut self.node_config.outputs) - .into_iter() - .collect(), - ) - .context("failed to close outputs on drop") - { - tracing::warn!("{err:?}") - } + if let Err(err) = self + .control_channel + .report_closed_outputs( + std::mem::take(&mut self.node_config.outputs) + .into_iter() + .collect(), + ) + .context("failed to close outputs on drop") + { + tracing::warn!("{err:?}") } while !self.sent_out_shared_memory.is_empty() { From a96a5d6b790c8f20efdaf2fa68022824350f3a91 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 12 Jun 2024 12:33:53 +0200 Subject: [PATCH 10/16] Update local listen port argument --- examples/multiple-daemons/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 59e75fb0..049dc7d8 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -226,7 +226,7 @@ async fn run_daemon( .arg(machine_id) .arg("--coordinator-addr") .arg(coordinator) - .arg("--dynamic-node-port") + .arg("--local-listen-port") .arg(local_listen_port.to_string()); if !cmd.status().await?.success() { bail!("failed to run dataflow"); From 0e2858ccd61e84dedccdee445480622e34ac0eb1 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 13:44:02 +0200 Subject: [PATCH 11/16] Add `--quiet` flag to daemon and coordinator Suppresses all log messages to stdout. --- binaries/cli/src/main.rs | 23 +++++++++++++++---- .../extensions/telemetry/tracing/src/lib.rs | 14 +++++++++-- 2 files changed, 30 insertions(+), 7 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 099aed2e..5ea7c003 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -12,6 +12,7 @@ use dora_core::{ use dora_daemon::Daemon; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; +use dora_tracing::set_up_tracing_opts; use duration_str::parse; use eyre::{bail, Context}; use std::net::SocketAddr; @@ -182,6 +183,9 @@ enum Command { coordinator_addr: SocketAddr, #[clap(long, hide = true)] run_dataflow: Option, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, }, /// Run runtime Runtime, @@ -199,6 +203,9 @@ enum Command { /// Port number to bind to for control communication #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] control_port: u16, + /// Suppresses all log output to stdout. + #[clap(long)] + quiet: bool, }, } @@ -244,14 +251,16 @@ fn run() -> eyre::Result<()> { #[cfg(feature = "tracing")] match args.command { - Command::Daemon { .. } => { - set_up_tracing("dora-daemon").context("failed to set up tracing subscriber")?; + Command::Daemon { quiet, .. } => { + set_up_tracing_opts("dora-daemon", !quiet) + .context("failed to set up tracing subscriber")?; } Command::Runtime => { // Do not set the runtime in the cli. } - Command::Coordinator { .. } => { - set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?; + Command::Coordinator { quiet, .. } => { + set_up_tracing_opts("dora-coordinator", !quiet) + .context("failed to set up tracing subscriber")?; } _ => { set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; @@ -392,6 +401,7 @@ fn run() -> eyre::Result<()> { port, control_interface, control_port, + quiet, } => { let rt = Builder::new_multi_thread() .enable_all() @@ -403,7 +413,9 @@ fn run() -> eyre::Result<()> { let (port, task) = dora_coordinator::start(bind, bind_control, futures::stream::empty::()) .await?; - println!("Listening for incoming daemon connection on {port}"); + if !quiet { + println!("Listening for incoming daemon connection on {port}"); + } task.await }) .context("failed to run dora-coordinator")? @@ -413,6 +425,7 @@ fn run() -> eyre::Result<()> { addr, machine_id, run_dataflow, + quiet: _, } => { let rt = Builder::new_multi_thread() .enable_all() diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index b51b8eb4..93bb09b7 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -14,11 +14,21 @@ use tracing_subscriber::Registry; pub mod telemetry; pub fn set_up_tracing(name: &str) -> eyre::Result<()> { + set_up_tracing_opts(name, true) +} + +pub fn set_up_tracing_opts(name: &str, stdout: bool) -> eyre::Result<()> { + let stdout_filter = if stdout { + LevelFilter::TRACE + } else { + LevelFilter::OFF + }; // Filter log using `RUST_LOG`. More useful for CLI. - let filter = EnvFilter::from_default_env().or(LevelFilter::WARN); + let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN); let stdout_log = tracing_subscriber::fmt::layer() .pretty() - .with_filter(filter); + .with_filter(stdout_filter) + .with_filter(env_filter); let registry = Registry::default().with(stdout_log); if let Some(endpoint) = std::env::var_os("DORA_JAEGER_TRACING") { From b158796ac25270042d5a0873f69e54181b4a0e1b Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 13:46:17 +0200 Subject: [PATCH 12/16] Spawn daemon and coordinator in quiet mode on `dora up` --- binaries/cli/src/up.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 66b2bd20..d1376d7c 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -85,6 +85,7 @@ fn start_coordinator() -> eyre::Result<()> { let mut cmd = Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); cmd.arg("coordinator"); + cmd.arg("--quiet"); cmd.spawn().wrap_err("failed to run `dora coordinator`")?; println!("started dora coordinator"); @@ -96,6 +97,7 @@ fn start_daemon() -> eyre::Result<()> { let mut cmd = Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); cmd.arg("daemon"); + cmd.arg("--quiet"); cmd.spawn().wrap_err("failed to run `dora daemon`")?; println!("started dora daemon"); From b7f21dbd4a41b650d1c65e73ea78f00e2b70f4db Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 14:28:21 +0200 Subject: [PATCH 13/16] Log deamon and coordinator output to files Logs the deamon output to `out/dora-daemon.txt` and the coordinator output to `out/dora-coordinator.txt`. --- binaries/cli/src/main.rs | 4 +- .../extensions/telemetry/tracing/src/lib.rs | 61 ++++++++++++------- 2 files changed, 40 insertions(+), 25 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 5ea7c003..37698422 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -252,14 +252,14 @@ fn run() -> eyre::Result<()> { #[cfg(feature = "tracing")] match args.command { Command::Daemon { quiet, .. } => { - set_up_tracing_opts("dora-daemon", !quiet) + set_up_tracing_opts("dora-daemon", !quiet, true) .context("failed to set up tracing subscriber")?; } Command::Runtime => { // Do not set the runtime in the cli. } Command::Coordinator { quiet, .. } => { - set_up_tracing_opts("dora-coordinator", !quiet) + set_up_tracing_opts("dora-coordinator", !quiet, true) .context("failed to set up tracing subscriber")?; } _ => { diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index 93bb09b7..d7baca3d 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -3,6 +3,8 @@ //! This module init a tracing propagator for Rust code that requires tracing, and is //! able to serialize and deserialize context that has been sent via the middleware. +use std::path::Path; + use eyre::Context as EyreContext; use tracing::metadata::LevelFilter; use tracing_subscriber::{ @@ -14,23 +16,38 @@ use tracing_subscriber::Registry; pub mod telemetry; pub fn set_up_tracing(name: &str) -> eyre::Result<()> { - set_up_tracing_opts(name, true) + set_up_tracing_opts(name, true, false) } -pub fn set_up_tracing_opts(name: &str, stdout: bool) -> eyre::Result<()> { - let stdout_filter = if stdout { - LevelFilter::TRACE - } else { - LevelFilter::OFF - }; - // Filter log using `RUST_LOG`. More useful for CLI. - let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN); - let stdout_log = tracing_subscriber::fmt::layer() - .pretty() - .with_filter(stdout_filter) - .with_filter(env_filter); - - let registry = Registry::default().with(stdout_log); +pub fn set_up_tracing_opts(name: &str, stdout: bool, file: bool) -> eyre::Result<()> { + let mut layers = Vec::new(); + + if stdout { + // Filter log using `RUST_LOG`. More useful for CLI. + let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN); + let layer = tracing_subscriber::fmt::layer() + .pretty() + .with_filter(env_filter); + layers.push(layer.boxed()); + } + + if file { + let out_dir = Path::new("out"); + std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?; + let path = out_dir.join(format!("{name}.txt")); + let file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path) + .context("failed to create log file")?; + // Filter log using `RUST_LOG`. More useful for CLI. + let layer = tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_writer(file) + .with_filter(LevelFilter::INFO); + layers.push(layer.boxed()); + } + if let Some(endpoint) = std::env::var_os("DORA_JAEGER_TRACING") { let endpoint = endpoint .to_str() @@ -38,13 +55,11 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool) -> eyre::Result<()> { let tracer = crate::telemetry::init_jaeger_tracing(name, endpoint) .wrap_err("Could not instantiate tracing")?; let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - let subscriber = registry.with(telemetry); - tracing::subscriber::set_global_default(subscriber).context(format!( - "failed to set tracing global subscriber for {name}" - )) - } else { - tracing::subscriber::set_global_default(registry).context(format!( - "failed to set tracing global subscriber for {name}" - )) + layers.push(telemetry.boxed()); } + + let registry = Registry::default().with(layers); + tracing::subscriber::set_global_default(registry).context(format!( + "failed to set tracing global subscriber for {name}" + )) } From 77f9ba903e4b27f2d3ed8da4fba7e57b09a0e5e4 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 14:28:47 +0200 Subject: [PATCH 14/16] Use compact formatting for stdout logs We now have the full logs in the output file. --- libraries/extensions/telemetry/tracing/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index d7baca3d..c0e83b6d 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -26,7 +26,7 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, file: bool) -> eyre::Result // Filter log using `RUST_LOG`. More useful for CLI. let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN); let layer = tracing_subscriber::fmt::layer() - .pretty() + .compact() .with_filter(env_filter); layers.push(layer.boxed()); } From e81473f223d8e3959a736275919fc82781f226dc Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 12 Jun 2024 14:32:44 +0200 Subject: [PATCH 15/16] Include machine ID in daemon log file name if set --- binaries/cli/src/main.rs | 16 ++++++++++++---- .../extensions/telemetry/tracing/src/lib.rs | 8 ++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 37698422..fda955d2 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -250,16 +250,24 @@ fn run() -> eyre::Result<()> { let args = Args::parse(); #[cfg(feature = "tracing")] - match args.command { - Command::Daemon { quiet, .. } => { - set_up_tracing_opts("dora-daemon", !quiet, true) + match &args.command { + Command::Daemon { + quiet, machine_id, .. + } => { + let name = "dora-daemon"; + let filename = machine_id + .as_ref() + .map(|id| format!("{name}-{id}")) + .unwrap_or(name.to_string()); + set_up_tracing_opts(name, !quiet, Some(&filename)) .context("failed to set up tracing subscriber")?; } Command::Runtime => { // Do not set the runtime in the cli. } Command::Coordinator { quiet, .. } => { - set_up_tracing_opts("dora-coordinator", !quiet, true) + let name = "dora-coordinator"; + set_up_tracing_opts(name, !quiet, Some(name)) .context("failed to set up tracing subscriber")?; } _ => { diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index c0e83b6d..10c27723 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -16,10 +16,10 @@ use tracing_subscriber::Registry; pub mod telemetry; pub fn set_up_tracing(name: &str) -> eyre::Result<()> { - set_up_tracing_opts(name, true, false) + set_up_tracing_opts(name, true, None) } -pub fn set_up_tracing_opts(name: &str, stdout: bool, file: bool) -> eyre::Result<()> { +pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> { let mut layers = Vec::new(); if stdout { @@ -31,10 +31,10 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, file: bool) -> eyre::Result layers.push(layer.boxed()); } - if file { + if let Some(filename) = filename { let out_dir = Path::new("out"); std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?; - let path = out_dir.join(format!("{name}.txt")); + let path = out_dir.join(filename).with_extension("txt"); let file = std::fs::OpenOptions::new() .create(true) .append(true) From 7d3a5de1c50b277ddc48322f0608a1b404480c51 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Wed, 12 Jun 2024 15:52:06 +0200 Subject: [PATCH 16/16] Make rust flexible logic and make sure to always use env variables DORA_NODE_CONFIG if it exists. This remove the complexity of having to rename the node id when used within a yaml configuration. --- apis/python/node/src/lib.rs | 5 +++-- apis/rust/node/src/node/mod.rs | 25 +++++++++++++------------ binaries/daemon/src/lib.rs | 12 ++++++++---- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index b48ec762..67c642f9 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -35,9 +35,10 @@ impl Node { #[new] pub fn new(node_id: Option) -> eyre::Result { let (node, events) = if let Some(node_id) = node_id { - DoraNode::init_from_node_id(NodeId::from(node_id))? + DoraNode::init_flexible(NodeId::from(node_id)) + .context("Could not setup node from node id. Make sure to have a running dataflow with this dynamic node")? } else { - DoraNode::init_from_env()? + DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")? }; Ok(Node { diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 3f60f01a..9eb4b18e 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -23,6 +23,7 @@ use std::{ sync::Arc, time::Duration, }; +use tracing::info; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; @@ -58,8 +59,9 @@ impl DoraNode { /// pub fn init_from_env() -> eyre::Result<(Self, EventStream)> { let node_config: NodeConfig = { - let raw = std::env::var("DORA_NODE_CONFIG") - .wrap_err("env variable DORA_NODE_CONFIG must be set")?; + let raw = std::env::var("DORA_NODE_CONFIG").wrap_err( + "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?", + )?; serde_yaml::from_str(&raw).context("failed to deserialize operator config")? }; #[cfg(feature = "tracing")] @@ -79,16 +81,6 @@ impl DoraNode { /// pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { // Make sure that the node is initialized outside of dora start. - if let Ok(node_config_string) = std::env::var("DORA_NODE_CONFIG") { - let node_config: NodeConfig = serde_yaml::from_str(&node_config_string) - .context("failed to deserialize operator config")?; - assert!( - node_config.node_id == node_id, - "Node id within the yaml description and the node_id does not match. Please either run this node in either dynamic mode or change or remove `node_id` specification in the code." - ); - return Self::init(node_config); - } - let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(); let mut channel = @@ -112,6 +104,15 @@ impl DoraNode { } } + pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { + if std::env::var("DORA_NODE_CONFIG").is_ok() { + info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); + Self::init_from_env() + } else { + Self::init_from_node_id(node_id) + } + } + #[tracing::instrument] pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> { let NodeConfig { diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 72c699e3..521b5bd8 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -678,17 +678,21 @@ impl Daemon { .running .iter() .filter(|(_id, dataflow)| dataflow.running_nodes.contains_key(&node_id)) - .map(|(_id, dataflow)| -> Result { - Ok(dataflow + .map(|(id, dataflow)| -> Result { + let node_config = dataflow .running_nodes .get(&node_id) .context("no node with ID `{node_id}` within the given dataflow")? .node_config - .clone()) + .clone(); + if !node_config.dynamic { + bail!("node with ID `{node_id}` in {id} is not dynamic"); + } + Ok(node_config) }) .next() .context("no node with ID `{node_id}`")? - .context("failed to get node config within given dataflow")?, + .context("failed to get dynamic node config within given dataflow")?, 0 => { let _ = reply_tx.send(Some(DaemonReply::NodeConfig { result: Err("no node with ID `{node_id}`".to_string()),