diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index f639c94b..53e0bcfb 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -23,10 +23,10 @@ uuid = { version = "1.2.1" } rand = "0.8.5" dora-core = { workspace = true } tracing = "0.1.36" +dora-tracing = { workspace = true, optional = true } futures-concurrency = "7.1.0" zenoh = "0.7.0-rc" serde_json = "1.0.86" -dora-tracing = { workspace = true, optional = true } which = "4.3.0" thiserror = "1.0.37" ctrlc = "3.2.5" diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 1822fc9e..e4809721 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -4,7 +4,7 @@ use crate::{ }; use control::ControlEvent; use dora_core::{ - config::CommunicationConfig, + config::{CommunicationConfig, NodeId, OperatorId}, coordinator_messages::RegisterResult, daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply}, topics::{ @@ -185,6 +185,39 @@ async fn start(tasks: &FuturesUnordered>) -> eyre::Result<()> { ControlRequestReply::DataflowStarted { uuid } }) } + ControlRequest::Check { dataflow_uuid } => { + let status = match &running_dataflows.get(&dataflow_uuid) { + Some(_) => ControlRequestReply::DataflowStarted { + uuid: dataflow_uuid, + }, + None => ControlRequestReply::DataflowStopped { + uuid: dataflow_uuid, + }, + }; + Ok(status) + } + ControlRequest::Reload { + dataflow_id, + node_id, + operator_id, + } => { + let reload = async { + reload_dataflow( + &running_dataflows, + dataflow_id, + node_id, + operator_id, + &mut daemon_connections, + ) + .await?; + Result::<_, eyre::Report>::Ok(()) + }; + reload + .await + .map(|()| ControlRequestReply::DataflowReloaded { + uuid: dataflow_id, + }) + } ControlRequest::Stop { dataflow_uuid } => { let stop = async { stop_dataflow( @@ -409,6 +442,48 @@ async fn stop_dataflow( Ok(()) } +async fn reload_dataflow( + running_dataflows: &HashMap, + dataflow_id: Uuid, + node_id: NodeId, + operator_id: Option, + daemon_connections: &mut HashMap, +) -> eyre::Result<()> { + let Some(dataflow) = running_dataflows.get(&dataflow_id) else { + bail!("No running dataflow found with UUID `{dataflow_id}`") + }; + let message = serde_json::to_vec(&DaemonCoordinatorEvent::ReloadDataflow { + dataflow_id, + node_id, + operator_id, + })?; + + for machine_id in &dataflow.machines { + let daemon_connection = daemon_connections + .get_mut(machine_id) + .wrap_err("no daemon connection")?; // TODO: take from dataflow spec + tcp_send(daemon_connection, &message) + .await + .wrap_err("failed to send reload message to daemon")?; + + // wait for reply + let reply_raw = tcp_receive(daemon_connection) + .await + .wrap_err("failed to receive reload reply from daemon")?; + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize reload reply from daemon")? + { + DaemonCoordinatorReply::ReloadResult(result) => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to reload dataflow")?, + other => bail!("unexpected reply after sending reload: {other:?}"), + } + } + tracing::info!("successfully reloaded dataflow `{dataflow_id}`"); + + Ok(()) +} + async fn start_dataflow( path: &Path, name: Option,