From 13d3a9168316b17b98ff0b6c1184e141ce2500c9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 12:40:38 +0200 Subject: [PATCH 1/9] Implement support for downloading operator sources Makes it possible to set an URL as operator source. The `dora-runtime` will then try to download the operator from the given URL when the dataflow is started. --- Cargo.lock | 201 ++++++++++++++++++++ binaries/cli/src/check.rs | 28 +-- binaries/runtime/Cargo.toml | 3 + binaries/runtime/src/operator/mod.rs | 30 ++- binaries/runtime/src/operator/python.rs | 20 +- binaries/runtime/src/operator/shared_lib.rs | 20 +- libraries/core/Cargo.toml | 2 + libraries/core/src/descriptor/mod.rs | 55 ++++-- 8 files changed, 320 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3a37e2a..f6423d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,6 +926,8 @@ name = "dora-core" version = "0.1.0" dependencies = [ "eyre", + "http", + "http-serde", "once_cell", "serde", "serde_yaml 0.9.11", @@ -1064,11 +1066,14 @@ dependencies = [ "flume", "futures", "futures-concurrency 2.0.3", + "http", "libloading", "opentelemetry", "opentelemetry-system-metrics", "pyo3", + "reqwest", "serde_yaml 0.8.23", + "tempfile", "tokio", "tokio-stream", "tracing", @@ -1098,6 +1103,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -1186,6 +1200,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1509,6 +1538,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e" +dependencies = [ + "http", + "serde", +] + [[package]] name = "httparse" version = "1.7.1" @@ -1563,6 +1602,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iceoryx-example-node" version = "0.1.0" @@ -1687,6 +1739,12 @@ dependencies = [ "syn", ] +[[package]] +name = "ipnet" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + [[package]] name = "ipnetwork" version = "0.18.0" @@ -1883,6 +1941,12 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "mini_paste" version = "0.1.11" @@ -2022,6 +2086,24 @@ name = "napi-sys" version = "1.0.0" source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" +[[package]] +name = "native-tls" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk" version = "0.7.0" @@ -2231,12 +2313,51 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce" +dependencies = [ + "autocfg 1.1.0", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.17.0" @@ -2496,6 +2617,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + [[package]] name = "pnet" version = "0.28.0" @@ -2968,6 +3095,43 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -3247,6 +3411,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.8.23" @@ -3621,6 +3797,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.9" @@ -4006,6 +4192,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -4323,6 +4515,15 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "with_builtin_macros" version = "0.0.3" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 01ce395c..335751da 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -55,22 +55,24 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } descriptor::CoreNodeKind::Runtime(node) => { for operator_definition in &node.operators { - match &operator_definition.config.source { - OperatorSource::SharedLibrary(path) => { - let path = adjust_shared_library_path(path)?; + if let Some(path) = operator_definition.config.source.as_local_path() { + match &operator_definition.config.source { + OperatorSource::SharedLibrary(_) => { + let path = adjust_shared_library_path(path)?; - if !base.join(&path).exists() { - bail!("no shared library at `{}`", path.display()); + if !base.join(&path).exists() { + bail!("no shared library at `{}`", path.display()); + } } - } - OperatorSource::Python(path) => { - if !base.join(&path).exists() { - bail!("no Python library at `{}`", path.display()); + OperatorSource::Python(_) => { + if !base.join(&path).exists() { + bail!("no Python library at `{}`", path.display()); + } } - } - OperatorSource::Wasm(path) => { - if !base.join(&path).exists() { - bail!("no WASM library at `{}`", path.display()); + OperatorSource::Wasm(_) => { + if !base.join(&path).exists() { + bail!("no WASM library at `{}`", path.display()); + } } } } diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 4caea8b6..4119c6b5 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,6 +37,9 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" +http = "0.2.8" +tempfile = "3.3.0" +reqwest = "0.11.12" [features] tracing = ["opentelemetry", "dora-tracing"] diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index f7801df6..b76edbaa 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -6,7 +6,7 @@ use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; -use std::any::Any; +use std::{any::Any, io::Write}; use tokio::sync::mpsc::Sender; #[cfg(not(feature = "tracing"))] @@ -55,23 +55,23 @@ pub fn spawn_operator( let tracer = (); match &operator_definition.config.source { - OperatorSource::SharedLibrary(path) => { - shared_lib::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::SharedLibrary(uri) => { + shared_lib::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { format!( "failed to spawn shared library operator for {}", operator_definition.id ) })?; } - OperatorSource::Python(path) => { - python::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::Python(uri) => { + python::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { format!( "failed to spawn Python operator for {}", operator_definition.id ) })?; } - OperatorSource::Wasm(_path) => { + OperatorSource::Wasm(_uri) => { tracing::error!("WASM operators are not supported yet"); } } @@ -83,3 +83,21 @@ pub enum OperatorEvent { Panic(Box), Finished, } + +fn download_file(uri: &http::Uri) -> Result { + let uri_str = uri.to_string(); + let response = tokio::runtime::Handle::current().block_on(async { + reqwest::get(&uri_str) + .await + .wrap_err_with(|| format!("failed to request operator from `{uri_str}`"))? + .bytes() + .await + .wrap_err("failed to read operator from `{uri}`") + })?; + let mut tmp = + tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; + tmp.as_file_mut() + .write_all(&response) + .wrap_err("failed to write downloaded operator to file")?; + Ok(tmp) +} diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 7c60dd97..8801955c 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,7 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use super::{OperatorEvent, Tracer}; -use dora_core::config::DataId; +use super::{download_file, OperatorEvent, Tracer}; +use dora_core::{config::DataId, descriptor::OperatorSource}; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; @@ -15,7 +15,6 @@ use std::{ borrow::Cow, collections::HashMap, panic::{catch_unwind, AssertUnwindSafe}, - path::Path, sync::Arc, thread, }; @@ -34,12 +33,23 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { } pub fn spawn( - path: &Path, + uri: &http::Uri, events_tx: Sender, inputs: flume::Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { + let mut temp_file = None; + let path = if let Some(path) = OperatorSource::uri_as_local_path(uri) { + path.to_owned() + } else { + // try to download the Python file + let tmp = download_file(uri).wrap_err("failed to download Python operator")?; + let path = tmp.path().to_owned(); + temp_file = Some(tmp); + path + }; + if !path.exists() { bail!("No python file exists at {}", path.display()); } @@ -87,6 +97,8 @@ pub fn spawn( }; let python_runner = move || { + let _temp_file = temp_file; + let operator = Python::with_gil(init_operator).wrap_err("failed to init python operator")?; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 7da7670d..d34e1043 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,5 @@ -use super::{OperatorEvent, Tracer}; -use dora_core::{adjust_shared_library_path, config::DataId}; +use super::{download_file, OperatorEvent, Tracer}; +use dora_core::{adjust_shared_library_path, config::DataId, descriptor::OperatorSource}; use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, @@ -13,20 +13,28 @@ use std::{ ffi::c_void, ops::Deref, panic::{catch_unwind, AssertUnwindSafe}, - path::Path, sync::Arc, thread, }; use tokio::sync::mpsc::Sender; pub fn spawn( - path: &Path, + uri: &http::Uri, events_tx: Sender, inputs: Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { - let path = adjust_shared_library_path(path)?; + let mut temp_file = None; + let path = if let Some(path) = OperatorSource::uri_as_local_path(&uri) { + adjust_shared_library_path(path)? + } else { + // try to download the shared library + let tmp = download_file(uri).wrap_err("failed to download shared library operator")?; + let path = tmp.path().to_owned(); + temp_file = Some(tmp); + path + }; let library = unsafe { libloading::Library::new(&path) @@ -35,6 +43,8 @@ pub fn spawn( thread::spawn(move || { let closure = AssertUnwindSafe(|| { + let _temp_file = temp_file; + let bindings = Bindings::init(&library).context("failed to init operator")?; let operator = SharedLibraryOperator { inputs, bindings }; diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index af7fe3fe..7bd580dd 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -12,3 +12,5 @@ serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" once_cell = "1.13.0" zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +http = "0.2.8" +http-serde = "1.1.2" diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 664184db..714e9c0f 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,14 +1,15 @@ +use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; +use std::os::unix::prelude::OsStrExt; +use std::path::Path; use std::{ collections::{BTreeMap, BTreeSet}, path::PathBuf, }; pub use visualize::collect_dora_timers; -use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; - mod visualize; #[derive(Debug, Serialize, Deserialize)] @@ -166,21 +167,53 @@ pub struct OperatorConfig { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub enum OperatorSource { - SharedLibrary(PathBuf), - Python(PathBuf), - Wasm(PathBuf), + SharedLibrary(#[serde(with = "http_serde::uri")] http::Uri), + Python(#[serde(with = "http_serde::uri")] http::Uri), + Wasm(#[serde(with = "http_serde::uri")] http::Uri), } impl OperatorSource { + pub fn as_local_path(&self) -> Option<&Path> { + let uri = self.uri(); + Self::uri_as_local_path(uri) + } + pub fn canonicalize(&mut self) -> std::io::Result<()> { - let path = match self { - OperatorSource::SharedLibrary(path) => path, - OperatorSource::Python(path) => path, - OperatorSource::Wasm(path) => path, - }; - *path = path.canonicalize()?; + if let Some(path) = self.as_local_path() { + *self.uri_mut() = path + .canonicalize()? + .as_os_str() + .as_bytes() + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; + } Ok(()) } + + fn uri(&self) -> &http::Uri { + match self { + OperatorSource::SharedLibrary(uri) => uri, + OperatorSource::Python(uri) => uri, + OperatorSource::Wasm(uri) => uri, + } + } + + fn uri_mut(&mut self) -> &mut http::Uri { + match self { + OperatorSource::SharedLibrary(uri) => uri, + OperatorSource::Python(uri) => uri, + OperatorSource::Wasm(uri) => uri, + } + } + + pub fn uri_as_local_path(uri: &http::Uri) -> Option<&Path> { + if uri.path() == uri { + // URI is a local path + Some(Path::new(uri.path())) + } else { + None + } + } } #[derive(Debug, Serialize, Deserialize, Clone)] From ac75e40bd76d81edfb1c46d9ca4d33ea9d766155 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 13:23:51 +0200 Subject: [PATCH 2/9] Fix: Don't use `std::os::unix` import --- libraries/core/src/descriptor/mod.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 714e9c0f..be166979 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,8 +1,8 @@ use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; +use eyre::eyre; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; -use std::os::unix::prelude::OsStrExt; use std::path::Path; use std::{ collections::{BTreeMap, BTreeSet}, @@ -182,8 +182,13 @@ impl OperatorSource { if let Some(path) = self.as_local_path() { *self.uri_mut() = path .canonicalize()? - .as_os_str() - .as_bytes() + .to_str() + .ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::Other, + eyre!("operator path is invalid utf8"), + ) + })? .try_into() .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; } From a37d3ee6c88cc84b11d2cabc81fecb0b11b6a773 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 13:41:41 +0200 Subject: [PATCH 3/9] Don't parse operator source always as URI Parsing as URI does not work for relative paths, e.g. `../../foo`. --- Cargo.lock | 13 ----- binaries/cli/src/check.rs | 27 ++++++---- binaries/runtime/Cargo.toml | 1 - binaries/runtime/src/operator/mod.rs | 32 ++++++----- binaries/runtime/src/operator/python.rs | 13 ++--- binaries/runtime/src/operator/shared_lib.rs | 11 ++-- libraries/core/Cargo.toml | 2 - libraries/core/src/descriptor/mod.rs | 60 +++------------------ 8 files changed, 56 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6423d02..b5bb5045 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,8 +926,6 @@ name = "dora-core" version = "0.1.0" dependencies = [ "eyre", - "http", - "http-serde", "once_cell", "serde", "serde_yaml 0.9.11", @@ -1066,7 +1064,6 @@ dependencies = [ "flume", "futures", "futures-concurrency 2.0.3", - "http", "libloading", "opentelemetry", "opentelemetry-system-metrics", @@ -1538,16 +1535,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-serde" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e" -dependencies = [ - "http", - "serde", -] - [[package]] name = "httparse" version = "1.7.1" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 335751da..f81b5114 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -55,23 +55,32 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } descriptor::CoreNodeKind::Runtime(node) => { for operator_definition in &node.operators { - if let Some(path) = operator_definition.config.source.as_local_path() { - match &operator_definition.config.source { - OperatorSource::SharedLibrary(_) => { - let path = adjust_shared_library_path(path)?; - + match &operator_definition.config.source { + OperatorSource::SharedLibrary(path) => { + if OperatorSource::is_url(path) { + todo!("check URL"); + } else { + let path = adjust_shared_library_path(Path::new(&path))?; if !base.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } } - OperatorSource::Python(_) => { + } + OperatorSource::Python(path) => { + if OperatorSource::is_url(path) { + todo!("check URL"); + } else { if !base.join(&path).exists() { - bail!("no Python library at `{}`", path.display()); + bail!("no Python library at `{path}`"); } } - OperatorSource::Wasm(_) => { + } + OperatorSource::Wasm(path) => { + if OperatorSource::is_url(path) { + todo!("check URL"); + } else { if !base.join(&path).exists() { - bail!("no WASM library at `{}`", path.display()); + bail!("no WASM library at `{path}`"); } } } diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 4119c6b5..163fa3e6 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,7 +37,6 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" -http = "0.2.8" tempfile = "3.3.0" reqwest = "0.11.12" diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index b76edbaa..2b422ec1 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -55,23 +55,25 @@ pub fn spawn_operator( let tracer = (); match &operator_definition.config.source { - OperatorSource::SharedLibrary(uri) => { - shared_lib::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { - format!( - "failed to spawn shared library operator for {}", - operator_definition.id - ) - })?; + OperatorSource::SharedLibrary(source) => { + shared_lib::spawn(source, events_tx, inputs, publishers, tracer).wrap_err_with( + || { + format!( + "failed to spawn shared library operator for {}", + operator_definition.id + ) + }, + )?; } - OperatorSource::Python(uri) => { - python::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::Python(source) => { + python::spawn(source, events_tx, inputs, publishers, tracer).wrap_err_with(|| { format!( "failed to spawn Python operator for {}", operator_definition.id ) })?; } - OperatorSource::Wasm(_uri) => { + OperatorSource::Wasm(_) => { tracing::error!("WASM operators are not supported yet"); } } @@ -84,12 +86,14 @@ pub enum OperatorEvent { Finished, } -fn download_file(uri: &http::Uri) -> Result { - let uri_str = uri.to_string(); +fn download_file(url: T) -> Result +where + T: reqwest::IntoUrl + std::fmt::Display + Copy, +{ let response = tokio::runtime::Handle::current().block_on(async { - reqwest::get(&uri_str) + reqwest::get(url) .await - .wrap_err_with(|| format!("failed to request operator from `{uri_str}`"))? + .wrap_err_with(|| format!("failed to request operator from `{url}`"))? .bytes() .await .wrap_err("failed to read operator from `{uri}`") diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 8801955c..9ffc6be8 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -15,6 +15,7 @@ use std::{ borrow::Cow, collections::HashMap, panic::{catch_unwind, AssertUnwindSafe}, + path::Path, sync::Arc, thread, }; @@ -33,21 +34,21 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { } pub fn spawn( - uri: &http::Uri, + source: &str, events_tx: Sender, inputs: flume::Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if let Some(path) = OperatorSource::uri_as_local_path(uri) { - path.to_owned() - } else { - // try to download the Python file - let tmp = download_file(uri).wrap_err("failed to download Python operator")?; + let path = if OperatorSource::is_url(source) { + // try to download the shared library + let tmp = download_file(source).wrap_err("failed to download Python operator")?; let path = tmp.path().to_owned(); temp_file = Some(tmp); path + } else { + Path::new(source).to_owned() }; if !path.exists() { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index d34e1043..f2bb51c8 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -13,27 +13,28 @@ use std::{ ffi::c_void, ops::Deref, panic::{catch_unwind, AssertUnwindSafe}, + path::Path, sync::Arc, thread, }; use tokio::sync::mpsc::Sender; pub fn spawn( - uri: &http::Uri, + source: &str, events_tx: Sender, inputs: Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if let Some(path) = OperatorSource::uri_as_local_path(&uri) { - adjust_shared_library_path(path)? - } else { + let path = if OperatorSource::is_url(source) { // try to download the shared library - let tmp = download_file(uri).wrap_err("failed to download shared library operator")?; + let tmp = download_file(source).wrap_err("failed to download shared library operator")?; let path = tmp.path().to_owned(); temp_file = Some(tmp); path + } else { + adjust_shared_library_path(Path::new(source))? }; let library = unsafe { diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 7bd580dd..af7fe3fe 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -12,5 +12,3 @@ serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" once_cell = "1.13.0" zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } -http = "0.2.8" -http-serde = "1.1.2" diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index be166979..8719229e 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,11 +1,8 @@ use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; -use eyre::eyre; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fmt; -use std::path::Path; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, + fmt, path::PathBuf, }; pub use visualize::collect_dora_timers; @@ -167,57 +164,14 @@ pub struct OperatorConfig { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub enum OperatorSource { - SharedLibrary(#[serde(with = "http_serde::uri")] http::Uri), - Python(#[serde(with = "http_serde::uri")] http::Uri), - Wasm(#[serde(with = "http_serde::uri")] http::Uri), + SharedLibrary(String), + Python(String), + Wasm(String), } impl OperatorSource { - pub fn as_local_path(&self) -> Option<&Path> { - let uri = self.uri(); - Self::uri_as_local_path(uri) - } - - pub fn canonicalize(&mut self) -> std::io::Result<()> { - if let Some(path) = self.as_local_path() { - *self.uri_mut() = path - .canonicalize()? - .to_str() - .ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::Other, - eyre!("operator path is invalid utf8"), - ) - })? - .try_into() - .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - } - Ok(()) - } - - fn uri(&self) -> &http::Uri { - match self { - OperatorSource::SharedLibrary(uri) => uri, - OperatorSource::Python(uri) => uri, - OperatorSource::Wasm(uri) => uri, - } - } - - fn uri_mut(&mut self) -> &mut http::Uri { - match self { - OperatorSource::SharedLibrary(uri) => uri, - OperatorSource::Python(uri) => uri, - OperatorSource::Wasm(uri) => uri, - } - } - - pub fn uri_as_local_path(uri: &http::Uri) -> Option<&Path> { - if uri.path() == uri { - // URI is a local path - Some(Path::new(uri.path())) - } else { - None - } + pub fn is_url(source: &str) -> bool { + source.contains("://") } } From 64cf49703bdea7465f6591597fb2614d119fbd08 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 15:38:14 +0200 Subject: [PATCH 4/9] Implement download support for custom nodes --- Cargo.lock | 14 +++++- Cargo.toml | 1 + binaries/cli/src/check.rs | 24 ++++----- binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/run/custom.rs | 49 ++++++++++++------- binaries/runtime/Cargo.toml | 3 +- binaries/runtime/src/operator/mod.rs | 22 +-------- binaries/runtime/src/operator/python.rs | 7 +-- binaries/runtime/src/operator/shared_lib.rs | 7 +-- examples/c++-dataflow/dataflow.yml | 4 +- examples/c-dataflow/dataflow.yml | 4 +- examples/iceoryx/dataflow.yml | 4 +- examples/python-dataflow/dataflow.yml | 4 +- .../dataflow_without_webcam.yml | 4 +- examples/rust-dataflow/dataflow.yml | 4 +- libraries/core/src/descriptor/mod.rs | 10 ++-- libraries/extensions/download/Cargo.toml | 13 +++++ libraries/extensions/download/src/lib.rs | 22 +++++++++ 18 files changed, 120 insertions(+), 77 deletions(-) create mode 100644 libraries/extensions/download/Cargo.toml create mode 100644 libraries/extensions/download/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b5bb5045..243d6342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -902,6 +902,7 @@ dependencies = [ "bincode", "clap 3.2.20", "dora-core", + "dora-download", "dora-message", "dora-node-api", "eyre", @@ -932,6 +933,16 @@ dependencies = [ "zenoh-config", ] +[[package]] +name = "dora-download" +version = "0.1.0" +dependencies = [ + "eyre", + "reqwest", + "tempfile", + "tokio", +] + [[package]] name = "dora-examples" version = "0.0.0" @@ -1053,6 +1064,7 @@ version = "0.1.0" dependencies = [ "clap 3.2.20", "dora-core", + "dora-download", "dora-message", "dora-metrics", "dora-node-api", @@ -1068,9 +1080,7 @@ dependencies = [ "opentelemetry", "opentelemetry-system-metrics", "pyo3", - "reqwest", "serde_yaml 0.8.23", - "tempfile", "tokio", "tokio-stream", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 4a806645..fc0ea21b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "libraries/communication-layer", "libraries/core", "libraries/message", + "libraries/extensions/download", "libraries/extensions/telemetry/*", "libraries/extensions/zenoh-logger", ] diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index f81b5114..d4163d8c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -2,7 +2,7 @@ use crate::graph::read_descriptor; use dora_core::{ adjust_shared_library_path, config::{InputMapping, UserInputMapping}, - descriptor::{self, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -39,15 +39,15 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { for node in &nodes { match &node.kind { descriptor::CoreNodeKind::Custom(node) => { - let mut args = node.run.split_ascii_whitespace(); - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { - raw.with_extension(EXE_EXTENSION) + let path = if source_is_url(&node.source) { + todo!("check URL"); } else { - raw.to_owned() + let raw = Path::new(&node.source); + if raw.extension().is_none() { + raw.with_extension(EXE_EXTENSION) + } else { + raw.to_owned() + } }; base.join(&path) .canonicalize() @@ -57,7 +57,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { for operator_definition in &node.operators { match &operator_definition.config.source { OperatorSource::SharedLibrary(path) => { - if OperatorSource::is_url(path) { + if source_is_url(path) { todo!("check URL"); } else { let path = adjust_shared_library_path(Path::new(&path))?; @@ -67,7 +67,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } } OperatorSource::Python(path) => { - if OperatorSource::is_url(path) { + if source_is_url(path) { todo!("check URL"); } else { if !base.join(&path).exists() { @@ -76,7 +76,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } } OperatorSource::Wasm(path) => { - if OperatorSource::is_url(path) { + if source_is_url(path) { todo!("check URL"); } else { if !base.join(&path).exists() { diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index a8eba7db..3e242ce7 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -27,3 +27,4 @@ tracing-subscriber = "0.3.15" futures-concurrency = "5.0.1" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } serde_json = "1.0.86" +dora-download = { path = "../../libraries/extensions/download" } diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index e3411353..b892b271 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -1,5 +1,9 @@ use super::command_init_common_env; -use dora_core::{config::NodeId, descriptor}; +use dora_core::{ + config::NodeId, + descriptor::{self, source_is_url}, +}; +use dora_download::download_file; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -10,25 +14,31 @@ pub(super) fn spawn_custom_node( communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut args = node.run.split_ascii_whitespace(); - let cmd = { - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { + let mut temp_file = None; + let path = if source_is_url(&node.source) { + // try to download the shared library + let tmp = download_file(&node.source).wrap_err("failed to download custom node")?; + let path = tmp.path().to_owned(); + temp_file = Some(tmp); + path + } else { + let raw = Path::new(&node.source); + if raw.extension().is_none() { raw.with_extension(EXE_EXTENSION) } else { raw.to_owned() - }; - working_dir - .join(&path) - .canonicalize() - .wrap_err_with(|| format!("no node exists at `{}`", path.display()))? + } }; + let cmd = working_dir + .join(&path) + .canonicalize() + .wrap_err_with(|| format!("no node exists at `{}`", path.display()))?; + let mut command = tokio::process::Command::new(cmd); - command.args(args); + if let Some(args) = &node.args { + command.args(args.split_ascii_whitespace()); + } command_init_common_env(&mut command, &node_id, communication)?; command.env( "DORA_NODE_RUN_CONFIG", @@ -45,11 +55,16 @@ pub(super) fn spawn_custom_node( } } - let mut child = command - .spawn() - .wrap_err_with(|| format!("failed to run command `{}`", &node.run))?; + let mut child = command.spawn().wrap_err_with(|| { + format!( + "failed to run executable `{}` with args `{}`", + node.source, + node.args.as_deref().unwrap_or_default() + ) + })?; let result = tokio::spawn(async move { let status = child.wait().await.context("child process failed")?; + std::mem::drop(temp_file); if status.success() { tracing::info!("node {node_id} finished"); Ok(()) diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 163fa3e6..70fa7b92 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,8 +37,7 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" -tempfile = "3.3.0" -reqwest = "0.11.12" +dora-download = { path = "../../libraries/extensions/download" } [features] tracing = ["opentelemetry", "dora-tracing"] diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 2b422ec1..fcf56729 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -6,7 +6,7 @@ use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; -use std::{any::Any, io::Write}; +use std::any::Any; use tokio::sync::mpsc::Sender; #[cfg(not(feature = "tracing"))] @@ -85,23 +85,3 @@ pub enum OperatorEvent { Panic(Box), Finished, } - -fn download_file(url: T) -> Result -where - T: reqwest::IntoUrl + std::fmt::Display + Copy, -{ - let response = tokio::runtime::Handle::current().block_on(async { - reqwest::get(url) - .await - .wrap_err_with(|| format!("failed to request operator from `{url}`"))? - .bytes() - .await - .wrap_err("failed to read operator from `{uri}`") - })?; - let mut tmp = - tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; - tmp.as_file_mut() - .write_all(&response) - .wrap_err("failed to write downloaded operator to file")?; - Ok(tmp) -} diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 9ffc6be8..88370513 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,8 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use super::{download_file, OperatorEvent, Tracer}; -use dora_core::{config::DataId, descriptor::OperatorSource}; +use super::{OperatorEvent, Tracer}; +use dora_core::{config::DataId, descriptor::source_is_url}; +use dora_download::download_file; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; @@ -41,7 +42,7 @@ pub fn spawn( tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if OperatorSource::is_url(source) { + let path = if source_is_url(source) { // try to download the shared library let tmp = download_file(source).wrap_err("failed to download Python operator")?; let path = tmp.path().to_owned(); diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index f2bb51c8..49f8eda6 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,6 @@ -use super::{download_file, OperatorEvent, Tracer}; -use dora_core::{adjust_shared_library_path, config::DataId, descriptor::OperatorSource}; +use super::{OperatorEvent, Tracer}; +use dora_core::{adjust_shared_library_path, config::DataId, descriptor::source_is_url}; +use dora_download::download_file; use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, @@ -27,7 +28,7 @@ pub fn spawn( tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if OperatorSource::is_url(source) { + let path = if source_is_url(source) { // try to download the shared library let tmp = download_file(source).wrap_err("failed to download shared library operator")?; let path = tmp.path().to_owned(); diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml index ab113dd5..23caf7cc 100644 --- a/examples/c++-dataflow/dataflow.yml +++ b/examples/c++-dataflow/dataflow.yml @@ -5,14 +5,14 @@ communication: nodes: - id: cxx-node-rust-api custom: - run: ../../target/debug/cxx-dataflow-example-node-rust-api + source: ../../target/debug/cxx-dataflow-example-node-rust-api inputs: tick: dora/timer/millis/300 outputs: - counter - id: cxx-node-c-api custom: - run: build/node_c_api + source: build/node_c_api inputs: tick: dora/timer/millis/300 outputs: diff --git a/examples/c-dataflow/dataflow.yml b/examples/c-dataflow/dataflow.yml index ed372971..b2f30e40 100644 --- a/examples/c-dataflow/dataflow.yml +++ b/examples/c-dataflow/dataflow.yml @@ -5,7 +5,7 @@ communication: nodes: - id: c_node custom: - run: build/c_node + source: build/c_node inputs: timer: dora/timer/secs/1 outputs: @@ -20,6 +20,6 @@ nodes: - counter - id: c_sink custom: - run: build/c_sink + source: build/c_sink inputs: counter: runtime-node/c_operator/counter diff --git a/examples/iceoryx/dataflow.yml b/examples/iceoryx/dataflow.yml index f8b447db..56b0face 100644 --- a/examples/iceoryx/dataflow.yml +++ b/examples/iceoryx/dataflow.yml @@ -5,7 +5,7 @@ communication: nodes: - id: rust-node custom: - run: ../../target/debug/iceoryx-example-node + source: ../../target/debug/iceoryx-example-node inputs: tick: dora/timer/millis/300 outputs: @@ -21,6 +21,6 @@ nodes: - status - id: rust-sink custom: - run: ../../target/debug/iceoryx-example-sink + source: ../../target/debug/iceoryx-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/examples/python-dataflow/dataflow.yml b/examples/python-dataflow/dataflow.yml index a31b5b7e..1aa1d0ac 100644 --- a/examples/python-dataflow/dataflow.yml +++ b/examples/python-dataflow/dataflow.yml @@ -5,12 +5,12 @@ communication: nodes: - id: webcam custom: - run: ./webcam.py + source: ./webcam.py inputs: tick: dora/timer/millis/100 outputs: - image - + - id: object_detection operator: python: object_detection.py diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index fdf14a2b..6b9a00af 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -5,12 +5,12 @@ communication: nodes: - id: no_webcam custom: - run: ./no_webcam.py + source: ./no_webcam.py inputs: tick: dora/timer/millis/100 outputs: - image - + - id: object_detection operator: python: object_detection.py diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index be9d4f57..78b8a6cb 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -6,7 +6,7 @@ nodes: - id: rust-node custom: build: cargo build -p rust-dataflow-example-node - run: ../../target/debug/rust-dataflow-example-node + source: ../../target/debug/rust-dataflow-example-node inputs: tick: dora/timer/millis/300 outputs: @@ -24,6 +24,6 @@ nodes: - id: rust-sink custom: build: cargo build -p rust-dataflow-example-sink - run: ../../target/debug/rust-dataflow-example-sink + source: ../../target/debug/rust-dataflow-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 8719229e..07506241 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -169,10 +169,8 @@ pub enum OperatorSource { Wasm(String), } -impl OperatorSource { - pub fn is_url(source: &str) -> bool { - source.contains("://") - } +pub fn source_is_url(source: &str) -> bool { + source.contains("://") } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -186,7 +184,9 @@ pub struct PythonOperatorConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CustomNode { - pub run: String, + pub source: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, pub env: Option>, pub working_directory: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml new file mode 100644 index 00000000..4d1dae0d --- /dev/null +++ b/libraries/extensions/download/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dora-download" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +eyre = "0.6.8" +tempfile = "3.3.0" +reqwest = "0.11.12" +tokio = { version = "1.17.0" } diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs new file mode 100644 index 00000000..ab71a39b --- /dev/null +++ b/libraries/extensions/download/src/lib.rs @@ -0,0 +1,22 @@ +use eyre::Context; +use std::io::Write; + +pub fn download_file(url: T) -> Result +where + T: reqwest::IntoUrl + std::fmt::Display + Copy, +{ + let response = tokio::runtime::Handle::current().block_on(async { + reqwest::get(url) + .await + .wrap_err_with(|| format!("failed to request operator from `{url}`"))? + .bytes() + .await + .wrap_err("failed to read operator from `{uri}`") + })?; + let mut tmp = + tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; + tmp.as_file_mut() + .write_all(&response) + .wrap_err("failed to write downloaded operator to file")?; + Ok(tmp) +} From 0b00955ef46d6a905183a7d94264c17ddc7a5b7c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 25 Oct 2022 10:14:17 +0200 Subject: [PATCH 5/9] Fix panic in tokio runtime --- Cargo.lock | 38 ++++++++++++------------ libraries/extensions/download/Cargo.toml | 2 +- libraries/extensions/download/src/lib.rs | 2 +- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 243d6342..a9f31939 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,9 +938,9 @@ name = "dora-download" version = "0.1.0" dependencies = [ "eyre", + "futures", "reqwest", "tempfile", - "tokio", ] [[package]] @@ -1234,9 +1234,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f73fe65f54d1e12b726f517d3e2135ca3125a437b6d998caf1962961f7172d9e" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" dependencies = [ "futures-channel", "futures-core", @@ -1249,9 +1249,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3083ce4b914124575708913bca19bfe887522d6e2e6d0952943f5eac4a74010" +checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", "futures-sink", @@ -1280,15 +1280,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c09fd04b7e4073ac7156a9539b57a484a8ea920f79c7c675d05d289ab6110d3" +checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" [[package]] name = "futures-executor" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9420b90cfa29e327d0429f19be13e7ddb68fa1cccb09d65e5706b8c7a749b8a6" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" dependencies = [ "futures-core", "futures-task", @@ -1297,9 +1297,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc4045962a5a5e935ee2fdedaa4e08284547402885ab326734432bed5d12966b" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" [[package]] name = "futures-lite" @@ -1318,9 +1318,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" dependencies = [ "proc-macro2", "quote", @@ -1329,21 +1329,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21163e139fa306126e6eedaf49ecdb4588f939600f0b1e770f4205ee4b7fa868" +checksum = "39c15cf1a4aa79df40f1bb462fb39676d0ad9e366c2a33b590d7c66f4f81fcf9" [[package]] name = "futures-task" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" +checksum = "2ffb393ac5d9a6eaa9d3fdf37ae2776656b706e200c8e16b1bdb227f5198e6ea" [[package]] name = "futures-util" -version = "0.3.21" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8b7abd5d659d9b90c8cba917f6ec750a74e2dc23902ef9cd4cc8c8b22e6036a" +checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ "futures-channel", "futures-core", diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml index 4d1dae0d..b42caccd 100644 --- a/libraries/extensions/download/Cargo.toml +++ b/libraries/extensions/download/Cargo.toml @@ -10,4 +10,4 @@ license = "Apache-2.0" eyre = "0.6.8" tempfile = "3.3.0" reqwest = "0.11.12" -tokio = { version = "1.17.0" } +futures = "0.3.25" diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs index ab71a39b..50419d53 100644 --- a/libraries/extensions/download/src/lib.rs +++ b/libraries/extensions/download/src/lib.rs @@ -5,7 +5,7 @@ pub fn download_file(url: T) -> Result Date: Tue, 25 Oct 2022 10:14:47 +0200 Subject: [PATCH 6/9] Print downloaded executable path instead of URL in error message --- binaries/coordinator/src/run/custom.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index b892b271..c4292cc2 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -58,7 +58,7 @@ pub(super) fn spawn_custom_node( let mut child = command.spawn().wrap_err_with(|| { format!( "failed to run executable `{}` with args `{}`", - node.source, + path.display(), node.args.as_deref().unwrap_or_default() ) })?; From e2c0e97b8f675865be542e92e212aa750465d4bd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 25 Oct 2022 10:43:21 +0200 Subject: [PATCH 7/9] Download files to `build` subfolder and set them as executable --- binaries/coordinator/src/run/custom.rs | 11 +++---- binaries/runtime/src/operator/mod.rs | 34 +++++++++++++++------ binaries/runtime/src/operator/python.rs | 19 +++++++----- binaries/runtime/src/operator/shared_lib.rs | 23 +++++++++----- libraries/extensions/download/src/lib.rs | 24 ++++++++++----- 5 files changed, 73 insertions(+), 38 deletions(-) diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index c4292cc2..1c3c6566 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -14,13 +14,13 @@ pub(super) fn spawn_custom_node( communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut temp_file = None; let path = if source_is_url(&node.source) { // try to download the shared library - let tmp = download_file(&node.source).wrap_err("failed to download custom node")?; - let path = tmp.path().to_owned(); - temp_file = Some(tmp); - path + let target_path = Path::new("build") + .join(node_id.to_string()) + .with_extension(EXE_EXTENSION); + download_file(&node.source, &target_path).wrap_err("failed to download custom node")?; + target_path } else { let raw = Path::new(&node.source); if raw.extension().is_none() { @@ -64,7 +64,6 @@ pub(super) fn spawn_custom_node( })?; let result = tokio::spawn(async move { let status = child.wait().await.context("child process failed")?; - std::mem::drop(temp_file); if status.success() { tracing::info!("node {node_id} finished"); Ok(()) diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index fcf56729..4d8c246c 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -56,17 +56,33 @@ pub fn spawn_operator( match &operator_definition.config.source { OperatorSource::SharedLibrary(source) => { - shared_lib::spawn(source, events_tx, inputs, publishers, tracer).wrap_err_with( - || { - format!( - "failed to spawn shared library operator for {}", - operator_definition.id - ) - }, - )?; + shared_lib::spawn( + node_id, + &operator_definition.id, + source, + events_tx, + inputs, + publishers, + tracer, + ) + .wrap_err_with(|| { + format!( + "failed to spawn shared library operator for {}", + operator_definition.id + ) + })?; } OperatorSource::Python(source) => { - python::spawn(source, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + python::spawn( + node_id, + &operator_definition.id, + source, + events_tx, + inputs, + publishers, + tracer, + ) + .wrap_err_with(|| { format!( "failed to spawn Python operator for {}", operator_definition.id diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 88370513..9136101a 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,10 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use super::{OperatorEvent, Tracer}; -use dora_core::{config::DataId, descriptor::source_is_url}; +use dora_core::{ + config::{DataId, NodeId, OperatorId}, + descriptor::source_is_url, +}; use dora_download::download_file; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; @@ -35,19 +38,21 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { } pub fn spawn( + node_id: &NodeId, + operator_id: &OperatorId, source: &str, events_tx: Sender, inputs: flume::Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { - let mut temp_file = None; let path = if source_is_url(source) { + let target_path = Path::new("build") + .join(node_id.to_string()) + .join(operator_id.to_string()); // try to download the shared library - let tmp = download_file(source).wrap_err("failed to download Python operator")?; - let path = tmp.path().to_owned(); - temp_file = Some(tmp); - path + download_file(source, &target_path).wrap_err("failed to download Python operator")?; + target_path } else { Path::new(source).to_owned() }; @@ -99,8 +104,6 @@ pub fn spawn( }; let python_runner = move || { - let _temp_file = temp_file; - let operator = Python::with_gil(init_operator).wrap_err("failed to init python operator")?; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 49f8eda6..f8efcb28 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,9 @@ use super::{OperatorEvent, Tracer}; -use dora_core::{adjust_shared_library_path, config::DataId, descriptor::source_is_url}; +use dora_core::{ + adjust_shared_library_path, + config::{DataId, NodeId, OperatorId}, + descriptor::source_is_url, +}; use dora_download::download_file; use dora_node_api::communication::Publisher; use dora_operator_api_types::{ @@ -21,19 +25,24 @@ use std::{ use tokio::sync::mpsc::Sender; pub fn spawn( + node_id: &NodeId, + operator_id: &OperatorId, source: &str, events_tx: Sender, inputs: Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { - let mut temp_file = None; let path = if source_is_url(source) { + let target_path = adjust_shared_library_path( + &Path::new("build") + .join(node_id.to_string()) + .join(operator_id.to_string()), + )?; // try to download the shared library - let tmp = download_file(source).wrap_err("failed to download shared library operator")?; - let path = tmp.path().to_owned(); - temp_file = Some(tmp); - path + download_file(source, &target_path) + .wrap_err("failed to download shared library operator")?; + target_path } else { adjust_shared_library_path(Path::new(source))? }; @@ -45,8 +54,6 @@ pub fn spawn( thread::spawn(move || { let closure = AssertUnwindSafe(|| { - let _temp_file = temp_file; - let bindings = Bindings::init(&library).context("failed to init operator")?; let operator = SharedLibraryOperator { inputs, bindings }; diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs index 50419d53..f4f3145a 100644 --- a/libraries/extensions/download/src/lib.rs +++ b/libraries/extensions/download/src/lib.rs @@ -1,10 +1,16 @@ use eyre::Context; -use std::io::Write; +#[cfg(unix)] +use std::os::unix::prelude::PermissionsExt; +use std::{io::Write, path::Path}; -pub fn download_file(url: T) -> Result +pub fn download_file(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> where T: reqwest::IntoUrl + std::fmt::Display + Copy, { + if let Some(parent) = target_path.parent() { + std::fs::create_dir_all(parent).wrap_err("failed to create parent folder")?; + } + let response = futures::executor::block_on(async { reqwest::get(url) .await @@ -13,10 +19,14 @@ where .await .wrap_err("failed to read operator from `{uri}`") })?; - let mut tmp = - tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; - tmp.as_file_mut() - .write_all(&response) + let mut file = std::fs::File::create(target_path).wrap_err("failed to create target file")?; + file.write_all(&response) .wrap_err("failed to write downloaded operator to file")?; - Ok(tmp) + file.sync_all().wrap_err("failed to `sync_all`")?; + + #[cfg(unix)] + file.set_permissions(std::fs::Permissions::from_mode(0o764)) + .wrap_err("failed to make downloaded file executable")?; + + Ok(()) } From 5cc0e8b8402fe4edc90e0774688cc61b5f3954df Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 25 Oct 2022 11:10:08 +0200 Subject: [PATCH 8/9] Fix: Make `download_file` asynchronous --- Cargo.lock | 2 +- binaries/coordinator/src/run/custom.rs | 6 ++-- binaries/coordinator/src/run/mod.rs | 1 + binaries/runtime/src/operator/python.rs | 6 +++- binaries/runtime/src/operator/shared_lib.rs | 5 +++- libraries/extensions/download/Cargo.toml | 2 +- libraries/extensions/download/src/lib.rs | 31 ++++++++++++--------- 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f73718fc..b6fb1933 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -938,9 +938,9 @@ name = "dora-download" version = "0.1.0" dependencies = [ "eyre", - "futures", "reqwest", "tempfile", + "tokio", ] [[package]] diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index 1c3c6566..10ee4ad2 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -8,7 +8,7 @@ use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; #[tracing::instrument] -pub(super) fn spawn_custom_node( +pub(super) async fn spawn_custom_node( node_id: NodeId, node: &descriptor::CustomNode, communication: &dora_core::config::CommunicationConfig, @@ -19,7 +19,9 @@ pub(super) fn spawn_custom_node( let target_path = Path::new("build") .join(node_id.to_string()) .with_extension(EXE_EXTENSION); - download_file(&node.source, &target_path).wrap_err("failed to download custom node")?; + download_file(&node.source, &target_path) + .await + .wrap_err("failed to download custom node")?; target_path } else { let raw = Path::new(&node.source); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 1851944d..a098107e 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -66,6 +66,7 @@ pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Resul descriptor::CoreNodeKind::Custom(node) => { let result = spawn_custom_node(node_id.clone(), &node, &communication_config, &working_dir) + .await .wrap_err_with(|| format!("failed to spawn custom node {node_id}"))?; tasks.push(result); } diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index eeeed05c..2f715f83 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -52,7 +52,11 @@ pub fn spawn( .join(node_id.to_string()) .join(operator_id.to_string()); // try to download the shared library - download_file(source, &target_path).wrap_err("failed to download Python operator")?; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(download_file(source, &target_path)) + .wrap_err("failed to download Python operator")?; target_path } else { Path::new(source).to_owned() diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 538ac0ae..71616bed 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -41,7 +41,10 @@ pub fn spawn( .join(operator_id.to_string()), )?; // try to download the shared library - download_file(source, &target_path) + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + rt.block_on(download_file(source, &target_path)) .wrap_err("failed to download shared library operator")?; target_path } else { diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml index b42caccd..4d1dae0d 100644 --- a/libraries/extensions/download/Cargo.toml +++ b/libraries/extensions/download/Cargo.toml @@ -10,4 +10,4 @@ license = "Apache-2.0" eyre = "0.6.8" tempfile = "3.3.0" reqwest = "0.11.12" -futures = "0.3.25" +tokio = { version = "1.17.0" } diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs index f4f3145a..a7fdf2cc 100644 --- a/libraries/extensions/download/src/lib.rs +++ b/libraries/extensions/download/src/lib.rs @@ -1,31 +1,36 @@ use eyre::Context; #[cfg(unix)] use std::os::unix::prelude::PermissionsExt; -use std::{io::Write, path::Path}; +use std::path::Path; +use tokio::io::AsyncWriteExt; -pub fn download_file(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> +pub async fn download_file(url: T, target_path: &Path) -> Result<(), eyre::ErrReport> where T: reqwest::IntoUrl + std::fmt::Display + Copy, { if let Some(parent) = target_path.parent() { - std::fs::create_dir_all(parent).wrap_err("failed to create parent folder")?; + tokio::fs::create_dir_all(parent) + .await + .wrap_err("failed to create parent folder")?; } - let response = futures::executor::block_on(async { - reqwest::get(url) - .await - .wrap_err_with(|| format!("failed to request operator from `{url}`"))? - .bytes() - .await - .wrap_err("failed to read operator from `{uri}`") - })?; - let mut file = std::fs::File::create(target_path).wrap_err("failed to create target file")?; + let response = reqwest::get(url) + .await + .wrap_err_with(|| format!("failed to request operator from `{url}`"))? + .bytes() + .await + .wrap_err("failed to read operator from `{uri}`")?; + let mut file = tokio::fs::File::create(target_path) + .await + .wrap_err("failed to create target file")?; file.write_all(&response) + .await .wrap_err("failed to write downloaded operator to file")?; - file.sync_all().wrap_err("failed to `sync_all`")?; + file.sync_all().await.wrap_err("failed to `sync_all`")?; #[cfg(unix)] file.set_permissions(std::fs::Permissions::from_mode(0o764)) + .await .wrap_err("failed to make downloaded file executable")?; Ok(()) From 4701c5a98dd2e5b0575be4518f504726a2a12e22 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Tue, 25 Oct 2022 11:15:43 +0200 Subject: [PATCH 9/9] Add an Rust dataflow example that uses nodes by URL --- .github/workflows/ci.yml | 19 +++++++++ Cargo.toml | 4 ++ examples/rust-dataflow-url/dataflow.yml | 27 +++++++++++++ examples/rust-dataflow-url/run.rs | 44 +++++++++++++++++++++ examples/rust-dataflow-url/sink/Cargo.toml | 10 +++++ examples/rust-dataflow-url/sink/src/main.rs | 28 +++++++++++++ 6 files changed, 132 insertions(+) create mode 100644 examples/rust-dataflow-url/dataflow.yml create mode 100644 examples/rust-dataflow-url/run.rs create mode 100644 examples/rust-dataflow-url/sink/Cargo.toml create mode 100644 examples/rust-dataflow-url/sink/src/main.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 85d114f1..6361e3ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -118,6 +118,25 @@ jobs: command: run args: --example iceoryx + examples-remote: + name: "Examples (Remote)" + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Install Cap'n Proto and libacl-dev (Linux) + if: runner.os == 'Linux' + run: | + export DEBIAN_FRONTEND=noninteractive + sudo apt-get install -y capnproto libcapnp-dev libacl1-dev + + - name: "Remote Rust Dataflow example" + uses: actions-rs/cargo@v1 + timeout-minutes: 30 + with: + command: run + args: --example rust-dataflow-url + clippy: name: "Clippy" runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index fc0ea21b..3438425c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,10 @@ path = "examples/c-dataflow/run.rs" name = "rust-dataflow" path = "examples/rust-dataflow/run.rs" +[[example]] +name = "rust-dataflow-url" +path = "examples/rust-dataflow-url/run.rs" + [[example]] name = "cxx-dataflow" path = "examples/c++-dataflow/run.rs" diff --git a/examples/rust-dataflow-url/dataflow.yml b/examples/rust-dataflow-url/dataflow.yml new file mode 100644 index 00000000..81c8d239 --- /dev/null +++ b/examples/rust-dataflow-url/dataflow.yml @@ -0,0 +1,27 @@ +communication: + zenoh: + prefix: /example-rust-dataflow + +nodes: + - id: rust-node + custom: + source: https://github.com/dora-rs/dora/releases/download/v0.0.0-test.4/rust-dataflow-example-node + inputs: + tick: dora/timer/millis/300 + outputs: + - random + - id: runtime-node + operators: + - id: rust-operator + shared-library: https://github.com/dora-rs/dora/releases/download/v0.0.0-test.4/librust_dataflow_example_operator.so + inputs: + tick: dora/timer/millis/100 + random: rust-node/random + outputs: + - status + - id: rust-sink + custom: + build: cargo build -p rust-dataflow-example-sink + source: ../../target/debug/rust-dataflow-example-sink + inputs: + message: runtime-node/rust-operator/status diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs new file mode 100644 index 00000000..9378905c --- /dev/null +++ b/examples/rust-dataflow-url/run.rs @@ -0,0 +1,44 @@ +use eyre::{bail, Context}; +use std::path::Path; + +#[tokio::main] +async fn main() -> eyre::Result<()> { + let root = Path::new(env!("CARGO_MANIFEST_DIR")); + std::env::set_current_dir(root.join(file!()).parent().unwrap()) + .wrap_err("failed to set working dir")?; + + let dataflow = Path::new("dataflow.yml"); + build_dataflow(dataflow).await?; + build_package("dora-runtime").await?; + + dora_coordinator::run(dora_coordinator::Args { + run_dataflow: dataflow.to_owned().into(), + runtime: Some(root.join("target").join("debug").join("dora-runtime")), + }) + .await?; + + Ok(()) +} + +async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("run"); + cmd.arg("--package").arg("dora-cli"); + cmd.arg("--").arg("build").arg(dataflow); + if !cmd.status().await?.success() { + bail!("failed to build dataflow"); + }; + Ok(()) +} + +async fn build_package(package: &str) -> eyre::Result<()> { + let cargo = std::env::var("CARGO").unwrap(); + let mut cmd = tokio::process::Command::new(&cargo); + cmd.arg("build"); + cmd.arg("--package").arg(package); + if !cmd.status().await?.success() { + bail!("failed to build {package}"); + }; + Ok(()) +} diff --git a/examples/rust-dataflow-url/sink/Cargo.toml b/examples/rust-dataflow-url/sink/Cargo.toml new file mode 100644 index 00000000..e80b5a61 --- /dev/null +++ b/examples/rust-dataflow-url/sink/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "rust-dataflow-example-sink" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dora-node-api = { version = "0.1.0", path = "../../../apis/rust/node" } +eyre = "0.6.8" diff --git a/examples/rust-dataflow-url/sink/src/main.rs b/examples/rust-dataflow-url/sink/src/main.rs new file mode 100644 index 00000000..f9c932a5 --- /dev/null +++ b/examples/rust-dataflow-url/sink/src/main.rs @@ -0,0 +1,28 @@ +use dora_node_api::{self, DoraNode}; +use eyre::{bail, Context}; + +fn main() -> eyre::Result<()> { + let mut operator = DoraNode::init_from_env()?; + + let inputs = operator.inputs()?; + + while let Ok(input) = inputs.recv() { + match input.id.as_str() { + "message" => { + let data = input.data(); + let received_string = + std::str::from_utf8(&data).wrap_err("received message was not utf8-encoded")?; + println!("received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } + } + other => eprintln!("Ignoring unexpected input `{other}`"), + } + } + + Ok(()) +}