diff --git a/Cargo.lock b/Cargo.lock index 77c3df57..6e1c7e5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -904,7 +904,7 @@ dependencies = [ "dora-node-api", "eyre", "futures", - "futures-concurrency", + "futures-concurrency 5.0.1", "rand", "serde", "serde_yaml 0.8.23", @@ -1042,7 +1042,7 @@ dependencies = [ "fern", "flume", "futures", - "futures-concurrency", + "futures-concurrency 2.0.3", "libloading", "pyo3", "serde_yaml 0.8.23", @@ -1209,6 +1209,16 @@ dependencies = [ "pin-project", ] +[[package]] +name = "futures-concurrency" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "407ed2aa475d777e35fb167144b63babd0377b2f9a528ae3ec4bec94f1ce1f1a" +dependencies = [ + "futures-core", + "pin-project", +] + [[package]] name = "futures-core" version = "0.3.21" diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index 1831b02a..ed9ac418 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -19,9 +19,9 @@ tokio-util = { version = "0.7.1", features = ["codec"] } clap = { version = "3.1.8", features = ["derive"] } uuid = "0.8.2" time = "0.3.9" -futures-concurrency = "2.0.3" rand = "0.8.5" dora-core = { version = "0.1.0", path = "../../libraries/core" } dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" +futures-concurrency = "5.0.1" diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 5f8f2d1b..6de19cfb 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -4,42 +4,107 @@ use dora_node_api::{ config::{format_duration, NodeId}, }; use eyre::{bail, eyre, WrapErr}; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; +use futures_concurrency::stream::Merge; use std::{ env::consts::EXE_EXTENSION, path::{Path, PathBuf}, + time::Duration, }; -use tokio_stream::wrappers::IntervalStream; +use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; #[derive(Debug, Clone, clap::Parser)] #[clap(about = "Dora coordinator")] -pub enum Command { - #[clap(about = "Run dataflow pipeline")] - Run { - dataflow: PathBuf, - runtime: Option, - }, +pub struct Args { + pub runtime: Option, + pub run_dataflow: Option, } -pub async fn run(command: Command) -> eyre::Result<()> { - match command { - Command::Run { dataflow, runtime } => { - let runtime_path = runtime.unwrap_or_else(|| { - std::env::args() - .next() - .map(PathBuf::from) - .unwrap_or_default() - .with_file_name("dora-runtime") - }); - run_dataflow(dataflow.clone(), &runtime_path) +pub async fn run(args: Args) -> eyre::Result<()> { + let Args { + runtime, + run_dataflow, + } = args; + + let runtime_path = runtime.unwrap_or_else(|| { + std::env::args() + .next() + .map(PathBuf::from) + .unwrap_or_default() + .with_file_name("dora-runtime") + }); + + match run_dataflow { + Some(path) => { + // start the given dataflow directly + self::run_dataflow(path.clone(), &runtime_path) .await - .wrap_err_with(|| format!("failed to run dataflow at {}", dataflow.display()))? + .wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?; + } + None => { + // start in daemon mode + start(&runtime_path).await?; + } + } + + Ok(()) +} + +async fn start(runtime_path: &Path) -> eyre::Result<()> { + let (dataflow_errors_tx, dataflow_errors) = tokio::sync::mpsc::channel(2); + let mut dataflow_errors_tx = Some(dataflow_errors_tx); + let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError); + + let stop_events = tokio::time::sleep(Duration::from_secs(5)) + .into_stream() + .map(|()| Event::Stop); + + let mut events = (dataflow_error_events, stop_events).merge(); + + while let Some(event) = events.next().await { + match event { + Event::DataflowError(err) => { + tracing::error!("{err:?}"); + } + Event::StartDataflow { path } => { + let runtime_path = runtime_path.to_owned(); + let dataflow_errors_tx = match &dataflow_errors_tx { + Some(channel) => channel.clone(), + None => { + tracing::error!("cannot start new dataflow after receiving stop command"); + continue; + } + }; + let task = async move { + let result = run_dataflow(path.clone(), &runtime_path) + .await + .wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); + match result { + Ok(()) => {} + Err(err) => { + let _ = dataflow_errors_tx.send(err).await; + } + } + }; + tokio::spawn(task); + } + Event::Stop => { + tracing::info!("Received stop command"); + // ensure that no new dataflows can be started + dataflow_errors_tx = None; + } } } Ok(()) } +enum Event { + DataflowError(eyre::Report), + StartDataflow { path: PathBuf }, + Stop, +} + async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { let runtime = runtime.with_extension(EXE_EXTENSION); let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { diff --git a/binaries/coordinator/src/main.rs b/binaries/coordinator/src/main.rs index 7a8fb1de..dccbde73 100644 --- a/binaries/coordinator/src/main.rs +++ b/binaries/coordinator/src/main.rs @@ -4,8 +4,8 @@ use eyre::Context; async fn main() -> eyre::Result<()> { set_up_tracing().context("failed to set up tracing subscriber")?; - let command = clap::Parser::parse(); - dora_coordinator::run(command).await + let args = clap::Parser::parse(); + dora_coordinator::run(args).await } fn set_up_tracing() -> eyre::Result<()> { diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index 319a286d..c4daf9b3 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -32,8 +32,8 @@ async fn main() -> eyre::Result<()> { build_package("dora-runtime").await?; - dora_coordinator::run(dora_coordinator::Command::Run { - dataflow: Path::new("dataflow.yml").to_owned(), + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: Path::new("dataflow.yml").to_owned().into(), runtime: Some(root.join("target").join("debug").join("dora-runtime")), }) .await?; diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index 18c10e6c..724729fc 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -20,8 +20,8 @@ async fn main() -> eyre::Result<()> { build_c_node(root, "sink.c", "c_sink").await?; build_c_operator().await?; - dora_coordinator::run(dora_coordinator::Command::Run { - dataflow: Path::new("dataflow.yml").to_owned(), + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: Path::new("dataflow.yml").to_owned().into(), runtime: Some(root.join("target").join("debug").join("dora-runtime")), }) .await?; diff --git a/examples/iceoryx/run.rs b/examples/iceoryx/run.rs index e6562fa6..87a7d6fb 100644 --- a/examples/iceoryx/run.rs +++ b/examples/iceoryx/run.rs @@ -12,8 +12,8 @@ async fn main() -> eyre::Result<()> { build_package("iceoryx-example-sink").await?; build_package("dora-runtime").await?; - dora_coordinator::run(dora_coordinator::Command::Run { - dataflow: Path::new("dataflow.yml").to_owned(), + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: Path::new("dataflow.yml").to_owned().into(), runtime: Some(root.join("target").join("debug").join("dora-runtime")), }) .await?; diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index d0b50f4d..9378905c 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -11,8 +11,8 @@ async fn main() -> eyre::Result<()> { build_dataflow(dataflow).await?; build_package("dora-runtime").await?; - dora_coordinator::run(dora_coordinator::Command::Run { - dataflow: dataflow.to_owned(), + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: dataflow.to_owned().into(), runtime: Some(root.join("target").join("debug").join("dora-runtime")), }) .await?;