Browse Source

Implement download support for custom nodes

tags/v0.0.0-test-pr-120
Philipp Oppermann 3 years ago
parent
commit
64cf49703b
Failed to extract signature
18 changed files with 120 additions and 77 deletions
  1. +12
    -2
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +12
    -12
      binaries/cli/src/check.rs
  4. +1
    -0
      binaries/coordinator/Cargo.toml
  5. +32
    -17
      binaries/coordinator/src/run/custom.rs
  6. +1
    -2
      binaries/runtime/Cargo.toml
  7. +1
    -21
      binaries/runtime/src/operator/mod.rs
  8. +4
    -3
      binaries/runtime/src/operator/python.rs
  9. +4
    -3
      binaries/runtime/src/operator/shared_lib.rs
  10. +2
    -2
      examples/c++-dataflow/dataflow.yml
  11. +2
    -2
      examples/c-dataflow/dataflow.yml
  12. +2
    -2
      examples/iceoryx/dataflow.yml
  13. +2
    -2
      examples/python-dataflow/dataflow.yml
  14. +2
    -2
      examples/python-dataflow/dataflow_without_webcam.yml
  15. +2
    -2
      examples/rust-dataflow/dataflow.yml
  16. +5
    -5
      libraries/core/src/descriptor/mod.rs
  17. +13
    -0
      libraries/extensions/download/Cargo.toml
  18. +22
    -0
      libraries/extensions/download/src/lib.rs

+ 12
- 2
Cargo.lock View File

@@ -902,6 +902,7 @@ dependencies = [
"bincode",
"clap 3.2.20",
"dora-core",
"dora-download",
"dora-message",
"dora-node-api",
"eyre",
@@ -932,6 +933,16 @@ dependencies = [
"zenoh-config",
]

[[package]]
name = "dora-download"
version = "0.1.0"
dependencies = [
"eyre",
"reqwest",
"tempfile",
"tokio",
]

[[package]]
name = "dora-examples"
version = "0.0.0"
@@ -1053,6 +1064,7 @@ version = "0.1.0"
dependencies = [
"clap 3.2.20",
"dora-core",
"dora-download",
"dora-message",
"dora-metrics",
"dora-node-api",
@@ -1068,9 +1080,7 @@ dependencies = [
"opentelemetry",
"opentelemetry-system-metrics",
"pyo3",
"reqwest",
"serde_yaml 0.8.23",
"tempfile",
"tokio",
"tokio-stream",
"tracing",


+ 1
- 0
Cargo.toml View File

@@ -13,6 +13,7 @@ members = [
"libraries/communication-layer",
"libraries/core",
"libraries/message",
"libraries/extensions/download",
"libraries/extensions/telemetry/*",
"libraries/extensions/zenoh-logger",
]


+ 12
- 12
binaries/cli/src/check.rs View File

@@ -2,7 +2,7 @@ use crate::graph::read_descriptor;
use dora_core::{
adjust_shared_library_path,
config::{InputMapping, UserInputMapping},
descriptor::{self, CoreNodeKind, OperatorSource},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource},
};
use eyre::{bail, eyre, Context};
use std::{env::consts::EXE_EXTENSION, path::Path};
@@ -39,15 +39,15 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
for node in &nodes {
match &node.kind {
descriptor::CoreNodeKind::Custom(node) => {
let mut args = node.run.split_ascii_whitespace();
let raw = Path::new(
args.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?,
);
let path = if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
let path = if source_is_url(&node.source) {
todo!("check URL");
} else {
raw.to_owned()
let raw = Path::new(&node.source);
if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
} else {
raw.to_owned()
}
};
base.join(&path)
.canonicalize()
@@ -57,7 +57,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
for operator_definition in &node.operators {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
if OperatorSource::is_url(path) {
if source_is_url(path) {
todo!("check URL");
} else {
let path = adjust_shared_library_path(Path::new(&path))?;
@@ -67,7 +67,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
}
}
OperatorSource::Python(path) => {
if OperatorSource::is_url(path) {
if source_is_url(path) {
todo!("check URL");
} else {
if !base.join(&path).exists() {
@@ -76,7 +76,7 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
}
}
OperatorSource::Wasm(path) => {
if OperatorSource::is_url(path) {
if source_is_url(path) {
todo!("check URL");
} else {
if !base.join(&path).exists() {


+ 1
- 0
binaries/coordinator/Cargo.toml View File

@@ -27,3 +27,4 @@ tracing-subscriber = "0.3.15"
futures-concurrency = "5.0.1"
zenoh = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
serde_json = "1.0.86"
dora-download = { path = "../../libraries/extensions/download" }

+ 32
- 17
binaries/coordinator/src/run/custom.rs View File

@@ -1,5 +1,9 @@
use super::command_init_common_env;
use dora_core::{config::NodeId, descriptor};
use dora_core::{
config::NodeId,
descriptor::{self, source_is_url},
};
use dora_download::download_file;
use eyre::{eyre, WrapErr};
use std::{env::consts::EXE_EXTENSION, path::Path};

@@ -10,25 +14,31 @@ pub(super) fn spawn_custom_node(
communication: &dora_core::config::CommunicationConfig,
working_dir: &Path,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
let cmd = {
let raw = Path::new(
args.next()
.ok_or_else(|| eyre!("`run` field must not be empty"))?,
);
let path = if raw.extension().is_none() {
let mut temp_file = None;
let path = if source_is_url(&node.source) {
// try to download the shared library
let tmp = download_file(&node.source).wrap_err("failed to download custom node")?;
let path = tmp.path().to_owned();
temp_file = Some(tmp);
path
} else {
let raw = Path::new(&node.source);
if raw.extension().is_none() {
raw.with_extension(EXE_EXTENSION)
} else {
raw.to_owned()
};
working_dir
.join(&path)
.canonicalize()
.wrap_err_with(|| format!("no node exists at `{}`", path.display()))?
}
};

let cmd = working_dir
.join(&path)
.canonicalize()
.wrap_err_with(|| format!("no node exists at `{}`", path.display()))?;

let mut command = tokio::process::Command::new(cmd);
command.args(args);
if let Some(args) = &node.args {
command.args(args.split_ascii_whitespace());
}
command_init_common_env(&mut command, &node_id, communication)?;
command.env(
"DORA_NODE_RUN_CONFIG",
@@ -45,11 +55,16 @@ pub(super) fn spawn_custom_node(
}
}

let mut child = command
.spawn()
.wrap_err_with(|| format!("failed to run command `{}`", &node.run))?;
let mut child = command.spawn().wrap_err_with(|| {
format!(
"failed to run executable `{}` with args `{}`",
node.source,
node.args.as_deref().unwrap_or_default()
)
})?;
let result = tokio::spawn(async move {
let status = child.wait().await.context("child process failed")?;
std::mem::drop(temp_file);
if status.success() {
tracing::info!("node {node_id} finished");
Ok(())


+ 1
- 2
binaries/runtime/Cargo.toml View File

@@ -37,8 +37,7 @@ flume = "0.10.14"
dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
tempfile = "3.3.0"
reqwest = "0.11.12"
dora-download = { path = "../../libraries/extensions/download" }

[features]
tracing = ["opentelemetry", "dora-tracing"]


+ 1
- 21
binaries/runtime/src/operator/mod.rs View File

@@ -6,7 +6,7 @@ use dora_node_api::communication::{self, CommunicationLayer};
use eyre::Context;
#[cfg(feature = "tracing")]
use opentelemetry::sdk::trace::Tracer;
use std::{any::Any, io::Write};
use std::any::Any;
use tokio::sync::mpsc::Sender;

#[cfg(not(feature = "tracing"))]
@@ -85,23 +85,3 @@ pub enum OperatorEvent {
Panic(Box<dyn Any + Send>),
Finished,
}

fn download_file<T>(url: T) -> Result<tempfile::NamedTempFile, eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
let response = tokio::runtime::Handle::current().block_on(async {
reqwest::get(url)
.await
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?
.bytes()
.await
.wrap_err("failed to read operator from `{uri}`")
})?;
let mut tmp =
tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?;
tmp.as_file_mut()
.write_all(&response)
.wrap_err("failed to write downloaded operator to file")?;
Ok(tmp)
}

+ 4
- 3
binaries/runtime/src/operator/python.rs View File

@@ -1,7 +1,8 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use super::{download_file, OperatorEvent, Tracer};
use dora_core::{config::DataId, descriptor::OperatorSource};
use super::{OperatorEvent, Tracer};
use dora_core::{config::DataId, descriptor::source_is_url};
use dora_download::download_file;
use dora_node_api::communication::Publisher;
use dora_operator_api_python::metadata_to_pydict;
use eyre::{bail, eyre, Context};
@@ -41,7 +42,7 @@ pub fn spawn(
tracer: Tracer,
) -> eyre::Result<()> {
let mut temp_file = None;
let path = if OperatorSource::is_url(source) {
let path = if source_is_url(source) {
// try to download the shared library
let tmp = download_file(source).wrap_err("failed to download Python operator")?;
let path = tmp.path().to_owned();


+ 4
- 3
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,5 +1,6 @@
use super::{download_file, OperatorEvent, Tracer};
use dora_core::{adjust_shared_library_path, config::DataId, descriptor::OperatorSource};
use super::{OperatorEvent, Tracer};
use dora_core::{adjust_shared_library_path, config::DataId, descriptor::source_is_url};
use dora_download::download_file;
use dora_node_api::communication::Publisher;
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
@@ -27,7 +28,7 @@ pub fn spawn(
tracer: Tracer,
) -> eyre::Result<()> {
let mut temp_file = None;
let path = if OperatorSource::is_url(source) {
let path = if source_is_url(source) {
// try to download the shared library
let tmp = download_file(source).wrap_err("failed to download shared library operator")?;
let path = tmp.path().to_owned();


+ 2
- 2
examples/c++-dataflow/dataflow.yml View File

@@ -5,14 +5,14 @@ communication:
nodes:
- id: cxx-node-rust-api
custom:
run: ../../target/debug/cxx-dataflow-example-node-rust-api
source: ../../target/debug/cxx-dataflow-example-node-rust-api
inputs:
tick: dora/timer/millis/300
outputs:
- counter
- id: cxx-node-c-api
custom:
run: build/node_c_api
source: build/node_c_api
inputs:
tick: dora/timer/millis/300
outputs:


+ 2
- 2
examples/c-dataflow/dataflow.yml View File

@@ -5,7 +5,7 @@ communication:
nodes:
- id: c_node
custom:
run: build/c_node
source: build/c_node
inputs:
timer: dora/timer/secs/1
outputs:
@@ -20,6 +20,6 @@ nodes:
- counter
- id: c_sink
custom:
run: build/c_sink
source: build/c_sink
inputs:
counter: runtime-node/c_operator/counter

+ 2
- 2
examples/iceoryx/dataflow.yml View File

@@ -5,7 +5,7 @@ communication:
nodes:
- id: rust-node
custom:
run: ../../target/debug/iceoryx-example-node
source: ../../target/debug/iceoryx-example-node
inputs:
tick: dora/timer/millis/300
outputs:
@@ -21,6 +21,6 @@ nodes:
- status
- id: rust-sink
custom:
run: ../../target/debug/iceoryx-example-sink
source: ../../target/debug/iceoryx-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 2
- 2
examples/python-dataflow/dataflow.yml View File

@@ -5,12 +5,12 @@ communication:
nodes:
- id: webcam
custom:
run: ./webcam.py
source: ./webcam.py
inputs:
tick: dora/timer/millis/100
outputs:
- image
- id: object_detection
operator:
python: object_detection.py


+ 2
- 2
examples/python-dataflow/dataflow_without_webcam.yml View File

@@ -5,12 +5,12 @@ communication:
nodes:
- id: no_webcam
custom:
run: ./no_webcam.py
source: ./no_webcam.py
inputs:
tick: dora/timer/millis/100
outputs:
- image
- id: object_detection
operator:
python: object_detection.py


+ 2
- 2
examples/rust-dataflow/dataflow.yml View File

@@ -6,7 +6,7 @@ nodes:
- id: rust-node
custom:
build: cargo build -p rust-dataflow-example-node
run: ../../target/debug/rust-dataflow-example-node
source: ../../target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/300
outputs:
@@ -24,6 +24,6 @@ nodes:
- id: rust-sink
custom:
build: cargo build -p rust-dataflow-example-sink
run: ../../target/debug/rust-dataflow-example-sink
source: ../../target/debug/rust-dataflow-example-sink
inputs:
message: runtime-node/rust-operator/status

+ 5
- 5
libraries/core/src/descriptor/mod.rs View File

@@ -169,10 +169,8 @@ pub enum OperatorSource {
Wasm(String),
}

impl OperatorSource {
pub fn is_url(source: &str) -> bool {
source.contains("://")
}
pub fn source_is_url(source: &str) -> bool {
source.contains("://")
}

#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -186,7 +184,9 @@ pub struct PythonOperatorConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CustomNode {
pub run: String,
pub source: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub args: Option<String>,
pub env: Option<BTreeMap<String, EnvValue>>,
pub working_directory: Option<BTreeMap<String, EnvValue>>,
#[serde(default, skip_serializing_if = "Option::is_none")]


+ 13
- 0
libraries/extensions/download/Cargo.toml View File

@@ -0,0 +1,13 @@
[package]
name = "dora-download"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
eyre = "0.6.8"
tempfile = "3.3.0"
reqwest = "0.11.12"
tokio = { version = "1.17.0" }

+ 22
- 0
libraries/extensions/download/src/lib.rs View File

@@ -0,0 +1,22 @@
use eyre::Context;
use std::io::Write;

pub fn download_file<T>(url: T) -> Result<tempfile::NamedTempFile, eyre::ErrReport>
where
T: reqwest::IntoUrl + std::fmt::Display + Copy,
{
let response = tokio::runtime::Handle::current().block_on(async {
reqwest::get(url)
.await
.wrap_err_with(|| format!("failed to request operator from `{url}`"))?
.bytes()
.await
.wrap_err("failed to read operator from `{uri}`")
})?;
let mut tmp =
tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?;
tmp.as_file_mut()
.write_all(&response)
.wrap_err("failed to write downloaded operator to file")?;
Ok(tmp)
}

Loading…
Cancel
Save