diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 33efc154..5c31308e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -64,14 +64,16 @@ impl Daemon { coordinator_addr: SocketAddr, machine_id: String, dora_runtime_path: Option, + external_events: impl Stream + Unpin, ) -> eyre::Result<()> { // connect to the coordinator let coordinator_events = coordinator::register(coordinator_addr, machine_id.clone()) .await .wrap_err("failed to connect to dora-coordinator")? .map(Event::Coordinator); + Self::run_general( - coordinator_events, + (coordinator_events, external_events).merge(), Some(coordinator_addr), machine_id, None, @@ -153,23 +155,6 @@ impl Daemon { exit_when_done: Option>, dora_runtime_path: Option, ) -> eyre::Result> { - let (dora_events_tx, dora_events_rx) = mpsc::channel(5); - let ctrlc_tx = dora_events_tx.clone(); - let mut ctrlc_sent = false; - ctrlc::set_handler(move || { - if ctrlc_sent { - tracing::warn!("received second ctrlc signal -> aborting immediately"); - std::process::abort(); - } else { - tracing::info!("received ctrlc signal"); - if ctrlc_tx.blocking_send(Event::CtrlC).is_err() { - tracing::error!("failed to report ctrl-c event to dora-daemon"); - } - ctrlc_sent = true; - } - }) - .wrap_err("failed to set ctrl-c handler")?; - let coordinator_connection = match coordinator_addr { Some(addr) => { let stream = TcpStream::connect(addr) @@ -183,6 +168,7 @@ impl Daemon { None => None, }; + let (dora_events_tx, dora_events_rx) = mpsc::channel(5); let daemon = Self { running: HashMap::new(), events_tx: dora_events_tx, diff --git a/binaries/daemon/src/main.rs b/binaries/daemon/src/main.rs index 0e72fbec..bcb9abe0 100644 --- a/binaries/daemon/src/main.rs +++ b/binaries/daemon/src/main.rs @@ -1,9 +1,11 @@ use dora_core::topics::DORA_COORDINATOR_PORT_DEFAULT; -use dora_daemon::Daemon; +use dora_daemon::{Daemon, Event}; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; #[cfg(feature = "tracing")] use eyre::Context; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; use std::{net::Ipv4Addr, path::PathBuf}; @@ -33,6 +35,25 @@ async fn run() -> eyre::Result<()> { dora_runtime_path, } = clap::Parser::parse(); + let ctrl_c_events = { + let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1); + let mut ctrlc_sent = false; + ctrlc::set_handler(move || { + if ctrlc_sent { + tracing::warn!("received second ctrlc signal -> aborting immediately"); + std::process::abort(); + } else { + tracing::info!("received ctrlc signal"); + if ctrl_c_tx.blocking_send(Event::CtrlC).is_err() { + tracing::error!("failed to report ctrl-c event to dora-daemon"); + } + ctrlc_sent = true; + } + }) + .wrap_err("failed to set ctrl-c handler")?; + ReceiverStream::new(ctrl_c_rx) + }; + match run_dataflow { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); @@ -46,7 +67,13 @@ async fn run() -> eyre::Result<()> { let machine_id = String::new(); // TODO - Daemon::run(coordinator_socket.into(), machine_id, dora_runtime_path).await + Daemon::run( + coordinator_socket.into(), + machine_id, + dora_runtime_path, + ctrl_c_events, + ) + .await } } } diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 0280278f..23056f4a 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -32,9 +32,18 @@ async fn main() -> eyre::Result<()> { ) .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); - let daemon_a = - dora_daemon::Daemon::run(coordinator_addr, "A".into(), dora_runtime_path.clone()); - let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), dora_runtime_path); + let daemon_a = dora_daemon::Daemon::run( + coordinator_addr, + "A".into(), + dora_runtime_path.clone(), + stream::empty(), + ); + let daemon_b = dora_daemon::Daemon::run( + coordinator_addr, + "B".into(), + dora_runtime_path, + stream::empty(), + ); let mut tasks = JoinSet::new(); tasks.spawn(coordinator);