Philipp Oppermann 7 months ago
parent
commit
b65563dd2b
Failed to extract signature
12 changed files with 95 additions and 40 deletions
  1. +2
    -1
      Cargo.lock
  2. +2
    -0
      Cargo.toml
  3. +1
    -1
      apis/python/node/Cargo.toml
  4. +1
    -14
      apis/python/node/src/lib.rs
  5. +5
    -21
      binaries/cli/src/lib.rs
  6. +2
    -2
      binaries/cli/src/session.rs
  7. +14
    -0
      examples/benchmark/run.rs
  8. +20
    -1
      examples/multiple-daemons/run.rs
  9. +9
    -0
      examples/python-ros2-dataflow/run.rs
  10. +13
    -0
      examples/rust-dataflow-git/run.rs
  11. +13
    -0
      examples/rust-dataflow-url/run.rs
  12. +13
    -0
      examples/rust-dataflow/run.rs

+ 2
- 1
Cargo.lock View File

@@ -3070,6 +3070,7 @@ dependencies = [
name = "dora-examples"
version = "0.0.0"
dependencies = [
"dora-cli",
"dora-coordinator",
"dora-core",
"dora-download",
@@ -3199,7 +3200,7 @@ name = "dora-node-api-python"
version = "0.3.10"
dependencies = [
"arrow 54.2.1",
"dora-daemon",
"dora-cli",
"dora-download",
"dora-node-api",
"dora-operator-api-python",


+ 2
- 0
Cargo.toml View File

@@ -69,6 +69,7 @@ dora-metrics = { version = "0.3.10", path = "libraries/extensions/telemetry/metr
dora-download = { version = "0.3.10", path = "libraries/extensions/download" }
shared-memory-server = { version = "0.3.10", path = "libraries/shared-memory-server" }
communication-layer-request-reply = { version = "0.3.10", path = "libraries/communication-layer/request-reply" }
dora-cli = { version = "0.3.10", path = "binaries/cli" }
dora-runtime = { version = "0.3.10", path = "binaries/runtime" }
dora-daemon = { version = "0.3.10", path = "binaries/daemon" }
dora-coordinator = { version = "0.3.10", path = "binaries/coordinator" }
@@ -104,6 +105,7 @@ ros2-examples = []
[dev-dependencies]
eyre = "0.6.8"
tokio = "1.24.2"
dora-cli = { workspace = true }
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dora-message = { workspace = true }


+ 1
- 1
apis/python/node/Cargo.toml View File

@@ -24,7 +24,7 @@ eyre = "0.6"
serde_yaml = "0.8.23"
flume = "0.10.14"
dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] }
dora-daemon = { workspace = true }
dora-cli = { workspace = true }
dora-download = { workspace = true }
arrow = { workspace = true, features = ["pyarrow"] }
pythonize = { workspace = true }


+ 1
- 14
apis/python/node/src/lib.rs View File

@@ -6,7 +6,6 @@ use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_daemon::Daemon;
use dora_download::download_file;
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::descriptor::source_is_url;
@@ -382,19 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
#[pyfunction]
#[pyo3(signature = (dataflow_path, uv=None))]
pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> {
let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?;
match result.is_ok() {
true => Ok(()),
false => Err(eyre::eyre!(
"Dataflow failed to run with error: {:?}",
result.node_results
)),
}
dora_cli::run::run(dataflow_path, uv.unwrap_or_default())
}

#[pymodule]


+ 5
- 21
binaries/cli/src/lib.rs View File

@@ -48,7 +48,8 @@ mod formatting;
mod git;
mod graph;
mod logs;
mod session;
pub mod run;
pub mod session;
mod template;
mod up;

@@ -303,14 +304,14 @@ enum Lang {
}

pub fn lib_main(args: Args) {
if let Err(err) = run(args) {
if let Err(err) = run_cli(args) {
eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:?}");
std::process::exit(1);
}
}

fn run(args: Args) -> eyre::Result<()> {
fn run_cli(args: Args) -> eyre::Result<()> {
#[cfg(feature = "tracing")]
match &args.command {
Command::Daemon {
@@ -403,24 +404,7 @@ fn run(args: Args) -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow, uv } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session = DataflowSession::read_session(&dataflow_path)
.context("failed to read DataflowSession")?;

let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;

let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
dataflow_session.session_id,
uv,
))?;
handle_dataflow_result(result, None)?
}
Command::Run { dataflow, uv } => run::run(dataflow, uv)?,
Command::Up { config } => {
up::up(config.as_deref())?;
}


+ 2
- 2
binaries/cli/src/session.rs View File

@@ -42,12 +42,12 @@ impl DataflowSession {
pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> {
let session_file = session_file_path(dataflow_path)?;
std::fs::write(session_file, self.serialize()?)
.context("failed to write dataflow session file");
.context("failed to write dataflow session file")?;
Ok(())
}

fn serialize(&self) -> eyre::Result<String> {
Ok(serde_yaml::to_string(&self).context("failed to serialize dataflow session file")?)
serde_yaml::to_string(&self).context("failed to serialize dataflow session file")
}
}



+ 14
- 0
examples/benchmark/run.rs View File

@@ -11,12 +11,26 @@ async fn main() -> eyre::Result<()> {
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--release");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 20
- 1
examples/multiple-daemons/run.rs View File

@@ -1,3 +1,4 @@
use dora_cli::session::DataflowSession;
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::{read_as_descriptor, DescriptorExt},
@@ -37,6 +38,7 @@ async fn main() -> eyre::Result<()> {
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
@@ -138,12 +140,17 @@ async fn start_dataflow(
.check(&working_dir)
.wrap_err("could not validate yaml")?;

let dataflow_session =
DataflowSession::read_session(dataflow).context("failed to read DataflowSession")?;

let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::Start {
build_id: dataflow_session.build_id,
session_id: dataflow_session.session_id,
dataflow: dataflow_descriptor,
local_working_dir: working_dir,
local_working_dir: Some(working_dir),
name: None,
uv: false,
},
@@ -227,6 +234,18 @@ async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> {
}
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 9
- 0
examples/python-ros2-dataflow/run.rs View File

@@ -40,6 +40,15 @@ async fn main() -> eyre::Result<()> {
async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();

// First build the dataflow (install requirements)
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow).arg("--uv");
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};

let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");


+ 13
- 0
examples/rust-dataflow-git/run.rs View File

@@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> {
} else {
Path::new("dataflow.yml")
};
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 13
- 0
examples/rust-dataflow-url/run.rs View File

@@ -11,12 +11,25 @@ async fn main() -> eyre::Result<()> {
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


+ 13
- 0
examples/rust-dataflow/run.rs View File

@@ -16,12 +16,25 @@ async fn main() -> eyre::Result<()> {
} else {
Path::new("dataflow.yml")
};
build_dataflow(dataflow).await?;

run_dataflow(dataflow).await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);


Loading…
Cancel
Save