diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 85d114f1..6361e3ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -118,6 +118,25 @@ jobs: command: run args: --example iceoryx + examples-remote: + name: "Examples (Remote)" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install Cap'n Proto and libacl-dev (Linux) + if: runner.os == 'Linux' + run: | + export DEBIAN_FRONTEND=noninteractive + sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + + - name: "Remote Rust Dataflow example" + uses: actions-rs/cargo@v1 + timeout-minutes: 30 + with: + command: run + args: --example rust-dataflow-url + clippy: name: "Clippy" runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index ae591b71..b6fb1933 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -902,6 +902,7 @@ dependencies = [ "bincode", "clap 3.2.20", "dora-core", + "dora-download", "dora-message", "dora-node-api", "eyre", @@ -932,6 +933,16 @@ dependencies = [ "zenoh-config", ] +[[package]] +name = "dora-download" +version = "0.1.0" +dependencies = [ + "eyre", + "reqwest", + "tempfile", + "tokio", +] + [[package]] name = "dora-examples" version = "0.0.0" @@ -1054,6 +1065,7 @@ version = "0.1.0" dependencies = [ "clap 3.2.20", "dora-core", + "dora-download", "dora-message", "dora-metrics", "dora-node-api", @@ -1099,6 +1111,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -1187,6 +1208,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1199,9 +1235,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" dependencies = [ "futures-channel", "futures-core", @@ -1214,9 +1250,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", "futures-sink", @@ -1245,15 +1281,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] name = "futures-executor" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" dependencies = [ "futures-core", "futures-task", @@ -1262,9 +1298,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" [[package]] name = "futures-lite" @@ -1283,9 +1319,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", @@ -1294,21 +1330,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-channel", "futures-core", @@ -1564,6 +1600,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iceoryx-example-node" version = "0.1.0" @@ -1688,6 +1737,12 @@ dependencies = [ "syn", ] +[[package]] +name = "ipnet" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + [[package]] name = "ipnetwork" version = "0.18.0" @@ -1884,6 +1939,12 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "mini_paste" version = "0.1.11" @@ -2023,6 +2084,24 @@ name = "napi-sys" version = "1.0.0" source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" +[[package]] +name = "native-tls" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk" version = "0.7.0" @@ -2232,12 +2311,51 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce" +dependencies = [ + "autocfg 1.1.0", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.17.0" @@ -2497,6 +2615,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + [[package]] name = "pnet" version = "0.28.0" @@ -2969,6 +3093,43 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -3248,6 +3409,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.8.23" @@ -3622,6 +3795,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.9" @@ -4021,6 +4204,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -4338,6 +4527,15 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "with_builtin_macros" version = "0.0.3" diff --git a/Cargo.toml b/Cargo.toml index 4a806645..3438425c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "libraries/communication-layer", "libraries/core", "libraries/message", + "libraries/extensions/download", "libraries/extensions/telemetry/*", "libraries/extensions/zenoh-logger", ] @@ -37,6 +38,10 @@ path = "examples/c-dataflow/run.rs" name = "rust-dataflow" path = "examples/rust-dataflow/run.rs" +[[example]] +name = "rust-dataflow-url" +path = "examples/rust-dataflow-url/run.rs" + [[example]] name = "cxx-dataflow" path = "examples/c++-dataflow/run.rs" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 01ce395c..d4163d8c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -2,7 +2,7 @@ use crate::graph::read_descriptor; use dora_core::{ adjust_shared_library_path, config::{InputMapping, UserInputMapping}, - descriptor::{self, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -39,15 +39,15 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { for node in &nodes { match &node.kind { descriptor::CoreNodeKind::Custom(node) => { - let mut args = node.run.split_ascii_whitespace(); - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { - raw.with_extension(EXE_EXTENSION) + let path = if source_is_url(&node.source) { + todo!("check URL"); } else { - raw.to_owned() + let raw = Path::new(&node.source); + if raw.extension().is_none() { + raw.with_extension(EXE_EXTENSION) + } else { + raw.to_owned() + } }; base.join(&path) .canonicalize() @@ -57,20 +57,31 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { for operator_definition in &node.operators { match &operator_definition.config.source { OperatorSource::SharedLibrary(path) => { - let path = adjust_shared_library_path(path)?; - - if !base.join(&path).exists() { - bail!("no shared library at `{}`", path.display()); + if source_is_url(path) { + todo!("check URL"); + } else { + let path = adjust_shared_library_path(Path::new(&path))?; + if !base.join(&path).exists() { + bail!("no shared library at `{}`", path.display()); + } } } OperatorSource::Python(path) => { - if !base.join(&path).exists() { - bail!("no Python library at `{}`", path.display()); + if source_is_url(path) { + todo!("check URL"); + } else { + if !base.join(&path).exists() { + bail!("no Python library at `{path}`"); + } } } OperatorSource::Wasm(path) => { - if !base.join(&path).exists() { - bail!("no WASM library at `{}`", path.display()); + if source_is_url(path) { + todo!("check URL"); + } else { + if !base.join(&path).exists() { + bail!("no WASM library at `{path}`"); + } } } } diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index a8eba7db..3e242ce7 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -27,3 +27,4 @@ tracing-subscriber = "0.3.15" futures-concurrency = "5.0.1" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } serde_json = "1.0.86" +dora-download = { path = "../../libraries/extensions/download" } diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index e3411353..10ee4ad2 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -1,34 +1,46 @@ use super::command_init_common_env; -use dora_core::{config::NodeId, descriptor}; +use dora_core::{ + config::NodeId, + descriptor::{self, source_is_url}, +}; +use dora_download::download_file; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; #[tracing::instrument] -pub(super) fn spawn_custom_node( +pub(super) async fn spawn_custom_node( node_id: NodeId, node: &descriptor::CustomNode, communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut args = node.run.split_ascii_whitespace(); - let cmd = { - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { + let path = if source_is_url(&node.source) { + // try to download the shared library + let target_path = Path::new("build") + .join(node_id.to_string()) + .with_extension(EXE_EXTENSION); + download_file(&node.source, &target_path) + .await + .wrap_err("failed to download custom node")?; + target_path + } else { + let raw = Path::new(&node.source); + if raw.extension().is_none() { raw.with_extension(EXE_EXTENSION) } else { raw.to_owned() - }; - working_dir - .join(&path) - .canonicalize() - .wrap_err_with(|| format!("no node exists at `{}`", path.display()))? + } }; + let cmd = working_dir + .join(&path) + .canonicalize() + .wrap_err_with(|| format!("no node exists at `{}`", path.display()))?; + let mut command = tokio::process::Command::new(cmd); - command.args(args); + if let Some(args) = &node.args { + command.args(args.split_ascii_whitespace()); + } command_init_common_env(&mut command, &node_id, communication)?; command.env( "DORA_NODE_RUN_CONFIG", @@ -45,9 +57,13 @@ pub(super) fn spawn_custom_node( } } - let mut child = command - .spawn() - .wrap_err_with(|| format!("failed to run command `{}`", &node.run))?; + let mut child = command.spawn().wrap_err_with(|| { + format!( + "failed to run executable `{}` with args `{}`", + path.display(), + node.args.as_deref().unwrap_or_default() + ) + })?; let result = tokio::spawn(async move { let status = child.wait().await.context("child process failed")?; if status.success() { diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 1851944d..a098107e 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -66,6 +66,7 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul descriptor::CoreNodeKind::Custom(node) => { let result = spawn_custom_node(node_id.clone(), &node, &communication_config, &working_dir) + .await .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; tasks.push(result); } diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 4caea8b6..70fa7b92 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,6 +37,7 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" +dora-download = { path = "../../libraries/extensions/download" } [features] tracing = ["opentelemetry", "dora-tracing"] diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index f7801df6..4d8c246c 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -55,23 +55,41 @@ pub fn spawn_operator( let tracer = (); match &operator_definition.config.source { - OperatorSource::SharedLibrary(path) => { - shared_lib::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::SharedLibrary(source) => { + shared_lib::spawn( + node_id, + &operator_definition.id, + source, + events_tx, + inputs, + publishers, + tracer, + ) + .wrap_err_with(|| { format!( "failed to spawn shared library operator for {}", operator_definition.id ) })?; } - OperatorSource::Python(path) => { - python::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::Python(source) => { + python::spawn( + node_id, + &operator_definition.id, + source, + events_tx, + inputs, + publishers, + tracer, + ) + .wrap_err_with(|| { format!( "failed to spawn Python operator for {}", operator_definition.id ) })?; } - OperatorSource::Wasm(_path) => { + OperatorSource::Wasm(_) => { tracing::error!("WASM operators are not supported yet"); } } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 03362a4a..2f715f83 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,11 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use super::{OperatorEvent, Tracer}; -use dora_core::config::DataId; +use dora_core::{ + config::{DataId, NodeId, OperatorId}, + descriptor::source_is_url, +}; +use dora_download::download_file; use dora_message::uhlc; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; @@ -35,12 +39,29 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { } pub fn spawn( - path: &Path, + node_id: &NodeId, + operator_id: &OperatorId, + source: &str, events_tx: Sender, inputs: flume::Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { + let path = if source_is_url(source) { + let target_path = Path::new("build") + .join(node_id.to_string()) + .join(operator_id.to_string()); + // try to download the shared library + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(download_file(source, &target_path)) + .wrap_err("failed to download Python operator")?; + target_path + } else { + Path::new(source).to_owned() + }; + if !path.exists() { bail!("No python file exists at {}", path.display()); } diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 8a399995..71616bed 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,10 @@ use super::{OperatorEvent, Tracer}; -use dora_core::{adjust_shared_library_path, config::DataId}; +use dora_core::{ + adjust_shared_library_path, + config::{DataId, NodeId, OperatorId}, + descriptor::source_is_url, +}; +use dora_download::download_file; use dora_message::uhlc; use dora_node_api::communication::Publisher; use dora_operator_api_types::{ @@ -21,13 +26,30 @@ use std::{ use tokio::sync::mpsc::Sender; pub fn spawn( - path: &Path, + node_id: &NodeId, + operator_id: &OperatorId, + source: &str, events_tx: Sender, inputs: Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { - let path = adjust_shared_library_path(path)?; + let path = if source_is_url(source) { + let target_path = adjust_shared_library_path( + &Path::new("build") + .join(node_id.to_string()) + .join(operator_id.to_string()), + )?; + // try to download the shared library + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(download_file(source, &target_path)) + .wrap_err("failed to download shared library operator")?; + target_path + } else { + adjust_shared_library_path(Path::new(source))? + }; let library = unsafe { libloading::Library::new(&path) diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml index ab113dd5..23caf7cc 100644 --- a/examples/c++-dataflow/dataflow.yml +++ b/examples/c++-dataflow/dataflow.yml @@ -5,14 +5,14 @@ communication: nodes: - id: cxx-node-rust-api custom: - run: ../../target/debug/cxx-dataflow-example-node-rust-api + source: ../../target/debug/cxx-dataflow-example-node-rust-api inputs: tick: dora/timer/millis/300 outputs: - counter - id: cxx-node-c-api custom: - run: build/node_c_api + source: build/node_c_api inputs: tick: dora/timer/millis/300 outputs: diff --git a/examples/c-dataflow/dataflow.yml b/examples/c-dataflow/dataflow.yml index ed372971..b2f30e40 100644 --- a/examples/c-dataflow/dataflow.yml +++ b/examples/c-dataflow/dataflow.yml @@ -5,7 +5,7 @@ communication: nodes: - id: c_node custom: - run: build/c_node + source: build/c_node inputs: timer: dora/timer/secs/1 outputs: @@ -20,6 +20,6 @@ nodes: - counter - id: c_sink custom: - run: build/c_sink + source: build/c_sink inputs: counter: runtime-node/c_operator/counter diff --git a/examples/iceoryx/dataflow.yml b/examples/iceoryx/dataflow.yml index f8b447db..56b0face 100644 --- a/examples/iceoryx/dataflow.yml +++ b/examples/iceoryx/dataflow.yml @@ -5,7 +5,7 @@ communication: nodes: - id: rust-node custom: - run: ../../target/debug/iceoryx-example-node + source: ../../target/debug/iceoryx-example-node inputs: tick: dora/timer/millis/300 outputs: @@ -21,6 +21,6 @@ nodes: - status - id: rust-sink custom: - run: ../../target/debug/iceoryx-example-sink + source: ../../target/debug/iceoryx-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/examples/python-dataflow/dataflow.yml b/examples/python-dataflow/dataflow.yml index a31b5b7e..1aa1d0ac 100644 --- a/examples/python-dataflow/dataflow.yml +++ b/examples/python-dataflow/dataflow.yml @@ -5,12 +5,12 @@ communication: nodes: - id: webcam custom: - run: ./webcam.py + source: ./webcam.py inputs: tick: dora/timer/millis/100 outputs: - image - + - id: object_detection operator: python: object_detection.py diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index fdf14a2b..6b9a00af 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -5,12 +5,12 @@ communication: nodes: - id: no_webcam custom: - run: ./no_webcam.py + source: ./no_webcam.py inputs: tick: dora/timer/millis/100 outputs: - image - + - id: object_detection operator: python: object_detection.py diff --git a/examples/rust-dataflow-url/dataflow.yml b/examples/rust-dataflow-url/dataflow.yml new file mode 100644 index 00000000..81c8d239 --- /dev/null +++ b/examples/rust-dataflow-url/dataflow.yml @@ -0,0 +1,27 @@ +communication: + zenoh: + prefix: /example-rust-dataflow + +nodes: + - id: rust-node + custom: + source: https://github.com/dora-rs/dora/releases/download/v0.0.0-test.4/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/300 + outputs: + - random + - id: runtime-node + operators: + - id: rust-operator + shared-library: https://github.com/dora-rs/dora/releases/download/v0.0.0-test.4/librust_dataflow_example_operator.so + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + custom: + build: cargo build -p rust-dataflow-example-sink + source: ../../target/debug/rust-dataflow-example-sink + inputs: + message: runtime-node/rust-operator/status diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs new file mode 100644 index 00000000..9378905c --- /dev/null +++ b/examples/rust-dataflow-url/run.rs @@ -0,0 +1,44 @@ +use eyre::{bail, Context}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; + build_package("dora-runtime").await?; + + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: dataflow.to_owned().into(), + runtime: Some(root.join("target").join("debug").join("dora-runtime")), + }) + .await?; + + Ok(()) +} + +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} diff --git a/examples/rust-dataflow-url/sink/Cargo.toml b/examples/rust-dataflow-url/sink/Cargo.toml new file mode 100644 index 00000000..e80b5a61 --- /dev/null +++ b/examples/rust-dataflow-url/sink/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rust-dataflow-example-sink" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" diff --git a/examples/rust-dataflow-url/sink/src/main.rs b/examples/rust-dataflow-url/sink/src/main.rs new file mode 100644 index 00000000..f9c932a5 --- /dev/null +++ b/examples/rust-dataflow-url/sink/src/main.rs @@ -0,0 +1,28 @@ +use dora_node_api::{self, DoraNode}; +use eyre::{bail, Context}; + +fn main() -> eyre::Result<()> { + let mut operator = DoraNode::init_from_env()?; + + let inputs = operator.inputs()?; + + while let Ok(input) = inputs.recv() { + match input.id.as_str() { + "message" => { + let data = input.data(); + let received_string = + std::str::from_utf8(&data).wrap_err("received message was not utf8-encoded")?; + println!("received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +} diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index be9d4f57..78b8a6cb 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -6,7 +6,7 @@ nodes: - id: rust-node custom: build: cargo build -p rust-dataflow-example-node - run: ../../target/debug/rust-dataflow-example-node + source: ../../target/debug/rust-dataflow-example-node inputs: tick: dora/timer/millis/300 outputs: @@ -24,6 +24,6 @@ nodes: - id: rust-sink custom: build: cargo build -p rust-dataflow-example-sink - run: ../../target/debug/rust-dataflow-example-sink + source: ../../target/debug/rust-dataflow-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 664184db..07506241 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,14 +1,12 @@ +use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fmt; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, + fmt, path::PathBuf, }; pub use visualize::collect_dora_timers; -use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; - mod visualize; #[derive(Debug, Serialize, Deserialize)] @@ -166,21 +164,13 @@ pub struct OperatorConfig { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub enum OperatorSource { - SharedLibrary(PathBuf), - Python(PathBuf), - Wasm(PathBuf), -} - -impl OperatorSource { - pub fn canonicalize(&mut self) -> std::io::Result<()> { - let path = match self { - OperatorSource::SharedLibrary(path) => path, - OperatorSource::Python(path) => path, - OperatorSource::Wasm(path) => path, - }; - *path = path.canonicalize()?; - Ok(()) - } + SharedLibrary(String), + Python(String), + Wasm(String), +} + +pub fn source_is_url(source: &str) -> bool { + source.contains("://") } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -194,7 +184,9 @@ pub struct PythonOperatorConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CustomNode { - pub run: String, + pub source: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, pub env: Option>, pub working_directory: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml new file mode 100644 index 00000000..4d1dae0d --- /dev/null +++ b/libraries/extensions/download/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dora-download" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +eyre = "0.6.8" +tempfile = "3.3.0" +reqwest = "0.11.12" +tokio = { version = "1.17.0" } diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs new file mode 100644 index 00000000..a7fdf2cc --- /dev/null +++ b/libraries/extensions/download/src/lib.rs @@ -0,0 +1,37 @@ +use eyre::Context; +#[cfg(unix)] +use std::os::unix::prelude::PermissionsExt; +use std::path::Path; +use tokio::io::AsyncWriteExt; + +pub async fn download_file(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> +where + T: reqwest::IntoUrl + std::fmt::Display + Copy, +{ + if let Some(parent) = target_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .wrap_err("failed to create parent folder")?; + } + + let response = reqwest::get(url) + .await + .wrap_err_with(|| format!("failed to request operator from `{url}`"))? + .bytes() + .await + .wrap_err("failed to read operator from `{uri}`")?; + let mut file = tokio::fs::File::create(target_path) + .await + .wrap_err("failed to create target file")?; + file.write_all(&response) + .await + .wrap_err("failed to write downloaded operator to file")?; + file.sync_all().await.wrap_err("failed to `sync_all`")?; + + #[cfg(unix)] + file.set_permissions(std::fs::Permissions::from_mode(0o764)) + .await + .wrap_err("failed to make downloaded file executable")?; + + Ok(()) +}