Browse Source

Forward outputs through coordinator to target machine and nodes

tags/v0.2.3-rc
Philipp Oppermann 2 years ago
parent
commit
7397bcc4c3
Failed to extract signature
5 changed files with 263 additions and 82 deletions
  1. +57
    -1
      binaries/coordinator/src/lib.rs
  2. +24
    -0
      binaries/coordinator/src/listener.rs
  3. +157
    -80
      binaries/daemon/src/lib.rs
  4. +18
    -1
      libraries/core/src/coordinator_messages.rs
  5. +7
    -0
      libraries/core/src/daemon_messages.rs

+ 57
- 1
binaries/coordinator/src/lib.rs View File

@@ -4,9 +4,10 @@ use crate::{
};
use control::ControlEvent;
use dora_core::{
config::{CommunicationConfig, NodeId, OperatorId},
config::{CommunicationConfig, DataId, NodeId, OperatorId},
coordinator_messages::RegisterResult,
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply},
message::Metadata,
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
@@ -165,6 +166,27 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
}
}
}
DataflowEvent::Output {
machine_id,
source_node,
output_id,
metadata,
data,
target_machines,
} => {
for target_machine in target_machines {
match daemon_connections.get_mut(&target_machine) {
Some(connection) => {
tracing::trace!(
"forwarding output `{uuid}/{source_node}/{output_id}` \
from machine `{machine_id}` to machine `{target_machine}`"
);
forward_output(connection, uuid, source_node.clone(), output_id.clone(), metadata.clone(), data.clone()).await?;
},
None => tracing::warn!("received output event for unknown target machine `{target_machine}`"),
}
}
}
},

Event::Control(event) => match event {
@@ -347,6 +369,30 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> {
Ok(())
}

async fn forward_output(
connection: &mut TcpStream,
uuid: Uuid,
source_node: NodeId,
output_id: DataId,
metadata: Metadata<'static>,
data: Option<Vec<u8>>,
) -> eyre::Result<()> {
let message = serde_json::to_vec(&DaemonCoordinatorEvent::Output {
dataflow_id: uuid,
node_id: source_node,
output_id,
metadata,
data,
})
.wrap_err("failed to serialize output message")?;

tcp_send(connection, &message)
.await
.wrap_err("failed to send output message to daemon")?;

Ok(())
}

fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> {
let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1);

@@ -570,6 +616,16 @@ pub enum DataflowEvent {
machine_id: String,
result: eyre::Result<()>,
},
Output {
machine_id: String,
source_node: NodeId,
output_id: DataId,

metadata: Metadata<'static>,
data: Option<Vec<u8>>,

target_machines: BTreeSet<String>,
},
}

#[derive(Debug)]


+ 24
- 0
binaries/coordinator/src/listener.rs View File

@@ -82,6 +82,30 @@ pub async fn handle_connection(mut connection: TcpStream, events_tx: mpsc::Sende
.await;
}
},
coordinator_messages::CoordinatorRequest::Output {
source_machine,
dataflow_id,
source_node,
output_id,
metadata,
data,
target_machines,
} => {
let event = Event::Dataflow {
uuid: dataflow_id,
event: DataflowEvent::Output {
machine_id: source_machine,
source_node,
output_id,
metadata,
data,
target_machines,
},
};
if events_tx.send(event).await.is_err() {
break;
}
}
};
}
}

+ 157
- 80
binaries/daemon/src/lib.rs View File

@@ -212,7 +212,9 @@ impl Daemon {
match event {
Event::Coordinator(CoordinatorEvent { event, reply_tx }) => {
let (reply, status) = self.handle_coordinator_event(event).await;
let _ = reply_tx.send(reply);
if let Some(reply) = reply {
let _ = reply_tx.send(reply);
}
match status {
RunStatus::Continue => {}
RunStatus::Exit => break,
@@ -259,7 +261,7 @@ impl Daemon {
async fn handle_coordinator_event(
&mut self,
event: DaemonCoordinatorEvent,
) -> (DaemonCoordinatorReply, RunStatus) {
) -> (Option<DaemonCoordinatorReply>, RunStatus) {
match event {
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
dataflow_id,
@@ -275,7 +277,7 @@ impl Daemon {
}
let reply =
DaemonCoordinatorReply::SpawnResult(result.map_err(|err| format!("{err:?}")));
(reply, RunStatus::Continue)
(Some(reply), RunStatus::Continue)
}
DaemonCoordinatorEvent::ReloadDataflow {
dataflow_id,
@@ -285,7 +287,7 @@ impl Daemon {
let result = self.send_reload(dataflow_id, node_id, operator_id).await;
let reply =
DaemonCoordinatorReply::ReloadResult(result.map_err(|err| format!("{err:?}")));
(reply, RunStatus::Continue)
(Some(reply), RunStatus::Continue)
}
DaemonCoordinatorEvent::StopDataflow { dataflow_id } => {
let stop = async {
@@ -299,15 +301,45 @@ impl Daemon {
let reply = DaemonCoordinatorReply::StopResult(
stop.await.map_err(|err| format!("{err:?}")),
);
(reply, RunStatus::Continue)
(Some(reply), RunStatus::Continue)
}
DaemonCoordinatorEvent::Destroy => {
tracing::info!("received destroy command -> exiting");
let reply = DaemonCoordinatorReply::DestroyResult(Ok(()));
(reply, RunStatus::Exit)
(Some(reply), RunStatus::Exit)
}
DaemonCoordinatorEvent::Watchdog => {
(DaemonCoordinatorReply::WatchdogAck, RunStatus::Continue)
DaemonCoordinatorEvent::Watchdog => (
Some(DaemonCoordinatorReply::WatchdogAck),
RunStatus::Continue,
),
DaemonCoordinatorEvent::Output {
dataflow_id,
node_id,
output_id,
metadata,
data,
} => {
let inner = async {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("send out failed: no running dataflow with ID `{dataflow_id}`")
})?;
send_output_to_local_receivers(
node_id.clone(),
output_id.clone(),
dataflow,
&metadata,
data.map(Data::Vec),
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
if let Err(err) = inner
.await
.wrap_err("failed to forward remote output to local receivers")
{
tracing::warn!("{err:?}")
}
(None, RunStatus::Continue)
}
}
}
@@ -328,11 +360,11 @@ impl Daemon {
};

for node in nodes {
if node.deploy.machine == self.machine_id {
dataflow.running_nodes.insert(node.id.clone());
let inputs = node_inputs(&node);
let local = node.deploy.machine == self.machine_id;

for (input_id, input) in inputs {
let inputs = node_inputs(&node);
for (input_id, input) in inputs {
if local {
dataflow
.open_inputs
.entry(node.id.clone())
@@ -354,8 +386,15 @@ impl Daemon {
.insert((node.id.clone(), input_id));
}
}
} else if let InputMapping::User(mapping) = input.mapping {
dataflow
.external_mappings
.entry(OutputId(mapping.source, mapping.output))
.or_default()
.insert(node.deploy.machine.clone());
}

}
if local {
let node_id = node.id.clone();
spawn::spawn_node(
dataflow_id,
@@ -367,6 +406,7 @@ impl Daemon {
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?;
dataflow.running_nodes.insert(node_id);
} else {
dataflow.external_nodes.insert(node.id.clone(), node);
}
@@ -520,75 +560,37 @@ impl Daemon {
let dataflow = self.running.get_mut(&dataflow_id).wrap_err_with(|| {
format!("send out failed: no running dataflow with ID `{dataflow_id}`")
})?;
let data_bytes = send_output_to_local_receivers(
node_id.clone(),
output_id.clone(),
dataflow,
&metadata,
data,
)
.await?;

let empty_set = BTreeSet::new();
let output_id = OutputId(node_id, output_id);
let local_receivers = { dataflow.mappings.get(&output_id).unwrap_or(&empty_set) };
let OutputId(node_id, _) = output_id;
let mut closed = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let item = daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.clone(),
};
match channel.send(item) {
Ok(()) => {
if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
dataflow
.pending_drop_tokens
.entry(token)
.or_insert_with(|| DropTokenInformation {
owner: node_id.clone(),
pending_nodes: Default::default(),
})
.pending_nodes
.insert(receiver_id.clone());
}
}
Err(_) => {
closed.push(receiver_id);
}
}
}
}
for id in closed {
dataflow.subscribe_channels.remove(id);
}
let (data_bytes, drop_token) = match data {
None => (None, None),
Some(Data::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => {
let memory = ShmemConf::new()
.os_id(shared_memory_id)
.open()
.wrap_err("failed to map shared memory output")?;
let data = Some(unsafe { memory.as_slice() }[..len].to_owned());
(data, Some(drop_token))
}
Some(Data::Vec(v)) => (Some(v), None),
};
if let Some(token) = drop_token {
// insert token into `pending_drop_tokens` even if there are no local subscribers
dataflow
.pending_drop_tokens
.entry(token)
.or_insert_with(|| DropTokenInformation {
owner: node_id.clone(),
pending_nodes: Default::default(),
});
// check if all local subscribers are finished with the token
dataflow.check_drop_token(token).await?;
}

if !dataflow.external_nodes.is_empty() {
// TODO: Send the data to remote daemon instances if the dataflow
// is split across multiple machines
let _data_bytes = data_bytes;
todo!("send to remote nodes");
let remote_receivers = dataflow
.external_mappings
.get(&output_id)
.unwrap_or(&empty_set);
if !remote_receivers.is_empty() {
let Some(connection) = &mut self.coordinator_connection else {
bail!("no coordinator connection to forward output to remote receivers");
};
let msg = serde_json::to_vec(&CoordinatorRequest::Output {
source_machine: self.machine_id.clone(),
dataflow_id,
source_node: output_id.0,
output_id: output_id.1,
metadata,
data: data_bytes,
target_machines: remote_receivers.clone(),
})?;
tcp_send(connection, &msg)
.await
.wrap_err("failed to send output message to dora-coordinator")?;
}

Ok(())
@@ -796,6 +798,79 @@ impl Daemon {
}
}

async fn send_output_to_local_receivers(
node_id: NodeId,
output_id: DataId,
dataflow: &mut RunningDataflow,
metadata: &dora_core::message::Metadata<'static>,
data: Option<Data>,
) -> Result<Option<Vec<u8>>, eyre::ErrReport> {
let empty_set = BTreeSet::new();
let output_id = OutputId(node_id, output_id);
let local_receivers = dataflow.mappings.get(&output_id).unwrap_or(&empty_set);
let OutputId(node_id, _) = output_id;
let mut closed = Vec::new();
for (receiver_id, input_id) in local_receivers {
if let Some(channel) = dataflow.subscribe_channels.get(receiver_id) {
let item = daemon_messages::NodeEvent::Input {
id: input_id.clone(),
metadata: metadata.clone(),
data: data.clone(),
};
match channel.send(item) {
Ok(()) => {
if let Some(token) = data.as_ref().and_then(|d| d.drop_token()) {
dataflow
.pending_drop_tokens
.entry(token)
.or_insert_with(|| DropTokenInformation {
owner: node_id.clone(),
pending_nodes: Default::default(),
})
.pending_nodes
.insert(receiver_id.clone());
}
}
Err(_) => {
closed.push(receiver_id);
}
}
}
}
for id in closed {
dataflow.subscribe_channels.remove(id);
}
let (data_bytes, drop_token) = match data {
None => (None, None),
Some(Data::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => {
let memory = ShmemConf::new()
.os_id(shared_memory_id)
.open()
.wrap_err("failed to map shared memory output")?;
let data = Some(unsafe { memory.as_slice() }[..len].to_owned());
(data, Some(drop_token))
}
Some(Data::Vec(v)) => (Some(v), None),
};
if let Some(token) = drop_token {
// insert token into `pending_drop_tokens` even if there are no local subscribers
dataflow
.pending_drop_tokens
.entry(token)
.or_insert_with(|| DropTokenInformation {
owner: node_id.clone(),
pending_nodes: Default::default(),
});
// check if all local subscribers are finished with the token
dataflow.check_drop_token(token).await?;
}
Ok(data_bytes)
}

fn node_inputs(node: &ResolvedNode) -> BTreeMap<DataId, Input> {
match &node.kind {
CoreNodeKind::Custom(n) => n.run_config.inputs.clone(),
@@ -879,6 +954,7 @@ pub struct RunningDataflow {
running_nodes: BTreeSet<NodeId>,

external_nodes: BTreeMap<NodeId, ResolvedNode>,
external_mappings: HashMap<OutputId, BTreeSet<String>>,

pending_drop_tokens: HashMap<DropToken, DropTokenInformation>,

@@ -905,6 +981,7 @@ impl RunningDataflow {
open_inputs: BTreeMap::new(),
running_nodes: BTreeSet::new(),
external_nodes: BTreeMap::new(),
external_mappings: HashMap::new(),
pending_drop_tokens: HashMap::new(),
_timer_handles: Vec::new(),
stop_sent: false,


+ 18
- 1
libraries/core/src/coordinator_messages.rs View File

@@ -1,6 +1,12 @@
use std::collections::BTreeSet;

use dora_message::Metadata;
use eyre::eyre;

use crate::daemon_messages::DataflowId;
use crate::{
config::{DataId, NodeId},
daemon_messages::DataflowId,
};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum CoordinatorRequest {
@@ -12,6 +18,17 @@ pub enum CoordinatorRequest {
machine_id: String,
event: DaemonEvent,
},
Output {
source_machine: String,
dataflow_id: DataflowId,
source_node: NodeId,
output_id: DataId,

metadata: Metadata<'static>,
data: Option<Vec<u8>>,

target_machines: BTreeSet<String>,
},
}

#[derive(Debug, serde::Serialize, serde::Deserialize)]


+ 7
- 0
libraries/core/src/daemon_messages.rs View File

@@ -203,6 +203,13 @@ pub enum DaemonCoordinatorEvent {
},
Destroy,
Watchdog,
Output {
dataflow_id: DataflowId,
node_id: NodeId,
output_id: DataId,
metadata: Metadata<'static>,
data: Option<Vec<u8>>,
},
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]


Loading…
Cancel
Save