diff --git a/Cargo.lock b/Cargo.lock index 50aa206d..4efc9561 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1417,17 +1417,23 @@ dependencies = [ "clap 4.4.6", "communication-layer-request-reply", "ctrlc", + "dora-coordinator", "dora-core", + "dora-daemon", "dora-node-api-c", "dora-operator-api-c", + "dora-runtime", "dora-tracing", "eyre", + "futures", "inquire", "notify", "serde", "serde_json", "serde_yaml 0.9.25", "termcolor", + "tokio", + "tokio-stream", "tracing", "uuid", "webbrowser", @@ -1488,7 +1494,6 @@ dependencies = [ "ctrlc", "dora-core", "dora-download", - "dora-runtime", "dora-tracing", "eyre", "flume", diff --git a/binaries/cli/Cargo.toml b/binaries/cli/Cargo.toml index aa3b23f2..0990b9e3 100644 --- a/binaries/cli/Cargo.toml +++ b/binaries/cli/Cargo.toml @@ -35,3 +35,9 @@ ctrlc = "3.2.5" tracing = "0.1.36" dora-tracing = { workspace = true, optional = true } bat = "0.23.0" +dora-daemon = { workspace = true } +dora-coordinator = { workspace = true } +dora-runtime = { workspace = true } +tokio = { version = "1.20.1", features = ["full"] } +tokio-stream = { version = "0.1.8", features = ["io-util", "net"] } +futures = "0.3.21" diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 96e4435d..f072237e 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,15 +1,25 @@ -use std::path::PathBuf; +use std::{net::Ipv4Addr, path::PathBuf}; use attach::attach_dataflow; use clap::Parser; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; use dora_core::{ + daemon_messages::Timestamped, descriptor::Descriptor, - topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, + message::uhlc::HLC, + topics::{ + control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, + DORA_COORDINATOR_PORT_DEFAULT, + }, }; +use dora_daemon::Daemon; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; use eyre::{bail, Context}; +use futures::{Stream, StreamExt}; +use std::net::SocketAddr; +use tokio::{runtime::Builder, sync::mpsc}; +use tokio_stream::wrappers::ReceiverStream; use uuid::Uuid; mod attach; @@ -92,6 +102,24 @@ enum Command { // Stats, // Get, // Upgrade, + /// Run daemon + Daemon { + #[clap(long)] + machine_id: Option, + #[clap(long)] + coordinator_addr: Option, + + #[clap(long)] + run_dataflow: Option, + }, + /// Run runtime + Runtime, + /// Run coordinator + Coordinator { port: Option }, +} + +enum Event { + CtrlC, } #[derive(Debug, clap::Args)] @@ -127,10 +155,26 @@ fn main() { } fn run() -> eyre::Result<()> { - #[cfg(feature = "tracing")] - set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; let args = Args::parse(); + #[cfg(feature = "tracing")] + match args.command { + Command::Daemon { .. } => { + set_up_tracing("dora-daemon").context("failed to set up tracing subscriber")?; + } + Command::Runtime => { + set_up_tracing("dora-runtime").context("failed to set up tracing subscriber")?; + } + Command::Coordinator { .. } => { + set_up_tracing("dora-coordinator").context("failed to set up tracing subscriber")?; + } + _ => { + set_up_tracing("dora-cli").context("failed to set up tracing subscriber")?; + } + }; + + let ctrlc_events = set_up_ctrlc_handler()?; + match args.command { Command::Check { dataflow } => match dataflow { Some(dataflow) => { @@ -227,7 +271,65 @@ fn run() -> eyre::Result<()> { } } Command::Destroy { config } => up::destroy(config.as_deref())?, - } + Command::Coordinator { port } => { + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + let (_, task) = dora_coordinator::start( + port, + ctrlc_events.map(|event| match event { + Event::CtrlC => dora_coordinator::Event::CtrlC, + }), + ) + .await?; + task.await + }) + .context("failed to run dora-coordinator")? + } + Command::Daemon { + coordinator_addr, + machine_id, + run_dataflow, + } => { + let rt = Builder::new_multi_thread() + .enable_io() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { + match run_dataflow { + Some(dataflow_path) => { + tracing::info!("Starting dataflow `{}`", dataflow_path.display()); + + Daemon::run_dataflow(&dataflow_path).await + } + None => { + Daemon::run( + coordinator_addr.unwrap_or_else(|| { + tracing::info!("Starting in local mode"); + let localhost = Ipv4Addr::new(127, 0, 0, 1); + (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() + }), + machine_id.unwrap_or_default(), + ctrlc_events.map(|event| { + let clock = HLC::default(); + match event { + Event::CtrlC => Timestamped { + inner: dora_daemon::Event::CtrlC, + timestamp: clock.new_timestamp(), + }, + } + }), + ) + .await + } + } + }) + .context("failed to run dora-daemon")? + } + Command::Runtime => dora_runtime::main().context("Failed to run dora-runtime")?, + }; Ok(()) } @@ -349,3 +451,25 @@ fn query_running_dataflows( fn connect_to_coordinator() -> std::io::Result> { TcpLayer::new().connect(control_socket_addr()) } + +fn set_up_ctrlc_handler() -> Result, eyre::ErrReport> { + let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); + + let mut ctrlc_sent = false; + ctrlc::set_handler(move || { + if ctrlc_sent { + tracing::warn!("received second ctrlc signal -> aborting immediately"); + std::process::abort(); + } else { + tracing::info!("received ctrlc signal"); + if ctrlc_tx.blocking_send(Event::CtrlC).is_err() { + tracing::error!("failed to report ctrl-c event to dora-coordinator"); + } + + ctrlc_sent = true; + } + }) + .wrap_err("failed to set ctrl-c handler")?; + + Ok(ReceiverStream::new(ctrlc_rx)) +} diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index febb7b0f..056dade8 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -48,7 +48,7 @@ pub struct Args { pub async fn run(args: Args) -> eyre::Result<()> { let ctrlc_events = set_up_ctrlc_handler()?; - let (_, task) = start(args, ctrlc_events).await?; + let (_, task) = start(None, ctrlc_events).await?; task.await?; @@ -56,10 +56,10 @@ pub async fn run(args: Args) -> eyre::Result<()> { } pub async fn start( - args: Args, + port: Option, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { - let port = args.port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); + let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT); let listener = listener::create_listener(port).await?; let port = listener .local_addr() diff --git a/binaries/daemon/Cargo.toml b/binaries/daemon/Cargo.toml index a5ba2491..43255a1f 100644 --- a/binaries/daemon/Cargo.toml +++ b/binaries/daemon/Cargo.toml @@ -25,7 +25,6 @@ futures-concurrency = "7.1.0" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.86" dora-core = { workspace = true } -dora-runtime = { workspace = true } flume = "0.10.14" dora-download = { workspace = true } dora-tracing = { workspace = true, optional = true }