Browse Source

Merge pull request #108 from dora-rs/url-source

Implement support for downloading operator sources
tags/v0.0.0-test-pr-120
Philipp Oppermann GitHub 3 years ago
parent
commit
e9b7ca1e7f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 560 additions and 96 deletions
  1. +19
    -0
      .github/workflows/ci.yml
  2. +216
    -18
      Cargo.lock
  3. +5
    -0
      Cargo.toml
  4. +28
    -17
      binaries/cli/src/check.rs
  5. +1
    -0
      binaries/coordinator/Cargo.toml
  6. +34
    -18
      binaries/coordinator/src/run/custom.rs
  7. +1
    -0
      binaries/coordinator/src/run/mod.rs
  8. +1
    -0
      binaries/runtime/Cargo.toml
  9. +23
    -5
      binaries/runtime/src/operator/mod.rs
  10. +23
    -2
      binaries/runtime/src/operator/python.rs
  11. +25
    -3
      binaries/runtime/src/operator/shared_lib.rs
  12. +2
    -2
      examples/c++-dataflow/dataflow.yml
  13. +2
    -2
      examples/c-dataflow/dataflow.yml
  14. +2
    -2
      examples/iceoryx/dataflow.yml
  15. +2
    -2
      examples/python-dataflow/dataflow.yml
  16. +2
    -2
      examples/python-dataflow/dataflow_without_webcam.yml
  17. +27
    -0
      examples/rust-dataflow-url/dataflow.yml
  18. +44
    -0
      examples/rust-dataflow-url/run.rs
  19. +10
    -0
      examples/rust-dataflow-url/sink/Cargo.toml
  20. +28
    -0
      examples/rust-dataflow-url/sink/src/main.rs
  21. +2
    -2
      examples/rust-dataflow/dataflow.yml
  22. +13
    -21
      libraries/core/src/descriptor/mod.rs
  23. +13
    -0
      libraries/extensions/download/Cargo.toml
  24. +37
    -0
      libraries/extensions/download/src/lib.rs

+ 19
- 0
.github/workflows/ci.yml View File

@@ -118,6 +118,25 @@ jobs:
command: run
args: --example iceoryx

examples-remote:
name: "Examples (Remote)"
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2

- name: Install Cap'n Proto and libacl-dev (Linux)
if: runner.os == 'Linux'
run: |
export DEBIAN_FRONTEND=noninteractive
sudo apt-get install -y capnproto libcapnp-dev libacl1-dev

- name: "Remote Rust Dataflow example"
uses: actions-rs/cargo@v1
timeout-minutes: 30
with:
command: run
args: --example rust-dataflow-url

clippy:
name: "Clippy"
runs-on: ubuntu-latest


+ 216
- 18
Cargo.lock View File

@@ -902,6 +902,7 @@ dependencies = [
"bincode",
"clap 3.2.20",
"dora-core",
"dora-download",
"dora-message",
"dora-node-api",
"eyre",
@@ -932,6 +933,16 @@ dependencies = [
"zenoh-config",
]

[[package]]
name = "dora-download"
version = "0.1.0"
dependencies = [
"eyre",
"reqwest",
"tempfile",
"tokio",
]

[[package]]
name = "dora-examples"
version = "0.0.0"
@@ -1054,6 +1065,7 @@ version = "0.1.0"
dependencies = [
"clap 3.2.20",
"dora-core",
"dora-download",
"dora-message",
"dora-metrics",
"dora-node-api",
@@ -1099,6 +1111,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"

[[package]]
name = "encoding_rs"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]

[[package]]
name = "env_logger"
version = "0.9.0"
@@ -1187,6 +1208,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"

[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]

[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"

[[package]]
name = "form_urlencoded"
version = "1.0.1"
@@ -1199,9 +1235,9 @@ dependencies = [

[[package]]
name = "futures"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
dependencies = [
"futures-channel",
"futures-core",
@@ -1214,9 +1250,9 @@ dependencies = [

[[package]]
name = "futures-channel"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
"futures-core",
"futures-sink",
@@ -1245,15 +1281,15 @@ dependencies = [

[[package]]
name = "futures-core"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"

[[package]]
name = "futures-executor"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
dependencies = [
"futures-core",
"futures-task",
@@ -1262,9 +1298,9 @@ dependencies = [

[[package]]
name = "futures-io"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"

[[package]]
name = "futures-lite"
@@ -1283,9 +1319,9 @@ dependencies = [

[[package]]
name = "futures-macro"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
@@ -1294,21 +1330,21 @@ dependencies = [

[[package]]
name = "futures-sink"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868"
checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9"

[[package]]
name = "futures-task"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea"

[[package]]
name = "futures-util"
version = "0.3.21"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-channel",
"futures-core",
@@ -1564,6 +1600,19 @@ dependencies = [
"tokio-io-timeout",
]

[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]

[[package]]
name = "iceoryx-example-node"
version = "0.1.0"
@@ -1688,6 +1737,12 @@ dependencies = [
"syn",
]

[[package]]
name = "ipnet"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"

[[package]]
name = "ipnetwork"
version = "0.18.0"
@@ -1884,6 +1939,12 @@ dependencies = [
"autocfg 1.1.0",
]

[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"

[[package]]
name = "mini_paste"
version = "0.1.11"
@@ -2023,6 +2084,24 @@ name = "napi-sys"
version = "1.0.0"
source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249"

[[package]]
name = "native-tls"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]

[[package]]
name = "ndk"
version = "0.7.0"
@@ -2232,12 +2311,51 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"

[[package]]
name = "openssl"
version = "0.10.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]

[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"

[[package]]
name = "openssl-sys"
version = "0.9.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce"
dependencies = [
"autocfg 1.1.0",
"cc",
"libc",
"pkg-config",
"vcpkg",
]

[[package]]
name = "opentelemetry"
version = "0.17.0"
@@ -2497,6 +2615,12 @@ dependencies = [
"zeroize",
]

[[package]]
name = "pkg-config"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"

[[package]]
name = "pnet"
version = "0.28.0"
@@ -2969,6 +3093,43 @@ dependencies = [
"winapi",
]

[[package]]
name = "reqwest"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]

[[package]]
name = "ring"
version = "0.16.20"
@@ -3248,6 +3409,18 @@ dependencies = [
"serde",
]

[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]

[[package]]
name = "serde_yaml"
version = "0.8.23"
@@ -3622,6 +3795,16 @@ dependencies = [
"syn",
]

[[package]]
name = "tokio-native-tls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio",
]

[[package]]
name = "tokio-stream"
version = "0.1.9"
@@ -4021,6 +4204,12 @@ dependencies = [
"version_check",
]

[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"

[[package]]
name = "vec_map"
version = "0.8.2"
@@ -4338,6 +4527,15 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"

[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]

[[package]]
name = "with_builtin_macros"
version = "0.0.3"


+ 5
- 0
Cargo.toml View File

@@ -13,6 +13,7 @@ members = [
"libraries/communication-layer",
"libraries/core",
"libraries/message",
"libraries/extensions/download",
"libraries/extensions/telemetry/*",
"libraries/extensions/zenoh-logger",
]
@@ -37,6 +38,10 @@ path = "examples/c-dataflow/run.rs"
name = "rust-dataflow"
path = "examples/rust-dataflow/run.rs"

[[example]]
name = "rust-dataflow-url"
path = "examples/rust-dataflow-url/run.rs"

[[example]]
name = "cxx-dataflow"
path = "examples/c++-dataflow/run.rs"


+ 28
- 17
binaries/cli/src/check.rs View File

@@ -2,7 +2,7 @@ use crate::graph::read_descriptor;
use dora_core::{
adjust_shared_library_path,
config::{InputMapping, UserInputMapping},
descriptor::{self, CoreNodeKind, OperatorSource},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource},
};
use eyre::{bail, eyre, Context};
use std::{env::consts::EXE_EXTENSION, path::Path};
@@ -39,15 +39,15 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
for node in &nodes {
match &node.kind {
descriptor::CoreNodeKind::Custom(node) => {
let mut args = node.run.split_ascii_whitespace();
let raw = Path::new(
args.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?,
);
let path = if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
let path = if source_is_url(&node.source) {
todo!("check URL");
} else {
raw.to_owned()
let raw = Path::new(&node.source);
if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
} else {
raw.to_owned()
}
};
base.join(&path)
.canonicalize()
@@ -57,20 +57,31 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
for operator_definition in &node.operators {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
let path = adjust_shared_library_path(path)?;

if !base.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
if source_is_url(path) {
todo!("check URL");
} else {
let path = adjust_shared_library_path(Path::new(&path))?;
if !base.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
}
}
}
OperatorSource::Python(path) => {
if !base.join(&path).exists() {
bail!("no Python library at `{}`", path.display());
if source_is_url(path) {
todo!("check URL");
} else {
if !base.join(&path).exists() {
bail!("no Python library at `{path}`");
}
}
}
OperatorSource::Wasm(path) => {
if !base.join(&path).exists() {
bail!("no WASM library at `{}`", path.display());
if source_is_url(path) {
todo!("check URL");
} else {
if !base.join(&path).exists() {
bail!("no WASM library at `{path}`");
}
}
}
}


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -27,3 +27,4 @@ tracing-subscriber = "0.3.15"
futures-concurrency = "5.0.1"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
serde_json = "1.0.86"
dora-download = { path = "../../libraries/extensions/download" }

+ 34
- 18
binaries/coordinator/src/run/custom.rs View File

@@ -1,34 +1,46 @@
use super::command_init_common_env;
use dora_core::{config::NodeId, descriptor};
use dora_core::{
config::NodeId,
descriptor::{self, source_is_url},
};
use dora_download::download_file;
use eyre::{eyre, WrapErr};
use std::{env::consts::EXE_EXTENSION, path::Path};

#[tracing::instrument]
pub(super) fn spawn_custom_node(
pub(super) async fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
communication: &dora_core::config::CommunicationConfig,
working_dir: &Path,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
let cmd = {
let raw = Path::new(
args.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?,
);
let path = if raw.extension().is_none() {
let path = if source_is_url(&node.source) {
// try to download the shared library
let target_path = Path::new("build")
.join(node_id.to_string())
.with_extension(EXE_EXTENSION);
download_file(&node.source, &target_path)
.await
.wrap_err("failed to download custom node")?;
target_path
} else {
let raw = Path::new(&node.source);
if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
} else {
raw.to_owned()
};
working_dir
.join(&path)
.canonicalize()
.wrap_err_with(|| format!("no node exists at `{}`", path.display()))?
}
};

let cmd = working_dir
.join(&path)
.canonicalize()
.wrap_err_with(|| format!("no node exists at `{}`", path.display()))?;

let mut command = tokio::process::Command::new(cmd);
command.args(args);
if let Some(args) = &node.args {
command.args(args.split_ascii_whitespace());
}
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
@@ -45,9 +57,13 @@ pub(super) fn spawn_custom_node(
}
}

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
let mut child = command.spawn().wrap_err_with(|| {
format!(
"failed to run executable `{}` with args `{}`",
path.display(),
node.args.as_deref().unwrap_or_default()
)
})?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {


+ 1
- 0
binaries/coordinator/src/run/mod.rs View File

@@ -66,6 +66,7 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul
descriptor::CoreNodeKind::Custom(node) => {
let result =
spawn_custom_node(node_id.clone(), &node, &communication_config, &working_dir)
.await
.wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?;
tasks.push(result);
}


+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -37,6 +37,7 @@ flume = "0.10.14"
dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
dora-download = { path = "../../libraries/extensions/download" }

[features]
tracing = ["opentelemetry", "dora-tracing"]


+ 23
- 5
binaries/runtime/src/operator/mod.rs View File

@@ -55,23 +55,41 @@ pub fn spawn_operator(
let tracer = ();

match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| {
OperatorSource::SharedLibrary(source) => {
shared_lib::spawn(
node_id,
&operator_definition.id,
source,
events_tx,
inputs,
publishers,
tracer,
)
.wrap_err_with(|| {
format!(
"failed to spawn shared library operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| {
OperatorSource::Python(source) => {
python::spawn(
node_id,
&operator_definition.id,
source,
events_tx,
inputs,
publishers,
tracer,
)
.wrap_err_with(|| {
format!(
"failed to spawn Python operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Wasm(_path) => {
OperatorSource::Wasm(_) => {
tracing::error!("WASM operators are not supported yet");
}
}


+ 23
- 2
binaries/runtime/src/operator/python.rs View File

@@ -1,7 +1,11 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use super::{OperatorEvent, Tracer};
use dora_core::config::DataId;
use dora_core::{
config::{DataId, NodeId, OperatorId},
descriptor::source_is_url,
};
use dora_download::download_file;
use dora_message::uhlc;
use dora_node_api::communication::Publisher;
use dora_operator_api_python::metadata_to_pydict;
@@ -35,12 +39,29 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report {
}

pub fn spawn(
path: &Path,
node_id: &NodeId,
operator_id: &OperatorId,
source: &str,
events_tx: Sender<OperatorEvent>,
inputs: flume::Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
tracer: Tracer,
) -> eyre::Result<()> {
let path = if source_is_url(source) {
let target_path = Path::new("build")
.join(node_id.to_string())
.join(operator_id.to_string());
// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(source, &target_path))
.wrap_err("failed to download Python operator")?;
target_path
} else {
Path::new(source).to_owned()
};

if !path.exists() {
bail!("No python file exists at {}", path.display());
}


+ 25
- 3
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,5 +1,10 @@
use super::{OperatorEvent, Tracer};
use dora_core::{adjust_shared_library_path, config::DataId};
use dora_core::{
adjust_shared_library_path,
config::{DataId, NodeId, OperatorId},
descriptor::source_is_url,
};
use dora_download::download_file;
use dora_message::uhlc;
use dora_node_api::communication::Publisher;
use dora_operator_api_types::{
@@ -21,13 +26,30 @@ use std::{
use tokio::sync::mpsc::Sender;

pub fn spawn(
path: &Path,
node_id: &NodeId,
operator_id: &OperatorId,
source: &str,
events_tx: Sender<OperatorEvent>,
inputs: Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
tracer: Tracer,
) -> eyre::Result<()> {
let path = adjust_shared_library_path(path)?;
let path = if source_is_url(source) {
let target_path = adjust_shared_library_path(
&Path::new("build")
.join(node_id.to_string())
.join(operator_id.to_string()),
)?;
// try to download the shared library
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(download_file(source, &target_path))
.wrap_err("failed to download shared library operator")?;
target_path
} else {
adjust_shared_library_path(Path::new(source))?
};

let library = unsafe {
libloading::Library::new(&path)


+ 2
- 2
examples/c++-dataflow/dataflow.yml View File

@@ -5,14 +5,14 @@ communication:
nodes:
- id: cxx-node-rust-api
custom:
run: ../../target/debug/cxx-dataflow-example-node-rust-api
source: ../../target/debug/cxx-dataflow-example-node-rust-api
inputs:
tick: dora/timer/millis/300
outputs:
- counter
- id: cxx-node-c-api
custom:
run: build/node_c_api
source: build/node_c_api
inputs:
tick: dora/timer/millis/300
outputs:


+ 2
- 2
examples/c-dataflow/dataflow.yml View File

@@ -5,7 +5,7 @@ communication:
nodes:
- id: c_node
custom:
run: build/c_node
source: build/c_node
inputs:
timer: dora/timer/secs/1
outputs:
@@ -20,6 +20,6 @@ nodes:
- counter
- id: c_sink
custom:
run: build/c_sink
source: build/c_sink
inputs:
counter: runtime-node/c_operator/counter

+ 2
- 2
examples/iceoryx/dataflow.yml View File

@@ -5,7 +5,7 @@ communication:
nodes:
- id: rust-node
custom:
run: ../../target/debug/iceoryx-example-node
source: ../../target/debug/iceoryx-example-node
inputs:
tick: dora/timer/millis/300
outputs:
@@ -21,6 +21,6 @@ nodes:
- status
- id: rust-sink
custom:
run: ../../target/debug/iceoryx-example-sink
source: ../../target/debug/iceoryx-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 2
- 2
examples/python-dataflow/dataflow.yml View File

@@ -5,12 +5,12 @@ communication:
nodes:
- id: webcam
custom:
run: ./webcam.py
source: ./webcam.py
inputs:
tick: dora/timer/millis/100
outputs:
- image
- id: object_detection
operator:
python: object_detection.py


+ 2
- 2
examples/python-dataflow/dataflow_without_webcam.yml View File

@@ -5,12 +5,12 @@ communication:
nodes:
- id: no_webcam
custom:
run: ./no_webcam.py
source: ./no_webcam.py
inputs:
tick: dora/timer/millis/100
outputs:
- image
- id: object_detection
operator:
python: object_detection.py


+ 27
- 0
examples/rust-dataflow-url/dataflow.yml View File

@@ -0,0 +1,27 @@
communication:
zenoh:
prefix: /example-rust-dataflow

nodes:
- id: rust-node
custom:
source: https://github.com/dora-rs/dora/releases/download/v0.0.0-test.4/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/300
outputs:
- random
- id: runtime-node
operators:
- id: rust-operator
shared-library: https://github.com/dora-rs/dora/releases/download/v0.0.0-test.4/librust_dataflow_example_operator.so
inputs:
tick: dora/timer/millis/100
random: rust-node/random
outputs:
- status
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 44
- 0
examples/rust-dataflow-url/run.rs View File

@@ -0,0 +1,44 @@
use eyre::{bail, Context};
use std::path::Path;

#[tokio::main]
async fn main() -> eyre::Result<()> {
let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

let dataflow = Path::new("dataflow.yml");
build_dataflow(dataflow).await?;
build_package("dora-runtime").await?;

dora_coordinator::run(dora_coordinator::Args {
run_dataflow: dataflow.to_owned().into(),
runtime: Some(root.join("target").join("debug").join("dora-runtime")),
})
.await?;

Ok(())
}

async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("run");
cmd.arg("--package").arg("dora-cli");
cmd.arg("--").arg("build").arg(dataflow);
if !cmd.status().await?.success() {
bail!("failed to build dataflow");
};
Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

+ 10
- 0
examples/rust-dataflow-url/sink/Cargo.toml View File

@@ -0,0 +1,10 @@
[package]
name = "rust-dataflow-example-sink"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" }
eyre = "0.6.8"

+ 28
- 0
examples/rust-dataflow-url/sink/src/main.rs View File

@@ -0,0 +1,28 @@
use dora_node_api::{self, DoraNode};
use eyre::{bail, Context};

fn main() -> eyre::Result<()> {
let mut operator = DoraNode::init_from_env()?;

let inputs = operator.inputs()?;

while let Ok(input) = inputs.recv() {
match input.id.as_str() {
"message" => {
let data = input.data();
let received_string =
std::str::from_utf8(&data).wrap_err("received message was not utf8-encoded")?;
println!("received message: {}", received_string);
if !received_string.starts_with("operator received random value ") {
bail!("unexpected message format (should start with 'operator received random value')")
}
if !received_string.ends_with(" ticks") {
bail!("unexpected message format (should end with 'ticks')")
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
}
}

Ok(())
}

+ 2
- 2
examples/rust-dataflow/dataflow.yml View File

@@ -6,7 +6,7 @@ nodes:
- id: rust-node
custom:
build: cargo build -p rust-dataflow-example-node
run: ../../target/debug/rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/300
outputs:
@@ -24,6 +24,6 @@ nodes:
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
run: ../../target/debug/rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 13
- 21
libraries/core/src/descriptor/mod.rs View File

@@ -1,14 +1,12 @@
use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::{
collections::{BTreeMap, BTreeSet},
collections::{BTreeMap, BTreeSet, HashMap},
fmt,
path::PathBuf,
};
pub use visualize::collect_dora_timers;

use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId};

mod visualize;

#[derive(Debug, Serialize, Deserialize)]
@@ -166,21 +164,13 @@ pub struct OperatorConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(PathBuf),
Python(PathBuf),
Wasm(PathBuf),
}

impl OperatorSource {
pub fn canonicalize(&mut self) -> std::io::Result<()> {
let path = match self {
OperatorSource::SharedLibrary(path) => path,
OperatorSource::Python(path) => path,
OperatorSource::Wasm(path) => path,
};
*path = path.canonicalize()?;
Ok(())
}
SharedLibrary(String),
Python(String),
Wasm(String),
}

pub fn source_is_url(source: &str) -> bool {
source.contains("://")
}

#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -194,7 +184,9 @@ pub struct PythonOperatorConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomNode {
pub run: String,
pub source: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
pub env: Option<BTreeMap<String, EnvValue>>,
pub working_directory: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]


+ 13
- 0
libraries/extensions/download/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "dora-download"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
eyre = "0.6.8"
tempfile = "3.3.0"
reqwest = "0.11.12"
tokio = { version = "1.17.0" }

+ 37
- 0
libraries/extensions/download/src/lib.rs View File

@@ -0,0 +1,37 @@
use eyre::Context;
#[cfg(unix)]
use std::os::unix::prelude::PermissionsExt;
use std::path::Path;
use tokio::io::AsyncWriteExt;

pub async fn download_file<T>(url: T, target_path: &Path) -> Result<(), eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
if let Some(parent) = target_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.wrap_err("failed to create parent folder")?;
}

let response = reqwest::get(url)
.await
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?
.bytes()
.await
.wrap_err("failed to read operator from `{uri}`")?;
let mut file = tokio::fs::File::create(target_path)
.await
.wrap_err("failed to create target file")?;
file.write_all(&response)
.await
.wrap_err("failed to write downloaded operator to file")?;
file.sync_all().await.wrap_err("failed to `sync_all`")?;

#[cfg(unix)]
file.set_permissions(std::fs::Permissions::from_mode(0o764))
.await
.wrap_err("failed to make downloaded file executable")?;

Ok(())
}

Loading…
Cancel
Save