|
|
|
@@ -4,42 +4,107 @@ use dora_node_api::{ |
|
|
|
config::{format_duration, NodeId}, |
|
|
|
}; |
|
|
|
use eyre::{bail, eyre, WrapErr}; |
|
|
|
use futures::{stream::FuturesUnordered, StreamExt}; |
|
|
|
use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; |
|
|
|
use futures_concurrency::stream::Merge; |
|
|
|
use std::{ |
|
|
|
env::consts::EXE_EXTENSION, |
|
|
|
path::{Path, PathBuf}, |
|
|
|
time::Duration, |
|
|
|
}; |
|
|
|
use tokio_stream::wrappers::IntervalStream; |
|
|
|
use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; |
|
|
|
|
|
|
|
#[derive(Debug, Clone, clap::Parser)] |
|
|
|
#[clap(about = "Dora coordinator")] |
|
|
|
pub enum Command { |
|
|
|
#[clap(about = "Run dataflow pipeline")] |
|
|
|
Run { |
|
|
|
dataflow: PathBuf, |
|
|
|
runtime: Option<PathBuf>, |
|
|
|
}, |
|
|
|
pub struct Args { |
|
|
|
pub runtime: Option<PathBuf>, |
|
|
|
pub run_dataflow: Option<PathBuf>, |
|
|
|
} |
|
|
|
|
|
|
|
pub async fn run(command: Command) -> eyre::Result<()> { |
|
|
|
match command { |
|
|
|
Command::Run { dataflow, runtime } => { |
|
|
|
let runtime_path = runtime.unwrap_or_else(|| { |
|
|
|
std::env::args() |
|
|
|
.next() |
|
|
|
.map(PathBuf::from) |
|
|
|
.unwrap_or_default() |
|
|
|
.with_file_name("dora-runtime") |
|
|
|
}); |
|
|
|
run_dataflow(dataflow.clone(), &runtime_path) |
|
|
|
pub async fn run(args: Args) -> eyre::Result<()> { |
|
|
|
let Args { |
|
|
|
runtime, |
|
|
|
run_dataflow, |
|
|
|
} = args; |
|
|
|
|
|
|
|
let runtime_path = runtime.unwrap_or_else(|| { |
|
|
|
std::env::args() |
|
|
|
.next() |
|
|
|
.map(PathBuf::from) |
|
|
|
.unwrap_or_default() |
|
|
|
.with_file_name("dora-runtime") |
|
|
|
}); |
|
|
|
|
|
|
|
match run_dataflow { |
|
|
|
Some(path) => { |
|
|
|
// start the given dataflow directly |
|
|
|
self::run_dataflow(path.clone(), &runtime_path) |
|
|
|
.await |
|
|
|
.wrap_err_with(|| format!("failed to run dataflow at {}", dataflow.display()))? |
|
|
|
.wrap_err_with(|| format!("failed to run dataflow at {}", path.display()))?; |
|
|
|
} |
|
|
|
None => { |
|
|
|
// start in daemon mode |
|
|
|
start(&runtime_path).await?; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn start(runtime_path: &Path) -> eyre::Result<()> { |
|
|
|
let (dataflow_errors_tx, dataflow_errors) = tokio::sync::mpsc::channel(2); |
|
|
|
let mut dataflow_errors_tx = Some(dataflow_errors_tx); |
|
|
|
let dataflow_error_events = ReceiverStream::new(dataflow_errors).map(Event::DataflowError); |
|
|
|
|
|
|
|
let stop_events = tokio::time::sleep(Duration::from_secs(5)) |
|
|
|
.into_stream() |
|
|
|
.map(|()| Event::Stop); |
|
|
|
|
|
|
|
let mut events = (dataflow_error_events, stop_events).merge(); |
|
|
|
|
|
|
|
while let Some(event) = events.next().await { |
|
|
|
match event { |
|
|
|
Event::DataflowError(err) => { |
|
|
|
tracing::error!("{err:?}"); |
|
|
|
} |
|
|
|
Event::StartDataflow { path } => { |
|
|
|
let runtime_path = runtime_path.to_owned(); |
|
|
|
let dataflow_errors_tx = match &dataflow_errors_tx { |
|
|
|
Some(channel) => channel.clone(), |
|
|
|
None => { |
|
|
|
tracing::error!("cannot start new dataflow after receiving stop command"); |
|
|
|
continue; |
|
|
|
} |
|
|
|
}; |
|
|
|
let task = async move { |
|
|
|
let result = run_dataflow(path.clone(), &runtime_path) |
|
|
|
.await |
|
|
|
.wrap_err_with(|| format!("failed to run dataflow at {}", path.display())); |
|
|
|
match result { |
|
|
|
Ok(()) => {} |
|
|
|
Err(err) => { |
|
|
|
let _ = dataflow_errors_tx.send(err).await; |
|
|
|
} |
|
|
|
} |
|
|
|
}; |
|
|
|
tokio::spawn(task); |
|
|
|
} |
|
|
|
Event::Stop => { |
|
|
|
tracing::info!("Received stop command"); |
|
|
|
// ensure that no new dataflows can be started |
|
|
|
dataflow_errors_tx = None; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
enum Event { |
|
|
|
DataflowError(eyre::Report), |
|
|
|
StartDataflow { path: PathBuf }, |
|
|
|
Stop, |
|
|
|
} |
|
|
|
|
|
|
|
async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()> { |
|
|
|
let runtime = runtime.with_extension(EXE_EXTENSION); |
|
|
|
let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { |
|
|
|
|