From 64cf49703bdea7465f6591597fb2614d119fbd08 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 15:38:14 +0200 Subject: [PATCH] Implement download support for custom nodes --- Cargo.lock | 14 +++++- Cargo.toml | 1 + binaries/cli/src/check.rs | 24 ++++----- binaries/coordinator/Cargo.toml | 1 + binaries/coordinator/src/run/custom.rs | 49 ++++++++++++------- binaries/runtime/Cargo.toml | 3 +- binaries/runtime/src/operator/mod.rs | 22 +-------- binaries/runtime/src/operator/python.rs | 7 +-- binaries/runtime/src/operator/shared_lib.rs | 7 +-- examples/c++-dataflow/dataflow.yml | 4 +- examples/c-dataflow/dataflow.yml | 4 +- examples/iceoryx/dataflow.yml | 4 +- examples/python-dataflow/dataflow.yml | 4 +- .../dataflow_without_webcam.yml | 4 +- examples/rust-dataflow/dataflow.yml | 4 +- libraries/core/src/descriptor/mod.rs | 10 ++-- libraries/extensions/download/Cargo.toml | 13 +++++ libraries/extensions/download/src/lib.rs | 22 +++++++++ 18 files changed, 120 insertions(+), 77 deletions(-) create mode 100644 libraries/extensions/download/Cargo.toml create mode 100644 libraries/extensions/download/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index b5bb5045..243d6342 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -902,6 +902,7 @@ dependencies = [ "bincode", "clap 3.2.20", "dora-core", + "dora-download", "dora-message", "dora-node-api", "eyre", @@ -932,6 +933,16 @@ dependencies = [ "zenoh-config", ] +[[package]] +name = "dora-download" +version = "0.1.0" +dependencies = [ + "eyre", + "reqwest", + "tempfile", + "tokio", +] + [[package]] name = "dora-examples" version = "0.0.0" @@ -1053,6 +1064,7 @@ version = "0.1.0" dependencies = [ "clap 3.2.20", "dora-core", + "dora-download", "dora-message", "dora-metrics", "dora-node-api", @@ -1068,9 +1080,7 @@ dependencies = [ "opentelemetry", "opentelemetry-system-metrics", "pyo3", - "reqwest", "serde_yaml 0.8.23", - "tempfile", "tokio", "tokio-stream", "tracing", diff --git a/Cargo.toml b/Cargo.toml index 4a806645..fc0ea21b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "libraries/communication-layer", "libraries/core", "libraries/message", + "libraries/extensions/download", "libraries/extensions/telemetry/*", "libraries/extensions/zenoh-logger", ] diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index f81b5114..d4163d8c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -2,7 +2,7 @@ use crate::graph::read_descriptor; use dora_core::{ adjust_shared_library_path, config::{InputMapping, UserInputMapping}, - descriptor::{self, CoreNodeKind, OperatorSource}, + descriptor::{self, source_is_url, CoreNodeKind, OperatorSource}, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -39,15 +39,15 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { for node in &nodes { match &node.kind { descriptor::CoreNodeKind::Custom(node) => { - let mut args = node.run.split_ascii_whitespace(); - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { - raw.with_extension(EXE_EXTENSION) + let path = if source_is_url(&node.source) { + todo!("check URL"); } else { - raw.to_owned() + let raw = Path::new(&node.source); + if raw.extension().is_none() { + raw.with_extension(EXE_EXTENSION) + } else { + raw.to_owned() + } }; base.join(&path) .canonicalize() @@ -57,7 +57,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { for operator_definition in &node.operators { match &operator_definition.config.source { OperatorSource::SharedLibrary(path) => { - if OperatorSource::is_url(path) { + if source_is_url(path) { todo!("check URL"); } else { let path = adjust_shared_library_path(Path::new(&path))?; @@ -67,7 +67,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } } OperatorSource::Python(path) => { - if OperatorSource::is_url(path) { + if source_is_url(path) { todo!("check URL"); } else { if !base.join(&path).exists() { @@ -76,7 +76,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } } OperatorSource::Wasm(path) => { - if OperatorSource::is_url(path) { + if source_is_url(path) { todo!("check URL"); } else { if !base.join(&path).exists() { diff --git a/binaries/coordinator/Cargo.toml b/binaries/coordinator/Cargo.toml index a8eba7db..3e242ce7 100644 --- a/binaries/coordinator/Cargo.toml +++ b/binaries/coordinator/Cargo.toml @@ -27,3 +27,4 @@ tracing-subscriber = "0.3.15" futures-concurrency = "5.0.1" zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" } serde_json = "1.0.86" +dora-download = { path = "../../libraries/extensions/download" } diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index e3411353..b892b271 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -1,5 +1,9 @@ use super::command_init_common_env; -use dora_core::{config::NodeId, descriptor}; +use dora_core::{ + config::NodeId, + descriptor::{self, source_is_url}, +}; +use dora_download::download_file; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -10,25 +14,31 @@ pub(super) fn spawn_custom_node( communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { - let mut args = node.run.split_ascii_whitespace(); - let cmd = { - let raw = Path::new( - args.next() - .ok_or_else(|| eyre!("`run` field must not be empty"))?, - ); - let path = if raw.extension().is_none() { + let mut temp_file = None; + let path = if source_is_url(&node.source) { + // try to download the shared library + let tmp = download_file(&node.source).wrap_err("failed to download custom node")?; + let path = tmp.path().to_owned(); + temp_file = Some(tmp); + path + } else { + let raw = Path::new(&node.source); + if raw.extension().is_none() { raw.with_extension(EXE_EXTENSION) } else { raw.to_owned() - }; - working_dir - .join(&path) - .canonicalize() - .wrap_err_with(|| format!("no node exists at `{}`", path.display()))? + } }; + let cmd = working_dir + .join(&path) + .canonicalize() + .wrap_err_with(|| format!("no node exists at `{}`", path.display()))?; + let mut command = tokio::process::Command::new(cmd); - command.args(args); + if let Some(args) = &node.args { + command.args(args.split_ascii_whitespace()); + } command_init_common_env(&mut command, &node_id, communication)?; command.env( "DORA_NODE_RUN_CONFIG", @@ -45,11 +55,16 @@ pub(super) fn spawn_custom_node( } } - let mut child = command - .spawn() - .wrap_err_with(|| format!("failed to run command `{}`", &node.run))?; + let mut child = command.spawn().wrap_err_with(|| { + format!( + "failed to run executable `{}` with args `{}`", + node.source, + node.args.as_deref().unwrap_or_default() + ) + })?; let result = tokio::spawn(async move { let status = child.wait().await.context("child process failed")?; + std::mem::drop(temp_file); if status.success() { tracing::info!("node {node_id} finished"); Ok(()) diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 163fa3e6..70fa7b92 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,8 +37,7 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" -tempfile = "3.3.0" -reqwest = "0.11.12" +dora-download = { path = "../../libraries/extensions/download" } [features] tracing = ["opentelemetry", "dora-tracing"] diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 2b422ec1..fcf56729 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -6,7 +6,7 @@ use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; -use std::{any::Any, io::Write}; +use std::any::Any; use tokio::sync::mpsc::Sender; #[cfg(not(feature = "tracing"))] @@ -85,23 +85,3 @@ pub enum OperatorEvent { Panic(Box), Finished, } - -fn download_file(url: T) -> Result -where - T: reqwest::IntoUrl + std::fmt::Display + Copy, -{ - let response = tokio::runtime::Handle::current().block_on(async { - reqwest::get(url) - .await - .wrap_err_with(|| format!("failed to request operator from `{url}`"))? - .bytes() - .await - .wrap_err("failed to read operator from `{uri}`") - })?; - let mut tmp = - tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; - tmp.as_file_mut() - .write_all(&response) - .wrap_err("failed to write downloaded operator to file")?; - Ok(tmp) -} diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 9ffc6be8..88370513 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,8 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use super::{download_file, OperatorEvent, Tracer}; -use dora_core::{config::DataId, descriptor::OperatorSource}; +use super::{OperatorEvent, Tracer}; +use dora_core::{config::DataId, descriptor::source_is_url}; +use dora_download::download_file; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; @@ -41,7 +42,7 @@ pub fn spawn( tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if OperatorSource::is_url(source) { + let path = if source_is_url(source) { // try to download the shared library let tmp = download_file(source).wrap_err("failed to download Python operator")?; let path = tmp.path().to_owned(); diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index f2bb51c8..49f8eda6 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,6 @@ -use super::{download_file, OperatorEvent, Tracer}; -use dora_core::{adjust_shared_library_path, config::DataId, descriptor::OperatorSource}; +use super::{OperatorEvent, Tracer}; +use dora_core::{adjust_shared_library_path, config::DataId, descriptor::source_is_url}; +use dora_download::download_file; use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, @@ -27,7 +28,7 @@ pub fn spawn( tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if OperatorSource::is_url(source) { + let path = if source_is_url(source) { // try to download the shared library let tmp = download_file(source).wrap_err("failed to download shared library operator")?; let path = tmp.path().to_owned(); diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml index ab113dd5..23caf7cc 100644 --- a/examples/c++-dataflow/dataflow.yml +++ b/examples/c++-dataflow/dataflow.yml @@ -5,14 +5,14 @@ communication: nodes: - id: cxx-node-rust-api custom: - run: ../../target/debug/cxx-dataflow-example-node-rust-api + source: ../../target/debug/cxx-dataflow-example-node-rust-api inputs: tick: dora/timer/millis/300 outputs: - counter - id: cxx-node-c-api custom: - run: build/node_c_api + source: build/node_c_api inputs: tick: dora/timer/millis/300 outputs: diff --git a/examples/c-dataflow/dataflow.yml b/examples/c-dataflow/dataflow.yml index ed372971..b2f30e40 100644 --- a/examples/c-dataflow/dataflow.yml +++ b/examples/c-dataflow/dataflow.yml @@ -5,7 +5,7 @@ communication: nodes: - id: c_node custom: - run: build/c_node + source: build/c_node inputs: timer: dora/timer/secs/1 outputs: @@ -20,6 +20,6 @@ nodes: - counter - id: c_sink custom: - run: build/c_sink + source: build/c_sink inputs: counter: runtime-node/c_operator/counter diff --git a/examples/iceoryx/dataflow.yml b/examples/iceoryx/dataflow.yml index f8b447db..56b0face 100644 --- a/examples/iceoryx/dataflow.yml +++ b/examples/iceoryx/dataflow.yml @@ -5,7 +5,7 @@ communication: nodes: - id: rust-node custom: - run: ../../target/debug/iceoryx-example-node + source: ../../target/debug/iceoryx-example-node inputs: tick: dora/timer/millis/300 outputs: @@ -21,6 +21,6 @@ nodes: - status - id: rust-sink custom: - run: ../../target/debug/iceoryx-example-sink + source: ../../target/debug/iceoryx-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/examples/python-dataflow/dataflow.yml b/examples/python-dataflow/dataflow.yml index a31b5b7e..1aa1d0ac 100644 --- a/examples/python-dataflow/dataflow.yml +++ b/examples/python-dataflow/dataflow.yml @@ -5,12 +5,12 @@ communication: nodes: - id: webcam custom: - run: ./webcam.py + source: ./webcam.py inputs: tick: dora/timer/millis/100 outputs: - image - + - id: object_detection operator: python: object_detection.py diff --git a/examples/python-dataflow/dataflow_without_webcam.yml b/examples/python-dataflow/dataflow_without_webcam.yml index fdf14a2b..6b9a00af 100644 --- a/examples/python-dataflow/dataflow_without_webcam.yml +++ b/examples/python-dataflow/dataflow_without_webcam.yml @@ -5,12 +5,12 @@ communication: nodes: - id: no_webcam custom: - run: ./no_webcam.py + source: ./no_webcam.py inputs: tick: dora/timer/millis/100 outputs: - image - + - id: object_detection operator: python: object_detection.py diff --git a/examples/rust-dataflow/dataflow.yml b/examples/rust-dataflow/dataflow.yml index be9d4f57..78b8a6cb 100644 --- a/examples/rust-dataflow/dataflow.yml +++ b/examples/rust-dataflow/dataflow.yml @@ -6,7 +6,7 @@ nodes: - id: rust-node custom: build: cargo build -p rust-dataflow-example-node - run: ../../target/debug/rust-dataflow-example-node + source: ../../target/debug/rust-dataflow-example-node inputs: tick: dora/timer/millis/300 outputs: @@ -24,6 +24,6 @@ nodes: - id: rust-sink custom: build: cargo build -p rust-dataflow-example-sink - run: ../../target/debug/rust-dataflow-example-sink + source: ../../target/debug/rust-dataflow-example-sink inputs: message: runtime-node/rust-operator/status diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 8719229e..07506241 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -169,10 +169,8 @@ pub enum OperatorSource { Wasm(String), } -impl OperatorSource { - pub fn is_url(source: &str) -> bool { - source.contains("://") - } +pub fn source_is_url(source: &str) -> bool { + source.contains("://") } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -186,7 +184,9 @@ pub struct PythonOperatorConfig { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CustomNode { - pub run: String, + pub source: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub args: Option, pub env: Option>, pub working_directory: Option>, #[serde(default, skip_serializing_if = "Option::is_none")] diff --git a/libraries/extensions/download/Cargo.toml b/libraries/extensions/download/Cargo.toml new file mode 100644 index 00000000..4d1dae0d --- /dev/null +++ b/libraries/extensions/download/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dora-download" +version = "0.1.0" +edition = "2021" +license = "Apache-2.0" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +eyre = "0.6.8" +tempfile = "3.3.0" +reqwest = "0.11.12" +tokio = { version = "1.17.0" } diff --git a/libraries/extensions/download/src/lib.rs b/libraries/extensions/download/src/lib.rs new file mode 100644 index 00000000..ab71a39b --- /dev/null +++ b/libraries/extensions/download/src/lib.rs @@ -0,0 +1,22 @@ +use eyre::Context; +use std::io::Write; + +pub fn download_file(url: T) -> Result +where + T: reqwest::IntoUrl + std::fmt::Display + Copy, +{ + let response = tokio::runtime::Handle::current().block_on(async { + reqwest::get(url) + .await + .wrap_err_with(|| format!("failed to request operator from `{url}`"))? + .bytes() + .await + .wrap_err("failed to read operator from `{uri}`") + })?; + let mut tmp = + tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; + tmp.as_file_mut() + .write_all(&response) + .wrap_err("failed to write downloaded operator to file")?; + Ok(tmp) +}