diff --git a/Cargo.lock b/Cargo.lock index f38be17c..7a191bcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3070,6 +3070,7 @@ dependencies = [ name = "dora-examples" version = "0.0.0" dependencies = [ + "dora-cli", "dora-coordinator", "dora-core", "dora-download", @@ -3199,7 +3200,7 @@ name = "dora-node-api-python" version = "0.3.10" dependencies = [ "arrow 54.2.1", - "dora-daemon", + "dora-cli", "dora-download", "dora-node-api", "dora-operator-api-python", diff --git a/Cargo.toml b/Cargo.toml index a353bf17..8ae578fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ dora-metrics = { version = "0.3.10", path = "libraries/extensions/telemetry/metr dora-download = { version = "0.3.10", path = "libraries/extensions/download" } shared-memory-server = { version = "0.3.10", path = "libraries/shared-memory-server" } communication-layer-request-reply = { version = "0.3.10", path = "libraries/communication-layer/request-reply" } +dora-cli = { version = "0.3.10", path = "binaries/cli" } dora-runtime = { version = "0.3.10", path = "binaries/runtime" } dora-daemon = { version = "0.3.10", path = "binaries/daemon" } dora-coordinator = { version = "0.3.10", path = "binaries/coordinator" } @@ -104,6 +105,7 @@ ros2-examples = [] [dev-dependencies] eyre = "0.6.8" tokio = "1.24.2" +dora-cli = { workspace = true } dora-coordinator = { workspace = true } dora-core = { workspace = true } dora-message = { workspace = true } diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index 54ebff5c..c06fbbaa 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -24,7 +24,7 @@ eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] } -dora-daemon = { workspace = true } +dora-cli = { workspace = true } dora-download = { workspace = true } arrow = { workspace = true, features = ["pyarrow"] } pythonize = { workspace = true } diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index e2a249a9..c6165f17 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; -use dora_daemon::Daemon; use dora_download::download_file; use dora_node_api::dora_core::config::NodeId; use dora_node_api::dora_core::descriptor::source_is_url; @@ -382,19 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result { #[pyfunction] #[pyo3(signature = (dataflow_path, uv=None))] pub fn run(dataflow_path: String, uv: Option) -> eyre::Result<()> { - let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?; - let rt = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?; - match result.is_ok() { - true => Ok(()), - false => Err(eyre::eyre!( - "Dataflow failed to run with error: {:?}", - result.node_results - )), - } + dora_cli::run::run(dataflow_path, uv.unwrap_or_default()) } #[pymodule] diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index ca3d2422..28a032a3 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -48,7 +48,8 @@ mod formatting; mod git; mod graph; mod logs; -mod session; +pub mod run; +pub mod session; mod template; mod up; @@ -303,14 +304,14 @@ enum Lang { } pub fn lib_main(args: Args) { - if let Err(err) = run(args) { + if let Err(err) = run_cli(args) { eprintln!("\n\n{}", "[ERROR]".bold().red()); eprintln!("{err:?}"); std::process::exit(1); } } -fn run(args: Args) -> eyre::Result<()> { +fn run_cli(args: Args) -> eyre::Result<()> { #[cfg(feature = "tracing")] match &args.command { Command::Daemon { @@ -403,24 +404,7 @@ fn run(args: Args) -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Run { dataflow, uv } => { - let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; - let dataflow_session = DataflowSession::read_session(&dataflow_path) - .context("failed to read DataflowSession")?; - - let rt = Builder::new_multi_thread() - .enable_all() - .build() - .context("tokio runtime failed")?; - - let result = rt.block_on(Daemon::run_dataflow( - &dataflow_path, - dataflow_session.build_id, - dataflow_session.session_id, - uv, - ))?; - handle_dataflow_result(result, None)? - } + Command::Run { dataflow, uv } => run::run(dataflow, uv)?, Command::Up { config } => { up::up(config.as_deref())?; } diff --git a/binaries/cli/src/session.rs b/binaries/cli/src/session.rs index c0069359..0edb8011 100644 --- a/binaries/cli/src/session.rs +++ b/binaries/cli/src/session.rs @@ -42,12 +42,12 @@ impl DataflowSession { pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> { let session_file = session_file_path(dataflow_path)?; std::fs::write(session_file, self.serialize()?) - .context("failed to write dataflow session file"); + .context("failed to write dataflow session file")?; Ok(()) } fn serialize(&self) -> eyre::Result { - Ok(serde_yaml::to_string(&self).context("failed to serialize dataflow session file")?) + serde_yaml::to_string(&self).context("failed to serialize dataflow session file") } } diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index 8e0076bc..b6bed6fe 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -11,12 +11,26 @@ async fn main() -> eyre::Result<()> { .wrap_err("failed to set working dir")?; let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--release"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index d9e37e59..4d8a40e1 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,3 +1,4 @@ +use dora_cli::session::DataflowSession; use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::{read_as_descriptor, DescriptorExt}, @@ -37,6 +38,7 @@ async fn main() -> eyre::Result<()> { .wrap_err("failed to set working dir")?; let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); let coordinator_bind = SocketAddr::new( @@ -138,12 +140,17 @@ async fn start_dataflow( .check(&working_dir) .wrap_err("could not validate yaml")?; + let dataflow_session = + DataflowSession::read_session(dataflow).context("failed to read DataflowSession")?; + let (reply_sender, reply) = oneshot::channel(); coordinator_events_tx .send(Event::Control(ControlEvent::IncomingRequest { request: ControlRequest::Start { + build_id: dataflow_session.build_id, + session_id: dataflow_session.session_id, dataflow: dataflow_descriptor, - local_working_dir: working_dir, + local_working_dir: Some(working_dir), name: None, uv: false, }, @@ -227,6 +234,18 @@ async fn destroy(coordinator_events_tx: &Sender) -> eyre::Result<()> { } } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/python-ros2-dataflow/run.rs b/examples/python-ros2-dataflow/run.rs index 2873426e..23b254e2 100644 --- a/examples/python-ros2-dataflow/run.rs +++ b/examples/python-ros2-dataflow/run.rs @@ -40,6 +40,15 @@ async fn main() -> eyre::Result<()> { async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); + // First build the dataflow (install requirements) + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow).arg("--uv"); + if !cmd.status().await?.success() { + bail!("failed to run dataflow"); + }; + let mut cmd = tokio::process::Command::new(&cargo); cmd.arg("run"); cmd.arg("--package").arg("dora-cli"); diff --git a/examples/rust-dataflow-git/run.rs b/examples/rust-dataflow-git/run.rs index 6a6a8782..490c5c57 100644 --- a/examples/rust-dataflow-git/run.rs +++ b/examples/rust-dataflow-git/run.rs @@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> { } else { Path::new("dataflow.yml") }; + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs index e93a5d28..6f511970 100644 --- a/examples/rust-dataflow-url/run.rs +++ b/examples/rust-dataflow-url/run.rs @@ -11,12 +11,25 @@ async fn main() -> eyre::Result<()> { .wrap_err("failed to set working dir")?; let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index 6a6a8782..490c5c57 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> { } else { Path::new("dataflow.yml") }; + build_dataflow(dataflow).await?; run_dataflow(dataflow).await?; Ok(()) } +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo);