| @@ -64,14 +64,16 @@ impl Daemon { | |||
| coordinator_addr: SocketAddr, | |||
| machine_id: String, | |||
| dora_runtime_path: Option<PathBuf>, | |||
| external_events: impl Stream<Item = Event> + Unpin, | |||
| ) -> eyre::Result<()> { | |||
| // connect to the coordinator | |||
| let coordinator_events = coordinator::register(coordinator_addr, machine_id.clone()) | |||
| .await | |||
| .wrap_err("failed to connect to dora-coordinator")? | |||
| .map(Event::Coordinator); | |||
| Self::run_general( | |||
| coordinator_events, | |||
| (coordinator_events, external_events).merge(), | |||
| Some(coordinator_addr), | |||
| machine_id, | |||
| None, | |||
| @@ -153,23 +155,6 @@ impl Daemon { | |||
| exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, | |||
| dora_runtime_path: Option<PathBuf>, | |||
| ) -> eyre::Result<Vec<(Uuid, NodeId, eyre::Report)>> { | |||
| let (dora_events_tx, dora_events_rx) = mpsc::channel(5); | |||
| let ctrlc_tx = dora_events_tx.clone(); | |||
| let mut ctrlc_sent = false; | |||
| ctrlc::set_handler(move || { | |||
| if ctrlc_sent { | |||
| tracing::warn!("received second ctrlc signal -> aborting immediately"); | |||
| std::process::abort(); | |||
| } else { | |||
| tracing::info!("received ctrlc signal"); | |||
| if ctrlc_tx.blocking_send(Event::CtrlC).is_err() { | |||
| tracing::error!("failed to report ctrl-c event to dora-daemon"); | |||
| } | |||
| ctrlc_sent = true; | |||
| } | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| let coordinator_connection = match coordinator_addr { | |||
| Some(addr) => { | |||
| let stream = TcpStream::connect(addr) | |||
| @@ -183,6 +168,7 @@ impl Daemon { | |||
| None => None, | |||
| }; | |||
| let (dora_events_tx, dora_events_rx) = mpsc::channel(5); | |||
| let daemon = Self { | |||
| running: HashMap::new(), | |||
| events_tx: dora_events_tx, | |||
| @@ -1,9 +1,11 @@ | |||
| use dora_core::topics::DORA_COORDINATOR_PORT_DEFAULT; | |||
| use dora_daemon::Daemon; | |||
| use dora_daemon::{Daemon, Event}; | |||
| #[cfg(feature = "tracing")] | |||
| use dora_tracing::set_up_tracing; | |||
| #[cfg(feature = "tracing")] | |||
| use eyre::Context; | |||
| use tokio::sync::mpsc; | |||
| use tokio_stream::wrappers::ReceiverStream; | |||
| use std::{net::Ipv4Addr, path::PathBuf}; | |||
| @@ -33,6 +35,25 @@ async fn run() -> eyre::Result<()> { | |||
| dora_runtime_path, | |||
| } = clap::Parser::parse(); | |||
| let ctrl_c_events = { | |||
| let (ctrl_c_tx, ctrl_c_rx) = mpsc::channel(1); | |||
| let mut ctrlc_sent = false; | |||
| ctrlc::set_handler(move || { | |||
| if ctrlc_sent { | |||
| tracing::warn!("received second ctrlc signal -> aborting immediately"); | |||
| std::process::abort(); | |||
| } else { | |||
| tracing::info!("received ctrlc signal"); | |||
| if ctrl_c_tx.blocking_send(Event::CtrlC).is_err() { | |||
| tracing::error!("failed to report ctrl-c event to dora-daemon"); | |||
| } | |||
| ctrlc_sent = true; | |||
| } | |||
| }) | |||
| .wrap_err("failed to set ctrl-c handler")?; | |||
| ReceiverStream::new(ctrl_c_rx) | |||
| }; | |||
| match run_dataflow { | |||
| Some(dataflow_path) => { | |||
| tracing::info!("Starting dataflow `{}`", dataflow_path.display()); | |||
| @@ -46,7 +67,13 @@ async fn run() -> eyre::Result<()> { | |||
| let machine_id = String::new(); // TODO | |||
| Daemon::run(coordinator_socket.into(), machine_id, dora_runtime_path).await | |||
| Daemon::run( | |||
| coordinator_socket.into(), | |||
| machine_id, | |||
| dora_runtime_path, | |||
| ctrl_c_events, | |||
| ) | |||
| .await | |||
| } | |||
| } | |||
| } | |||
| @@ -32,9 +32,18 @@ async fn main() -> eyre::Result<()> { | |||
| ) | |||
| .await?; | |||
| let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); | |||
| let daemon_a = | |||
| dora_daemon::Daemon::run(coordinator_addr, "A".into(), dora_runtime_path.clone()); | |||
| let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), dora_runtime_path); | |||
| let daemon_a = dora_daemon::Daemon::run( | |||
| coordinator_addr, | |||
| "A".into(), | |||
| dora_runtime_path.clone(), | |||
| stream::empty(), | |||
| ); | |||
| let daemon_b = dora_daemon::Daemon::run( | |||
| coordinator_addr, | |||
| "B".into(), | |||
| dora_runtime_path, | |||
| stream::empty(), | |||
| ); | |||
| let mut tasks = JoinSet::new(); | |||
| tasks.spawn(coordinator); | |||