diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index e96705bb..b6345794 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,6 +1,6 @@ use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; -use dora_message::cli_to_coordinator::ControlRequest; +use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context}; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] @@ -55,10 +55,22 @@ pub(crate) fn destroy( match connect_to_coordinator(coordinator_addr) { Ok(mut session) => { // send destroy command to dora-coordinator - session + let reply_raw = session .request(&serde_json::to_vec(&ControlRequest::Destroy).unwrap()) .wrap_err("failed to send destroy message")?; - println!("Send destroy command to dora-coordinator"); + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DestroyOk => { + println!("Coordinator and daemons destroyed successfully"); + } + ControlRequestReply::Error(err) => { + bail!("Destroy command failed with error: {}", err); + } + _ => { + bail!("Unexpected reply from dora-coordinator"); + } + } } Err(_) => { bail!("Could not connect to dora-coordinator"); diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index b9aae414..aaeee0e9 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -17,8 +17,8 @@ use dora_message::{ coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, }; -use eyre::{bail, eyre, ContextCompat, WrapErr}; -use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; +use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; +use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; use futures_concurrency::stream::Merge; use log_subscriber::LogSubscriber; use run::SpawnedDataflow; @@ -675,9 +675,10 @@ async fn handle_destroy( ) .await?; } - destroy_daemons(daemon_connections, clock.new_timestamp()).await?; + + let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await; *daemon_events_tx = None; - Ok(()) + result } async fn send_heartbeat_message( @@ -916,36 +917,55 @@ async fn start_dataflow( }) } -async fn destroy_daemons( - daemon_connections: &mut HashMap, +async fn destroy_daemon( + machine_id: String, + mut daemon_connection: DaemonConnection, + timestamp: uhlc::Timestamp, -) -> eyre::Result<()> { +) -> Result<()> { let message = serde_json::to_vec(&Timestamped { inner: DaemonCoordinatorEvent::Destroy, timestamp, })?; - for (machine_id, mut daemon_connection) in daemon_connections.drain() { - tcp_send(&mut daemon_connection.stream, &message) - .await - .wrap_err("failed to send destroy message to daemon")?; - - // wait for reply - let reply_raw = tcp_receive(&mut daemon_connection.stream) - .await - .wrap_err("failed to receive destroy reply from daemon")?; - match serde_json::from_slice(&reply_raw) - .wrap_err("failed to deserialize destroy reply from daemon")? - { - DaemonCoordinatorReply::DestroyResult { result, .. } => result - .map_err(|e| eyre!(e)) - .wrap_err("failed to destroy dataflow")?, - other => bail!("unexpected reply after sending `destroy`: {other:?}"), - } + tcp_send(&mut daemon_connection.stream, &message) + .await + .wrap_err(format!( + "failed to send destroy message to daemon `{machine_id}`" + ))?; - tracing::info!("successfully destroyed daemon `{machine_id}`"); + // wait for reply + let reply_raw = tcp_receive(&mut daemon_connection.stream) + .await + .wrap_err("failed to receive destroy reply from daemon")?; + match serde_json::from_slice(&reply_raw) + .wrap_err("failed to deserialize destroy reply from daemon")? + { + DaemonCoordinatorReply::DestroyResult { result, .. } => result + .map_err(|e| eyre!(e)) + .wrap_err("failed to destroy dataflow")?, + other => bail!("unexpected reply after sending `destroy`: {other:?}"), } + tracing::info!("successfully destroyed daemon `{machine_id}`"); + Ok(()) +} + +async fn destroy_daemons( + daemon_connections: &mut HashMap, + timestamp: uhlc::Timestamp, +) -> eyre::Result<()> { + let futures = daemon_connections + .drain() + .map(|(machine_id, daemon_connection)| { + destroy_daemon(machine_id, daemon_connection, timestamp) + }) + .collect::>(); + let results: Vec> = + join_all(futures).await.into_iter().collect::>(); + for result in results { + result?; + } Ok(()) }