Browse Source

Subscribe to required outputs of remote nodes when spawning dataflow

tags/v0.3.10-rc3
Philipp Oppermann 11 months ago
parent
commit
33163c2fe7
Failed to extract signature
2 changed files with 82 additions and 5 deletions
  1. +74
    -4
      binaries/daemon/src/lib.rs
  2. +8
    -1
      libraries/message/src/common.rs

+ 74
- 4
binaries/daemon/src/lib.rs View File

@@ -47,6 +47,7 @@ use tokio::{
io::AsyncReadExt,
net::TcpStream,
sync::{
broadcast,
mpsc::{self, UnboundedSender},
oneshot::{self, Sender},
},
@@ -90,6 +91,7 @@ pub struct Daemon {
clock: Arc<uhlc::HLC>,

zenoh_session: zenoh::Session,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
}

type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
@@ -129,6 +131,7 @@ impl Daemon {
daemon_id,
None,
clock,
Some(remote_daemon_events_tx),
)
.await
.map(|_| ())
@@ -182,6 +185,7 @@ impl Daemon {
DaemonId::new(None),
Some(exit_when_done),
clock.clone(),
None,
);

let spawn_result = reply_rx
@@ -212,6 +216,7 @@ impl Daemon {
daemon_id: DaemonId,
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
) -> eyre::Result<DaemonRunResult> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
@@ -244,6 +249,7 @@ impl Daemon {
dataflow_node_results: BTreeMap::new(),
clock,
zenoh_session,
remote_daemon_events_tx,
};

let dora_events = ReceiverStream::new(dora_events_rx);
@@ -324,6 +330,9 @@ impl Daemon {
tracing::warn!("received second ctrlc signal -> exit immediately");
bail!("received second ctrl-c signal");
}
Event::DaemonError(err) => {
tracing::error!("Daemon error: {err:?}");
}
}
}

@@ -730,7 +739,56 @@ impl Daemon {
}
}
} else {
// wait until node is ready before starting
dataflow.pending_nodes.set_external_nodes(true);

// subscribe to all node outputs that are mapped to some local inputs
for output_id in dataflow.mappings.keys().filter(|o| o.0 == node.id) {
let tx = self
.remote_daemon_events_tx
.clone()
.wrap_err("no remote_daemon_events_tx channel")?;
let mut finished_rx = dataflow.finished_tx.subscribe();
let subscriber = self
.zenoh_session
.declare_subscriber(dataflow.output_publish_topic(output_id))
.await
.map_err(|e| eyre!(e))
.wrap_err_with(|| format!("failed to subscribe to {output_id:?}"))?;
tokio::spawn(async move {
let mut finished = pin!(finished_rx.recv());
loop {
let finished_or_next =
futures::future::select(finished, subscriber.recv_async());
match finished_or_next.await {
future::Either::Left((finished, _)) => {
match finished {
Err(broadcast::error::RecvError::Closed) => {
tracing::debug!("dataflow finished, breaking from zenoh subscribe task");
break;
}
other => {
tracing::warn!("unexpected return value of dataflow finished_rx channel: {other:?}");
break;
}
}
}
future::Either::Right((sample, f)) => {
finished = f;
let event = sample.map_err(|e| eyre!(e)).and_then(|s| {
Timestamped::deserialize_inter_daemon_event(
&s.payload().to_bytes(),
)
});
if tx.send_async(event).await.is_err() {
// daemon finished
break;
}
}
}
}
});
}
}
}

@@ -1393,13 +1451,20 @@ async fn set_up_event_stream(
coordinator_addr: SocketAddr,
machine_id: &Option<String>,
clock: &Arc<HLC>,
remote_daemon_events_rx: flume::Receiver<Timestamped<InterDaemonEvent>>,
remote_daemon_events_rx: flume::Receiver<eyre::Result<Timestamped<InterDaemonEvent>>>,
// used for dynamic nodes
local_listen_port: u16,
) -> eyre::Result<(DaemonId, impl Stream<Item = Timestamped<Event>> + Unpin)> {
let remote_daemon_events = remote_daemon_events_rx.into_stream().map(|e| Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
let clock_cloned = clock.clone();
let remote_daemon_events = remote_daemon_events_rx.into_stream().map(move |e| match e {
Ok(e) => Timestamped {
inner: Event::Daemon(e.inner),
timestamp: e.timestamp,
},
Err(err) => Timestamped {
inner: Event::DaemonError(err),
timestamp: clock_cloned.new_timestamp(),
},
});
let (daemon_id, coordinator_events) =
coordinator::register(coordinator_addr, machine_id.clone(), clock)
@@ -1625,10 +1690,13 @@ pub struct RunningDataflow {
node_stderr_most_recent: BTreeMap<NodeId, Arc<ArrayQueue<String>>>,

publishers: BTreeMap<OutputId, zenoh::pubsub::Publisher<'static>>,

finished_tx: broadcast::Sender<()>,
}

impl RunningDataflow {
fn new(dataflow_id: Uuid, daemon_id: DaemonId) -> RunningDataflow {
let (finished_tx, _) = broadcast::channel(1);
Self {
id: dataflow_id,
pending_nodes: PendingNodes::new(dataflow_id, daemon_id),
@@ -1648,6 +1716,7 @@ impl RunningDataflow {
grace_duration_kills: Default::default(),
node_stderr_most_recent: BTreeMap::new(),
publishers: Default::default(),
finished_tx,
}
}

@@ -1833,6 +1902,7 @@ pub enum Event {
HeartbeatInterval,
CtrlC,
SecondCtrlC,
DaemonError(eyre::Report),
}

impl From<DoraEvent> for Event {


+ 8
- 1
libraries/message/src/common.rs View File

@@ -2,9 +2,10 @@ use core::fmt;
use std::borrow::Cow;

use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{id::NodeId, DataflowId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId};

pub use log::Level as LogLevel;

@@ -138,6 +139,12 @@ where
}
}

impl Timestamped<InterDaemonEvent> {
pub fn deserialize_inter_daemon_event(bytes: &[u8]) -> eyre::Result<Self> {
bincode::deserialize(bytes).wrap_err("failed to deserialize InterDaemonEvent")
}
}

pub type SharedMemoryId = String;

#[derive(serde::Serialize, serde::Deserialize, Clone)]


Loading…
Cancel
Save