| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
22f2956b84
|
Remove dev-dependency on `tokio` | 1 year ago |
|
|
fcef458aa3
|
Migrate rust-dataflow-url example to `xshell` | 1 year ago |
|
|
1c3488d977
|
Migrate Python operator example to xshell | 1 year ago |
|
|
6fba72165a
|
Remove tokio dependency from cmake example | 1 year ago |
|
|
4514d6e9d0
|
Migrate C++ dataflow example to `xshell` | 1 year ago |
|
|
61d3f16bb2
|
Remove tokio dependency from Python dataflow example | 1 year ago |
|
|
7ec839fc6f
|
Migrate C dataflow example to `xshell` | 1 year ago |
|
|
b5bd71c17f
|
Migrate benchmark example to xshell | 1 year ago |
|
|
cd41356e76
|
Merge branch 'main' into xshell | 1 year ago |
|
|
46433bb8c5
|
Migrate run.rs script of cmake dataflow example to xshell | 1 year ago |
|
|
dcae010f86
|
Simplify `multiple-daemons` examples using new `connected-machines` command and `xshell` crate | 1 year ago |
|
|
1b665b0c44
|
CLI: Add a hidden `connected-machines` command
This command is useful for testing examples with multiple daemons. |
1 year ago |
|
|
4117e6087e
|
Migrate run.rs script of rust-ros2-dataflow example to xshell | 1 year ago |
|
|
20498f7e1e
|
Remove unused import | 1 year ago |
|
|
1a5fa7fe45
|
Fix: Use correct python/pip binaries after setting up venv
We don't modify the PATH for the current executable anymore, so we cannot use the `get_python_path`/`get_pip_path` functions anymore. |
1 year ago |
|
|
6f85cfb429
|
Use `xshell` for Python dataflow example to try venv activation | 1 year ago |
|
|
179dbf1bbc
|
Simplify the `run.rs` script of the `rust-dataflow` example using `xshell`
The `xshell` crate provides a more convenient way to build and run a `Command`, which is more similar to traditional bash scripts. Using this crate, we can simplify our `run.rs` script while still being platform-independent and not requiring any external dependencies. We can also still run the examples using `cargo run --example`. |
1 year ago |
| @@ -2352,10 +2352,9 @@ dependencies = [ | |||||
| "eyre", | "eyre", | ||||
| "futures", | "futures", | ||||
| "serde_yaml 0.8.26", | "serde_yaml 0.8.26", | ||||
| "tokio", | |||||
| "tokio-stream", | |||||
| "tracing", | "tracing", | ||||
| "uuid", | "uuid", | ||||
| "xshell", | |||||
| ] | ] | ||||
| [[package]] | [[package]] | ||||
| @@ -86,7 +86,6 @@ ros2-examples = [] | |||||
| [dev-dependencies] | [dev-dependencies] | ||||
| eyre = "0.6.8" | eyre = "0.6.8" | ||||
| tokio = "1.24.2" | |||||
| dora-coordinator = { workspace = true } | dora-coordinator = { workspace = true } | ||||
| dora-core = { workspace = true } | dora-core = { workspace = true } | ||||
| dora-tracing = { workspace = true } | dora-tracing = { workspace = true } | ||||
| @@ -96,7 +95,7 @@ serde_yaml = "0.8.23" | |||||
| uuid = { version = "1.7", features = ["v7", "serde"] } | uuid = { version = "1.7", features = ["v7", "serde"] } | ||||
| tracing = "0.1.36" | tracing = "0.1.36" | ||||
| futures = "0.3.25" | futures = "0.3.25" | ||||
| tokio-stream = "0.1.11" | |||||
| xshell = "0.2.6" | |||||
| [[example]] | [[example]] | ||||
| name = "c-dataflow" | name = "c-dataflow" | ||||
| @@ -14,7 +14,7 @@ use dora_daemon::Daemon; | |||||
| use dora_tracing::set_up_tracing; | use dora_tracing::set_up_tracing; | ||||
| use duration_str::parse; | use duration_str::parse; | ||||
| use eyre::{bail, Context}; | use eyre::{bail, Context}; | ||||
| use std::net::SocketAddr; | |||||
| use std::{collections::BTreeSet, net::SocketAddr}; | |||||
| use std::{ | use std::{ | ||||
| net::{IpAddr, Ipv4Addr}, | net::{IpAddr, Ipv4Addr}, | ||||
| path::PathBuf, | path::PathBuf, | ||||
| @@ -102,6 +102,8 @@ enum Command { | |||||
| dataflow: Option<String>, | dataflow: Option<String>, | ||||
| node: String, | node: String, | ||||
| }, | }, | ||||
| #[clap(hide = true)] | |||||
| ConnectedMachines, | |||||
| // Metrics, | // Metrics, | ||||
| // Stats, | // Stats, | ||||
| // Get, | // Get, | ||||
| @@ -273,6 +275,12 @@ fn run() -> eyre::Result<()> { | |||||
| bail!("No dora coordinator seems to be running."); | bail!("No dora coordinator seems to be running."); | ||||
| } | } | ||||
| }, | }, | ||||
| Command::ConnectedMachines => match connect_to_coordinator() { | |||||
| Ok(mut session) => connected_machines(&mut *session)?, | |||||
| Err(_) => { | |||||
| bail!("No dora coordinator seems to be running."); | |||||
| } | |||||
| }, | |||||
| Command::Stop { | Command::Stop { | ||||
| uuid, | uuid, | ||||
| name, | name, | ||||
| @@ -466,6 +474,37 @@ fn query_running_dataflows( | |||||
| Ok(ids) | Ok(ids) | ||||
| } | } | ||||
| fn connected_machines(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> { | |||||
| let machines = query_connected_machines(session)?; | |||||
| if machines.is_empty() { | |||||
| eprintln!("No machines are connected"); | |||||
| } else { | |||||
| for id in machines { | |||||
| println!("{id}"); | |||||
| } | |||||
| } | |||||
| Ok(()) | |||||
| } | |||||
| fn query_connected_machines( | |||||
| session: &mut TcpRequestReplyConnection, | |||||
| ) -> eyre::Result<BTreeSet<String>> { | |||||
| let reply_raw = session | |||||
| .request(&serde_json::to_vec(&ControlRequest::ConnectedMachines).unwrap()) | |||||
| .wrap_err("failed to send connected machines message")?; | |||||
| let reply: ControlRequestReply = | |||||
| serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; | |||||
| let ids = match reply { | |||||
| ControlRequestReply::ConnectedMachines(ids) => ids, | |||||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||||
| other => bail!("unexpected connected machines reply: {other:?}"), | |||||
| }; | |||||
| Ok(ids) | |||||
| } | |||||
| fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> { | fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> { | ||||
| TcpLayer::new().connect(control_socket_addr()) | TcpLayer::new().connect(control_socket_addr()) | ||||
| } | } | ||||
| @@ -1,46 +1,43 @@ | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, Context}; | |||||
| use std::path::Path; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("benchmark-runner").wrap_err("failed to set up tracing subscriber")?; | |||||
| fn main() -> eyre::Result<()> { | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora_optimized(&sh)?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| // build the dataflow using `dora build` | |||||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| build_dataflow(dataflow).await?; | |||||
| // start running the dataflow.yml | |||||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||||
| run_dataflow(dataflow).await?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--").arg("build").arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora_optimized(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli --release").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("release").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,12 +1,9 @@ | |||||
| use dora_tracing::set_up_tracing; | use dora_tracing::set_up_tracing; | ||||
| use eyre::{bail, Context}; | |||||
| use std::{ | |||||
| env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, | |||||
| path::Path, | |||||
| }; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| use eyre::Context; | |||||
| use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; | set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?; | ||||
| if cfg!(windows) { | if cfg!(windows) { | ||||
| @@ -16,284 +13,168 @@ async fn main() -> eyre::Result<()> { | |||||
| return Ok(()); | return Ok(()); | ||||
| } | } | ||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| cmd!(sh, "cargo build --package dora-node-api-cxx").run()?; | |||||
| cmd!(sh, "cargo build --package dora-operator-api-cxx").run()?; | |||||
| cmd!(sh, "cargo build --package dora-node-api-c").run()?; | |||||
| cmd!(sh, "cargo build --package dora-operator-api-c").run()?; | |||||
| sh.create_dir("build")?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | let root = Path::new(env!("CARGO_MANIFEST_DIR")); | ||||
| let target = root.join("target"); | let target = root.join("target"); | ||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| tokio::fs::create_dir_all("build").await?; | |||||
| let build_dir = Path::new("build"); | |||||
| let target_debug = target.join("debug"); | |||||
| build_package("dora-node-api-cxx").await?; | |||||
| let node_cxxbridge = target | let node_cxxbridge = target | ||||
| .join("cxxbridge") | .join("cxxbridge") | ||||
| .join("dora-node-api-cxx") | .join("dora-node-api-cxx") | ||||
| .join("src"); | .join("src"); | ||||
| tokio::fs::copy( | |||||
| node_cxxbridge.join("lib.rs.cc"), | |||||
| build_dir.join("node-bridge.cc"), | |||||
| ) | |||||
| .await?; | |||||
| tokio::fs::copy( | |||||
| node_cxxbridge.join("lib.rs.h"), | |||||
| build_dir.join("dora-node-api.h"), | |||||
| ) | |||||
| .await?; | |||||
| tokio::fs::write( | |||||
| build_dir.join("operator.h"), | |||||
| sh.copy_file(node_cxxbridge.join("lib.rs.cc"), "build/node-bridge.cc")?; | |||||
| sh.copy_file(node_cxxbridge.join("lib.rs.h"), "build/dora-node-api.h")?; | |||||
| sh.write_file( | |||||
| "build/operator.h", | |||||
| r###"#include "../operator-rust-api/operator.h""###, | r###"#include "../operator-rust-api/operator.h""###, | ||||
| ) | |||||
| .await?; | |||||
| )?; | |||||
| build_package("dora-operator-api-cxx").await?; | |||||
| let operator_cxxbridge = target | let operator_cxxbridge = target | ||||
| .join("cxxbridge") | .join("cxxbridge") | ||||
| .join("dora-operator-api-cxx") | .join("dora-operator-api-cxx") | ||||
| .join("src"); | .join("src"); | ||||
| tokio::fs::copy( | |||||
| sh.copy_file( | |||||
| operator_cxxbridge.join("lib.rs.cc"), | operator_cxxbridge.join("lib.rs.cc"), | ||||
| build_dir.join("operator-bridge.cc"), | |||||
| ) | |||||
| .await?; | |||||
| tokio::fs::copy( | |||||
| "build/operator-bridge.cc", | |||||
| )?; | |||||
| sh.copy_file( | |||||
| operator_cxxbridge.join("lib.rs.h"), | operator_cxxbridge.join("lib.rs.h"), | ||||
| build_dir.join("dora-operator-api.h"), | |||||
| ) | |||||
| .await?; | |||||
| "build/dora-operator-api.h", | |||||
| )?; | |||||
| build_package("dora-node-api-c").await?; | |||||
| build_package("dora-operator-api-c").await?; | |||||
| build_cxx_node( | |||||
| root, | |||||
| &[ | |||||
| &dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?, | |||||
| &dunce::canonicalize(build_dir.join("node-bridge.cc"))?, | |||||
| ], | |||||
| "node_rust_api", | |||||
| &["-l", "dora_node_api_cxx"], | |||||
| ) | |||||
| .await?; | |||||
| build_cxx_node( | |||||
| root, | |||||
| &[&dunce::canonicalize( | |||||
| Path::new("node-c-api").join("main.cc"), | |||||
| )?], | |||||
| "node_c_api", | |||||
| &["-l", "dora_node_api_c"], | |||||
| ) | |||||
| .await?; | |||||
| build_cxx_operator( | |||||
| // compile nodes | |||||
| let args: &[&str] = if cfg!(target_os = "linux") { | |||||
| &["-l", "m", "-l", "rt", "-l", "dl", "-pthread"] | |||||
| } else if cfg!(target_os = "windows") { | |||||
| &[ | &[ | ||||
| &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, | |||||
| &dunce::canonicalize(build_dir.join("operator-bridge.cc"))?, | |||||
| ], | |||||
| "operator_rust_api", | |||||
| "-ladvapi32", | |||||
| "-luserenv", | |||||
| "-lkernel32", | |||||
| "-lws2_32", | |||||
| "-lbcrypt", | |||||
| "-lncrypt", | |||||
| "-lschannel", | |||||
| "-lntdll", | |||||
| "-liphlpapi", | |||||
| "-lcfgmgr32", | |||||
| "-lcredui", | |||||
| "-lcrypt32", | |||||
| "-lcryptnet", | |||||
| "-lfwpuclnt", | |||||
| "-lgdi32", | |||||
| "-lmsimg32", | |||||
| "-lmswsock", | |||||
| "-lole32", | |||||
| "-loleaut32", | |||||
| "-lopengl32", | |||||
| "-lsecur32", | |||||
| "-lshell32", | |||||
| "-lsynchronization", | |||||
| "-luser32", | |||||
| "-lwinspool", | |||||
| "-Wl,-nodefaultlib:libcmt", | |||||
| "-D_DLL", | |||||
| "-lmsvcrt", | |||||
| ] | |||||
| } else if cfg!(target_os = "macos") { | |||||
| &[ | &[ | ||||
| "-framework", | |||||
| "CoreServices", | |||||
| "-framework", | |||||
| "Security", | |||||
| "-l", | "-l", | ||||
| "dora_operator_api_cxx", | |||||
| "-L", | |||||
| root.join("target").join("debug").to_str().unwrap(), | |||||
| ], | |||||
| ) | |||||
| .await?; | |||||
| build_cxx_operator( | |||||
| &[&dunce::canonicalize( | |||||
| Path::new("operator-c-api").join("operator.cc"), | |||||
| )?], | |||||
| "operator_c_api", | |||||
| &[ | |||||
| "System", | |||||
| "-l", | |||||
| "resolv", | |||||
| "-l", | |||||
| "pthread", | |||||
| "-l", | |||||
| "c", | |||||
| "-l", | "-l", | ||||
| "dora_operator_api_c", | |||||
| "-L", | |||||
| root.join("target").join("debug").to_str().unwrap(), | |||||
| ], | |||||
| "m", | |||||
| ] | |||||
| } else { | |||||
| panic!("unsupported target platform") | |||||
| }; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ node-rust-api/main.cc build/node-bridge.cc -std=c++17 -l dora_node_api_cxx {args...} -L {target_debug} --output build/node_rust_api{EXE_SUFFIX}" | |||||
| ) | ) | ||||
| .await?; | |||||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||||
| build_package("dora-runtime").await?; | |||||
| run_dataflow(&dataflow).await?; | |||||
| Ok(()) | |||||
| } | |||||
| .run()?; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ node-c-api/main.cc -std=c++17 -l dora_node_api_c {args...} -L {target_debug} --output build/node_c_api{EXE_SUFFIX}" | |||||
| ) | |||||
| .run()?; | |||||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("build"); | |||||
| cmd.arg("--package").arg(package); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build {package}"); | |||||
| }; | |||||
| Ok(()) | |||||
| } | |||||
| // compile operators | |||||
| let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] }; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ -c build/operator-bridge.cc -std=c++17 -o build/operator-bridge.o {operator_args...}" | |||||
| ) | |||||
| .run()?; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ -c operator-rust-api/operator.cc -o operator-rust-api/operator.o -std=c++17 {operator_args...}" | |||||
| ) | |||||
| .run()?; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ -c operator-c-api/operator.cc -o operator-c-api/operator.o -std=c++17 {operator_args...}" | |||||
| ) | |||||
| .run()?; | |||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| } | |||||
| // link operators | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ -shared operator-rust-api/operator.o build/operator-bridge.o -l dora_operator_api_cxx {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_rust_api{DLL_SUFFIX}" | |||||
| ) | |||||
| .run()?; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang++ -shared operator-c-api/operator.o -l dora_operator_api_c {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_c_api{DLL_SUFFIX}" | |||||
| ) | |||||
| .run()?; | |||||
| async fn build_cxx_node( | |||||
| root: &Path, | |||||
| paths: &[&Path], | |||||
| out_name: &str, | |||||
| args: &[&str], | |||||
| ) -> eyre::Result<()> { | |||||
| let mut clang = tokio::process::Command::new("clang++"); | |||||
| clang.args(paths); | |||||
| clang.arg("-std=c++17"); | |||||
| #[cfg(target_os = "linux")] | |||||
| { | |||||
| clang.arg("-l").arg("m"); | |||||
| clang.arg("-l").arg("rt"); | |||||
| clang.arg("-l").arg("dl"); | |||||
| clang.arg("-pthread"); | |||||
| } | |||||
| #[cfg(target_os = "windows")] | |||||
| { | |||||
| clang.arg("-ladvapi32"); | |||||
| clang.arg("-luserenv"); | |||||
| clang.arg("-lkernel32"); | |||||
| clang.arg("-lws2_32"); | |||||
| clang.arg("-lbcrypt"); | |||||
| clang.arg("-lncrypt"); | |||||
| clang.arg("-lschannel"); | |||||
| clang.arg("-lntdll"); | |||||
| clang.arg("-liphlpapi"); | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| clang.arg("-lcfgmgr32"); | |||||
| clang.arg("-lcredui"); | |||||
| clang.arg("-lcrypt32"); | |||||
| clang.arg("-lcryptnet"); | |||||
| clang.arg("-lfwpuclnt"); | |||||
| clang.arg("-lgdi32"); | |||||
| clang.arg("-lmsimg32"); | |||||
| clang.arg("-lmswsock"); | |||||
| clang.arg("-lole32"); | |||||
| clang.arg("-lopengl32"); | |||||
| clang.arg("-lsecur32"); | |||||
| clang.arg("-lshell32"); | |||||
| clang.arg("-lsynchronization"); | |||||
| clang.arg("-luser32"); | |||||
| clang.arg("-lwinspool"); | |||||
| // start running the dataflow.yml | |||||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||||
| clang.arg("-Wl,-nodefaultlib:libcmt"); | |||||
| clang.arg("-D_DLL"); | |||||
| clang.arg("-lmsvcrt"); | |||||
| } | |||||
| #[cfg(target_os = "macos")] | |||||
| { | |||||
| clang.arg("-framework").arg("CoreServices"); | |||||
| clang.arg("-framework").arg("Security"); | |||||
| clang.arg("-l").arg("System"); | |||||
| clang.arg("-l").arg("resolv"); | |||||
| clang.arg("-l").arg("pthread"); | |||||
| clang.arg("-l").arg("c"); | |||||
| clang.arg("-l").arg("m"); | |||||
| } | |||||
| clang.args(args); | |||||
| clang.arg("-L").arg(root.join("target").join("debug")); | |||||
| clang | |||||
| .arg("--output") | |||||
| .arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}"))); | |||||
| if let Some(parent) = paths[0].parent() { | |||||
| clang.current_dir(parent); | |||||
| } | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| if !clang.status().await?.success() { | |||||
| bail!("failed to compile c++ node"); | |||||
| }; | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_cxx_operator( | |||||
| paths: &[&Path], | |||||
| out_name: &str, | |||||
| link_args: &[&str], | |||||
| ) -> eyre::Result<()> { | |||||
| let mut object_file_paths = Vec::new(); | |||||
| for path in paths { | |||||
| let mut compile = tokio::process::Command::new("clang++"); | |||||
| compile.arg("-c").arg(path); | |||||
| compile.arg("-std=c++17"); | |||||
| let object_file_path = path.with_extension("o"); | |||||
| compile.arg("-o").arg(&object_file_path); | |||||
| #[cfg(unix)] | |||||
| compile.arg("-fPIC"); | |||||
| if let Some(parent) = path.parent() { | |||||
| compile.current_dir(parent); | |||||
| } | |||||
| if !compile.status().await?.success() { | |||||
| bail!("failed to compile cxx operator"); | |||||
| }; | |||||
| object_file_paths.push(object_file_path); | |||||
| } | |||||
| let mut link = tokio::process::Command::new("clang++"); | |||||
| link.arg("-shared").args(&object_file_paths); | |||||
| link.args(link_args); | |||||
| #[cfg(target_os = "windows")] | |||||
| { | |||||
| link.arg("-ladvapi32"); | |||||
| link.arg("-luserenv"); | |||||
| link.arg("-lkernel32"); | |||||
| link.arg("-lws2_32"); | |||||
| link.arg("-lbcrypt"); | |||||
| link.arg("-lncrypt"); | |||||
| link.arg("-lschannel"); | |||||
| link.arg("-lntdll"); | |||||
| link.arg("-liphlpapi"); | |||||
| link.arg("-lcfgmgr32"); | |||||
| link.arg("-lcredui"); | |||||
| link.arg("-lcrypt32"); | |||||
| link.arg("-lcryptnet"); | |||||
| link.arg("-lfwpuclnt"); | |||||
| link.arg("-lgdi32"); | |||||
| link.arg("-lmsimg32"); | |||||
| link.arg("-lmswsock"); | |||||
| link.arg("-lole32"); | |||||
| link.arg("-lopengl32"); | |||||
| link.arg("-lsecur32"); | |||||
| link.arg("-lshell32"); | |||||
| link.arg("-lsynchronization"); | |||||
| link.arg("-luser32"); | |||||
| link.arg("-lwinspool"); | |||||
| link.arg("-Wl,-nodefaultlib:libcmt"); | |||||
| link.arg("-D_DLL"); | |||||
| link.arg("-lmsvcrt"); | |||||
| link.arg("-fms-runtime-lib=static"); | |||||
| } | |||||
| #[cfg(target_os = "macos")] | |||||
| { | |||||
| link.arg("-framework").arg("CoreServices"); | |||||
| link.arg("-framework").arg("Security"); | |||||
| link.arg("-l").arg("System"); | |||||
| link.arg("-l").arg("resolv"); | |||||
| link.arg("-l").arg("pthread"); | |||||
| link.arg("-l").arg("c"); | |||||
| link.arg("-l").arg("m"); | |||||
| } | |||||
| link.arg("-o") | |||||
| .arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}"))); | |||||
| if let Some(parent) = paths[0].parent() { | |||||
| link.current_dir(parent); | |||||
| } | |||||
| if !link.status().await?.success() { | |||||
| bail!("failed to create shared library from cxx operator (c api)"); | |||||
| }; | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,186 +1,178 @@ | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, Context}; | |||||
| use std::{ | |||||
| env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, | |||||
| path::Path, | |||||
| }; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("c-dataflow-runner").wrap_err("failed to set up tracing")?; | |||||
| use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| fn main() -> eyre::Result<()> { | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | let root = Path::new(env!("CARGO_MANIFEST_DIR")); | ||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| tokio::fs::create_dir_all("build").await?; | |||||
| build_package("dora-node-api-c").await?; | |||||
| build_c_node(root, "node.c", "c_node").await?; | |||||
| build_c_node(root, "sink.c", "c_sink").await?; | |||||
| build_package("dora-operator-api-c").await?; | |||||
| build_c_operator(root).await?; | |||||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||||
| run_dataflow(&dataflow).await?; | |||||
| Ok(()) | |||||
| } | |||||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("build"); | |||||
| cmd.arg("--package").arg(package); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build {package}"); | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| cmd!(sh, "cargo build --package dora-node-api-c").run()?; | |||||
| cmd!(sh, "cargo build --package dora-operator-api-c").run()?; | |||||
| sh.create_dir("build")?; | |||||
| let target_debug = root.join("target").join("debug"); | |||||
| // compile nodes | |||||
| let args: &[&str] = if cfg!(target_os = "linux") { | |||||
| &["-l", "m", "-l", "rt", "-l", "dl", "-pthread"] | |||||
| } else if cfg!(target_os = "windows") { | |||||
| &[ | |||||
| "-ladvapi32", | |||||
| "-luserenv", | |||||
| "-lkernel32", | |||||
| "-lws2_32", | |||||
| "-lbcrypt", | |||||
| "-lncrypt", | |||||
| "-lschannel", | |||||
| "-lntdll", | |||||
| "-liphlpapi", | |||||
| "-lcfgmgr32", | |||||
| "-lcredui", | |||||
| "-lcrypt32", | |||||
| "-lcryptnet", | |||||
| "-lfwpuclnt", | |||||
| "-lgdi32", | |||||
| "-lmsimg32", | |||||
| "-lmswsock", | |||||
| "-lole32", | |||||
| "-loleaut32", | |||||
| "-lopengl32", | |||||
| "-lsecur32", | |||||
| "-lshell32", | |||||
| "-lsynchronization", | |||||
| "-luser32", | |||||
| "-lwinspool", | |||||
| "-Wl,-nodefaultlib:libcmt", | |||||
| "-D_DLL", | |||||
| "-lmsvcrt", | |||||
| ] | |||||
| } else if cfg!(target_os = "macos") { | |||||
| &[ | |||||
| "-framework", | |||||
| "CoreServices", | |||||
| "-framework", | |||||
| "Security", | |||||
| "-l", | |||||
| "System", | |||||
| "-l", | |||||
| "resolv", | |||||
| "-l", | |||||
| "pthread", | |||||
| "-l", | |||||
| "c", | |||||
| "-l", | |||||
| "m", | |||||
| ] | |||||
| } else { | |||||
| panic!("unsupported target platform") | |||||
| }; | }; | ||||
| Ok(()) | |||||
| } | |||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| cmd!( | |||||
| sh, | |||||
| "clang node.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_node{EXE_SUFFIX}" | |||||
| ) | |||||
| .run()?; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang sink.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_sink{EXE_SUFFIX}" | |||||
| ) | |||||
| .run()?; | |||||
| // compile operator | |||||
| let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] }; | |||||
| cmd!( | |||||
| sh, | |||||
| "clang -c operator.c -o build/operator.o -fdeclspec {operator_args...}" | |||||
| ) | |||||
| .run()?; | |||||
| // link operator | |||||
| let operator_link_args: &[&str] = if cfg!(target_os = "windows") { | |||||
| &[ | |||||
| "-ladvapi32", | |||||
| "-luserenv", | |||||
| "-lkernel32", | |||||
| "-lws2_32", | |||||
| "-lbcrypt", | |||||
| "-lncrypt", | |||||
| "-lschannel", | |||||
| "-lntdll", | |||||
| "-liphlpapi", | |||||
| "-lcfgmgr32", | |||||
| "-lcredui", | |||||
| "-lcrypt32", | |||||
| "-lcryptnet", | |||||
| "-lfwpuclnt", | |||||
| "-lgdi32", | |||||
| "-lmsimg32", | |||||
| "-lmswsock", | |||||
| "-lole32", | |||||
| "-loleaut32", | |||||
| "-lopengl32", | |||||
| "-lsecur32", | |||||
| "-lshell32", | |||||
| "-lsynchronization", | |||||
| "-luser32", | |||||
| "-lwinspool", | |||||
| "-Wl,-nodefaultlib:libcmt", | |||||
| "-D_DLL", | |||||
| "-lmsvcrt", | |||||
| ] | |||||
| } else if cfg!(target_os = "macos") { | |||||
| &[ | |||||
| "-framework", | |||||
| "CoreServices", | |||||
| "-framework", | |||||
| "Security", | |||||
| "-l", | |||||
| "System", | |||||
| "-l", | |||||
| "resolv", | |||||
| "-l", | |||||
| "pthread", | |||||
| "-l", | |||||
| "c", | |||||
| "-l", | |||||
| "m", | |||||
| ] | |||||
| } else { | |||||
| &[] | |||||
| }; | }; | ||||
| Ok(()) | |||||
| } | |||||
| cmd!( | |||||
| sh, | |||||
| "clang -shared build/operator.o -L {target_debug} -l dora_operator_api_c {operator_link_args...} -o build/{DLL_PREFIX}operator{DLL_SUFFIX}" | |||||
| ).run()?; | |||||
| async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> { | |||||
| let mut clang = tokio::process::Command::new("clang"); | |||||
| clang.arg(name); | |||||
| clang.arg("-l").arg("dora_node_api_c"); | |||||
| #[cfg(target_os = "linux")] | |||||
| { | |||||
| clang.arg("-l").arg("m"); | |||||
| clang.arg("-l").arg("rt"); | |||||
| clang.arg("-l").arg("dl"); | |||||
| clang.arg("-pthread"); | |||||
| } | |||||
| #[cfg(target_os = "windows")] | |||||
| { | |||||
| clang.arg("-ladvapi32"); | |||||
| clang.arg("-luserenv"); | |||||
| clang.arg("-lkernel32"); | |||||
| clang.arg("-lws2_32"); | |||||
| clang.arg("-lbcrypt"); | |||||
| clang.arg("-lncrypt"); | |||||
| clang.arg("-lschannel"); | |||||
| clang.arg("-lntdll"); | |||||
| clang.arg("-liphlpapi"); | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| clang.arg("-lcfgmgr32"); | |||||
| clang.arg("-lcredui"); | |||||
| clang.arg("-lcrypt32"); | |||||
| clang.arg("-lcryptnet"); | |||||
| clang.arg("-lfwpuclnt"); | |||||
| clang.arg("-lgdi32"); | |||||
| clang.arg("-lmsimg32"); | |||||
| clang.arg("-lmswsock"); | |||||
| clang.arg("-lole32"); | |||||
| clang.arg("-loleaut32"); | |||||
| clang.arg("-lopengl32"); | |||||
| clang.arg("-lsecur32"); | |||||
| clang.arg("-lshell32"); | |||||
| clang.arg("-lsynchronization"); | |||||
| clang.arg("-luser32"); | |||||
| clang.arg("-lwinspool"); | |||||
| // start running the dataflow.yml | |||||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| clang.arg("-Wl,-nodefaultlib:libcmt"); | |||||
| clang.arg("-D_DLL"); | |||||
| clang.arg("-lmsvcrt"); | |||||
| } | |||||
| #[cfg(target_os = "macos")] | |||||
| { | |||||
| clang.arg("-framework").arg("CoreServices"); | |||||
| clang.arg("-framework").arg("Security"); | |||||
| clang.arg("-l").arg("System"); | |||||
| clang.arg("-l").arg("resolv"); | |||||
| clang.arg("-l").arg("pthread"); | |||||
| clang.arg("-l").arg("c"); | |||||
| clang.arg("-l").arg("m"); | |||||
| } | |||||
| clang.arg("-L").arg(root.join("target").join("debug")); | |||||
| clang | |||||
| .arg("--output") | |||||
| .arg(Path::new("build").join(format!("{out_name}{EXE_SUFFIX}"))); | |||||
| if !clang.status().await?.success() { | |||||
| bail!("failed to compile c node"); | |||||
| }; | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_c_operator(root: &Path) -> eyre::Result<()> { | |||||
| let mut compile = tokio::process::Command::new("clang"); | |||||
| compile.arg("-c").arg("operator.c"); | |||||
| compile.arg("-o").arg("build/operator.o"); | |||||
| compile.arg("-fdeclspec"); | |||||
| #[cfg(unix)] | |||||
| compile.arg("-fPIC"); | |||||
| if !compile.status().await?.success() { | |||||
| bail!("failed to compile c operator"); | |||||
| }; | |||||
| let mut link = tokio::process::Command::new("clang"); | |||||
| link.arg("-shared").arg("build/operator.o"); | |||||
| link.arg("-L").arg(root.join("target").join("debug")); | |||||
| link.arg("-l").arg("dora_operator_api_c"); | |||||
| #[cfg(target_os = "windows")] | |||||
| { | |||||
| link.arg("-ladvapi32"); | |||||
| link.arg("-luserenv"); | |||||
| link.arg("-lkernel32"); | |||||
| link.arg("-lws2_32"); | |||||
| link.arg("-lbcrypt"); | |||||
| link.arg("-lncrypt"); | |||||
| link.arg("-lschannel"); | |||||
| link.arg("-lntdll"); | |||||
| link.arg("-liphlpapi"); | |||||
| link.arg("-lcfgmgr32"); | |||||
| link.arg("-lcredui"); | |||||
| link.arg("-lcrypt32"); | |||||
| link.arg("-lcryptnet"); | |||||
| link.arg("-lfwpuclnt"); | |||||
| link.arg("-lgdi32"); | |||||
| link.arg("-lmsimg32"); | |||||
| link.arg("-lmswsock"); | |||||
| link.arg("-lole32"); | |||||
| link.arg("-loleaut32"); | |||||
| link.arg("-lopengl32"); | |||||
| link.arg("-lsecur32"); | |||||
| link.arg("-lshell32"); | |||||
| link.arg("-lsynchronization"); | |||||
| link.arg("-luser32"); | |||||
| link.arg("-lwinspool"); | |||||
| link.arg("-Wl,-nodefaultlib:libcmt"); | |||||
| link.arg("-D_DLL"); | |||||
| link.arg("-lmsvcrt"); | |||||
| } | |||||
| #[cfg(target_os = "macos")] | |||||
| { | |||||
| link.arg("-framework").arg("CoreServices"); | |||||
| link.arg("-framework").arg("Security"); | |||||
| link.arg("-l").arg("System"); | |||||
| link.arg("-l").arg("resolv"); | |||||
| link.arg("-l").arg("pthread"); | |||||
| link.arg("-l").arg("c"); | |||||
| link.arg("-l").arg("m"); | |||||
| } | |||||
| link.arg("-o") | |||||
| .arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}"))); | |||||
| if !link.status().await?.success() { | |||||
| bail!("failed to link c operator"); | |||||
| }; | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,9 +1,9 @@ | |||||
| use dora_tracing::set_up_tracing; | use dora_tracing::set_up_tracing; | ||||
| use eyre::{bail, Context}; | |||||
| use std::path::Path; | |||||
| use eyre::{Context, ContextCompat}; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("cmake-dataflow-runner").wrap_err("failed to set up tracing")?; | set_up_tracing("cmake-dataflow-runner").wrap_err("failed to set up tracing")?; | ||||
| if cfg!(windows) { | if cfg!(windows) { | ||||
| @@ -13,60 +13,55 @@ async fn main() -> eyre::Result<()> { | |||||
| return Ok(()); | return Ok(()); | ||||
| } | } | ||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build C++ source code using cmake | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | let root = Path::new(env!("CARGO_MANIFEST_DIR")); | ||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| cmd!(sh, "cmake -DDORA_ROOT_DIR={root} -B build .").run()?; | |||||
| cmd!(sh, "cmake --build build").run()?; | |||||
| cmd!(sh, "cmake --install build").run()?; | |||||
| tokio::fs::create_dir_all("build").await?; | |||||
| let mut cmd = tokio::process::Command::new("cmake"); | |||||
| cmd.arg(format!("-DDORA_ROOT_DIR={}", root.display())); | |||||
| cmd.arg("-B").arg("build"); | |||||
| cmd.arg("."); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to generating make file"); | |||||
| } | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| let mut cmd = tokio::process::Command::new("cmake"); | |||||
| cmd.arg("--build").arg("build"); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build a cmake-generated project binary tree"); | |||||
| } | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| let mut cmd = tokio::process::Command::new("cmake"); | |||||
| cmd.arg("--install").arg("build"); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build a cmake-generated project binary tree"); | |||||
| } | |||||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||||
| let uuid = output.lines().next().context("no output")?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| let dataflow = Path::new("dataflow.yml").to_owned(); | |||||
| build_package("dora-runtime").await?; | |||||
| run_dataflow(&dataflow).await?; | |||||
| // verify that the node output was written to `out` | |||||
| sh.change_dir("out"); | |||||
| sh.change_dir(uuid); | |||||
| let sink_output = sh.read_file("log_runtime-node-2.txt")?; | |||||
| if sink_output.lines().count() < 20 { | |||||
| eyre::bail!("sink did not receive the expected number of messages") | |||||
| } | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_package(package: &str) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("build"); | |||||
| cmd.arg("--package").arg(package); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build {package}"); | |||||
| } | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,219 +1,96 @@ | |||||
| use dora_coordinator::{ControlEvent, Event}; | |||||
| use dora_core::{ | |||||
| descriptor::Descriptor, | |||||
| topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, | |||||
| }; | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, Context}; | |||||
| use eyre::ContextCompat; | |||||
| use std::{ | use std::{ | ||||
| collections::BTreeSet, | |||||
| net::{IpAddr, Ipv4Addr, SocketAddr}, | |||||
| path::Path, | |||||
| path::{Path, PathBuf}, | |||||
| process::Command, | |||||
| time::Duration, | time::Duration, | ||||
| }; | }; | ||||
| use tokio::{ | |||||
| sync::{ | |||||
| mpsc::{self, Sender}, | |||||
| oneshot, | |||||
| }, | |||||
| task::JoinSet, | |||||
| }; | |||||
| use tokio_stream::wrappers::ReceiverStream; | |||||
| use uuid::Uuid; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| use xshell::{cmd, Shell}; | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| build_dataflow(dataflow).await?; | |||||
| fn main() -> eyre::Result<()> { | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); | |||||
| let coordinator_bind = SocketAddr::new( | |||||
| IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), | |||||
| DORA_COORDINATOR_PORT_DEFAULT, | |||||
| ); | |||||
| let (coordinator_port, coordinator) = | |||||
| dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) | |||||
| .await?; | |||||
| let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); | |||||
| let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); | |||||
| let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); | |||||
| // build the dataflow using `dora build` | |||||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||||
| tracing::info!("Spawning coordinator and daemons"); | |||||
| let mut tasks = JoinSet::new(); | |||||
| tasks.spawn(coordinator); | |||||
| tasks.spawn(daemon_a); | |||||
| tasks.spawn(daemon_b); | |||||
| tracing::info!("waiting until daemons are connected to coordinator"); | |||||
| let mut retries = 0; | |||||
| // start the dora coordinator (in background) | |||||
| Command::from(cmd!(sh, "{dora} coordinator")).spawn()?; | |||||
| // wait until coordinator is ready | |||||
| loop { | loop { | ||||
| let connected_machines = connected_machines(&coordinator_events_tx).await?; | |||||
| if connected_machines.contains("A") && connected_machines.contains("B") { | |||||
| break; | |||||
| } else if retries > 20 { | |||||
| bail!("daemon not connected after {retries} retries"); | |||||
| } else { | |||||
| std::thread::sleep(Duration::from_millis(500)); | |||||
| retries += 1 | |||||
| match cmd!(sh, "{dora} list").quiet().ignore_stderr().run() { | |||||
| Ok(_) => { | |||||
| println!("coordinator connected"); | |||||
| break; | |||||
| } | |||||
| Err(_) => { | |||||
| eprintln!("waiting for coordinator"); | |||||
| std::thread::sleep(Duration::from_millis(100)) | |||||
| } | |||||
| } | } | ||||
| } | } | ||||
| tracing::info!("starting dataflow"); | |||||
| let uuid = start_dataflow(dataflow, &coordinator_events_tx).await?; | |||||
| tracing::info!("started dataflow under ID `{uuid}`"); | |||||
| let running = running_dataflows(&coordinator_events_tx).await?; | |||||
| if !running.iter().map(|d| d.uuid).any(|id| id == uuid) { | |||||
| bail!("dataflow `{uuid}` is not running"); | |||||
| } | |||||
| tracing::info!("waiting for dataflow `{uuid}` to finish"); | |||||
| let mut retries = 0; | |||||
| // start two daemons (in background) | |||||
| Command::from(cmd!(sh, "{dora} daemon --machine-id A")).spawn()?; | |||||
| Command::from(cmd!(sh, "{dora} daemon --machine-id B")).spawn()?; | |||||
| // wait until both daemons are connected | |||||
| loop { | loop { | ||||
| let running = running_dataflows(&coordinator_events_tx).await?; | |||||
| if running.is_empty() { | |||||
| break; | |||||
| } else if retries > 100 { | |||||
| bail!("dataflow not finished after {retries} retries"); | |||||
| } else { | |||||
| tracing::debug!("not done yet"); | |||||
| std::thread::sleep(Duration::from_millis(500)); | |||||
| retries += 1 | |||||
| let output = cmd!(sh, "{dora} connected-machines") | |||||
| .quiet() | |||||
| .ignore_stderr() | |||||
| .read(); | |||||
| match output { | |||||
| Ok(output) => { | |||||
| let connected: Vec<&str> = output.lines().collect(); | |||||
| if connected == ["A", "B"] { | |||||
| println!("both daemons connected"); | |||||
| break; | |||||
| } else { | |||||
| eprintln!("not all daemons connected yet (connected: {connected:?})"); | |||||
| } | |||||
| } | |||||
| Err(err) => eprintln!("failed to query connected-machines: {err:?}"), | |||||
| } | } | ||||
| std::thread::sleep(Duration::from_millis(100)); | |||||
| } | } | ||||
| tracing::info!("dataflow `{uuid}` finished, destroying coordinator"); | |||||
| destroy(&coordinator_events_tx).await?; | |||||
| tracing::info!("joining tasks"); | |||||
| while let Some(res) = tasks.join_next().await { | |||||
| res.unwrap()?; | |||||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||||
| println!("starting dataflow"); | |||||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||||
| println!("dataflow finished successfully"); | |||||
| let uuid = output.lines().next().context("no output")?; | |||||
| // stop the coordinator and both daemons again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| // verify that the node output was written to `out` | |||||
| sh.change_dir("out"); | |||||
| sh.change_dir(uuid); | |||||
| let sink_output = sh.read_file("log_rust-sink.txt")?; | |||||
| if sink_output.lines().count() < 50 { | |||||
| eyre::bail!("sink did not receive the expected number of messages") | |||||
| } | } | ||||
| tracing::info!("done"); | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn start_dataflow( | |||||
| dataflow: &Path, | |||||
| coordinator_events_tx: &Sender<Event>, | |||||
| ) -> eyre::Result<Uuid> { | |||||
| let dataflow_descriptor = Descriptor::read(dataflow) | |||||
| .await | |||||
| .wrap_err("failed to read yaml dataflow")?; | |||||
| let working_dir = dataflow | |||||
| .canonicalize() | |||||
| .context("failed to canonicalize dataflow path")? | |||||
| .parent() | |||||
| .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? | |||||
| .to_owned(); | |||||
| dataflow_descriptor | |||||
| .check(&working_dir) | |||||
| .wrap_err("could not validate yaml")?; | |||||
| let (reply_sender, reply) = oneshot::channel(); | |||||
| coordinator_events_tx | |||||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||||
| request: ControlRequest::Start { | |||||
| dataflow: dataflow_descriptor, | |||||
| local_working_dir: working_dir, | |||||
| name: None, | |||||
| }, | |||||
| reply_sender, | |||||
| })) | |||||
| .await?; | |||||
| let result = reply.await??; | |||||
| let uuid = match result { | |||||
| ControlRequestReply::DataflowStarted { uuid } => uuid, | |||||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||||
| }; | |||||
| Ok(uuid) | |||||
| } | |||||
| async fn connected_machines( | |||||
| coordinator_events_tx: &Sender<Event>, | |||||
| ) -> eyre::Result<BTreeSet<String>> { | |||||
| let (reply_sender, reply) = oneshot::channel(); | |||||
| coordinator_events_tx | |||||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||||
| request: ControlRequest::ConnectedMachines, | |||||
| reply_sender, | |||||
| })) | |||||
| .await?; | |||||
| let result = reply.await??; | |||||
| let machines = match result { | |||||
| ControlRequestReply::ConnectedMachines(machines) => machines, | |||||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||||
| }; | |||||
| Ok(machines) | |||||
| } | |||||
| async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Result<Vec<DataflowId>> { | |||||
| let (reply_sender, reply) = oneshot::channel(); | |||||
| coordinator_events_tx | |||||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||||
| request: ControlRequest::List, | |||||
| reply_sender, | |||||
| })) | |||||
| .await?; | |||||
| let result = reply.await??; | |||||
| let dataflows = match result { | |||||
| ControlRequestReply::DataflowList { dataflows } => dataflows, | |||||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||||
| }; | |||||
| Ok(dataflows) | |||||
| } | |||||
| async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> { | |||||
| let (reply_sender, reply) = oneshot::channel(); | |||||
| coordinator_events_tx | |||||
| .send(Event::Control(ControlEvent::IncomingRequest { | |||||
| request: ControlRequest::Destroy, | |||||
| reply_sender, | |||||
| })) | |||||
| .await?; | |||||
| let result = reply.await??; | |||||
| match result { | |||||
| ControlRequestReply::DestroyOk => Ok(()), | |||||
| ControlRequestReply::Error(err) => bail!("{err}"), | |||||
| other => bail!("unexpected start dataflow reply: {other:?}"), | |||||
| } | |||||
| } | |||||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--").arg("build").arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | } | ||||
| async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--machine-id") | |||||
| .arg(machine_id) | |||||
| .arg("--coordinator-addr") | |||||
| .arg(coordinator); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,30 +1,73 @@ | |||||
| use dora_core::{get_pip_path, get_python_path, run}; | |||||
| use dora_download::download_file; | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, ContextCompat, WrapErr}; | |||||
| use std::path::Path; | |||||
| use dora_core::get_python_path; | |||||
| use dora_download::download_file_sync; | |||||
| use eyre::{ContextCompat, WrapErr}; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("python-dataflow-runner")?; | |||||
| fn main() -> eyre::Result<()> { | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| // prepare Python virtual environment | |||||
| prepare_venv(&sh)?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| // install/upgrade pip, then install requirements | |||||
| cmd!(sh, "python -m pip install --upgrade pip").run()?; | |||||
| cmd!(sh, "pip install -r requirements.txt").run()?; | |||||
| // build the dora Python package (you can skip this if you installed the Python dora package) | |||||
| { | |||||
| let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR")) | |||||
| .join("apis") | |||||
| .join("python") | |||||
| .join("node"); | |||||
| let _dir = sh.push_dir(python_node_api_dir); | |||||
| cmd!(sh, "maturin develop").run()?; | |||||
| } | |||||
| run( | |||||
| get_python_path().context("Could not get python binary")?, | |||||
| &["-m", "venv", "../.env"], | |||||
| None, | |||||
| download_file_sync( | |||||
| "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", | |||||
| Path::new("yolov8n.pt"), | |||||
| ) | ) | ||||
| .await | |||||
| .context("failed to create venv")?; | |||||
| let venv = &root.join("examples").join(".env"); | |||||
| std::env::set_var( | |||||
| .context("Could not download weights.")?; | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||||
| let uuid = output.lines().next().context("no output")?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| // verify that the node output was written to `out` | |||||
| sh.change_dir("out"); | |||||
| sh.change_dir(uuid); | |||||
| let sink_output = sh.read_file("log_object_detection.txt")?; | |||||
| if sink_output.lines().count() < 50 { | |||||
| eyre::bail!("object dectection node did not receive the expected number of messages") | |||||
| } | |||||
| Ok(()) | |||||
| } | |||||
| /// Prepares a Python virtual environment. | |||||
| /// | |||||
| /// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` | |||||
| /// if you're running bash. | |||||
| fn prepare_venv(sh: &Shell) -> eyre::Result<()> { | |||||
| let python = get_python_path().context("Could not get python binary")?; | |||||
| cmd!(sh, "{python} -m venv ../.env").run()?; | |||||
| let venv = sh.current_dir().parent().unwrap().join(".env"); | |||||
| sh.set_var( | |||||
| "VIRTUAL_ENV", | "VIRTUAL_ENV", | ||||
| venv.to_str().context("venv path not valid unicode")?, | venv.to_str().context("venv path not valid unicode")?, | ||||
| ); | ); | ||||
| let orig_path = std::env::var("PATH")?; | |||||
| // bin folder is named Scripts on windows. | // bin folder is named Scripts on windows. | ||||
| // 🤦♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 | // 🤦♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 | ||||
| let venv_bin = if cfg!(windows) { | let venv_bin = if cfg!(windows) { | ||||
| @@ -32,71 +75,36 @@ async fn main() -> eyre::Result<()> { | |||||
| } else { | } else { | ||||
| venv.join("bin") | venv.join("bin") | ||||
| }; | }; | ||||
| let path_separator = if cfg!(windows) { ';' } else { ':' }; | |||||
| if cfg!(windows) { | |||||
| std::env::set_var( | |||||
| "PATH", | |||||
| format!( | |||||
| "{};{orig_path}", | |||||
| venv_bin.to_str().context("venv path not valid unicode")? | |||||
| ), | |||||
| ); | |||||
| } else { | |||||
| std::env::set_var( | |||||
| "PATH", | |||||
| format!( | |||||
| "{}:{orig_path}", | |||||
| venv_bin.to_str().context("venv path not valid unicode")? | |||||
| ), | |||||
| ); | |||||
| } | |||||
| run( | |||||
| get_python_path().context("Could not get pip binary")?, | |||||
| &["-m", "pip", "install", "--upgrade", "pip"], | |||||
| None, | |||||
| ) | |||||
| .await | |||||
| .context("failed to install pip")?; | |||||
| run( | |||||
| get_pip_path().context("Could not get pip binary")?, | |||||
| &["install", "-r", "requirements.txt"], | |||||
| None, | |||||
| ) | |||||
| .await | |||||
| .context("pip install failed")?; | |||||
| run( | |||||
| "maturin", | |||||
| &["develop"], | |||||
| Some(&root.join("apis").join("python").join("node")), | |||||
| ) | |||||
| .await | |||||
| .context("maturin develop failed")?; | |||||
| download_file( | |||||
| "https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt", | |||||
| Path::new("yolov8n.pt"), | |||||
| ) | |||||
| .await | |||||
| .context("Could not download weights.")?; | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| run_dataflow(dataflow).await?; | |||||
| sh.set_var( | |||||
| "PATH", | |||||
| format!( | |||||
| "{}{path_separator}{}", | |||||
| venv_bin.to_str().context("venv path not valid unicode")?, | |||||
| std::env::var("PATH")? | |||||
| ), | |||||
| ); | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,29 +1,72 @@ | |||||
| use dora_core::{get_pip_path, get_python_path, run}; | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, ContextCompat, WrapErr}; | |||||
| use std::path::Path; | |||||
| use dora_core::get_python_path; | |||||
| use eyre::{ContextCompat, WrapErr}; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("python-operator-dataflow-runner")?; | |||||
| fn main() -> eyre::Result<()> { | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| run( | |||||
| get_python_path().context("Could not get python binary")?, | |||||
| &["-m", "venv", "../.env"], | |||||
| None, | |||||
| ) | |||||
| .await | |||||
| .context("failed to create venv")?; | |||||
| let venv = &root.join("examples").join(".env"); | |||||
| std::env::set_var( | |||||
| // prepare Python virtual environment | |||||
| prepare_venv(&sh)?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| // install/upgrade pip, then install requirements | |||||
| cmd!(sh, "python -m pip install --upgrade pip").run()?; | |||||
| cmd!(sh, "pip install -r requirements.txt").run()?; | |||||
| // build the dora Python package (you can skip this if you installed the Python dora package) | |||||
| { | |||||
| let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR")) | |||||
| .join("apis") | |||||
| .join("python") | |||||
| .join("node"); | |||||
| let _dir = sh.push_dir(python_node_api_dir); | |||||
| cmd!(sh, "maturin develop").run()?; | |||||
| } | |||||
| let dataflow = if std::env::var("CONDA_EXE").is_ok() { | |||||
| Path::new("dataflow.yml") | |||||
| } else { | |||||
| Path::new("dataflow_conda.yml") | |||||
| }; | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||||
| let output = cmd!(sh, "{dora} start {dataflow} --attach").read_stderr()?; | |||||
| let uuid = output.lines().next().context("no output")?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| // verify that the node output was written to `out` | |||||
| sh.change_dir("out"); | |||||
| sh.change_dir(uuid); | |||||
| let sink_output = sh.read_file("log_object_detection.txt")?; | |||||
| if sink_output.lines().count() < 50 { | |||||
| eyre::bail!("object dectection node did not receive the expected number of messages") | |||||
| } | |||||
| Ok(()) | |||||
| } | |||||
| /// Prepares a Python virtual environment. | |||||
| /// | |||||
| /// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate` | |||||
| /// if you're running bash. | |||||
| fn prepare_venv(sh: &Shell) -> eyre::Result<()> { | |||||
| let python = get_python_path().context("Could not get python binary")?; | |||||
| cmd!(sh, "{python} -m venv ../.env").run()?; | |||||
| let venv = sh.current_dir().parent().unwrap().join(".env"); | |||||
| sh.set_var( | |||||
| "VIRTUAL_ENV", | "VIRTUAL_ENV", | ||||
| venv.to_str().context("venv path not valid unicode")?, | venv.to_str().context("venv path not valid unicode")?, | ||||
| ); | ); | ||||
| let orig_path = std::env::var("PATH")?; | |||||
| // bin folder is named Scripts on windows. | // bin folder is named Scripts on windows. | ||||
| // 🤦♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 | // 🤦♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1 | ||||
| let venv_bin = if cfg!(windows) { | let venv_bin = if cfg!(windows) { | ||||
| @@ -31,70 +74,36 @@ async fn main() -> eyre::Result<()> { | |||||
| } else { | } else { | ||||
| venv.join("bin") | venv.join("bin") | ||||
| }; | }; | ||||
| let path_separator = if cfg!(windows) { ';' } else { ':' }; | |||||
| if cfg!(windows) { | |||||
| std::env::set_var( | |||||
| "PATH", | |||||
| format!( | |||||
| "{};{orig_path}", | |||||
| venv_bin.to_str().context("venv path not valid unicode")? | |||||
| ), | |||||
| ); | |||||
| } else { | |||||
| std::env::set_var( | |||||
| "PATH", | |||||
| format!( | |||||
| "{}:{orig_path}", | |||||
| venv_bin.to_str().context("venv path not valid unicode")? | |||||
| ), | |||||
| ); | |||||
| } | |||||
| run( | |||||
| get_python_path().context("Could not get pip binary")?, | |||||
| &["-m", "pip", "install", "--upgrade", "pip"], | |||||
| None, | |||||
| ) | |||||
| .await | |||||
| .context("failed to install pip")?; | |||||
| run( | |||||
| get_pip_path().context("Could not get pip binary")?, | |||||
| &["install", "-r", "requirements.txt"], | |||||
| None, | |||||
| ) | |||||
| .await | |||||
| .context("pip install failed")?; | |||||
| run( | |||||
| "maturin", | |||||
| &["develop"], | |||||
| Some(&root.join("apis").join("python").join("node")), | |||||
| ) | |||||
| .await | |||||
| .context("maturin develop failed")?; | |||||
| if std::env::var("CONDA_EXE").is_ok() { | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| run_dataflow(dataflow).await?; | |||||
| } else { | |||||
| let dataflow = Path::new("dataflow_conda.yml"); | |||||
| run_dataflow(dataflow).await?; | |||||
| } | |||||
| sh.set_var( | |||||
| "PATH", | |||||
| format!( | |||||
| "{}{path_separator}{}", | |||||
| venv_bin.to_str().context("venv path not valid unicode")?, | |||||
| std::env::var("PATH")? | |||||
| ), | |||||
| ); | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,46 +1,41 @@ | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, Context}; | |||||
| use std::path::Path; | use std::path::Path; | ||||
| use std::path::PathBuf; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("rust-dataflow-url-runner").wrap_err("failed to set up tracing")?; | |||||
| fn main() -> eyre::Result<()> { | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| // build the dataflow using `dora build` | |||||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| build_dataflow(dataflow).await?; | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| run_dataflow(dataflow).await?; | |||||
| // start running the dataflow.yml | |||||
| cmd!(sh, "{dora} start dataflow.yml --attach").run()?; | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--").arg("build").arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,46 +1,53 @@ | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, Context}; | |||||
| use std::path::Path; | |||||
| use eyre::ContextCompat; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | |||||
| async fn main() -> eyre::Result<()> { | |||||
| set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; | |||||
| fn main() -> eyre::Result<()> { | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| // build the dataflow using `dora build` | |||||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| build_dataflow(dataflow).await?; | |||||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||||
| let uuid = output.lines().next().context("no output")?; | |||||
| run_dataflow(dataflow).await?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| // verify that the node output was written to `out` | |||||
| sh.change_dir("out"); | |||||
| sh.change_dir(uuid); | |||||
| let sink_output = sh.read_file("log_rust-sink.txt")?; | |||||
| if sink_output.lines().count() < 50 { | |||||
| eyre::bail!("sink did not receive the expected number of messages") | |||||
| } | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--").arg("build").arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -1,46 +1,57 @@ | |||||
| use dora_tracing::set_up_tracing; | |||||
| use eyre::{bail, Context}; | |||||
| use std::path::Path; | |||||
| use eyre::ContextCompat; | |||||
| use std::path::{Path, PathBuf}; | |||||
| use xshell::{cmd, Shell}; | |||||
| #[tokio::main] | #[tokio::main] | ||||
| async fn main() -> eyre::Result<()> { | async fn main() -> eyre::Result<()> { | ||||
| set_up_tracing("rust-ros2-dataflow-runner").wrap_err("failed to set up tracing subscriber")?; | |||||
| // create a new shell in this folder | |||||
| let sh = prepare_shell()?; | |||||
| // build the `dora` binary (you can skip this if you use `cargo install dora-cli`) | |||||
| let dora = prepare_dora(&sh)?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| std::env::set_current_dir(root.join(file!()).parent().unwrap()) | |||||
| .wrap_err("failed to set working dir")?; | |||||
| // build the dataflow using `dora build` | |||||
| cmd!(sh, "{dora} build dataflow.yml").run()?; | |||||
| // start up the dora daemon and coordinator | |||||
| cmd!(sh, "{dora} up").run()?; | |||||
| let dataflow = Path::new("dataflow.yml"); | |||||
| build_dataflow(dataflow).await?; | |||||
| // start running the dataflow.yml -> outputs the UUID assigned to the dataflow | |||||
| let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?; | |||||
| let uuid = output.lines().next().context("no output")?; | |||||
| run_dataflow(dataflow).await?; | |||||
| // stop the dora daemon and coordinator again | |||||
| cmd!(sh, "{dora} destroy").run()?; | |||||
| // verify that the node output was written to `out` | |||||
| sh.change_dir("out"); | |||||
| sh.change_dir(uuid); | |||||
| let sink_output = sh.read_file("log_rust-node.txt")?; | |||||
| if !sink_output | |||||
| .lines() | |||||
| .any(|l| l.starts_with("received pose event: Ok(")) | |||||
| { | |||||
| eyre::bail!("node did not receive any pose events") | |||||
| } | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--").arg("build").arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to build dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Prepares a shell and set the working directory to the parent folder of this file. | |||||
| /// | |||||
| /// You can use your system shell instead (e.g. `bash`); | |||||
| fn prepare_shell() -> Result<Shell, eyre::Error> { | |||||
| let sh = Shell::new()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| sh.change_dir(root.join(file!()).parent().unwrap()); | |||||
| Ok(sh) | |||||
| } | } | ||||
| async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| let cargo = std::env::var("CARGO").unwrap(); | |||||
| let mut cmd = tokio::process::Command::new(&cargo); | |||||
| cmd.arg("run"); | |||||
| cmd.arg("--package").arg("dora-cli"); | |||||
| cmd.arg("--") | |||||
| .arg("daemon") | |||||
| .arg("--run-dataflow") | |||||
| .arg(dataflow); | |||||
| if !cmd.status().await?.success() { | |||||
| bail!("failed to run dataflow"); | |||||
| }; | |||||
| Ok(()) | |||||
| /// Build the `dora` command-line executable from this repo. | |||||
| /// | |||||
| /// You can skip this step and run `cargo install dora-cli --locked` instead. | |||||
| fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> { | |||||
| cmd!(sh, "cargo build --package dora-cli").run()?; | |||||
| let root = Path::new(env!("CARGO_MANIFEST_DIR")); | |||||
| let dora = root.join("target").join("debug").join("dora"); | |||||
| Ok(dora) | |||||
| } | } | ||||
| @@ -5,6 +5,13 @@ use std::path::Path; | |||||
| use tokio::io::AsyncWriteExt; | use tokio::io::AsyncWriteExt; | ||||
| use tracing::info; | use tracing::info; | ||||
| pub fn download_file_sync<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> | |||||
| where | |||||
| T: reqwest::IntoUrl + std::fmt::Display + Copy, | |||||
| { | |||||
| tokio::runtime::Runtime::new()?.block_on(download_file(url, target_path)) | |||||
| } | |||||
| pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> | pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> | ||||
| where | where | ||||
| T: reqwest::IntoUrl + std::fmt::Display + Copy, | T: reqwest::IntoUrl + std::fmt::Display + Copy, | ||||