| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
22f2956b84
|
Remove dev-dependency on `tokio` | 1 year ago |
|
|
fcef458aa3
|
Migrate rust-dataflow-url example to `xshell` | 1 year ago |
|
|
1c3488d977
|
Migrate Python operator example to xshell | 1 year ago |
|
|
6fba72165a
|
Remove tokio dependency from cmake example | 1 year ago |
|
|
4514d6e9d0
|
Migrate C++ dataflow example to `xshell` | 1 year ago |
|
|
61d3f16bb2
|
Remove tokio dependency from Python dataflow example | 1 year ago |
|
|
7ec839fc6f
|
Migrate C dataflow example to `xshell` | 1 year ago |
|
|
b5bd71c17f
|
Migrate benchmark example to xshell | 1 year ago |
|
|
cd41356e76
|
Merge branch 'main' into xshell | 1 year ago |
|
|
46433bb8c5
|
Migrate run.rs script of cmake dataflow example to xshell | 1 year ago |
|
|
dcae010f86
|
Simplify `multiple-daemons` examples using new `connected-machines` command and `xshell` crate | 1 year ago |
|
|
1b665b0c44
|
CLI: Add a hidden `connected-machines` command
This command is useful for testing examples with multiple daemons. |
1 year ago |
|
|
4117e6087e
|
Migrate run.rs script of rust-ros2-dataflow example to xshell | 1 year ago |
|
|
20498f7e1e
|
Remove unused import | 1 year ago |
|
|
1a5fa7fe45
|
Fix: Use correct python/pip binaries after setting up venv
We don't modify the PATH for the current executable anymore, so we cannot use the `get_python_path`/`get_pip_path` functions anymore. |
1 year ago |
|
|
6f85cfb429
|
Use `xshell` for Python dataflow example to try venv activation | 1 year ago |
|
|
179dbf1bbc
|
Simplify the `run.rs` script of the `rust-dataflow` example using `xshell`
The `xshell` crate provides a more convenient way to build and run a `Command`, which is more similar to traditional bash scripts. Using this crate, we can simplify our `run.rs` script while still being platform-independent and not requiring any external dependencies. We can also still run the examples using `cargo run --example`. |
1 year ago |
| @@ -2352,10 +2352,9 @@ dependencies = [ | |||
| "eyre", | |||
| "futures", | |||
| "serde_yaml 0.8.26", | |||
| "tokio", | |||
| "tokio-stream", | |||
| "tracing", | |||
| "uuid", | |||
| "xshell", | |||
| ] | |||
| [[package]] | |||
| @@ -86,7 +86,6 @@ ros2-examples = [] | |||
| [dev-dependencies] | |||
| eyre = "0.6.8" | |||
| tokio = "1.24.2" | |||
| dora-coordinator = { workspace = true } | |||
| dora-core = { workspace = true } | |||
| dora-tracing = { workspace = true } | |||
| @@ -96,7 +95,7 @@ serde_yaml = "0.8.23" | |||
| uuid = { version = "1.7", features = ["v7", "serde"] } | |||
| tracing = "0.1.36" | |||
| futures = "0.3.25" | |||
| tokio-stream = "0.1.11" | |||
| xshell = "0.2.6" | |||
| [[example]] | |||
| name = "c-dataflow" | |||
| @@ -14,7 +14,7 @@ use dora_daemon::Daemon; | |||
| use dora_tracing::set_up_tracing; | |||
| use duration_str::parse; | |||
| use eyre::{bail, Context}; | |||
| use std::net::SocketAddr; | |||
| use std::{collections::BTreeSet, net::SocketAddr}; | |||
| use std::{ | |||
| net::{IpAddr, Ipv4Addr}, | |||
| path::PathBuf, | |||
| @@ -102,6 +102,8 @@ enum Command { | |||
| dataflow: Option<String>, | |||
| node: String, | |||
| }, | |||
| #[clap(hide = true)] | |||
| ConnectedMachines, | |||
| // Metrics, | |||
| // Stats, | |||
| // Get, | |||
| @@ -273,6 +275,12 @@ fn run() -> eyre::Result<()> { | |||
| bail!("No dora coordinator seems to be running."); | |||
| } | |||
| }, | |||
| Command::ConnectedMachines => match connect_to_coordinator() { | |||
| Ok(mut session) => connected_machines(&mut *session)?, | |||
| Err(_) => { | |||
| bail!("No dora coordinator seems to be running."); | |||
| } | |||
| }, | |||
| Command::Stop { | |||
| uuid, | |||
| name, | |||
| @@ -466,6 +474,37 @@ fn query_running_dataflows( | |||
| Ok(ids) | |||
| } | |||
| fn connected_machines(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { | |||
| let machines = query_connected_machines(session)?; | |||
| if machines.is_empty() { | |||
| eprintln!("No machines are connected"); | |||
| } else { | |||
| for id in machines { | |||
| println!("{id}"); | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| fn query_connected_machines( | |||
| session: &mut TcpRequestReplyConnection, | |||
| ) -> eyre::Result<BTreeSet<String>> { | |||
| let reply_raw = session | |||
| .request(&serde_json::to_vec(&ControlRequest::ConnectedMachines).unwrap()) | |||
| .wrap_err("failed to send connected machines message")?; | |||
| let reply: ControlRequestReply = | |||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||
| let ids = match reply { | |||
| ControlRequestReply::ConnectedMachines(ids) => ids, | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected connected machines reply: {other:?}"), | |||
| }; | |||
| Ok(ids) | |||
| } | |||
| fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> { | |||
| TcpLayer::new().connect(control_socket_addr()) | |||
| } | |||
| @@ -1,46 +1,43 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::path::Path; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("benchmark-runner").wrap_err("failed to set up tracing subscriber")?; | |||
| fn main() -> eyre::Result<()> { | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora_optimized(&sh)?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| // build the dataflow using `dora build` | |||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| // start running the dataflow.yml | |||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||
| run_dataflow(dataflow).await?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| Ok(()) | |||
| } | |||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--").arg("build").arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora_optimized(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli --release").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("release").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,12 +1,9 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::{ | |||
| env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, | |||
| path::Path, | |||
| }; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| use eyre::Context; | |||
| use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| fn main() -> eyre::Result<()> { | |||
| set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; | |||
| if cfg!(windows) { | |||
| @@ -16,284 +13,168 @@ async fn main() -> eyre::Result<()> { | |||
| return Ok(()); | |||
| } | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| cmd!(sh, "cargo build --package dora-node-api-cxx").run()?; | |||
| cmd!(sh, "cargo build --package dora-operator-api-cxx").run()?; | |||
| cmd!(sh, "cargo build --package dora-node-api-c").run()?; | |||
| cmd!(sh, "cargo build --package dora-operator-api-c").run()?; | |||
| sh.create_dir("build")?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let target = root.join("target"); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| tokio::fs::create_dir_all("build").await?; | |||
| let build_dir = Path::new("build"); | |||
| let target_debug = target.join("debug"); | |||
| build_package("dora-node-api-cxx").await?; | |||
| let node_cxxbridge = target | |||
| .join("cxxbridge") | |||
| .join("dora-node-api-cxx") | |||
| .join("src"); | |||
| tokio::fs::copy( | |||
| node_cxxbridge.join("lib.rs.cc"), | |||
| build_dir.join("node-bridge.cc"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::copy( | |||
| node_cxxbridge.join("lib.rs.h"), | |||
| build_dir.join("dora-node-api.h"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::write( | |||
| build_dir.join("operator.h"), | |||
| sh.copy_file(node_cxxbridge.join("lib.rs.cc"), "build/node-bridge.cc")?; | |||
| sh.copy_file(node_cxxbridge.join("lib.rs.h"), "build/dora-node-api.h")?; | |||
| sh.write_file( | |||
| "build/operator.h", | |||
| r###"#include "../operator-rust-api/operator.h""###, | |||
| ) | |||
| .await?; | |||
| )?; | |||
| build_package("dora-operator-api-cxx").await?; | |||
| let operator_cxxbridge = target | |||
| .join("cxxbridge") | |||
| .join("dora-operator-api-cxx") | |||
| .join("src"); | |||
| tokio::fs::copy( | |||
| sh.copy_file( | |||
| operator_cxxbridge.join("lib.rs.cc"), | |||
| build_dir.join("operator-bridge.cc"), | |||
| ) | |||
| .await?; | |||
| tokio::fs::copy( | |||
| "build/operator-bridge.cc", | |||
| )?; | |||
| sh.copy_file( | |||
| operator_cxxbridge.join("lib.rs.h"), | |||
| build_dir.join("dora-operator-api.h"), | |||
| ) | |||
| .await?; | |||
| "build/dora-operator-api.h", | |||
| )?; | |||
| build_package("dora-node-api-c").await?; | |||
| build_package("dora-operator-api-c").await?; | |||
| build_cxx_node( | |||
| root, | |||
| &[ | |||
| &dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?, | |||
| &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, | |||
| ], | |||
| "node_rust_api", | |||
| &["-l", "dora_node_api_cxx"], | |||
| ) | |||
| .await?; | |||
| build_cxx_node( | |||
| root, | |||
| &[&dunce::canonicalize( | |||
| Path::new("node-c-api").join("main.cc"), | |||
| )?], | |||
| "node_c_api", | |||
| &["-l", "dora_node_api_c"], | |||
| ) | |||
| .await?; | |||
| build_cxx_operator( | |||
| // compile nodes | |||
| let args: &[&str] = if cfg!(target_os = "linux") { | |||
| &["-l", "m", "-l", "rt", "-l", "dl", "-pthread"] | |||
| } else if cfg!(target_os = "windows") { | |||
| &[ | |||
| &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, | |||
| &dunce::canonicalize(build_dir.join("operator-bridge.cc"))?, | |||
| ], | |||
| "operator_rust_api", | |||
| "-ladvapi32", | |||
| "-luserenv", | |||
| "-lkernel32", | |||
| "-lws2_32", | |||
| "-lbcrypt", | |||
| "-lncrypt", | |||
| "-lschannel", | |||
| "-lntdll", | |||
| "-liphlpapi", | |||
| "-lcfgmgr32", | |||
| "-lcredui", | |||
| "-lcrypt32", | |||
| "-lcryptnet", | |||
| "-lfwpuclnt", | |||
| "-lgdi32", | |||
| "-lmsimg32", | |||
| "-lmswsock", | |||
| "-lole32", | |||
| "-loleaut32", | |||
| "-lopengl32", | |||
| "-lsecur32", | |||
| "-lshell32", | |||
| "-lsynchronization", | |||
| "-luser32", | |||
| "-lwinspool", | |||
| "-Wl,-nodefaultlib:libcmt", | |||
| "-D_DLL", | |||
| "-lmsvcrt", | |||
| ] | |||
| } else if cfg!(target_os = "macos") { | |||
| &[ | |||
| "-framework", | |||
| "CoreServices", | |||
| "-framework", | |||
| "Security", | |||
| "-l", | |||
| "dora_operator_api_cxx", | |||
| "-L", | |||
| root.join("target").join("debug").to_str().unwrap(), | |||
| ], | |||
| ) | |||
| .await?; | |||
| build_cxx_operator( | |||
| &[&dunce::canonicalize( | |||
| Path::new("operator-c-api").join("operator.cc"), | |||
| )?], | |||
| "operator_c_api", | |||
| &[ | |||
| "System", | |||
| "-l", | |||
| "resolv", | |||
| "-l", | |||
| "pthread", | |||
| "-l", | |||
| "c", | |||
| "-l", | |||
| "dora_operator_api_c", | |||
| "-L", | |||
| root.join("target").join("debug").to_str().unwrap(), | |||
| ], | |||
| "m", | |||
| ] | |||
| } else { | |||
| panic!("unsupported target platform") | |||
| }; | |||
| cmd!( | |||
| sh, | |||
| "clang++ node-rust-api/main.cc build/node-bridge.cc -std=c++17 -l dora_node_api_cxx {args...} -L {target_debug} --output build/node_rust_api{EXE_SUFFIX}" | |||
| ) | |||
| .await?; | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| build_package("dora-runtime").await?; | |||
| run_dataflow(&dataflow).await?; | |||
| Ok(()) | |||
| } | |||
| .run()?; | |||
| cmd!( | |||
| sh, | |||
| "clang++ node-c-api/main.cc -std=c++17 -l dora_node_api_c {args...} -L {target_debug} --output build/node_c_api{EXE_SUFFIX}" | |||
| ) | |||
| .run()?; | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("build"); | |||
| cmd.arg("--package").arg(package); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build {package}"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| // compile operators | |||
| let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] }; | |||
| cmd!( | |||
| sh, | |||
| "clang++ -c build/operator-bridge.cc -std=c++17 -o build/operator-bridge.o {operator_args...}" | |||
| ) | |||
| .run()?; | |||
| cmd!( | |||
| sh, | |||
| "clang++ -c operator-rust-api/operator.cc -o operator-rust-api/operator.o -std=c++17 {operator_args...}" | |||
| ) | |||
| .run()?; | |||
| cmd!( | |||
| sh, | |||
| "clang++ -c operator-c-api/operator.cc -o operator-c-api/operator.o -std=c++17 {operator_args...}" | |||
| ) | |||
| .run()?; | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| // link operators | |||
| cmd!( | |||
| sh, | |||
| "clang++ -shared operator-rust-api/operator.o build/operator-bridge.o -l dora_operator_api_cxx {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_rust_api{DLL_SUFFIX}" | |||
| ) | |||
| .run()?; | |||
| cmd!( | |||
| sh, | |||
| "clang++ -shared operator-c-api/operator.o -l dora_operator_api_c {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_c_api{DLL_SUFFIX}" | |||
| ) | |||
| .run()?; | |||
| async fn build_cxx_node( | |||
| root: &Path, | |||
| paths: &[&Path], | |||
| out_name: &str, | |||
| args: &[&str], | |||
| ) -> eyre::Result<()> { | |||
| let mut clang = tokio::process::Command::new("clang++"); | |||
| clang.args(paths); | |||
| clang.arg("-std=c++17"); | |||
| #[cfg(target_os = "linux")] | |||
| { | |||
| clang.arg("-l").arg("m"); | |||
| clang.arg("-l").arg("rt"); | |||
| clang.arg("-l").arg("dl"); | |||
| clang.arg("-pthread"); | |||
| } | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| clang.arg("-ladvapi32"); | |||
| clang.arg("-luserenv"); | |||
| clang.arg("-lkernel32"); | |||
| clang.arg("-lws2_32"); | |||
| clang.arg("-lbcrypt"); | |||
| clang.arg("-lncrypt"); | |||
| clang.arg("-lschannel"); | |||
| clang.arg("-lntdll"); | |||
| clang.arg("-liphlpapi"); | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| clang.arg("-lcfgmgr32"); | |||
| clang.arg("-lcredui"); | |||
| clang.arg("-lcrypt32"); | |||
| clang.arg("-lcryptnet"); | |||
| clang.arg("-lfwpuclnt"); | |||
| clang.arg("-lgdi32"); | |||
| clang.arg("-lmsimg32"); | |||
| clang.arg("-lmswsock"); | |||
| clang.arg("-lole32"); | |||
| clang.arg("-lopengl32"); | |||
| clang.arg("-lsecur32"); | |||
| clang.arg("-lshell32"); | |||
| clang.arg("-lsynchronization"); | |||
| clang.arg("-luser32"); | |||
| clang.arg("-lwinspool"); | |||
| // start running the dataflow.yml | |||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||
| clang.arg("-Wl,-nodefaultlib:libcmt"); | |||
| clang.arg("-D_DLL"); | |||
| clang.arg("-lmsvcrt"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| clang.arg("-framework").arg("CoreServices"); | |||
| clang.arg("-framework").arg("Security"); | |||
| clang.arg("-l").arg("System"); | |||
| clang.arg("-l").arg("resolv"); | |||
| clang.arg("-l").arg("pthread"); | |||
| clang.arg("-l").arg("c"); | |||
| clang.arg("-l").arg("m"); | |||
| } | |||
| clang.args(args); | |||
| clang.arg("-L").arg(root.join("target").join("debug")); | |||
| clang | |||
| .arg("--output") | |||
| .arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}"))); | |||
| if let Some(parent) = paths[0].parent() { | |||
| clang.current_dir(parent); | |||
| } | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| if !clang.status().await?.success() { | |||
| bail!("failed to compile c++ node"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_cxx_operator( | |||
| paths: &[&Path], | |||
| out_name: &str, | |||
| link_args: &[&str], | |||
| ) -> eyre::Result<()> { | |||
| let mut object_file_paths = Vec::new(); | |||
| for path in paths { | |||
| let mut compile = tokio::process::Command::new("clang++"); | |||
| compile.arg("-c").arg(path); | |||
| compile.arg("-std=c++17"); | |||
| let object_file_path = path.with_extension("o"); | |||
| compile.arg("-o").arg(&object_file_path); | |||
| #[cfg(unix)] | |||
| compile.arg("-fPIC"); | |||
| if let Some(parent) = path.parent() { | |||
| compile.current_dir(parent); | |||
| } | |||
| if !compile.status().await?.success() { | |||
| bail!("failed to compile cxx operator"); | |||
| }; | |||
| object_file_paths.push(object_file_path); | |||
| } | |||
| let mut link = tokio::process::Command::new("clang++"); | |||
| link.arg("-shared").args(&object_file_paths); | |||
| link.args(link_args); | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| link.arg("-ladvapi32"); | |||
| link.arg("-luserenv"); | |||
| link.arg("-lkernel32"); | |||
| link.arg("-lws2_32"); | |||
| link.arg("-lbcrypt"); | |||
| link.arg("-lncrypt"); | |||
| link.arg("-lschannel"); | |||
| link.arg("-lntdll"); | |||
| link.arg("-liphlpapi"); | |||
| link.arg("-lcfgmgr32"); | |||
| link.arg("-lcredui"); | |||
| link.arg("-lcrypt32"); | |||
| link.arg("-lcryptnet"); | |||
| link.arg("-lfwpuclnt"); | |||
| link.arg("-lgdi32"); | |||
| link.arg("-lmsimg32"); | |||
| link.arg("-lmswsock"); | |||
| link.arg("-lole32"); | |||
| link.arg("-lopengl32"); | |||
| link.arg("-lsecur32"); | |||
| link.arg("-lshell32"); | |||
| link.arg("-lsynchronization"); | |||
| link.arg("-luser32"); | |||
| link.arg("-lwinspool"); | |||
| link.arg("-Wl,-nodefaultlib:libcmt"); | |||
| link.arg("-D_DLL"); | |||
| link.arg("-lmsvcrt"); | |||
| link.arg("-fms-runtime-lib=static"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| link.arg("-framework").arg("CoreServices"); | |||
| link.arg("-framework").arg("Security"); | |||
| link.arg("-l").arg("System"); | |||
| link.arg("-l").arg("resolv"); | |||
| link.arg("-l").arg("pthread"); | |||
| link.arg("-l").arg("c"); | |||
| link.arg("-l").arg("m"); | |||
| } | |||
| link.arg("-o") | |||
| .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); | |||
| if let Some(parent) = paths[0].parent() { | |||
| link.current_dir(parent); | |||
| } | |||
| if !link.status().await?.success() { | |||
| bail!("failed to create shared library from cxx operator (c api)"); | |||
| }; | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,186 +1,178 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::{ | |||
| env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, | |||
| path::Path, | |||
| }; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("c-dataflow-runner").wrap_err("failed to set up tracing")?; | |||
| use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| fn main() -> eyre::Result<()> { | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| tokio::fs::create_dir_all("build").await?; | |||
| build_package("dora-node-api-c").await?; | |||
| build_c_node(root, "node.c", "c_node").await?; | |||
| build_c_node(root, "sink.c", "c_sink").await?; | |||
| build_package("dora-operator-api-c").await?; | |||
| build_c_operator(root).await?; | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| run_dataflow(&dataflow).await?; | |||
| Ok(()) | |||
| } | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("build"); | |||
| cmd.arg("--package").arg(package); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build {package}"); | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| cmd!(sh, "cargo build --package dora-node-api-c").run()?; | |||
| cmd!(sh, "cargo build --package dora-operator-api-c").run()?; | |||
| sh.create_dir("build")?; | |||
| let target_debug = root.join("target").join("debug"); | |||
| // compile nodes | |||
| let args: &[&str] = if cfg!(target_os = "linux") { | |||
| &["-l", "m", "-l", "rt", "-l", "dl", "-pthread"] | |||
| } else if cfg!(target_os = "windows") { | |||
| &[ | |||
| "-ladvapi32", | |||
| "-luserenv", | |||
| "-lkernel32", | |||
| "-lws2_32", | |||
| "-lbcrypt", | |||
| "-lncrypt", | |||
| "-lschannel", | |||
| "-lntdll", | |||
| "-liphlpapi", | |||
| "-lcfgmgr32", | |||
| "-lcredui", | |||
| "-lcrypt32", | |||
| "-lcryptnet", | |||
| "-lfwpuclnt", | |||
| "-lgdi32", | |||
| "-lmsimg32", | |||
| "-lmswsock", | |||
| "-lole32", | |||
| "-loleaut32", | |||
| "-lopengl32", | |||
| "-lsecur32", | |||
| "-lshell32", | |||
| "-lsynchronization", | |||
| "-luser32", | |||
| "-lwinspool", | |||
| "-Wl,-nodefaultlib:libcmt", | |||
| "-D_DLL", | |||
| "-lmsvcrt", | |||
| ] | |||
| } else if cfg!(target_os = "macos") { | |||
| &[ | |||
| "-framework", | |||
| "CoreServices", | |||
| "-framework", | |||
| "Security", | |||
| "-l", | |||
| "System", | |||
| "-l", | |||
| "resolv", | |||
| "-l", | |||
| "pthread", | |||
| "-l", | |||
| "c", | |||
| "-l", | |||
| "m", | |||
| ] | |||
| } else { | |||
| panic!("unsupported target platform") | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| cmd!( | |||
| sh, | |||
| "clang node.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_node{EXE_SUFFIX}" | |||
| ) | |||
| .run()?; | |||
| cmd!( | |||
| sh, | |||
| "clang sink.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_sink{EXE_SUFFIX}" | |||
| ) | |||
| .run()?; | |||
| // compile operator | |||
| let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] }; | |||
| cmd!( | |||
| sh, | |||
| "clang -c operator.c -o build/operator.o -fdeclspec {operator_args...}" | |||
| ) | |||
| .run()?; | |||
| // link operator | |||
| let operator_link_args: &[&str] = if cfg!(target_os = "windows") { | |||
| &[ | |||
| "-ladvapi32", | |||
| "-luserenv", | |||
| "-lkernel32", | |||
| "-lws2_32", | |||
| "-lbcrypt", | |||
| "-lncrypt", | |||
| "-lschannel", | |||
| "-lntdll", | |||
| "-liphlpapi", | |||
| "-lcfgmgr32", | |||
| "-lcredui", | |||
| "-lcrypt32", | |||
| "-lcryptnet", | |||
| "-lfwpuclnt", | |||
| "-lgdi32", | |||
| "-lmsimg32", | |||
| "-lmswsock", | |||
| "-lole32", | |||
| "-loleaut32", | |||
| "-lopengl32", | |||
| "-lsecur32", | |||
| "-lshell32", | |||
| "-lsynchronization", | |||
| "-luser32", | |||
| "-lwinspool", | |||
| "-Wl,-nodefaultlib:libcmt", | |||
| "-D_DLL", | |||
| "-lmsvcrt", | |||
| ] | |||
| } else if cfg!(target_os = "macos") { | |||
| &[ | |||
| "-framework", | |||
| "CoreServices", | |||
| "-framework", | |||
| "Security", | |||
| "-l", | |||
| "System", | |||
| "-l", | |||
| "resolv", | |||
| "-l", | |||
| "pthread", | |||
| "-l", | |||
| "c", | |||
| "-l", | |||
| "m", | |||
| ] | |||
| } else { | |||
| &[] | |||
| }; | |||
| Ok(()) | |||
| } | |||
| cmd!( | |||
| sh, | |||
| "clang -shared build/operator.o -L {target_debug} -l dora_operator_api_c {operator_link_args...} -o build/{DLL_PREFIX}operator{DLL_SUFFIX}" | |||
| ).run()?; | |||
| async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> { | |||
| let mut clang = tokio::process::Command::new("clang"); | |||
| clang.arg(name); | |||
| clang.arg("-l").arg("dora_node_api_c"); | |||
| #[cfg(target_os = "linux")] | |||
| { | |||
| clang.arg("-l").arg("m"); | |||
| clang.arg("-l").arg("rt"); | |||
| clang.arg("-l").arg("dl"); | |||
| clang.arg("-pthread"); | |||
| } | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| clang.arg("-ladvapi32"); | |||
| clang.arg("-luserenv"); | |||
| clang.arg("-lkernel32"); | |||
| clang.arg("-lws2_32"); | |||
| clang.arg("-lbcrypt"); | |||
| clang.arg("-lncrypt"); | |||
| clang.arg("-lschannel"); | |||
| clang.arg("-lntdll"); | |||
| clang.arg("-liphlpapi"); | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| clang.arg("-lcfgmgr32"); | |||
| clang.arg("-lcredui"); | |||
| clang.arg("-lcrypt32"); | |||
| clang.arg("-lcryptnet"); | |||
| clang.arg("-lfwpuclnt"); | |||
| clang.arg("-lgdi32"); | |||
| clang.arg("-lmsimg32"); | |||
| clang.arg("-lmswsock"); | |||
| clang.arg("-lole32"); | |||
| clang.arg("-loleaut32"); | |||
| clang.arg("-lopengl32"); | |||
| clang.arg("-lsecur32"); | |||
| clang.arg("-lshell32"); | |||
| clang.arg("-lsynchronization"); | |||
| clang.arg("-luser32"); | |||
| clang.arg("-lwinspool"); | |||
| // start running the dataflow.yml | |||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| clang.arg("-Wl,-nodefaultlib:libcmt"); | |||
| clang.arg("-D_DLL"); | |||
| clang.arg("-lmsvcrt"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| clang.arg("-framework").arg("CoreServices"); | |||
| clang.arg("-framework").arg("Security"); | |||
| clang.arg("-l").arg("System"); | |||
| clang.arg("-l").arg("resolv"); | |||
| clang.arg("-l").arg("pthread"); | |||
| clang.arg("-l").arg("c"); | |||
| clang.arg("-l").arg("m"); | |||
| } | |||
| clang.arg("-L").arg(root.join("target").join("debug")); | |||
| clang | |||
| .arg("--output") | |||
| .arg(Path::new("build").join(format!("{out_name}{EXE_SUFFIX}"))); | |||
| if !clang.status().await?.success() { | |||
| bail!("failed to compile c node"); | |||
| }; | |||
| Ok(()) | |||
| } | |||
| async fn build_c_operator(root: &Path) -> eyre::Result<()> { | |||
| let mut compile = tokio::process::Command::new("clang"); | |||
| compile.arg("-c").arg("operator.c"); | |||
| compile.arg("-o").arg("build/operator.o"); | |||
| compile.arg("-fdeclspec"); | |||
| #[cfg(unix)] | |||
| compile.arg("-fPIC"); | |||
| if !compile.status().await?.success() { | |||
| bail!("failed to compile c operator"); | |||
| }; | |||
| let mut link = tokio::process::Command::new("clang"); | |||
| link.arg("-shared").arg("build/operator.o"); | |||
| link.arg("-L").arg(root.join("target").join("debug")); | |||
| link.arg("-l").arg("dora_operator_api_c"); | |||
| #[cfg(target_os = "windows")] | |||
| { | |||
| link.arg("-ladvapi32"); | |||
| link.arg("-luserenv"); | |||
| link.arg("-lkernel32"); | |||
| link.arg("-lws2_32"); | |||
| link.arg("-lbcrypt"); | |||
| link.arg("-lncrypt"); | |||
| link.arg("-lschannel"); | |||
| link.arg("-lntdll"); | |||
| link.arg("-liphlpapi"); | |||
| link.arg("-lcfgmgr32"); | |||
| link.arg("-lcredui"); | |||
| link.arg("-lcrypt32"); | |||
| link.arg("-lcryptnet"); | |||
| link.arg("-lfwpuclnt"); | |||
| link.arg("-lgdi32"); | |||
| link.arg("-lmsimg32"); | |||
| link.arg("-lmswsock"); | |||
| link.arg("-lole32"); | |||
| link.arg("-loleaut32"); | |||
| link.arg("-lopengl32"); | |||
| link.arg("-lsecur32"); | |||
| link.arg("-lshell32"); | |||
| link.arg("-lsynchronization"); | |||
| link.arg("-luser32"); | |||
| link.arg("-lwinspool"); | |||
| link.arg("-Wl,-nodefaultlib:libcmt"); | |||
| link.arg("-D_DLL"); | |||
| link.arg("-lmsvcrt"); | |||
| } | |||
| #[cfg(target_os = "macos")] | |||
| { | |||
| link.arg("-framework").arg("CoreServices"); | |||
| link.arg("-framework").arg("Security"); | |||
| link.arg("-l").arg("System"); | |||
| link.arg("-l").arg("resolv"); | |||
| link.arg("-l").arg("pthread"); | |||
| link.arg("-l").arg("c"); | |||
| link.arg("-l").arg("m"); | |||
| } | |||
| link.arg("-o") | |||
| .arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}"))); | |||
| if !link.status().await?.success() { | |||
| bail!("failed to link c operator"); | |||
| }; | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,9 +1,9 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::path::Path; | |||
| use eyre::{Context, ContextCompat}; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| fn main() -> eyre::Result<()> { | |||
| set_up_tracing("cmake-dataflow-runner").wrap_err("failed to set up tracing")?; | |||
| if cfg!(windows) { | |||
| @@ -13,60 +13,55 @@ async fn main() -> eyre::Result<()> { | |||
| return Ok(()); | |||
| } | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build C++ source code using cmake | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| cmd!(sh, "cmake -DDORA_ROOT_DIR={root} -B build .").run()?; | |||
| cmd!(sh, "cmake --build build").run()?; | |||
| cmd!(sh, "cmake --install build").run()?; | |||
| tokio::fs::create_dir_all("build").await?; | |||
| let mut cmd = tokio::process::Command::new("cmake"); | |||
| cmd.arg(format!("-DDORA_ROOT_DIR={}", root.display())); | |||
| cmd.arg("-B").arg("build"); | |||
| cmd.arg("."); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to generating make file"); | |||
| } | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| let mut cmd = tokio::process::Command::new("cmake"); | |||
| cmd.arg("--build").arg("build"); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build a cmake-generated project binary tree"); | |||
| } | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| let mut cmd = tokio::process::Command::new("cmake"); | |||
| cmd.arg("--install").arg("build"); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build a cmake-generated project binary tree"); | |||
| } | |||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||
| let uuid = output.lines().next().context("no output")?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||
| build_package("dora-runtime").await?; | |||
| run_dataflow(&dataflow).await?; | |||
| // verify that the node output was written to `out` | |||
| sh.change_dir("out"); | |||
| sh.change_dir(uuid); | |||
| let sink_output = sh.read_file("log_runtime-node-2.txt")?; | |||
| if sink_output.lines().count() < 20 { | |||
| eyre::bail!("sink did not receive the expected number of messages") | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("build"); | |||
| cmd.arg("--package").arg(package); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build {package}"); | |||
| } | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,219 +1,96 @@ | |||
| use dora_coordinator::{ControlEvent, Event}; | |||
| use dora_core::{ | |||
| descriptor::Descriptor, | |||
| topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, | |||
| }; | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use eyre::ContextCompat; | |||
| use std::{ | |||
| collections::BTreeSet, | |||
| net::{IpAddr, Ipv4Addr, SocketAddr}, | |||
| path::Path, | |||
| path::{Path, PathBuf}, | |||
| process::Command, | |||
| time::Duration, | |||
| }; | |||
| use tokio::{ | |||
| sync::{ | |||
| mpsc::{self, Sender}, | |||
| oneshot, | |||
| }, | |||
| task::JoinSet, | |||
| }; | |||
| use tokio_stream::wrappers::ReceiverStream; | |||
| use uuid::Uuid; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| use xshell::{cmd, Shell}; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| fn main() -> eyre::Result<()> { | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); | |||
| let coordinator_bind = SocketAddr::new( | |||
| IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), | |||
| DORA_COORDINATOR_PORT_DEFAULT, | |||
| ); | |||
| let (coordinator_port, coordinator) = | |||
| dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) | |||
| .await?; | |||
| let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); | |||
| let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); | |||
| let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); | |||
| // build the dataflow using `dora build` | |||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||
| tracing::info!("Spawning coordinator and daemons"); | |||
| let mut tasks = JoinSet::new(); | |||
| tasks.spawn(coordinator); | |||
| tasks.spawn(daemon_a); | |||
| tasks.spawn(daemon_b); | |||
| tracing::info!("waiting until daemons are connected to coordinator"); | |||
| let mut retries = 0; | |||
| // start the dora coordinator (in background) | |||
| Command::from(cmd!(sh, "{dora} coordinator")).spawn()?; | |||
| // wait until coordinator is ready | |||
| loop { | |||
| let connected_machines = connected_machines(&coordinator_events_tx).await?; | |||
| if connected_machines.contains("A") && connected_machines.contains("B") { | |||
| break; | |||
| } else if retries > 20 { | |||
| bail!("daemon not connected after {retries} retries"); | |||
| } else { | |||
| std::thread::sleep(Duration::from_millis(500)); | |||
| retries += 1 | |||
| match cmd!(sh, "{dora} list").quiet().ignore_stderr().run() { | |||
| Ok(_) => { | |||
| println!("coordinator connected"); | |||
| break; | |||
| } | |||
| Err(_) => { | |||
| eprintln!("waiting for coordinator"); | |||
| std::thread::sleep(Duration::from_millis(100)) | |||
| } | |||
| } | |||
| } | |||
| tracing::info!("starting dataflow"); | |||
| let uuid = start_dataflow(dataflow, &coordinator_events_tx).await?; | |||
| tracing::info!("started dataflow under ID `{uuid}`"); | |||
| let running = running_dataflows(&coordinator_events_tx).await?; | |||
| if !running.iter().map(|d| d.uuid).any(|id| id == uuid) { | |||
| bail!("dataflow `{uuid}` is not running"); | |||
| } | |||
| tracing::info!("waiting for dataflow `{uuid}` to finish"); | |||
| let mut retries = 0; | |||
| // start two daemons (in background) | |||
| Command::from(cmd!(sh, "{dora} daemon --machine-id A")).spawn()?; | |||
| Command::from(cmd!(sh, "{dora} daemon --machine-id B")).spawn()?; | |||
| // wait until both daemons are connected | |||
| loop { | |||
| let running = running_dataflows(&coordinator_events_tx).await?; | |||
| if running.is_empty() { | |||
| break; | |||
| } else if retries > 100 { | |||
| bail!("dataflow not finished after {retries} retries"); | |||
| } else { | |||
| tracing::debug!("not done yet"); | |||
| std::thread::sleep(Duration::from_millis(500)); | |||
| retries += 1 | |||
| let output = cmd!(sh, "{dora} connected-machines") | |||
| .quiet() | |||
| .ignore_stderr() | |||
| .read(); | |||
| match output { | |||
| Ok(output) => { | |||
| let connected: Vec<&str> = output.lines().collect(); | |||
| if connected == ["A", "B"] { | |||
| println!("both daemons connected"); | |||
| break; | |||
| } else { | |||
| eprintln!("not all daemons connected yet (connected: {connected:?})"); | |||
| } | |||
| } | |||
| Err(err) => eprintln!("failed to query connected-machines: {err:?}"), | |||
| } | |||
| std::thread::sleep(Duration::from_millis(100)); | |||
| } | |||
| tracing::info!("dataflow `{uuid}` finished, destroying coordinator"); | |||
| destroy(&coordinator_events_tx).await?; | |||
| tracing::info!("joining tasks"); | |||
| while let Some(res) = tasks.join_next().await { | |||
| res.unwrap()?; | |||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||
| println!("starting dataflow"); | |||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||
| println!("dataflow finished successfully"); | |||
| let uuid = output.lines().next().context("no output")?; | |||
| // stop the coordinator and both daemons again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| // verify that the node output was written to `out` | |||
| sh.change_dir("out"); | |||
| sh.change_dir(uuid); | |||
| let sink_output = sh.read_file("log_rust-sink.txt")?; | |||
| if sink_output.lines().count() < 50 { | |||
| eyre::bail!("sink did not receive the expected number of messages") | |||
| } | |||
| tracing::info!("done"); | |||
| Ok(()) | |||
| } | |||
| async fn start_dataflow( | |||
| dataflow: &Path, | |||
| coordinator_events_tx: &Sender<Event>, | |||
| ) -> eyre::Result<Uuid> { | |||
| let dataflow_descriptor = Descriptor::read(dataflow) | |||
| .await | |||
| .wrap_err("failed to read yaml dataflow")?; | |||
| let working_dir = dataflow | |||
| .canonicalize() | |||
| .context("failed to canonicalize dataflow path")? | |||
| .parent() | |||
| .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? | |||
| .to_owned(); | |||
| dataflow_descriptor | |||
| .check(&working_dir) | |||
| .wrap_err("could not validate yaml")?; | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| coordinator_events_tx | |||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||
| request: ControlRequest::Start { | |||
| dataflow: dataflow_descriptor, | |||
| local_working_dir: working_dir, | |||
| name: None, | |||
| }, | |||
| reply_sender, | |||
| })) | |||
| .await?; | |||
| let result = reply.await??; | |||
| let uuid = match result { | |||
| ControlRequestReply::DataflowStarted { uuid } => uuid, | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||
| }; | |||
| Ok(uuid) | |||
| } | |||
| async fn connected_machines( | |||
| coordinator_events_tx: &Sender<Event>, | |||
| ) -> eyre::Result<BTreeSet<String>> { | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| coordinator_events_tx | |||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||
| request: ControlRequest::ConnectedMachines, | |||
| reply_sender, | |||
| })) | |||
| .await?; | |||
| let result = reply.await??; | |||
| let machines = match result { | |||
| ControlRequestReply::ConnectedMachines(machines) => machines, | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||
| }; | |||
| Ok(machines) | |||
| } | |||
| async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Result<Vec<DataflowId>> { | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| coordinator_events_tx | |||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||
| request: ControlRequest::List, | |||
| reply_sender, | |||
| })) | |||
| .await?; | |||
| let result = reply.await??; | |||
| let dataflows = match result { | |||
| ControlRequestReply::DataflowList { dataflows } => dataflows, | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||
| }; | |||
| Ok(dataflows) | |||
| } | |||
| async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> { | |||
| let (reply_sender, reply) = oneshot::channel(); | |||
| coordinator_events_tx | |||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||
| request: ControlRequest::Destroy, | |||
| reply_sender, | |||
| })) | |||
| .await?; | |||
| let result = reply.await??; | |||
| match result { | |||
| ControlRequestReply::DestroyOk => Ok(()), | |||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||
| } | |||
| } | |||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--").arg("build").arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--machine-id") | |||
| .arg(machine_id) | |||
| .arg("--coordinator-addr") | |||
| .arg(coordinator); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,30 +1,73 @@ | |||
| use dora_core::{get_pip_path, get_python_path, run}; | |||
| use dora_download::download_file; | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, ContextCompat, WrapErr}; | |||
| use std::path::Path; | |||
| use dora_core::get_python_path; | |||
| use dora_download::download_file_sync; | |||
| use eyre::{ContextCompat, WrapErr}; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("python-dataflow-runner")?; | |||
| fn main() -> eyre::Result<()> { | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| // prepare Python virtual environment | |||
| prepare_venv(&sh)?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| // install/upgrade pip, then install requirements | |||
| cmd!(sh, "python -m pip install --upgrade pip").run()?; | |||
| cmd!(sh, "pip install -r requirements.txt").run()?; | |||
| // build the dora Python package (you can skip this if you installed the Python dora package) | |||
| { | |||
| let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR")) | |||
| .join("apis") | |||
| .join("python") | |||
| .join("node"); | |||
| let _dir = sh.push_dir(python_node_api_dir); | |||
| cmd!(sh, "maturin develop").run()?; | |||
| } | |||
| run( | |||
| get_python_path().context("Could not get python binary")?, | |||
| &["-m", "venv", "../.env"], | |||
| None, | |||
| download_file_sync( | |||
| "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", | |||
| Path::new("yolov8n.pt"), | |||
| ) | |||
| .await | |||
| .context("failed to create venv")?; | |||
| let venv = &root.join("examples").join(".env"); | |||
| std::env::set_var( | |||
| .context("Could not download weights.")?; | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||
| let uuid = output.lines().next().context("no output")?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| // verify that the node output was written to `out` | |||
| sh.change_dir("out"); | |||
| sh.change_dir(uuid); | |||
| let sink_output = sh.read_file("log_object_detection.txt")?; | |||
| if sink_output.lines().count() < 50 { | |||
| eyre::bail!("object dectection node did not receive the expected number of messages") | |||
| } | |||
| Ok(()) | |||
| } | |||
| /// Prepares a Python virtual environment. | |||
| /// | |||
| /// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` | |||
| /// if you're running bash. | |||
| fn prepare_venv(sh: &Shell) -> eyre::Result<()> { | |||
| let python = get_python_path().context("Could not get python binary")?; | |||
| cmd!(sh, "{python} -m venv ../.env").run()?; | |||
| let venv = sh.current_dir().parent().unwrap().join(".env"); | |||
| sh.set_var( | |||
| "VIRTUAL_ENV", | |||
| venv.to_str().context("venv path not valid unicode")?, | |||
| ); | |||
| let orig_path = std::env::var("PATH")?; | |||
| // bin folder is named Scripts on windows. | |||
| // 🤦♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 | |||
| let venv_bin = if cfg!(windows) { | |||
| @@ -32,71 +75,36 @@ async fn main() -> eyre::Result<()> { | |||
| } else { | |||
| venv.join("bin") | |||
| }; | |||
| let path_separator = if cfg!(windows) { ';' } else { ':' }; | |||
| if cfg!(windows) { | |||
| std::env::set_var( | |||
| "PATH", | |||
| format!( | |||
| "{};{orig_path}", | |||
| venv_bin.to_str().context("venv path not valid unicode")? | |||
| ), | |||
| ); | |||
| } else { | |||
| std::env::set_var( | |||
| "PATH", | |||
| format!( | |||
| "{}:{orig_path}", | |||
| venv_bin.to_str().context("venv path not valid unicode")? | |||
| ), | |||
| ); | |||
| } | |||
| run( | |||
| get_python_path().context("Could not get pip binary")?, | |||
| &["-m", "pip", "install", "--upgrade", "pip"], | |||
| None, | |||
| ) | |||
| .await | |||
| .context("failed to install pip")?; | |||
| run( | |||
| get_pip_path().context("Could not get pip binary")?, | |||
| &["install", "-r", "requirements.txt"], | |||
| None, | |||
| ) | |||
| .await | |||
| .context("pip install failed")?; | |||
| run( | |||
| "maturin", | |||
| &["develop"], | |||
| Some(&root.join("apis").join("python").join("node")), | |||
| ) | |||
| .await | |||
| .context("maturin develop failed")?; | |||
| download_file( | |||
| "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", | |||
| Path::new("yolov8n.pt"), | |||
| ) | |||
| .await | |||
| .context("Could not download weights.")?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| run_dataflow(dataflow).await?; | |||
| sh.set_var( | |||
| "PATH", | |||
| format!( | |||
| "{}{path_separator}{}", | |||
| venv_bin.to_str().context("venv path not valid unicode")?, | |||
| std::env::var("PATH")? | |||
| ), | |||
| ); | |||
| Ok(()) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,29 +1,72 @@ | |||
| use dora_core::{get_pip_path, get_python_path, run}; | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, ContextCompat, WrapErr}; | |||
| use std::path::Path; | |||
| use dora_core::get_python_path; | |||
| use eyre::{ContextCompat, WrapErr}; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("python-operator-dataflow-runner")?; | |||
| fn main() -> eyre::Result<()> { | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| run( | |||
| get_python_path().context("Could not get python binary")?, | |||
| &["-m", "venv", "../.env"], | |||
| None, | |||
| ) | |||
| .await | |||
| .context("failed to create venv")?; | |||
| let venv = &root.join("examples").join(".env"); | |||
| std::env::set_var( | |||
| // prepare Python virtual environment | |||
| prepare_venv(&sh)?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| // install/upgrade pip, then install requirements | |||
| cmd!(sh, "python -m pip install --upgrade pip").run()?; | |||
| cmd!(sh, "pip install -r requirements.txt").run()?; | |||
| // build the dora Python package (you can skip this if you installed the Python dora package) | |||
| { | |||
| let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR")) | |||
| .join("apis") | |||
| .join("python") | |||
| .join("node"); | |||
| let _dir = sh.push_dir(python_node_api_dir); | |||
| cmd!(sh, "maturin develop").run()?; | |||
| } | |||
| let dataflow = if std::env::var("CONDA_EXE").is_ok() { | |||
| Path::new("dataflow.yml") | |||
| } else { | |||
| Path::new("dataflow_conda.yml") | |||
| }; | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||
| let output = cmd!(sh, "{dora} start {dataflow} --attach").read_stderr()?; | |||
| let uuid = output.lines().next().context("no output")?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| // verify that the node output was written to `out` | |||
| sh.change_dir("out"); | |||
| sh.change_dir(uuid); | |||
| let sink_output = sh.read_file("log_object_detection.txt")?; | |||
| if sink_output.lines().count() < 50 { | |||
| eyre::bail!("object dectection node did not receive the expected number of messages") | |||
| } | |||
| Ok(()) | |||
| } | |||
| /// Prepares a Python virtual environment. | |||
| /// | |||
| /// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` | |||
| /// if you're running bash. | |||
| fn prepare_venv(sh: &Shell) -> eyre::Result<()> { | |||
| let python = get_python_path().context("Could not get python binary")?; | |||
| cmd!(sh, "{python} -m venv ../.env").run()?; | |||
| let venv = sh.current_dir().parent().unwrap().join(".env"); | |||
| sh.set_var( | |||
| "VIRTUAL_ENV", | |||
| venv.to_str().context("venv path not valid unicode")?, | |||
| ); | |||
| let orig_path = std::env::var("PATH")?; | |||
| // bin folder is named Scripts on windows. | |||
| // 🤦♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 | |||
| let venv_bin = if cfg!(windows) { | |||
| @@ -31,70 +74,36 @@ async fn main() -> eyre::Result<()> { | |||
| } else { | |||
| venv.join("bin") | |||
| }; | |||
| let path_separator = if cfg!(windows) { ';' } else { ':' }; | |||
| if cfg!(windows) { | |||
| std::env::set_var( | |||
| "PATH", | |||
| format!( | |||
| "{};{orig_path}", | |||
| venv_bin.to_str().context("venv path not valid unicode")? | |||
| ), | |||
| ); | |||
| } else { | |||
| std::env::set_var( | |||
| "PATH", | |||
| format!( | |||
| "{}:{orig_path}", | |||
| venv_bin.to_str().context("venv path not valid unicode")? | |||
| ), | |||
| ); | |||
| } | |||
| run( | |||
| get_python_path().context("Could not get pip binary")?, | |||
| &["-m", "pip", "install", "--upgrade", "pip"], | |||
| None, | |||
| ) | |||
| .await | |||
| .context("failed to install pip")?; | |||
| run( | |||
| get_pip_path().context("Could not get pip binary")?, | |||
| &["install", "-r", "requirements.txt"], | |||
| None, | |||
| ) | |||
| .await | |||
| .context("pip install failed")?; | |||
| run( | |||
| "maturin", | |||
| &["develop"], | |||
| Some(&root.join("apis").join("python").join("node")), | |||
| ) | |||
| .await | |||
| .context("maturin develop failed")?; | |||
| if std::env::var("CONDA_EXE").is_ok() { | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| run_dataflow(dataflow).await?; | |||
| } else { | |||
| let dataflow = Path::new("dataflow_conda.yml"); | |||
| run_dataflow(dataflow).await?; | |||
| } | |||
| sh.set_var( | |||
| "PATH", | |||
| format!( | |||
| "{}{path_separator}{}", | |||
| venv_bin.to_str().context("venv path not valid unicode")?, | |||
| std::env::var("PATH")? | |||
| ), | |||
| ); | |||
| Ok(()) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,46 +1,41 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::path::Path; | |||
| use std::path::PathBuf; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("rust-dataflow-url-runner").wrap_err("failed to set up tracing")?; | |||
| fn main() -> eyre::Result<()> { | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| // build the dataflow using `dora build` | |||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| run_dataflow(dataflow).await?; | |||
| // start running the dataflow.yml | |||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||
| Ok(()) | |||
| } | |||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--").arg("build").arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,46 +1,53 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::path::Path; | |||
| use eyre::ContextCompat; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; | |||
| fn main() -> eyre::Result<()> { | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| // build the dataflow using `dora build` | |||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||
| let uuid = output.lines().next().context("no output")?; | |||
| run_dataflow(dataflow).await?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| // verify that the node output was written to `out` | |||
| sh.change_dir("out"); | |||
| sh.change_dir(uuid); | |||
| let sink_output = sh.read_file("log_rust-sink.txt")?; | |||
| if sink_output.lines().count() < 50 { | |||
| eyre::bail!("sink did not receive the expected number of messages") | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--").arg("build").arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -1,46 +1,57 @@ | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{bail, Context}; | |||
| use std::path::Path; | |||
| use eyre::ContextCompat; | |||
| use std::path::{Path, PathBuf}; | |||
| use xshell::{cmd, Shell}; | |||
| #[tokio::main] | |||
| async fn main() -> eyre::Result<()> { | |||
| set_up_tracing("rust-ros2-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; | |||
| // create a new shell in this folder | |||
| let sh = prepare_shell()?; | |||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||
| let dora = prepare_dora(&sh)?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||
| .wrap_err("failed to set working dir")?; | |||
| // build the dataflow using `dora build` | |||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||
| // start up the dora daemon and coordinator | |||
| cmd!(sh, "{dora} up").run()?; | |||
| let dataflow = Path::new("dataflow.yml"); | |||
| build_dataflow(dataflow).await?; | |||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||
| let uuid = output.lines().next().context("no output")?; | |||
| run_dataflow(dataflow).await?; | |||
| // stop the dora daemon and coordinator again | |||
| cmd!(sh, "{dora} destroy").run()?; | |||
| // verify that the node output was written to `out` | |||
| sh.change_dir("out"); | |||
| sh.change_dir(uuid); | |||
| let sink_output = sh.read_file("log_rust-node.txt")?; | |||
| if !sink_output | |||
| .lines() | |||
| .any(|l| l.starts_with("received pose event: Ok(")) | |||
| { | |||
| eyre::bail!("node did not receive any pose events") | |||
| } | |||
| Ok(()) | |||
| } | |||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--").arg("build").arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to build dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||
| /// | |||
| /// You can use your system shell instead (e.g. `bash`); | |||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||
| let sh = Shell::new()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||
| Ok(sh) | |||
| } | |||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow); | |||
| if !cmd.status().await?.success() { | |||
| bail!("failed to run dataflow"); | |||
| }; | |||
| Ok(()) | |||
| /// Build the `dora` command-line executable from this repo. | |||
| /// | |||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||
| let dora = root.join("target").join("debug").join("dora"); | |||
| Ok(dora) | |||
| } | |||
| @@ -5,6 +5,13 @@ use std::path::Path; | |||
| use tokio::io::AsyncWriteExt; | |||
| use tracing::info; | |||
| pub fn download_file_sync<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> | |||
| where | |||
| T: reqwest::IntoUrl + std::fmt::Display + Copy, | |||
| { | |||
| tokio::runtime::Runtime::new()?.block_on(download_file(url, target_path)) | |||
| } | |||
| pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> | |||
| where | |||
| T: reqwest::IntoUrl + std::fmt::Display + Copy, | |||