Browse Source

Merge pull request #413 from dora-rs/sync-before-destroy

Wait until `DestroyResult` is sent before exiting dora-daemon
tags/v0.3.2-rc2
Haixuan Xavier Tao GitHub 2 years ago
parent
commit
87c52160a2
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
5 changed files with 23 additions and 27 deletions
  1. +1
    -1
      binaries/coordinator/src/lib.rs
  2. +6
    -23
      binaries/daemon/src/coordinator.rs
  3. +10
    -1
      binaries/daemon/src/lib.rs
  4. +1
    -1
      libraries/core/Cargo.toml
  5. +5
    -1
      libraries/core/src/daemon_messages.rs

+ 1
- 1
binaries/coordinator/src/lib.rs View File

@@ -852,7 +852,7 @@ async fn destroy_daemons(
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize destroy reply from daemon")?
{
DaemonCoordinatorReply::DestroyResult(result) => result
DaemonCoordinatorReply::DestroyResult { result, .. } => result
.map_err(|e| eyre!(e))
.wrap_err("failed to destroy dataflow")?,
other => bail!("unexpected reply after sending `destroy`: {other:?}"),


+ 6
- 23
binaries/daemon/src/coordinator.rs View File

@@ -81,29 +81,6 @@ pub async fn register(
timestamp,
} = event;
let (reply_tx, reply_rx) = oneshot::channel();

// Respond to the coordinator's destroy event immediately as the daemon is about to destroy itself.
match event {
DaemonCoordinatorEvent::Destroy => {
tracing::info!("Received destroy event from dora-coordinator");
let serialized =
match serde_json::to_vec(&DaemonCoordinatorReply::DestroyResult(Ok(())))
.wrap_err("failed to serialize DaemonCoordinatorReply")
{
Ok(r) => r,
Err(err) => {
tracing::error!("{err:?}");
continue;
}
};
if let Err(err) = tcp_send(&mut stream, &serialized).await {
tracing::warn!("failed to send reply to coordinator: {err}");
continue;
};
}
_ => {}
}

match tx
.send(Timestamped {
inner: CoordinatorEvent { event, reply_tx },
@@ -136,6 +113,12 @@ pub async fn register(
tracing::warn!("failed to send reply to coordinator: {err}");
continue;
};
if let DaemonCoordinatorReply::DestroyResult { notify, .. } = reply {
if let Some(notify) = notify {
let _ = notify.send(());
}
break;
}
}
}
});


+ 10
- 1
binaries/daemon/src/lib.rs View File

@@ -431,9 +431,18 @@ impl Daemon {
}
DaemonCoordinatorEvent::Destroy => {
tracing::info!("received destroy command -> exiting");
let (notify_tx, notify_rx) = oneshot::channel();
let reply = DaemonCoordinatorReply::DestroyResult {
result: Ok(()),
notify: Some(notify_tx),
};
let _ = reply_tx
.send(None)
.send(Some(reply))
.map_err(|_| error!("could not send destroy reply from daemon to coordinator"));
// wait until the reply is sent out
if notify_rx.await.is_err() {
tracing::warn!("no confirmation received for DestroyReply");
}
RunStatus::Exit
}
DaemonCoordinatorEvent::Heartbeat => {


+ 1
- 1
libraries/core/Cargo.toml View File

@@ -18,5 +18,5 @@ uuid = { version = "1.2.1", features = ["serde"] }
dora-message = { workspace = true }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process"] }
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
aligned-vec = { version = "0.5.0", features = ["serde"] }

+ 5
- 1
libraries/core/src/daemon_messages.rs View File

@@ -247,7 +247,11 @@ pub enum DaemonCoordinatorReply {
SpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),
DestroyResult(Result<(), String>),
DestroyResult {
result: Result<(), String>,
#[serde(skip)]
notify: Option<tokio::sync::oneshot::Sender<()>>,
},
Logs(Result<Vec<u8>, String>),
}



Loading…
Cancel
Save