|
|
|
@@ -17,8 +17,8 @@ use dora_message::{ |
|
|
|
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped}, |
|
|
|
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, |
|
|
|
}; |
|
|
|
use eyre::{bail, eyre, ContextCompat, WrapErr}; |
|
|
|
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; |
|
|
|
use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; |
|
|
|
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; |
|
|
|
use futures_concurrency::stream::Merge; |
|
|
|
use log_subscriber::LogSubscriber; |
|
|
|
use run::SpawnedDataflow; |
|
|
|
@@ -675,9 +675,10 @@ async fn handle_destroy( |
|
|
|
) |
|
|
|
.await?; |
|
|
|
} |
|
|
|
destroy_daemons(daemon_connections, clock.new_timestamp()).await?; |
|
|
|
|
|
|
|
let result = destroy_daemons(daemon_connections, clock.new_timestamp()).await; |
|
|
|
*daemon_events_tx = None; |
|
|
|
Ok(()) |
|
|
|
result |
|
|
|
} |
|
|
|
|
|
|
|
async fn send_heartbeat_message( |
|
|
|
@@ -916,36 +917,55 @@ async fn start_dataflow( |
|
|
|
}) |
|
|
|
} |
|
|
|
|
|
|
|
async fn destroy_daemons( |
|
|
|
daemon_connections: &mut HashMap<String, DaemonConnection>, |
|
|
|
async fn destroy_daemon( |
|
|
|
machine_id: String, |
|
|
|
mut daemon_connection: DaemonConnection, |
|
|
|
|
|
|
|
timestamp: uhlc::Timestamp, |
|
|
|
) -> eyre::Result<()> { |
|
|
|
) -> Result<()> { |
|
|
|
let message = serde_json::to_vec(&Timestamped { |
|
|
|
inner: DaemonCoordinatorEvent::Destroy, |
|
|
|
timestamp, |
|
|
|
})?; |
|
|
|
|
|
|
|
for (machine_id, mut daemon_connection) in daemon_connections.drain() { |
|
|
|
tcp_send(&mut daemon_connection.stream, &message) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send destroy message to daemon")?; |
|
|
|
|
|
|
|
// wait for reply |
|
|
|
let reply_raw = tcp_receive(&mut daemon_connection.stream) |
|
|
|
.await |
|
|
|
.wrap_err("failed to receive destroy reply from daemon")?; |
|
|
|
match serde_json::from_slice(&reply_raw) |
|
|
|
.wrap_err("failed to deserialize destroy reply from daemon")? |
|
|
|
{ |
|
|
|
DaemonCoordinatorReply::DestroyResult { result, .. } => result |
|
|
|
.map_err(|e| eyre!(e)) |
|
|
|
.wrap_err("failed to destroy dataflow")?, |
|
|
|
other => bail!("unexpected reply after sending `destroy`: {other:?}"), |
|
|
|
} |
|
|
|
tcp_send(&mut daemon_connection.stream, &message) |
|
|
|
.await |
|
|
|
.wrap_err(format!( |
|
|
|
"failed to send destroy message to daemon `{machine_id}`" |
|
|
|
))?; |
|
|
|
|
|
|
|
tracing::info!("successfully destroyed daemon `{machine_id}`"); |
|
|
|
// wait for reply |
|
|
|
let reply_raw = tcp_receive(&mut daemon_connection.stream) |
|
|
|
.await |
|
|
|
.wrap_err("failed to receive destroy reply from daemon")?; |
|
|
|
match serde_json::from_slice(&reply_raw) |
|
|
|
.wrap_err("failed to deserialize destroy reply from daemon")? |
|
|
|
{ |
|
|
|
DaemonCoordinatorReply::DestroyResult { result, .. } => result |
|
|
|
.map_err(|e| eyre!(e)) |
|
|
|
.wrap_err("failed to destroy dataflow")?, |
|
|
|
other => bail!("unexpected reply after sending `destroy`: {other:?}"), |
|
|
|
} |
|
|
|
|
|
|
|
tracing::info!("successfully destroyed daemon `{machine_id}`"); |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn destroy_daemons( |
|
|
|
daemon_connections: &mut HashMap<String, DaemonConnection>, |
|
|
|
timestamp: uhlc::Timestamp, |
|
|
|
) -> eyre::Result<()> { |
|
|
|
let futures = daemon_connections |
|
|
|
.drain() |
|
|
|
.map(|(machine_id, daemon_connection)| { |
|
|
|
destroy_daemon(machine_id, daemon_connection, timestamp) |
|
|
|
}) |
|
|
|
.collect::<Vec<_>>(); |
|
|
|
let results: Vec<std::result::Result<(), eyre::Error>> = |
|
|
|
join_all(futures).await.into_iter().collect::<Vec<_>>(); |
|
|
|
for result in results { |
|
|
|
result?; |
|
|
|
} |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
|