From 054ae5afcc54c489d8331a2a42c82a7e852806a1 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 15 Jan 2024 10:59:38 +0100 Subject: [PATCH] Fix multiple daemon example --- binaries/cli/src/main.rs | 4 +++- binaries/coordinator/src/lib.rs | 3 ++- examples/multiple-daemons/run.rs | 6 +++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index bce1c1b0..404d2794 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -3,6 +3,7 @@ use std::{net::Ipv4Addr, path::PathBuf}; use attach::attach_dataflow; use clap::Parser; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; +use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ @@ -255,7 +256,8 @@ fn run() -> eyre::Result<()> { .build() .context("tokio runtime failed")?; rt.block_on(async { - let (_, task) = dora_coordinator::start(port).await?; + let (_, task) = + dora_coordinator::start(port, futures::stream::empty::()).await?; task.await }) .context("failed to run dora-coordinator")? diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index c2e1a383..28484db4 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -40,6 +40,7 @@ mod tcp_utils; pub async fn start( port: Option, + external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); let listener = listener::create_listener(port).await?; @@ -53,7 +54,7 @@ pub async fn start( let ctrlc_events = set_up_ctrlc_handler()?; let future = async move { - start_inner(listener, &tasks, ctrlc_events).await?; + start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?; tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); while let Some(join_result) = tasks.next().await { diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index c3b7dddd..198124e9 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -5,7 +5,7 @@ use dora_core::{ }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; -use futures::stream; + use std::{ collections::BTreeSet, net::{Ipv4Addr, SocketAddr}, @@ -37,8 +37,8 @@ async fn main() -> eyre::Result<()> { let (coordinator_port, coordinator) = dora_coordinator::start(Some(0), ReceiverStream::new(coordinator_events_rx)).await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); - let daemon_a = dora_daemon::Daemon::run(coordinator_addr, "A".into(), stream::empty()); - let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), stream::empty()); + let daemon_a = dora_daemon::Daemon::run(coordinator_addr, "A".into()); + let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into()); tracing::info!("Spawning coordinator and daemons"); let mut tasks = JoinSet::new();