Compare commits

...

17 Commits
main ... xshell

Author SHA1 Message Date
  Philipp Oppermann 22f2956b84
Remove dev-dependency on `tokio` 1 year ago
  Philipp Oppermann fcef458aa3
Migrate rust-dataflow-url example to `xshell` 1 year ago
  Philipp Oppermann 1c3488d977
Migrate Python operator example to xshell 1 year ago
  Philipp Oppermann 6fba72165a
Remove tokio dependency from cmake example 1 year ago
  Philipp Oppermann 4514d6e9d0
Migrate C++ dataflow example to `xshell` 1 year ago
  Philipp Oppermann 61d3f16bb2
Remove tokio dependency from Python dataflow example 1 year ago
  Philipp Oppermann 7ec839fc6f
Migrate C dataflow example to `xshell` 1 year ago
  Philipp Oppermann b5bd71c17f
Migrate benchmark example to xshell 1 year ago
  Philipp Oppermann cd41356e76
Merge branch 'main' into xshell 1 year ago
  Philipp Oppermann 46433bb8c5
Migrate run.rs script of cmake dataflow example to xshell 1 year ago
  Philipp Oppermann dcae010f86
Simplify `multiple-daemons` examples using new `connected-machines` command and `xshell` crate 1 year ago
  Philipp Oppermann 1b665b0c44
CLI: Add a hidden `connected-machines` command 1 year ago
  Philipp Oppermann 4117e6087e
Migrate run.rs script of rust-ros2-dataflow example to xshell 1 year ago
  Philipp Oppermann 20498f7e1e
Remove unused import 1 year ago
  Philipp Oppermann 1a5fa7fe45
Fix: Use correct python/pip binaries after setting up venv 1 year ago
  Philipp Oppermann 6f85cfb429
Use `xshell` for Python dataflow example to try venv activation 1 year ago
  Philipp Oppermann 179dbf1bbc
Simplify the `run.rs` script of the `rust-dataflow` example using `xshell` 1 year ago
14 changed files with 798 additions and 982 deletions
Split View
  1. +1
    -2
      Cargo.lock
  2. +1
    -2
      Cargo.toml
  3. +40
    -1
      binaries/cli/src/main.rs
  4. +32
    -35
      examples/benchmark/run.rs
  5. +138
    -257
      examples/c++-dataflow/run.rs
  6. +165
    -173
      examples/c-dataflow/run.rs
  7. +44
    -49
      examples/cmake-dataflow/run.rs
  8. +74
    -197
      examples/multiple-daemons/run.rs
  9. +90
    -82
      examples/python-dataflow/run.rs
  10. +91
    -82
      examples/python-operator-dataflow/run.rs
  11. +29
    -34
      examples/rust-dataflow-url/run.rs
  12. +42
    -35
      examples/rust-dataflow/run.rs
  13. +44
    -33
      examples/rust-ros2-dataflow/run.rs
  14. +7
    -0
      libraries/extensions/download/src/lib.rs

+ 1
- 2
Cargo.lock View File

@@ -2352,10 +2352,9 @@ dependencies = [
"eyre",
"futures",
"serde_yaml 0.8.26",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"xshell",
]

[[package]]


+ 1
- 2
Cargo.toml View File

@@ -86,7 +86,6 @@ ros2-examples = []

[dev-dependencies]
eyre = "0.6.8"
tokio = "1.24.2"
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dora-tracing = { workspace = true }
@@ -96,7 +95,7 @@ serde_yaml = "0.8.23"
uuid = { version = "1.7", features = ["v7", "serde"] }
tracing = "0.1.36"
futures = "0.3.25"
tokio-stream = "0.1.11"
xshell = "0.2.6"

[[example]]
name = "c-dataflow"


+ 40
- 1
binaries/cli/src/main.rs View File

@@ -14,7 +14,7 @@ use dora_daemon::Daemon;
use dora_tracing::set_up_tracing;
use duration_str::parse;
use eyre::{bail, Context};
use std::net::SocketAddr;
use std::{collections::BTreeSet, net::SocketAddr};
use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
@@ -102,6 +102,8 @@ enum Command {
dataflow: Option<String>,
node: String,
},
#[clap(hide = true)]
ConnectedMachines,
// Metrics,
// Stats,
// Get,
@@ -273,6 +275,12 @@ fn run() -> eyre::Result<()> {
bail!("No dora coordinator seems to be running.");
}
},
Command::ConnectedMachines => match connect_to_coordinator() {
Ok(mut session) => connected_machines(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
}
},
Command::Stop {
uuid,
name,
@@ -466,6 +474,37 @@ fn query_running_dataflows(
Ok(ids)
}

fn connected_machines(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let machines = query_connected_machines(session)?;

if machines.is_empty() {
eprintln!("No machines are connected");
} else {
for id in machines {
println!("{id}");
}
}

Ok(())
}

fn query_connected_machines(
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<BTreeSet<String>> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::ConnectedMachines).unwrap())
.wrap_err("failed to send connected machines message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::ConnectedMachines(ids) => ids,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected connected machines reply: {other:?}"),
};

Ok(ids)
}

fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(control_socket_addr())
}

+ 32
- 35
examples/benchmark/run.rs View File

@@ -1,46 +1,43 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("benchmark-runner").wrap_err("failed to set up tracing subscriber")?;
fn main() -> eyre::Result<()> {
// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora_optimized(&sh)?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
// build the dataflow using `dora build`
cmd!(sh, "{dora} build dataflow.yml").run()?;

// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;
// start running the dataflow.yml
cmd!(sh, "{dora} start dataflow.yml --attach").run()?;

run_dataflow(dataflow).await?;
// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora_optimized(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli --release").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("release").join("dora");
Ok(dora)
}

+ 138
- 257
examples/c++-dataflow/run.rs View File

@@ -1,12 +1,9 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::{
env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX},
path::Path,
};

#[tokio::main]
async fn main() -> eyre::Result<()> {
use eyre::Context;
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX};
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};
fn main() -> eyre::Result<()> {
set_up_tracing("c++-dataflow-runner").wrap_err("failed to set up tracing")?;

if cfg!(windows) {
@@ -16,284 +13,168 @@ async fn main() -> eyre::Result<()> {
return Ok(());
}

// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

cmd!(sh, "cargo build --package dora-node-api-cxx").run()?;
cmd!(sh, "cargo build --package dora-operator-api-cxx").run()?;
cmd!(sh, "cargo build --package dora-node-api-c").run()?;
cmd!(sh, "cargo build --package dora-operator-api-c").run()?;

sh.create_dir("build")?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let target = root.join("target");
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

tokio::fs::create_dir_all("build").await?;
let build_dir = Path::new("build");
let target_debug = target.join("debug");

build_package("dora-node-api-cxx").await?;
let node_cxxbridge = target
.join("cxxbridge")
.join("dora-node-api-cxx")
.join("src");
tokio::fs::copy(
node_cxxbridge.join("lib.rs.cc"),
build_dir.join("node-bridge.cc"),
)
.await?;
tokio::fs::copy(
node_cxxbridge.join("lib.rs.h"),
build_dir.join("dora-node-api.h"),
)
.await?;
tokio::fs::write(
build_dir.join("operator.h"),
sh.copy_file(node_cxxbridge.join("lib.rs.cc"), "build/node-bridge.cc")?;
sh.copy_file(node_cxxbridge.join("lib.rs.h"), "build/dora-node-api.h")?;
sh.write_file(
"build/operator.h",
r###"#include "../operator-rust-api/operator.h""###,
)
.await?;
)?;

build_package("dora-operator-api-cxx").await?;
let operator_cxxbridge = target
.join("cxxbridge")
.join("dora-operator-api-cxx")
.join("src");
tokio::fs::copy(
sh.copy_file(
operator_cxxbridge.join("lib.rs.cc"),
build_dir.join("operator-bridge.cc"),
)
.await?;
tokio::fs::copy(
"build/operator-bridge.cc",
)?;
sh.copy_file(
operator_cxxbridge.join("lib.rs.h"),
build_dir.join("dora-operator-api.h"),
)
.await?;
"build/dora-operator-api.h",
)?;

build_package("dora-node-api-c").await?;
build_package("dora-operator-api-c").await?;
build_cxx_node(
root,
&[
&dunce::canonicalize(Path::new("node-rust-api").join("main.cc"))?,
&dunce::canonicalize(build_dir.join("node-bridge.cc"))?,
],
"node_rust_api",
&["-l", "dora_node_api_cxx"],
)
.await?;
build_cxx_node(
root,
&[&dunce::canonicalize(
Path::new("node-c-api").join("main.cc"),
)?],
"node_c_api",
&["-l", "dora_node_api_c"],
)
.await?;
build_cxx_operator(
// compile nodes
let args: &[&str] = if cfg!(target_os = "linux") {
&["-l", "m", "-l", "rt", "-l", "dl", "-pthread"]
} else if cfg!(target_os = "windows") {
&[
&dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?,
&dunce::canonicalize(build_dir.join("operator-bridge.cc"))?,
],
"operator_rust_api",
"-ladvapi32",
"-luserenv",
"-lkernel32",
"-lws2_32",
"-lbcrypt",
"-lncrypt",
"-lschannel",
"-lntdll",
"-liphlpapi",
"-lcfgmgr32",
"-lcredui",
"-lcrypt32",
"-lcryptnet",
"-lfwpuclnt",
"-lgdi32",
"-lmsimg32",
"-lmswsock",
"-lole32",
"-loleaut32",
"-lopengl32",
"-lsecur32",
"-lshell32",
"-lsynchronization",
"-luser32",
"-lwinspool",
"-Wl,-nodefaultlib:libcmt",
"-D_DLL",
"-lmsvcrt",
]
} else if cfg!(target_os = "macos") {
&[
"-framework",
"CoreServices",
"-framework",
"Security",
"-l",
"dora_operator_api_cxx",
"-L",
root.join("target").join("debug").to_str().unwrap(),
],
)
.await?;
build_cxx_operator(
&[&dunce::canonicalize(
Path::new("operator-c-api").join("operator.cc"),
)?],
"operator_c_api",
&[
"System",
"-l",
"resolv",
"-l",
"pthread",
"-l",
"c",
"-l",
"dora_operator_api_c",
"-L",
root.join("target").join("debug").to_str().unwrap(),
],
"m",
]
} else {
panic!("unsupported target platform")
};
cmd!(
sh,
"clang++ node-rust-api/main.cc build/node-bridge.cc -std=c++17 -l dora_node_api_cxx {args...} -L {target_debug} --output build/node_rust_api{EXE_SUFFIX}"
)
.await?;

let dataflow = Path::new("dataflow.yml").to_owned();
build_package("dora-runtime").await?;
run_dataflow(&dataflow).await?;

Ok(())
}
.run()?;
cmd!(
sh,
"clang++ node-c-api/main.cc -std=c++17 -l dora_node_api_c {args...} -L {target_debug} --output build/node_c_api{EXE_SUFFIX}"
)
.run()?;

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}
// compile operators
let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] };
cmd!(
sh,
"clang++ -c build/operator-bridge.cc -std=c++17 -o build/operator-bridge.o {operator_args...}"
)
.run()?;
cmd!(
sh,
"clang++ -c operator-rust-api/operator.cc -o operator-rust-api/operator.o -std=c++17 {operator_args...}"
)
.run()?;
cmd!(
sh,
"clang++ -c operator-c-api/operator.cc -o operator-c-api/operator.o -std=c++17 {operator_args...}"
)
.run()?;

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
}
// link operators
cmd!(
sh,
"clang++ -shared operator-rust-api/operator.o build/operator-bridge.o -l dora_operator_api_cxx {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_rust_api{DLL_SUFFIX}"
)
.run()?;
cmd!(
sh,
"clang++ -shared operator-c-api/operator.o -l dora_operator_api_c {args...} -L {target_debug} --output build/{DLL_PREFIX}operator_c_api{DLL_SUFFIX}"
)
.run()?;

async fn build_cxx_node(
root: &Path,
paths: &[&Path],
out_name: &str,
args: &[&str],
) -> eyre::Result<()> {
let mut clang = tokio::process::Command::new("clang++");
clang.args(paths);
clang.arg("-std=c++17");
#[cfg(target_os = "linux")]
{
clang.arg("-l").arg("m");
clang.arg("-l").arg("rt");
clang.arg("-l").arg("dl");
clang.arg("-pthread");
}
#[cfg(target_os = "windows")]
{
clang.arg("-ladvapi32");
clang.arg("-luserenv");
clang.arg("-lkernel32");
clang.arg("-lws2_32");
clang.arg("-lbcrypt");
clang.arg("-lncrypt");
clang.arg("-lschannel");
clang.arg("-lntdll");
clang.arg("-liphlpapi");
// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

clang.arg("-lcfgmgr32");
clang.arg("-lcredui");
clang.arg("-lcrypt32");
clang.arg("-lcryptnet");
clang.arg("-lfwpuclnt");
clang.arg("-lgdi32");
clang.arg("-lmsimg32");
clang.arg("-lmswsock");
clang.arg("-lole32");
clang.arg("-lopengl32");
clang.arg("-lsecur32");
clang.arg("-lshell32");
clang.arg("-lsynchronization");
clang.arg("-luser32");
clang.arg("-lwinspool");
// start running the dataflow.yml
cmd!(sh, "{dora} start dataflow.yml --attach").run()?;

clang.arg("-Wl,-nodefaultlib:libcmt");
clang.arg("-D_DLL");
clang.arg("-lmsvcrt");
}
#[cfg(target_os = "macos")]
{
clang.arg("-framework").arg("CoreServices");
clang.arg("-framework").arg("Security");
clang.arg("-l").arg("System");
clang.arg("-l").arg("resolv");
clang.arg("-l").arg("pthread");
clang.arg("-l").arg("c");
clang.arg("-l").arg("m");
}
clang.args(args);
clang.arg("-L").arg(root.join("target").join("debug"));
clang
.arg("--output")
.arg(Path::new("../build").join(format!("{out_name}{EXE_SUFFIX}")));
if let Some(parent) = paths[0].parent() {
clang.current_dir(parent);
}
// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

if !clang.status().await?.success() {
bail!("failed to compile c++ node");
};
Ok(())
}

async fn build_cxx_operator(
paths: &[&Path],
out_name: &str,
link_args: &[&str],
) -> eyre::Result<()> {
let mut object_file_paths = Vec::new();

for path in paths {
let mut compile = tokio::process::Command::new("clang++");
compile.arg("-c").arg(path);
compile.arg("-std=c++17");
let object_file_path = path.with_extension("o");
compile.arg("-o").arg(&object_file_path);
#[cfg(unix)]
compile.arg("-fPIC");
if let Some(parent) = path.parent() {
compile.current_dir(parent);
}
if !compile.status().await?.success() {
bail!("failed to compile cxx operator");
};
object_file_paths.push(object_file_path);
}

let mut link = tokio::process::Command::new("clang++");
link.arg("-shared").args(&object_file_paths);
link.args(link_args);
#[cfg(target_os = "windows")]
{
link.arg("-ladvapi32");
link.arg("-luserenv");
link.arg("-lkernel32");
link.arg("-lws2_32");
link.arg("-lbcrypt");
link.arg("-lncrypt");
link.arg("-lschannel");
link.arg("-lntdll");
link.arg("-liphlpapi");

link.arg("-lcfgmgr32");
link.arg("-lcredui");
link.arg("-lcrypt32");
link.arg("-lcryptnet");
link.arg("-lfwpuclnt");
link.arg("-lgdi32");
link.arg("-lmsimg32");
link.arg("-lmswsock");
link.arg("-lole32");
link.arg("-lopengl32");
link.arg("-lsecur32");
link.arg("-lshell32");
link.arg("-lsynchronization");
link.arg("-luser32");
link.arg("-lwinspool");

link.arg("-Wl,-nodefaultlib:libcmt");
link.arg("-D_DLL");
link.arg("-lmsvcrt");
link.arg("-fms-runtime-lib=static");
}
#[cfg(target_os = "macos")]
{
link.arg("-framework").arg("CoreServices");
link.arg("-framework").arg("Security");
link.arg("-l").arg("System");
link.arg("-l").arg("resolv");
link.arg("-l").arg("pthread");
link.arg("-l").arg("c");
link.arg("-l").arg("m");
}
link.arg("-o")
.arg(Path::new("../build").join(format!("{DLL_PREFIX}{out_name}{DLL_SUFFIX}")));
if let Some(parent) = paths[0].parent() {
link.current_dir(parent);
}
if !link.status().await?.success() {
bail!("failed to create shared library from cxx operator (c api)");
};
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 165
- 173
examples/c-dataflow/run.rs View File

@@ -1,186 +1,178 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::{
env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX},
path::Path,
};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("c-dataflow-runner").wrap_err("failed to set up tracing")?;
use std::env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX};
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

fn main() -> eyre::Result<()> {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

tokio::fs::create_dir_all("build").await?;

build_package("dora-node-api-c").await?;
build_c_node(root, "node.c", "c_node").await?;
build_c_node(root, "sink.c", "c_sink").await?;

build_package("dora-operator-api-c").await?;
build_c_operator(root).await?;

let dataflow = Path::new("dataflow.yml").to_owned();
run_dataflow(&dataflow).await?;

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

cmd!(sh, "cargo build --package dora-node-api-c").run()?;
cmd!(sh, "cargo build --package dora-operator-api-c").run()?;

sh.create_dir("build")?;
let target_debug = root.join("target").join("debug");

// compile nodes
let args: &[&str] = if cfg!(target_os = "linux") {
&["-l", "m", "-l", "rt", "-l", "dl", "-pthread"]
} else if cfg!(target_os = "windows") {
&[
"-ladvapi32",
"-luserenv",
"-lkernel32",
"-lws2_32",
"-lbcrypt",
"-lncrypt",
"-lschannel",
"-lntdll",
"-liphlpapi",
"-lcfgmgr32",
"-lcredui",
"-lcrypt32",
"-lcryptnet",
"-lfwpuclnt",
"-lgdi32",
"-lmsimg32",
"-lmswsock",
"-lole32",
"-loleaut32",
"-lopengl32",
"-lsecur32",
"-lshell32",
"-lsynchronization",
"-luser32",
"-lwinspool",
"-Wl,-nodefaultlib:libcmt",
"-D_DLL",
"-lmsvcrt",
]
} else if cfg!(target_os = "macos") {
&[
"-framework",
"CoreServices",
"-framework",
"Security",
"-l",
"System",
"-l",
"resolv",
"-l",
"pthread",
"-l",
"c",
"-l",
"m",
]
} else {
panic!("unsupported target platform")
};
Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
cmd!(
sh,
"clang node.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_node{EXE_SUFFIX}"
)
.run()?;
cmd!(
sh,
"clang sink.c -l dora_node_api_c {args...} -L {target_debug} --output build/c_sink{EXE_SUFFIX}"
)
.run()?;

// compile operator
let operator_args: &[&str] = if cfg!(unix) { &["-fPIC"] } else { &[] };
cmd!(
sh,
"clang -c operator.c -o build/operator.o -fdeclspec {operator_args...}"
)
.run()?;
// link operator
let operator_link_args: &[&str] = if cfg!(target_os = "windows") {
&[
"-ladvapi32",
"-luserenv",
"-lkernel32",
"-lws2_32",
"-lbcrypt",
"-lncrypt",
"-lschannel",
"-lntdll",
"-liphlpapi",
"-lcfgmgr32",
"-lcredui",
"-lcrypt32",
"-lcryptnet",
"-lfwpuclnt",
"-lgdi32",
"-lmsimg32",
"-lmswsock",
"-lole32",
"-loleaut32",
"-lopengl32",
"-lsecur32",
"-lshell32",
"-lsynchronization",
"-luser32",
"-lwinspool",
"-Wl,-nodefaultlib:libcmt",
"-D_DLL",
"-lmsvcrt",
]
} else if cfg!(target_os = "macos") {
&[
"-framework",
"CoreServices",
"-framework",
"Security",
"-l",
"System",
"-l",
"resolv",
"-l",
"pthread",
"-l",
"c",
"-l",
"m",
]
} else {
&[]
};
Ok(())
}
cmd!(
sh,
"clang -shared build/operator.o -L {target_debug} -l dora_operator_api_c {operator_link_args...} -o build/{DLL_PREFIX}operator{DLL_SUFFIX}"
).run()?;

async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> {
let mut clang = tokio::process::Command::new("clang");
clang.arg(name);
clang.arg("-l").arg("dora_node_api_c");
#[cfg(target_os = "linux")]
{
clang.arg("-l").arg("m");
clang.arg("-l").arg("rt");
clang.arg("-l").arg("dl");
clang.arg("-pthread");
}
#[cfg(target_os = "windows")]
{
clang.arg("-ladvapi32");
clang.arg("-luserenv");
clang.arg("-lkernel32");
clang.arg("-lws2_32");
clang.arg("-lbcrypt");
clang.arg("-lncrypt");
clang.arg("-lschannel");
clang.arg("-lntdll");
clang.arg("-liphlpapi");
// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

clang.arg("-lcfgmgr32");
clang.arg("-lcredui");
clang.arg("-lcrypt32");
clang.arg("-lcryptnet");
clang.arg("-lfwpuclnt");
clang.arg("-lgdi32");
clang.arg("-lmsimg32");
clang.arg("-lmswsock");
clang.arg("-lole32");
clang.arg("-loleaut32");
clang.arg("-lopengl32");
clang.arg("-lsecur32");
clang.arg("-lshell32");
clang.arg("-lsynchronization");
clang.arg("-luser32");
clang.arg("-lwinspool");
// start running the dataflow.yml
cmd!(sh, "{dora} start dataflow.yml --attach").run()?;

// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

clang.arg("-Wl,-nodefaultlib:libcmt");
clang.arg("-D_DLL");
clang.arg("-lmsvcrt");
}
#[cfg(target_os = "macos")]
{
clang.arg("-framework").arg("CoreServices");
clang.arg("-framework").arg("Security");
clang.arg("-l").arg("System");
clang.arg("-l").arg("resolv");
clang.arg("-l").arg("pthread");
clang.arg("-l").arg("c");
clang.arg("-l").arg("m");
}
clang.arg("-L").arg(root.join("target").join("debug"));
clang
.arg("--output")
.arg(Path::new("build").join(format!("{out_name}{EXE_SUFFIX}")));
if !clang.status().await?.success() {
bail!("failed to compile c node");
};
Ok(())
}

async fn build_c_operator(root: &Path) -> eyre::Result<()> {
let mut compile = tokio::process::Command::new("clang");
compile.arg("-c").arg("operator.c");
compile.arg("-o").arg("build/operator.o");
compile.arg("-fdeclspec");
#[cfg(unix)]
compile.arg("-fPIC");
if !compile.status().await?.success() {
bail!("failed to compile c operator");
};

let mut link = tokio::process::Command::new("clang");
link.arg("-shared").arg("build/operator.o");
link.arg("-L").arg(root.join("target").join("debug"));
link.arg("-l").arg("dora_operator_api_c");
#[cfg(target_os = "windows")]
{
link.arg("-ladvapi32");
link.arg("-luserenv");
link.arg("-lkernel32");
link.arg("-lws2_32");
link.arg("-lbcrypt");
link.arg("-lncrypt");
link.arg("-lschannel");
link.arg("-lntdll");
link.arg("-liphlpapi");

link.arg("-lcfgmgr32");
link.arg("-lcredui");
link.arg("-lcrypt32");
link.arg("-lcryptnet");
link.arg("-lfwpuclnt");
link.arg("-lgdi32");
link.arg("-lmsimg32");
link.arg("-lmswsock");
link.arg("-lole32");
link.arg("-loleaut32");
link.arg("-lopengl32");
link.arg("-lsecur32");
link.arg("-lshell32");
link.arg("-lsynchronization");
link.arg("-luser32");
link.arg("-lwinspool");

link.arg("-Wl,-nodefaultlib:libcmt");
link.arg("-D_DLL");
link.arg("-lmsvcrt");
}
#[cfg(target_os = "macos")]
{
link.arg("-framework").arg("CoreServices");
link.arg("-framework").arg("Security");
link.arg("-l").arg("System");
link.arg("-l").arg("resolv");
link.arg("-l").arg("pthread");
link.arg("-l").arg("c");
link.arg("-l").arg("m");
}
link.arg("-o")
.arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}")));
if !link.status().await?.success() {
bail!("failed to link c operator");
};
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 44
- 49
examples/cmake-dataflow/run.rs View File

@@ -1,9 +1,9 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;
use eyre::{Context, ContextCompat};
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
fn main() -> eyre::Result<()> {
set_up_tracing("cmake-dataflow-runner").wrap_err("failed to set up tracing")?;

if cfg!(windows) {
@@ -13,60 +13,55 @@ async fn main() -> eyre::Result<()> {
return Ok(());
}

// create a new shell in this folder
let sh = prepare_shell()?;

// build C++ source code using cmake
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
cmd!(sh, "cmake -DDORA_ROOT_DIR={root} -B build .").run()?;
cmd!(sh, "cmake --build build").run()?;
cmd!(sh, "cmake --install build").run()?;

tokio::fs::create_dir_all("build").await?;
let mut cmd = tokio::process::Command::new("cmake");
cmd.arg(format!("-DDORA_ROOT_DIR={}", root.display()));
cmd.arg("-B").arg("build");
cmd.arg(".");
if !cmd.status().await?.success() {
bail!("failed to generating make file");
}
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

let mut cmd = tokio::process::Command::new("cmake");
cmd.arg("--build").arg("build");
if !cmd.status().await?.success() {
bail!("failed to build a cmake-generated project binary tree");
}
// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

let mut cmd = tokio::process::Command::new("cmake");
cmd.arg("--install").arg("build");
if !cmd.status().await?.success() {
bail!("failed to build a cmake-generated project binary tree");
}
// start running the dataflow.yml -> outputs the UUID assigned to the dataflow
let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?;
let uuid = output.lines().next().context("no output")?;

// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

let dataflow = Path::new("dataflow.yml").to_owned();
build_package("dora-runtime").await?;
run_dataflow(&dataflow).await?;
// verify that the node output was written to `out`
sh.change_dir("out");
sh.change_dir(uuid);
let sink_output = sh.read_file("log_runtime-node-2.txt")?;
if sink_output.lines().count() < 20 {
eyre::bail!("sink did not receive the expected number of messages")
}

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
}
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 74
- 197
examples/multiple-daemons/run.rs View File

@@ -1,219 +1,96 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT},
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};

use eyre::ContextCompat;
use std::{
collections::BTreeSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
path::{Path, PathBuf},
process::Command,
time::Duration,
};
use tokio::{
sync::{
mpsc::{self, Sender},
oneshot,
},
task::JoinSet,
};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("multiple-daemon-runner").wrap_err("failed to set up tracing subscriber")?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
use xshell::{cmd, Shell};

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;
fn main() -> eyre::Result<()> {
// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1);
let coordinator_bind = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_DEFAULT,
);
let (coordinator_port, coordinator) =
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx))
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A");
let daemon_b = run_daemon(coordinator_addr.to_string(), "B");
// build the dataflow using `dora build`
cmd!(sh, "{dora} build dataflow.yml").run()?;

tracing::info!("Spawning coordinator and daemons");
let mut tasks = JoinSet::new();
tasks.spawn(coordinator);
tasks.spawn(daemon_a);
tasks.spawn(daemon_b);

tracing::info!("waiting until daemons are connected to coordinator");
let mut retries = 0;
// start the dora coordinator (in background)
Command::from(cmd!(sh, "{dora} coordinator")).spawn()?;
// wait until coordinator is ready
loop {
let connected_machines = connected_machines(&coordinator_events_tx).await?;
if connected_machines.contains("A") && connected_machines.contains("B") {
break;
} else if retries > 20 {
bail!("daemon not connected after {retries} retries");
} else {
std::thread::sleep(Duration::from_millis(500));
retries += 1
match cmd!(sh, "{dora} list").quiet().ignore_stderr().run() {
Ok(_) => {
println!("coordinator connected");
break;
}
Err(_) => {
eprintln!("waiting for coordinator");
std::thread::sleep(Duration::from_millis(100))
}
}
}

tracing::info!("starting dataflow");
let uuid = start_dataflow(dataflow, &coordinator_events_tx).await?;
tracing::info!("started dataflow under ID `{uuid}`");

let running = running_dataflows(&coordinator_events_tx).await?;
if !running.iter().map(|d| d.uuid).any(|id| id == uuid) {
bail!("dataflow `{uuid}` is not running");
}

tracing::info!("waiting for dataflow `{uuid}` to finish");
let mut retries = 0;
// start two daemons (in background)
Command::from(cmd!(sh, "{dora} daemon --machine-id A")).spawn()?;
Command::from(cmd!(sh, "{dora} daemon --machine-id B")).spawn()?;
// wait until both daemons are connected
loop {
let running = running_dataflows(&coordinator_events_tx).await?;
if running.is_empty() {
break;
} else if retries > 100 {
bail!("dataflow not finished after {retries} retries");
} else {
tracing::debug!("not done yet");
std::thread::sleep(Duration::from_millis(500));
retries += 1
let output = cmd!(sh, "{dora} connected-machines")
.quiet()
.ignore_stderr()
.read();
match output {
Ok(output) => {
let connected: Vec<&str> = output.lines().collect();
if connected == ["A", "B"] {
println!("both daemons connected");
break;
} else {
eprintln!("not all daemons connected yet (connected: {connected:?})");
}
}
Err(err) => eprintln!("failed to query connected-machines: {err:?}"),
}
std::thread::sleep(Duration::from_millis(100));
}
tracing::info!("dataflow `{uuid}` finished, destroying coordinator");
destroy(&coordinator_events_tx).await?;

tracing::info!("joining tasks");
while let Some(res) = tasks.join_next().await {
res.unwrap()?;
// start running the dataflow.yml -> outputs the UUID assigned to the dataflow
println!("starting dataflow");
let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?;
println!("dataflow finished successfully");
let uuid = output.lines().next().context("no output")?;

// stop the coordinator and both daemons again
cmd!(sh, "{dora} destroy").run()?;

// verify that the node output was written to `out`
sh.change_dir("out");
sh.change_dir(uuid);
let sink_output = sh.read_file("log_rust-sink.txt")?;
if sink_output.lines().count() < 50 {
eyre::bail!("sink did not receive the expected number of messages")
}

tracing::info!("done");
Ok(())
}

async fn start_dataflow(
dataflow: &Path,
coordinator_events_tx: &Sender<Event>,
) -> eyre::Result<Uuid> {
let dataflow_descriptor = Descriptor::read(dataflow)
.await
.wrap_err("failed to read yaml dataflow")?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
dataflow_descriptor
.check(&working_dir)
.wrap_err("could not validate yaml")?;

let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::Start {
dataflow: dataflow_descriptor,
local_working_dir: working_dir,
name: None,
},
reply_sender,
}))
.await?;
let result = reply.await??;
let uuid = match result {
ControlRequestReply::DataflowStarted { uuid } => uuid,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};
Ok(uuid)
}

async fn connected_machines(
coordinator_events_tx: &Sender<Event>,
) -> eyre::Result<BTreeSet<String>> {
let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::ConnectedMachines,
reply_sender,
}))
.await?;
let result = reply.await??;
let machines = match result {
ControlRequestReply::ConnectedMachines(machines) => machines,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};
Ok(machines)
}

async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Result<Vec<DataflowId>> {
let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::List,
reply_sender,
}))
.await?;
let result = reply.await??;
let dataflows = match result {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};
Ok(dataflows)
}

async fn destroy(coordinator_events_tx: &Sender<Event>) -> eyre::Result<()> {
let (reply_sender, reply) = oneshot::channel();
coordinator_events_tx
.send(Event::Control(ControlEvent::IncomingRequest {
request: ControlRequest::Destroy,
reply_sender,
}))
.await?;
let result = reply.await??;
match result {
ControlRequestReply::DestroyOk => Ok(()),
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--machine-id")
.arg(machine_id)
.arg("--coordinator-addr")
.arg(coordinator);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 90
- 82
examples/python-dataflow/run.rs View File

@@ -1,30 +1,73 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_download::download_file;
use dora_tracing::set_up_tracing;
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;
use dora_core::get_python_path;
use dora_download::download_file_sync;
use eyre::{ContextCompat, WrapErr};
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("python-dataflow-runner")?;
fn main() -> eyre::Result<()> {
// create a new shell in this folder
let sh = prepare_shell()?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
// prepare Python virtual environment
prepare_venv(&sh)?;

// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

// install/upgrade pip, then install requirements
cmd!(sh, "python -m pip install --upgrade pip").run()?;
cmd!(sh, "pip install -r requirements.txt").run()?;

// build the dora Python package (you can skip this if you installed the Python dora package)
{
let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("apis")
.join("python")
.join("node");
let _dir = sh.push_dir(python_node_api_dir);
cmd!(sh, "maturin develop").run()?;
}

run(
get_python_path().context("Could not get python binary")?,
&["-m", "venv", "../.env"],
None,
download_file_sync(
"https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt",
Path::new("yolov8n.pt"),
)
.await
.context("failed to create venv")?;
let venv = &root.join("examples").join(".env");
std::env::set_var(
.context("Could not download weights.")?;

// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

// start running the dataflow.yml -> outputs the UUID assigned to the dataflow
let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?;
let uuid = output.lines().next().context("no output")?;

// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

// verify that the node output was written to `out`
sh.change_dir("out");
sh.change_dir(uuid);
let sink_output = sh.read_file("log_object_detection.txt")?;
if sink_output.lines().count() < 50 {
eyre::bail!("object dectection node did not receive the expected number of messages")
}

Ok(())
}

/// Prepares a Python virtual environment.
///
/// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate`
/// if you're running bash.
fn prepare_venv(sh: &Shell) -> eyre::Result<()> {
let python = get_python_path().context("Could not get python binary")?;
cmd!(sh, "{python} -m venv ../.env").run()?;
let venv = sh.current_dir().parent().unwrap().join(".env");
sh.set_var(
"VIRTUAL_ENV",
venv.to_str().context("venv path not valid unicode")?,
);
let orig_path = std::env::var("PATH")?;
// bin folder is named Scripts on windows.
// 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1
let venv_bin = if cfg!(windows) {
@@ -32,71 +75,36 @@ async fn main() -> eyre::Result<()> {
} else {
venv.join("bin")
};
let path_separator = if cfg!(windows) { ';' } else { ':' };

if cfg!(windows) {
std::env::set_var(
"PATH",
format!(
"{};{orig_path}",
venv_bin.to_str().context("venv path not valid unicode")?
),
);
} else {
std::env::set_var(
"PATH",
format!(
"{}:{orig_path}",
venv_bin.to_str().context("venv path not valid unicode")?
),
);
}

run(
get_python_path().context("Could not get pip binary")?,
&["-m", "pip", "install", "--upgrade", "pip"],
None,
)
.await
.context("failed to install pip")?;
run(
get_pip_path().context("Could not get pip binary")?,
&["install", "-r", "requirements.txt"],
None,
)
.await
.context("pip install failed")?;

run(
"maturin",
&["develop"],
Some(&root.join("apis").join("python").join("node")),
)
.await
.context("maturin develop failed")?;
download_file(
"https://github.com/ultralytics/assets/releases/download/v0.0.0/yolov8n.pt",
Path::new("yolov8n.pt"),
)
.await
.context("Could not download weights.")?;

let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;
sh.set_var(
"PATH",
format!(
"{}{path_separator}{}",
venv_bin.to_str().context("venv path not valid unicode")?,
std::env::var("PATH")?
),
);

Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 91
- 82
examples/python-operator-dataflow/run.rs View File

@@ -1,29 +1,72 @@
use dora_core::{get_pip_path, get_python_path, run};
use dora_tracing::set_up_tracing;
use eyre::{bail, ContextCompat, WrapErr};
use std::path::Path;
use dora_core::get_python_path;
use eyre::{ContextCompat, WrapErr};
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("python-operator-dataflow-runner")?;
fn main() -> eyre::Result<()> {
// create a new shell in this folder
let sh = prepare_shell()?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

run(
get_python_path().context("Could not get python binary")?,
&["-m", "venv", "../.env"],
None,
)
.await
.context("failed to create venv")?;
let venv = &root.join("examples").join(".env");
std::env::set_var(
// prepare Python virtual environment
prepare_venv(&sh)?;

// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

// install/upgrade pip, then install requirements
cmd!(sh, "python -m pip install --upgrade pip").run()?;
cmd!(sh, "pip install -r requirements.txt").run()?;

// build the dora Python package (you can skip this if you installed the Python dora package)
{
let python_node_api_dir = Path::new(env!("CARGO_MANIFEST_DIR"))
.join("apis")
.join("python")
.join("node");
let _dir = sh.push_dir(python_node_api_dir);
cmd!(sh, "maturin develop").run()?;
}

let dataflow = if std::env::var("CONDA_EXE").is_ok() {
Path::new("dataflow.yml")
} else {
Path::new("dataflow_conda.yml")
};

// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

// start running the dataflow.yml -> outputs the UUID assigned to the dataflow
let output = cmd!(sh, "{dora} start {dataflow} --attach").read_stderr()?;
let uuid = output.lines().next().context("no output")?;

// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

// verify that the node output was written to `out`
sh.change_dir("out");
sh.change_dir(uuid);
let sink_output = sh.read_file("log_object_detection.txt")?;
if sink_output.lines().count() < 50 {
eyre::bail!("object dectection node did not receive the expected number of messages")
}

Ok(())
}

/// Prepares a Python virtual environment.
///
/// You can use the normal `python3 -m venv .venv` + `source .venv/bin/activate`
/// if you're running bash.
fn prepare_venv(sh: &Shell) -> eyre::Result<()> {
let python = get_python_path().context("Could not get python binary")?;
cmd!(sh, "{python} -m venv ../.env").run()?;
let venv = sh.current_dir().parent().unwrap().join(".env");
sh.set_var(
"VIRTUAL_ENV",
venv.to_str().context("venv path not valid unicode")?,
);
let orig_path = std::env::var("PATH")?;
// bin folder is named Scripts on windows.
// 🤦‍♂️ See: https://github.com/pypa/virtualenv/commit/993ba1316a83b760370f5a3872b3f5ef4dd904c1
let venv_bin = if cfg!(windows) {
@@ -31,70 +74,36 @@ async fn main() -> eyre::Result<()> {
} else {
venv.join("bin")
};
let path_separator = if cfg!(windows) { ';' } else { ':' };

if cfg!(windows) {
std::env::set_var(
"PATH",
format!(
"{};{orig_path}",
venv_bin.to_str().context("venv path not valid unicode")?
),
);
} else {
std::env::set_var(
"PATH",
format!(
"{}:{orig_path}",
venv_bin.to_str().context("venv path not valid unicode")?
),
);
}

run(
get_python_path().context("Could not get pip binary")?,
&["-m", "pip", "install", "--upgrade", "pip"],
None,
)
.await
.context("failed to install pip")?;
run(
get_pip_path().context("Could not get pip binary")?,
&["install", "-r", "requirements.txt"],
None,
)
.await
.context("pip install failed")?;

run(
"maturin",
&["develop"],
Some(&root.join("apis").join("python").join("node")),
)
.await
.context("maturin develop failed")?;

if std::env::var("CONDA_EXE").is_ok() {
let dataflow = Path::new("dataflow.yml");
run_dataflow(dataflow).await?;
} else {
let dataflow = Path::new("dataflow_conda.yml");
run_dataflow(dataflow).await?;
}
sh.set_var(
"PATH",
format!(
"{}{path_separator}{}",
venv_bin.to_str().context("venv path not valid unicode")?,
std::env::var("PATH")?
),
);

Ok(())
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 29
- 34
examples/rust-dataflow-url/run.rs View File

@@ -1,46 +1,41 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;
use std::path::PathBuf;
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("rust-dataflow-url-runner").wrap_err("failed to set up tracing")?;
fn main() -> eyre::Result<()> {
// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
// build the dataflow using `dora build`
cmd!(sh, "{dora} build dataflow.yml").run()?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;
// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

run_dataflow(dataflow).await?;
// start running the dataflow.yml
cmd!(sh, "{dora} start dataflow.yml --attach").run()?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 42
- 35
examples/rust-dataflow/run.rs View File

@@ -1,46 +1,53 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;
use eyre::ContextCompat;
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("rust-dataflow-runner").wrap_err("failed to set up tracing subscriber")?;
fn main() -> eyre::Result<()> {
// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
// build the dataflow using `dora build`
cmd!(sh, "{dora} build dataflow.yml").run()?;

// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;
// start running the dataflow.yml -> outputs the UUID assigned to the dataflow
let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?;
let uuid = output.lines().next().context("no output")?;

run_dataflow(dataflow).await?;
// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

// verify that the node output was written to `out`
sh.change_dir("out");
sh.change_dir(uuid);
let sink_output = sh.read_file("log_rust-sink.txt")?;
if sink_output.lines().count() < 50 {
eyre::bail!("sink did not receive the expected number of messages")
}

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 44
- 33
examples/rust-ros2-dataflow/run.rs View File

@@ -1,46 +1,57 @@
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::Path;
use eyre::ContextCompat;
use std::path::{Path, PathBuf};
use xshell::{cmd, Shell};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing("rust-ros2-dataflow-runner").wrap_err("failed to set up tracing subscriber")?;
// create a new shell in this folder
let sh = prepare_shell()?;
// build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
let dora = prepare_dora(&sh)?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;
// build the dataflow using `dora build`
cmd!(sh, "{dora} build dataflow.yml").run()?;

// start up the dora daemon and coordinator
cmd!(sh, "{dora} up").run()?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;
// start running the dataflow.yml -> outputs the UUID assigned to the dataflow
let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?;
let uuid = output.lines().next().context("no output")?;

run_dataflow(dataflow).await?;
// stop the dora daemon and coordinator again
cmd!(sh, "{dora} destroy").run()?;

// verify that the node output was written to `out`
sh.change_dir("out");
sh.change_dir(uuid);
let sink_output = sh.read_file("log_rust-node.txt")?;
if !sink_output
.lines()
.any(|l| l.starts_with("received pose event: Ok("))
{
eyre::bail!("node did not receive any pose events")
}

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
/// Prepares a shell and set the working directory to the parent folder of this file.
///
/// You can use your system shell instead (e.g. `bash`);
fn prepare_shell() -> Result<Shell, eyre::Error> {
let sh = Shell::new()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
sh.change_dir(root.join(file!()).parent().unwrap());
Ok(sh)
}

async fn run_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--")
.arg("daemon")
.arg("--run-dataflow")
.arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to run dataflow");
};
Ok(())
/// Build the `dora` command-line executable from this repo.
///
/// You can skip this step and run `cargo install dora-cli --locked` instead.
fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
cmd!(sh, "cargo build --package dora-cli").run()?;
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
let dora = root.join("target").join("debug").join("dora");
Ok(dora)
}

+ 7
- 0
libraries/extensions/download/src/lib.rs View File

@@ -5,6 +5,13 @@ use std::path::Path;
use tokio::io::AsyncWriteExt;
use tracing::info;

pub fn download_file_sync<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
tokio::runtime::Runtime::new()?.block_on(download_file(url, target_path))
}

pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,


Loading…
Cancel
Save