diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 28484db4..3d6be47d 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -852,7 +852,7 @@ async fn destroy_daemons( match serde_json::from_slice(&reply_raw) .wrap_err("failed to deserialize destroy reply from daemon")? { - DaemonCoordinatorReply::DestroyResult(result) => result + DaemonCoordinatorReply::DestroyResult { result, .. } => result .map_err(|e| eyre!(e)) .wrap_err("failed to destroy dataflow")?, other => bail!("unexpected reply after sending `destroy`: {other:?}"), diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 082dfc6b..27ae8e49 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -81,29 +81,6 @@ pub async fn register( timestamp, } = event; let (reply_tx, reply_rx) = oneshot::channel(); - - // Respond to the coordinator's destroy event immediately as the daemon is about to destroy itself. - match event { - DaemonCoordinatorEvent::Destroy => { - tracing::info!("Received destroy event from dora-coordinator"); - let serialized = - match serde_json::to_vec(&DaemonCoordinatorReply::DestroyResult(Ok(()))) - .wrap_err("failed to serialize DaemonCoordinatorReply") - { - Ok(r) => r, - Err(err) => { - tracing::error!("{err:?}"); - continue; - } - }; - if let Err(err) = tcp_send(&mut stream, &serialized).await { - tracing::warn!("failed to send reply to coordinator: {err}"); - continue; - }; - } - _ => {} - } - match tx .send(Timestamped { inner: CoordinatorEvent { event, reply_tx }, @@ -136,6 +113,12 @@ pub async fn register( tracing::warn!("failed to send reply to coordinator: {err}"); continue; }; + if let DaemonCoordinatorReply::DestroyResult { notify, .. } = reply { + if let Some(notify) = notify { + let _ = notify.send(()); + } + break; + } } } }); diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 51c36671..9207e47d 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -431,9 +431,18 @@ impl Daemon { } DaemonCoordinatorEvent::Destroy => { tracing::info!("received destroy command -> exiting"); + let (notify_tx, notify_rx) = oneshot::channel(); + let reply = DaemonCoordinatorReply::DestroyResult { + result: Ok(()), + notify: Some(notify_tx), + }; let _ = reply_tx - .send(None) + .send(Some(reply)) .map_err(|_| error!("could not send destroy reply from daemon to coordinator")); + // wait until the reply is sent out + if notify_rx.await.is_err() { + tracing::warn!("no confirmation received for DestroyReply"); + } RunStatus::Exit } DaemonCoordinatorEvent::Heartbeat => { diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 72529189..25263f65 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -18,5 +18,5 @@ uuid = { version = "1.2.1", features = ["serde"] } dora-message = { workspace = true } tracing = "0.1" serde-with-expand-env = "1.1.0" -tokio = { version = "1.24.1", features = ["fs", "process"] } +tokio = { version = "1.24.1", features = ["fs", "process", "sync"] } aligned-vec = { version = "0.5.0", features = ["serde"] } diff --git a/libraries/core/src/daemon_messages.rs b/libraries/core/src/daemon_messages.rs index a9a6bef8..623ea9c0 100644 --- a/libraries/core/src/daemon_messages.rs +++ b/libraries/core/src/daemon_messages.rs @@ -247,7 +247,11 @@ pub enum DaemonCoordinatorReply { SpawnResult(Result<(), String>), ReloadResult(Result<(), String>), StopResult(Result<(), String>), - DestroyResult(Result<(), String>), + DestroyResult { + result: Result<(), String>, + #[serde(skip)] + notify: Option>, + }, Logs(Result, String>), }