|
|
|
@@ -4,7 +4,7 @@ use crate::{ |
|
|
|
}; |
|
|
|
use control::ControlEvent; |
|
|
|
use dora_core::{ |
|
|
|
config::CommunicationConfig, |
|
|
|
config::{CommunicationConfig, NodeId, OperatorId}, |
|
|
|
coordinator_messages::RegisterResult, |
|
|
|
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply}, |
|
|
|
topics::{ |
|
|
|
@@ -185,6 +185,39 @@ async fn start(tasks: &FuturesUnordered<JoinHandle<()>>) -> eyre::Result<()> { |
|
|
|
ControlRequestReply::DataflowStarted { uuid } |
|
|
|
}) |
|
|
|
} |
|
|
|
ControlRequest::Check { dataflow_uuid } => { |
|
|
|
let status = match &running_dataflows.get(&dataflow_uuid) { |
|
|
|
Some(_) => ControlRequestReply::DataflowStarted { |
|
|
|
uuid: dataflow_uuid, |
|
|
|
}, |
|
|
|
None => ControlRequestReply::DataflowStopped { |
|
|
|
uuid: dataflow_uuid, |
|
|
|
}, |
|
|
|
}; |
|
|
|
Ok(status) |
|
|
|
} |
|
|
|
ControlRequest::Reload { |
|
|
|
dataflow_id, |
|
|
|
node_id, |
|
|
|
operator_id, |
|
|
|
} => { |
|
|
|
let reload = async { |
|
|
|
reload_dataflow( |
|
|
|
&running_dataflows, |
|
|
|
dataflow_id, |
|
|
|
node_id, |
|
|
|
operator_id, |
|
|
|
&mut daemon_connections, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
Result::<_, eyre::Report>::Ok(()) |
|
|
|
}; |
|
|
|
reload |
|
|
|
.await |
|
|
|
.map(|()| ControlRequestReply::DataflowReloaded { |
|
|
|
uuid: dataflow_id, |
|
|
|
}) |
|
|
|
} |
|
|
|
ControlRequest::Stop { dataflow_uuid } => { |
|
|
|
let stop = async { |
|
|
|
stop_dataflow( |
|
|
|
@@ -409,6 +442,48 @@ async fn stop_dataflow( |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn reload_dataflow( |
|
|
|
running_dataflows: &HashMap<Uuid, RunningDataflow>, |
|
|
|
dataflow_id: Uuid, |
|
|
|
node_id: NodeId, |
|
|
|
operator_id: Option<OperatorId>, |
|
|
|
daemon_connections: &mut HashMap<String, TcpStream>, |
|
|
|
) -> eyre::Result<()> { |
|
|
|
let Some(dataflow) = running_dataflows.get(&dataflow_id) else { |
|
|
|
bail!("No running dataflow found with UUID `{dataflow_id}`") |
|
|
|
}; |
|
|
|
let message = serde_json::to_vec(&DaemonCoordinatorEvent::ReloadDataflow { |
|
|
|
dataflow_id, |
|
|
|
node_id, |
|
|
|
operator_id, |
|
|
|
})?; |
|
|
|
|
|
|
|
for machine_id in &dataflow.machines { |
|
|
|
let daemon_connection = daemon_connections |
|
|
|
.get_mut(machine_id) |
|
|
|
.wrap_err("no daemon connection")?; // TODO: take from dataflow spec |
|
|
|
tcp_send(daemon_connection, &message) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send reload message to daemon")?; |
|
|
|
|
|
|
|
// wait for reply |
|
|
|
let reply_raw = tcp_receive(daemon_connection) |
|
|
|
.await |
|
|
|
.wrap_err("failed to receive reload reply from daemon")?; |
|
|
|
match serde_json::from_slice(&reply_raw) |
|
|
|
.wrap_err("failed to deserialize reload reply from daemon")? |
|
|
|
{ |
|
|
|
DaemonCoordinatorReply::ReloadResult(result) => result |
|
|
|
.map_err(|e| eyre!(e)) |
|
|
|
.wrap_err("failed to reload dataflow")?, |
|
|
|
other => bail!("unexpected reply after sending reload: {other:?}"), |
|
|
|
} |
|
|
|
} |
|
|
|
tracing::info!("successfully reloaded dataflow `{dataflow_id}`"); |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn start_dataflow( |
|
|
|
path: &Path, |
|
|
|
name: Option<String>, |
|
|
|
|