|
|
|
@@ -1,7 +1,7 @@ |
|
|
|
use communication::CommunicationLayer; |
|
|
|
use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; |
|
|
|
use eyre::WrapErr; |
|
|
|
use futures::{stream::FuturesUnordered, StreamExt}; |
|
|
|
use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; |
|
|
|
use futures_concurrency::Merge; |
|
|
|
use std::collections::HashSet; |
|
|
|
|
|
|
|
@@ -84,7 +84,12 @@ impl DoraNode { |
|
|
|
.wrap_err_with(|| format!("failed to subscribe on {topic}"))?; |
|
|
|
stop_messages.push(sub.into_future()); |
|
|
|
} |
|
|
|
let finished = Box::pin(stop_messages.all(|_| async { true })); |
|
|
|
let node_id = self.id.clone(); |
|
|
|
let finished = Box::pin( |
|
|
|
stop_messages |
|
|
|
.all(|_| async { true }) |
|
|
|
.map(move |_| println!("all inputs finished for node {node_id}")), |
|
|
|
); |
|
|
|
|
|
|
|
Ok(streams.merge().take_until(finished)) |
|
|
|
} |
|
|
|
@@ -121,8 +126,12 @@ impl Drop for DoraNode { |
|
|
|
.communication |
|
|
|
.publish_sync(&topic, &[]) |
|
|
|
.wrap_err_with(|| format!("failed to send stop message for source `{self_id}`")); |
|
|
|
if let Err(err) = result { |
|
|
|
tracing::error!("{err}") |
|
|
|
match result { |
|
|
|
Ok(()) => println!("sent stop message for {self_id}"), |
|
|
|
Err(err) => { |
|
|
|
println!("error sending stop message for {self_id}: {err:?}"); |
|
|
|
tracing::error!("{err:?}") |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|