| @@ -1437,6 +1437,7 @@ dependencies = [ | |||
| name = "dora-coordinator" | |||
| version = "0.3.1" | |||
| dependencies = [ | |||
| "ctrlc", | |||
| "dora-core", | |||
| "dora-tracing", | |||
| "eyre", | |||
| @@ -1474,6 +1475,7 @@ dependencies = [ | |||
| "aligned-vec", | |||
| "async-trait", | |||
| "bincode", | |||
| "ctrlc", | |||
| "dora-core", | |||
| "dora-download", | |||
| "dora-tracing", | |||
| @@ -4,9 +4,7 @@ use attach::attach_dataflow; | |||
| use clap::Parser; | |||
| use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; | |||
| use dora_core::{ | |||
| daemon_messages::Timestamped, | |||
| descriptor::Descriptor, | |||
| message::uhlc::HLC, | |||
| topics::{ | |||
| control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, | |||
| DORA_COORDINATOR_PORT_DEFAULT, | |||
| @@ -16,10 +14,8 @@ use dora_daemon::Daemon; | |||
| #[cfg(feature = "tracing")] | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use futures::{Stream, StreamExt}; | |||
| use std::net::SocketAddr; | |||
| use tokio::{runtime::Builder, sync::mpsc}; | |||
| use tokio_stream::wrappers::ReceiverStream; | |||
| use tokio::runtime::Builder; | |||
| use uuid::Uuid; | |||
| mod attach; | |||
| @@ -114,10 +110,6 @@ enum Command { | |||
| Coordinator { port: Option<u16> }, | |||
| } | |||
| enum Event { | |||
| CtrlC, | |||
| } | |||
| #[derive(Debug, clap::Args)] | |||
| pub struct CommandNew { | |||
| #[clap(long, value_enum, default_value_t = Kind::Dataflow)] | |||
| @@ -169,8 +161,6 @@ fn run() -> eyre::Result<()> { | |||
| } | |||
| }; | |||
| let ctrlc_events = set_up_ctrlc_handler()?; | |||
| match args.command { | |||
| Command::Check { dataflow } => match dataflow { | |||
| Some(dataflow) => { | |||
| @@ -265,13 +255,7 @@ fn run() -> eyre::Result<()> { | |||
| .build() | |||
| .context("tokio runtime failed")?; | |||
| rt.block_on(async { | |||
| let (_, task) = dora_coordinator::start( | |||
| port, | |||
| ctrlc_events.map(|event| match event { | |||
| Event::CtrlC => dora_coordinator::Event::CtrlC, | |||
| }), | |||
| ) | |||
| .await?; | |||
| let (_, task) = dora_coordinator::start(port).await?; | |||
| task.await | |||
| }) | |||
| .context("failed to run dora-coordinator")? | |||
| @@ -300,15 +284,6 @@ fn run() -> eyre::Result<()> { | |||
| (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() | |||
| }), | |||
| machine_id.unwrap_or_default(), | |||
| ctrlc_events.map(|event| { | |||
| let clock = HLC::default(); | |||
| match event { | |||
| Event::CtrlC => Timestamped { | |||
| inner: dora_daemon::Event::CtrlC, | |||
| timestamp: clock.new_timestamp(), | |||
| }, | |||
| } | |||
| }), | |||
| ) | |||
| .await | |||
| } | |||
| @@ -439,25 +414,3 @@ fn query_running_dataflows( | |||
| fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> { | |||
| TcpLayer::new().connect(control_socket_addr()) | |||
| } | |||
| fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> { | |||
| let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); | |||
| let mut ctrlc_sent = false; | |||
| ctrlc::set_handler(move || { | |||
| if ctrlc_sent { | |||
| tracing::warn!("received second ctrlc signal -> aborting immediately"); | |||
| std::process::abort(); | |||
| } else { | |||
| tracing::info!("received ctrlc signal"); | |||
| if ctrlc_tx.blocking_send(Event::CtrlC).is_err() { | |||
| tracing::error!("failed to report ctrl-c event to dora-coordinator"); | |||
| } | |||
| ctrlc_sent = true; | |||
| } | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| Ok(ReceiverStream::new(ctrlc_rx)) | |||
| } | |||
| @@ -24,3 +24,4 @@ dora-tracing = { workspace = true, optional = true } | |||
| futures-concurrency = "7.1.0" | |||
| serde_json = "1.0.86" | |||
| names = "0.14.0" | |||
| ctrlc = "3.2.5" | |||
| @@ -40,7 +40,6 @@ mod tcp_utils; | |||
| pub async fn start( | |||
| port: Option<u16>, | |||
| external_events: impl Stream<Item = Event> + Unpin, | |||
| ) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> { | |||
| let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); | |||
| let listener = listener::create_listener(port).await?; | |||
| @@ -49,8 +48,12 @@ pub async fn start( | |||
| .wrap_err("failed to get local addr of listener")? | |||
| .port(); | |||
| let mut tasks = FuturesUnordered::new(); | |||
| // Setup ctrl-c handler | |||
| let ctrlc_events = set_up_ctrlc_handler()?; | |||
| let future = async move { | |||
| start_inner(listener, &tasks, external_events).await?; | |||
| start_inner(listener, &tasks, ctrlc_events).await?; | |||
| tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); | |||
| while let Some(join_result) = tasks.next().await { | |||
| @@ -904,3 +907,25 @@ pub enum DaemonEvent { | |||
| listen_socket: SocketAddr, | |||
| }, | |||
| } | |||
| fn set_up_ctrlc_handler() -> Result<impl Stream<Item = Event>, eyre::ErrReport> { | |||
| let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); | |||
| let mut ctrlc_sent = false; | |||
| ctrlc::set_handler(move || { | |||
| if ctrlc_sent { | |||
| tracing::warn!("received second ctrlc signal -> aborting immediately"); | |||
| std::process::abort(); | |||
| } else { | |||
| tracing::info!("received ctrlc signal"); | |||
| if ctrlc_tx.blocking_send(Event::CtrlC).is_err() { | |||
| tracing::error!("failed to report ctrl-c event to dora-coordinator"); | |||
| } | |||
| ctrlc_sent = true; | |||
| } | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| Ok(ReceiverStream::new(ctrlc_rx)) | |||
| } | |||
| @@ -34,3 +34,4 @@ shared-memory-server = { workspace = true } | |||
| bincode = "1.3.3" | |||
| async-trait = "0.1.64" | |||
| aligned-vec = "0.5.0" | |||
| ctrlc = "3.2.5" | |||
| @@ -76,13 +76,11 @@ pub struct Daemon { | |||
| } | |||
| impl Daemon { | |||
| pub async fn run( | |||
| coordinator_addr: SocketAddr, | |||
| machine_id: String, | |||
| external_events: impl Stream<Item = Timestamped<Event>> + Unpin, | |||
| ) -> eyre::Result<()> { | |||
| pub async fn run(coordinator_addr: SocketAddr, machine_id: String) -> eyre::Result<()> { | |||
| let clock = Arc::new(HLC::default()); | |||
| let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; | |||
| // spawn listen loop | |||
| let (events_tx, events_rx) = flume::bounded(10); | |||
| let listen_socket = | |||
| @@ -108,7 +106,7 @@ impl Daemon { | |||
| ); | |||
| Self::run_general( | |||
| (coordinator_events, external_events, daemon_events).merge(), | |||
| (coordinator_events, ctrlc_events, daemon_events).merge(), | |||
| Some(coordinator_addr), | |||
| machine_id, | |||
| None, | |||
| @@ -1515,3 +1513,33 @@ fn send_with_timestamp<T>( | |||
| timestamp: clock.new_timestamp(), | |||
| }) | |||
| } | |||
| fn set_up_ctrlc_handler( | |||
| clock: Arc<HLC>, | |||
| ) -> Result<impl Stream<Item = Timestamped<Event>>, eyre::ErrReport> { | |||
| let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); | |||
| let mut ctrlc_sent = false; | |||
| ctrlc::set_handler(move || { | |||
| if ctrlc_sent { | |||
| tracing::warn!("received second ctrlc signal -> aborting immediately"); | |||
| std::process::abort(); | |||
| } else { | |||
| tracing::info!("received ctrlc signal"); | |||
| if ctrlc_tx | |||
| .blocking_send(Timestamped { | |||
| inner: Event::CtrlC, | |||
| timestamp: clock.new_timestamp(), | |||
| }) | |||
| .is_err() | |||
| { | |||
| tracing::error!("failed to report ctrl-c event to dora-coordinator"); | |||
| } | |||
| ctrlc_sent = true; | |||
| } | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| Ok(ReceiverStream::new(ctrlc_rx)) | |||
| } | |||