diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 4a11e003..949df240 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -4,9 +4,10 @@ use crate::{ }; use control::ControlEvent; use dora_core::{ - config::{CommunicationConfig, NodeId, OperatorId}, + config::{CommunicationConfig, DataId, NodeId, OperatorId}, coordinator_messages::RegisterResult, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply}, + message::Metadata, topics::{ control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT, @@ -165,6 +166,27 @@ async fn start(tasks: &FuturesUnordered>) -> eyre::Result<()> { } } } + DataflowEvent::Output { + machine_id, + source_node, + output_id, + metadata, + data, + target_machines, + } => { + for target_machine in target_machines { + match daemon_connections.get_mut(&target_machine) { + Some(connection) => { + tracing::trace!( + "forwarding output `{uuid}/{source_node}/{output_id}` \ + from machine `{machine_id}` to machine `{target_machine}`" + ); + forward_output(connection, uuid, source_node.clone(), output_id.clone(), metadata.clone(), data.clone()).await?; + }, + None => tracing::warn!("received output event for unknown target machine `{target_machine}`"), + } + } + } }, Event::Control(event) => match event { @@ -347,6 +369,30 @@ async fn start(tasks: &FuturesUnordered>) -> eyre::Result<()> { Ok(()) } +async fn forward_output( + connection: &mut TcpStream, + uuid: Uuid, + source_node: NodeId, + output_id: DataId, + metadata: Metadata<'static>, + data: Option>, +) -> eyre::Result<()> { + let message = serde_json::to_vec(&DaemonCoordinatorEvent::Output { + dataflow_id: uuid, + node_id: source_node, + output_id, + metadata, + data, + }) + .wrap_err("failed to serialize output message")?; + + tcp_send(connection, &message) + .await + .wrap_err("failed to send output message to daemon")?; + + Ok(()) +} + fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); @@ -570,6 +616,16 @@ pub enum DataflowEvent { machine_id: String, result: eyre::Result<()>, }, + Output { + machine_id: String, + source_node: NodeId, + output_id: DataId, + + metadata: Metadata<'static>, + data: Option>, + + target_machines: BTreeSet, + }, } #[derive(Debug)] diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 01e91404..a71fdf71 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -82,6 +82,30 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende .await; } }, + coordinator_messages::CoordinatorRequest::Output { + source_machine, + dataflow_id, + source_node, + output_id, + metadata, + data, + target_machines, + } => { + let event = Event::Dataflow { + uuid: dataflow_id, + event: DataflowEvent::Output { + machine_id: source_machine, + source_node, + output_id, + metadata, + data, + target_machines, + }, + }; + if events_tx.send(event).await.is_err() { + break; + } + } }; } } diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 4b320d30..33efc154 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -212,7 +212,9 @@ impl Daemon { match event { Event::Coordinator(CoordinatorEvent { event, reply_tx }) => { let (reply, status) = self.handle_coordinator_event(event).await; - let _ = reply_tx.send(reply); + if let Some(reply) = reply { + let _ = reply_tx.send(reply); + } match status { RunStatus::Continue => {} RunStatus::Exit => break, @@ -259,7 +261,7 @@ impl Daemon { async fn handle_coordinator_event( &mut self, event: DaemonCoordinatorEvent, - ) -> (DaemonCoordinatorReply, RunStatus) { + ) -> (Option, RunStatus) { match event { DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes { dataflow_id, @@ -275,7 +277,7 @@ impl Daemon { } let reply = DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}"))); - (reply, RunStatus::Continue) + (Some(reply), RunStatus::Continue) } DaemonCoordinatorEvent::ReloadDataflow { dataflow_id, @@ -285,7 +287,7 @@ impl Daemon { let result = self.send_reload(dataflow_id, node_id, operator_id).await; let reply = DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}"))); - (reply, RunStatus::Continue) + (Some(reply), RunStatus::Continue) } DaemonCoordinatorEvent::StopDataflow { dataflow_id } => { let stop = async { @@ -299,15 +301,45 @@ impl Daemon { let reply = DaemonCoordinatorReply::StopResult( stop.await.map_err(|err| format!("{err:?}")), ); - (reply, RunStatus::Continue) + (Some(reply), RunStatus::Continue) } DaemonCoordinatorEvent::Destroy => { tracing::info!("received destroy command -> exiting"); let reply = DaemonCoordinatorReply::DestroyResult(Ok(())); - (reply, RunStatus::Exit) + (Some(reply), RunStatus::Exit) } - DaemonCoordinatorEvent::Watchdog => { - (DaemonCoordinatorReply::WatchdogAck, RunStatus::Continue) + DaemonCoordinatorEvent::Watchdog => ( + Some(DaemonCoordinatorReply::WatchdogAck), + RunStatus::Continue, + ), + DaemonCoordinatorEvent::Output { + dataflow_id, + node_id, + output_id, + metadata, + data, + } => { + let inner = async { + let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { + format!("send out failed: no running dataflow with ID `{dataflow_id}`") + })?; + send_output_to_local_receivers( + node_id.clone(), + output_id.clone(), + dataflow, + &metadata, + data.map(Data::Vec), + ) + .await?; + Result::<_, eyre::Report>::Ok(()) + }; + if let Err(err) = inner + .await + .wrap_err("failed to forward remote output to local receivers") + { + tracing::warn!("{err:?}") + } + (None, RunStatus::Continue) } } } @@ -328,11 +360,11 @@ impl Daemon { }; for node in nodes { - if node.deploy.machine == self.machine_id { - dataflow.running_nodes.insert(node.id.clone()); - let inputs = node_inputs(&node); + let local = node.deploy.machine == self.machine_id; - for (input_id, input) in inputs { + let inputs = node_inputs(&node); + for (input_id, input) in inputs { + if local { dataflow .open_inputs .entry(node.id.clone()) @@ -354,8 +386,15 @@ impl Daemon { .insert((node.id.clone(), input_id)); } } + } else if let InputMapping::User(mapping) = input.mapping { + dataflow + .external_mappings + .entry(OutputId(mapping.source, mapping.output)) + .or_default() + .insert(node.deploy.machine.clone()); } - + } + if local { let node_id = node.id.clone(); spawn::spawn_node( dataflow_id, @@ -367,6 +406,7 @@ impl Daemon { ) .await .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?; + dataflow.running_nodes.insert(node_id); } else { dataflow.external_nodes.insert(node.id.clone(), node); } @@ -520,75 +560,37 @@ impl Daemon { let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| { format!("send out failed: no running dataflow with ID `{dataflow_id}`") })?; + let data_bytes = send_output_to_local_receivers( + node_id.clone(), + output_id.clone(), + dataflow, + &metadata, + data, + ) + .await?; + let empty_set = BTreeSet::new(); let output_id = OutputId(node_id, output_id); - let local_receivers = { dataflow.mappings.get(&output_id).unwrap_or(&empty_set) }; - let OutputId(node_id, _) = output_id; - let mut closed = Vec::new(); - for (receiver_id, input_id) in local_receivers { - if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { - let item = daemon_messages::NodeEvent::Input { - id: input_id.clone(), - metadata: metadata.clone(), - data: data.clone(), - }; - match channel.send(item) { - Ok(()) => { - if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) { - dataflow - .pending_drop_tokens - .entry(token) - .or_insert_with(|| DropTokenInformation { - owner: node_id.clone(), - pending_nodes: Default::default(), - }) - .pending_nodes - .insert(receiver_id.clone()); - } - } - Err(_) => { - closed.push(receiver_id); - } - } - } - } - for id in closed { - dataflow.subscribe_channels.remove(id); - } - let (data_bytes, drop_token) = match data { - None => (None, None), - Some(Data::SharedMemory { - shared_memory_id, - len, - drop_token, - }) => { - let memory = ShmemConf::new() - .os_id(shared_memory_id) - .open() - .wrap_err("failed to map shared memory output")?; - let data = Some(unsafe { memory.as_slice() }[..len].to_owned()); - (data, Some(drop_token)) - } - Some(Data::Vec(v)) => (Some(v), None), - }; - if let Some(token) = drop_token { - // insert token into `pending_drop_tokens` even if there are no local subscribers - dataflow - .pending_drop_tokens - .entry(token) - .or_insert_with(|| DropTokenInformation { - owner: node_id.clone(), - pending_nodes: Default::default(), - }); - // check if all local subscribers are finished with the token - dataflow.check_drop_token(token).await?; - } - - if !dataflow.external_nodes.is_empty() { - // TODO: Send the data to remote daemon instances if the dataflow - // is split across multiple machines - let _data_bytes = data_bytes; - todo!("send to remote nodes"); + let remote_receivers = dataflow + .external_mappings + .get(&output_id) + .unwrap_or(&empty_set); + if !remote_receivers.is_empty() { + let Some(connection) = &mut self.coordinator_connection else { + bail!("no coordinator connection to forward output to remote receivers"); + }; + let msg = serde_json::to_vec(&CoordinatorRequest::Output { + source_machine: self.machine_id.clone(), + dataflow_id, + source_node: output_id.0, + output_id: output_id.1, + metadata, + data: data_bytes, + target_machines: remote_receivers.clone(), + })?; + tcp_send(connection, &msg) + .await + .wrap_err("failed to send output message to dora-coordinator")?; } Ok(()) @@ -796,6 +798,79 @@ impl Daemon { } } +async fn send_output_to_local_receivers( + node_id: NodeId, + output_id: DataId, + dataflow: &mut RunningDataflow, + metadata: &dora_core::message::Metadata<'static>, + data: Option, +) -> Result>, eyre::ErrReport> { + let empty_set = BTreeSet::new(); + let output_id = OutputId(node_id, output_id); + let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set); + let OutputId(node_id, _) = output_id; + let mut closed = Vec::new(); + for (receiver_id, input_id) in local_receivers { + if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) { + let item = daemon_messages::NodeEvent::Input { + id: input_id.clone(), + metadata: metadata.clone(), + data: data.clone(), + }; + match channel.send(item) { + Ok(()) => { + if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) { + dataflow + .pending_drop_tokens + .entry(token) + .or_insert_with(|| DropTokenInformation { + owner: node_id.clone(), + pending_nodes: Default::default(), + }) + .pending_nodes + .insert(receiver_id.clone()); + } + } + Err(_) => { + closed.push(receiver_id); + } + } + } + } + for id in closed { + dataflow.subscribe_channels.remove(id); + } + let (data_bytes, drop_token) = match data { + None => (None, None), + Some(Data::SharedMemory { + shared_memory_id, + len, + drop_token, + }) => { + let memory = ShmemConf::new() + .os_id(shared_memory_id) + .open() + .wrap_err("failed to map shared memory output")?; + let data = Some(unsafe { memory.as_slice() }[..len].to_owned()); + (data, Some(drop_token)) + } + Some(Data::Vec(v)) => (Some(v), None), + }; + if let Some(token) = drop_token { + // insert token into `pending_drop_tokens` even if there are no local subscribers + dataflow + .pending_drop_tokens + .entry(token) + .or_insert_with(|| DropTokenInformation { + owner: node_id.clone(), + pending_nodes: Default::default(), + }); + // check if all local subscribers are finished with the token + dataflow.check_drop_token(token).await?; + } + Ok(data_bytes) +} + fn node_inputs(node: &ResolvedNode) -> BTreeMap { match &node.kind { CoreNodeKind::Custom(n) => n.run_config.inputs.clone(), @@ -879,6 +954,7 @@ pub struct RunningDataflow { running_nodes: BTreeSet, external_nodes: BTreeMap, + external_mappings: HashMap>, pending_drop_tokens: HashMap, @@ -905,6 +981,7 @@ impl RunningDataflow { open_inputs: BTreeMap::new(), running_nodes: BTreeSet::new(), external_nodes: BTreeMap::new(), + external_mappings: HashMap::new(), pending_drop_tokens: HashMap::new(), _timer_handles: Vec::new(), stop_sent: false, diff --git a/libraries/core/src/coordinator_messages.rs b/libraries/core/src/coordinator_messages.rs index 32494bbd..5af65c79 100644 --- a/libraries/core/src/coordinator_messages.rs +++ b/libraries/core/src/coordinator_messages.rs @@ -1,6 +1,12 @@ +use std::collections::BTreeSet; + +use dora_message::Metadata; use eyre::eyre; -use crate::daemon_messages::DataflowId; +use crate::{ + config::{DataId, NodeId}, + daemon_messages::DataflowId, +}; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum CoordinatorRequest { @@ -12,6 +18,17 @@ pub enum CoordinatorRequest { machine_id: String, event: DaemonEvent, }, + Output { + source_machine: String, + dataflow_id: DataflowId, + source_node: NodeId, + output_id: DataId, + + metadata: Metadata<'static>, + data: Option>, + + target_machines: BTreeSet, + }, } #[derive(Debug, serde::Serialize, serde::Deserialize)] diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index f4f15060..9bd34187 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -203,6 +203,13 @@ pub enum DaemonCoordinatorEvent { }, Destroy, Watchdog, + Output { + dataflow_id: DataflowId, + node_id: NodeId, + output_id: DataId, + metadata: Metadata<'static>, + data: Option>, + }, } #[derive(Debug, serde::Deserialize, serde::Serialize)]