diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 103f65e2..53de02a8 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -47,6 +47,7 @@ use tokio::{ io::AsyncReadExt, net::TcpStream, sync::{ + broadcast, mpsc::{self, UnboundedSender}, oneshot::{self, Sender}, }, @@ -90,6 +91,7 @@ pub struct Daemon { clock: Arc, zenoh_session: zenoh::Session, + remote_daemon_events_tx: Option>>>, } type DaemonRunResult = BTreeMap>>; @@ -129,6 +131,7 @@ impl Daemon { daemon_id, None, clock, + Some(remote_daemon_events_tx), ) .await .map(|_| ()) @@ -182,6 +185,7 @@ impl Daemon { DaemonId::new(None), Some(exit_when_done), clock.clone(), + None, ); let spawn_result = reply_rx @@ -212,6 +216,7 @@ impl Daemon { daemon_id: DaemonId, exit_when_done: Option>, clock: Arc, + remote_daemon_events_tx: Option>>>, ) -> eyre::Result { let coordinator_connection = match coordinator_addr { Some(addr) => { @@ -244,6 +249,7 @@ impl Daemon { dataflow_node_results: BTreeMap::new(), clock, zenoh_session, + remote_daemon_events_tx, }; let dora_events = ReceiverStream::new(dora_events_rx); @@ -324,6 +330,9 @@ impl Daemon { tracing::warn!("received second ctrlc signal -> exit immediately"); bail!("received second ctrl-c signal"); } + Event::DaemonError(err) => { + tracing::error!("Daemon error: {err:?}"); + } } } @@ -730,7 +739,56 @@ impl Daemon { } } } else { + // wait until node is ready before starting dataflow.pending_nodes.set_external_nodes(true); + + // subscribe to all node outputs that are mapped to some local inputs + for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) { + let tx = self + .remote_daemon_events_tx + .clone() + .wrap_err("no remote_daemon_events_tx channel")?; + let mut finished_rx = dataflow.finished_tx.subscribe(); + let subscriber = self + .zenoh_session + .declare_subscriber(dataflow.output_publish_topic(output_id)) + .await + .map_err(|e| eyre!(e)) + .wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?; + tokio::spawn(async move { + let mut finished = pin!(finished_rx.recv()); + loop { + let finished_or_next = + futures::future::select(finished, subscriber.recv_async()); + match finished_or_next.await { + future::Either::Left((finished, _)) => { + match finished { + Err(broadcast::error::RecvError::Closed) => { + tracing::debug!("dataflow finished, breaking from zenoh subscribe task"); + break; + } + other => { + tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}"); + break; + } + } + } + future::Either::Right((sample, f)) => { + finished = f; + let event = sample.map_err(|e| eyre!(e)).and_then(|s| { + Timestamped::deserialize_inter_daemon_event( + &s.payload().to_bytes(), + ) + }); + if tx.send_async(event).await.is_err() { + // daemon finished + break; + } + } + } + } + }); + } } } @@ -1393,13 +1451,20 @@ async fn set_up_event_stream( coordinator_addr: SocketAddr, machine_id: &Option, clock: &Arc, - remote_daemon_events_rx: flume::Receiver>, + remote_daemon_events_rx: flume::Receiver>>, // used for dynamic nodes local_listen_port: u16, ) -> eyre::Result<(DaemonId, impl Stream> + Unpin)> { - let remote_daemon_events = remote_daemon_events_rx.into_stream().map(|e| Timestamped { - inner: Event::Daemon(e.inner), - timestamp: e.timestamp, + let clock_cloned = clock.clone(); + let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e { + Ok(e) => Timestamped { + inner: Event::Daemon(e.inner), + timestamp: e.timestamp, + }, + Err(err) => Timestamped { + inner: Event::DaemonError(err), + timestamp: clock_cloned.new_timestamp(), + }, }); let (daemon_id, coordinator_events) = coordinator::register(coordinator_addr, machine_id.clone(), clock) @@ -1625,10 +1690,13 @@ pub struct RunningDataflow { node_stderr_most_recent: BTreeMap>>, publishers: BTreeMap>, + + finished_tx: broadcast::Sender<()>, } impl RunningDataflow { fn new(dataflow_id: Uuid, daemon_id: DaemonId) -> RunningDataflow { + let (finished_tx, _) = broadcast::channel(1); Self { id: dataflow_id, pending_nodes: PendingNodes::new(dataflow_id, daemon_id), @@ -1648,6 +1716,7 @@ impl RunningDataflow { grace_duration_kills: Default::default(), node_stderr_most_recent: BTreeMap::new(), publishers: Default::default(), + finished_tx, } } @@ -1833,6 +1902,7 @@ pub enum Event { HeartbeatInterval, CtrlC, SecondCtrlC, + DaemonError(eyre::Report), } impl From for Event { diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index a9d5d049..037ac8f2 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -2,9 +2,10 @@ use core::fmt; use std::borrow::Cow; use aligned_vec::{AVec, ConstAlign}; +use eyre::Context as _; use uuid::Uuid; -use crate::{id::NodeId, DataflowId}; +use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId}; pub use log::Level as LogLevel; @@ -138,6 +139,12 @@ where } } +impl Timestamped { + pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result { + bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent") + } +} + pub type SharedMemoryId = String; #[derive(serde::Serialize, serde::Deserialize, Clone)]