From 13d3a9168316b17b98ff0b6c1184e141ce2500c9 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 19 Oct 2022 12:40:38 +0200 Subject: [PATCH] Implement support for downloading operator sources Makes it possible to set an URL as operator source. The `dora-runtime` will then try to download the operator from the given URL when the dataflow is started. --- Cargo.lock | 201 ++++++++++++++++++++ binaries/cli/src/check.rs | 28 +-- binaries/runtime/Cargo.toml | 3 + binaries/runtime/src/operator/mod.rs | 30 ++- binaries/runtime/src/operator/python.rs | 20 +- binaries/runtime/src/operator/shared_lib.rs | 20 +- libraries/core/Cargo.toml | 2 + libraries/core/src/descriptor/mod.rs | 55 ++++-- 8 files changed, 320 insertions(+), 39 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e3a37e2a..f6423d02 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,6 +926,8 @@ name = "dora-core" version = "0.1.0" dependencies = [ "eyre", + "http", + "http-serde", "once_cell", "serde", "serde_yaml 0.9.11", @@ -1064,11 +1066,14 @@ dependencies = [ "flume", "futures", "futures-concurrency 2.0.3", + "http", "libloading", "opentelemetry", "opentelemetry-system-metrics", "pyo3", + "reqwest", "serde_yaml 0.8.23", + "tempfile", "tokio", "tokio-stream", "tracing", @@ -1098,6 +1103,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.9.0" @@ -1186,6 +1200,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.0.1" @@ -1509,6 +1538,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e" +dependencies = [ + "http", + "serde", +] + [[package]] name = "httparse" version = "1.7.1" @@ -1563,6 +1602,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iceoryx-example-node" version = "0.1.0" @@ -1687,6 +1739,12 @@ dependencies = [ "syn", ] +[[package]] +name = "ipnet" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + [[package]] name = "ipnetwork" version = "0.18.0" @@ -1883,6 +1941,12 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "mini_paste" version = "0.1.11" @@ -2022,6 +2086,24 @@ name = "napi-sys" version = "1.0.0" source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" +[[package]] +name = "native-tls" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "ndk" version = "0.7.0" @@ -2231,12 +2313,51 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.76" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce" +dependencies = [ + "autocfg 1.1.0", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.17.0" @@ -2496,6 +2617,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "pkg-config" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae" + [[package]] name = "pnet" version = "0.28.0" @@ -2968,6 +3095,43 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "log", + "mime", + "native-tls", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-native-tls", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.20" @@ -3247,6 +3411,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "serde_yaml" version = "0.8.23" @@ -3621,6 +3797,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.9" @@ -4006,6 +4192,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" @@ -4323,6 +4515,15 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "with_builtin_macros" version = "0.0.3" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 01ce395c..335751da 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -55,22 +55,24 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } descriptor::CoreNodeKind::Runtime(node) => { for operator_definition in &node.operators { - match &operator_definition.config.source { - OperatorSource::SharedLibrary(path) => { - let path = adjust_shared_library_path(path)?; + if let Some(path) = operator_definition.config.source.as_local_path() { + match &operator_definition.config.source { + OperatorSource::SharedLibrary(_) => { + let path = adjust_shared_library_path(path)?; - if !base.join(&path).exists() { - bail!("no shared library at `{}`", path.display()); + if !base.join(&path).exists() { + bail!("no shared library at `{}`", path.display()); + } } - } - OperatorSource::Python(path) => { - if !base.join(&path).exists() { - bail!("no Python library at `{}`", path.display()); + OperatorSource::Python(_) => { + if !base.join(&path).exists() { + bail!("no Python library at `{}`", path.display()); + } } - } - OperatorSource::Wasm(path) => { - if !base.join(&path).exists() { - bail!("no WASM library at `{}`", path.display()); + OperatorSource::Wasm(_) => { + if !base.join(&path).exists() { + bail!("no WASM library at `{}`", path.display()); + } } } } diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 4caea8b6..4119c6b5 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,6 +37,9 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" +http = "0.2.8" +tempfile = "3.3.0" +reqwest = "0.11.12" [features] tracing = ["opentelemetry", "dora-tracing"] diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index f7801df6..b76edbaa 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -6,7 +6,7 @@ use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; -use std::any::Any; +use std::{any::Any, io::Write}; use tokio::sync::mpsc::Sender; #[cfg(not(feature = "tracing"))] @@ -55,23 +55,23 @@ pub fn spawn_operator( let tracer = (); match &operator_definition.config.source { - OperatorSource::SharedLibrary(path) => { - shared_lib::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::SharedLibrary(uri) => { + shared_lib::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { format!( "failed to spawn shared library operator for {}", operator_definition.id ) })?; } - OperatorSource::Python(path) => { - python::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::Python(uri) => { + python::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { format!( "failed to spawn Python operator for {}", operator_definition.id ) })?; } - OperatorSource::Wasm(_path) => { + OperatorSource::Wasm(_uri) => { tracing::error!("WASM operators are not supported yet"); } } @@ -83,3 +83,21 @@ pub enum OperatorEvent { Panic(Box), Finished, } + +fn download_file(uri: &http::Uri) -> Result { + let uri_str = uri.to_string(); + let response = tokio::runtime::Handle::current().block_on(async { + reqwest::get(&uri_str) + .await + .wrap_err_with(|| format!("failed to request operator from `{uri_str}`"))? + .bytes() + .await + .wrap_err("failed to read operator from `{uri}`") + })?; + let mut tmp = + tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?; + tmp.as_file_mut() + .write_all(&response) + .wrap_err("failed to write downloaded operator to file")?; + Ok(tmp) +} diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 7c60dd97..8801955c 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,7 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use super::{OperatorEvent, Tracer}; -use dora_core::config::DataId; +use super::{download_file, OperatorEvent, Tracer}; +use dora_core::{config::DataId, descriptor::OperatorSource}; use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; @@ -15,7 +15,6 @@ use std::{ borrow::Cow, collections::HashMap, panic::{catch_unwind, AssertUnwindSafe}, - path::Path, sync::Arc, thread, }; @@ -34,12 +33,23 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { } pub fn spawn( - path: &Path, + uri: &http::Uri, events_tx: Sender, inputs: flume::Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { + let mut temp_file = None; + let path = if let Some(path) = OperatorSource::uri_as_local_path(uri) { + path.to_owned() + } else { + // try to download the Python file + let tmp = download_file(uri).wrap_err("failed to download Python operator")?; + let path = tmp.path().to_owned(); + temp_file = Some(tmp); + path + }; + if !path.exists() { bail!("No python file exists at {}", path.display()); } @@ -87,6 +97,8 @@ pub fn spawn( }; let python_runner = move || { + let _temp_file = temp_file; + let operator = Python::with_gil(init_operator).wrap_err("failed to init python operator")?; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 7da7670d..d34e1043 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,5 +1,5 @@ -use super::{OperatorEvent, Tracer}; -use dora_core::{adjust_shared_library_path, config::DataId}; +use super::{download_file, OperatorEvent, Tracer}; +use dora_core::{adjust_shared_library_path, config::DataId, descriptor::OperatorSource}; use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, @@ -13,20 +13,28 @@ use std::{ ffi::c_void, ops::Deref, panic::{catch_unwind, AssertUnwindSafe}, - path::Path, sync::Arc, thread, }; use tokio::sync::mpsc::Sender; pub fn spawn( - path: &Path, + uri: &http::Uri, events_tx: Sender, inputs: Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { - let path = adjust_shared_library_path(path)?; + let mut temp_file = None; + let path = if let Some(path) = OperatorSource::uri_as_local_path(&uri) { + adjust_shared_library_path(path)? + } else { + // try to download the shared library + let tmp = download_file(uri).wrap_err("failed to download shared library operator")?; + let path = tmp.path().to_owned(); + temp_file = Some(tmp); + path + }; let library = unsafe { libloading::Library::new(&path) @@ -35,6 +43,8 @@ pub fn spawn( thread::spawn(move || { let closure = AssertUnwindSafe(|| { + let _temp_file = temp_file; + let bindings = Bindings::init(&library).context("failed to init operator")?; let operator = SharedLibraryOperator { inputs, bindings }; diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index af7fe3fe..7bd580dd 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -12,3 +12,5 @@ serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" once_cell = "1.13.0" zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } +http = "0.2.8" +http-serde = "1.1.2" diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 664184db..714e9c0f 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,14 +1,15 @@ +use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; +use std::os::unix::prelude::OsStrExt; +use std::path::Path; use std::{ collections::{BTreeMap, BTreeSet}, path::PathBuf, }; pub use visualize::collect_dora_timers; -use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; - mod visualize; #[derive(Debug, Serialize, Deserialize)] @@ -166,21 +167,53 @@ pub struct OperatorConfig { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub enum OperatorSource { - SharedLibrary(PathBuf), - Python(PathBuf), - Wasm(PathBuf), + SharedLibrary(#[serde(with = "http_serde::uri")] http::Uri), + Python(#[serde(with = "http_serde::uri")] http::Uri), + Wasm(#[serde(with = "http_serde::uri")] http::Uri), } impl OperatorSource { + pub fn as_local_path(&self) -> Option<&Path> { + let uri = self.uri(); + Self::uri_as_local_path(uri) + } + pub fn canonicalize(&mut self) -> std::io::Result<()> { - let path = match self { - OperatorSource::SharedLibrary(path) => path, - OperatorSource::Python(path) => path, - OperatorSource::Wasm(path) => path, - }; - *path = path.canonicalize()?; + if let Some(path) = self.as_local_path() { + *self.uri_mut() = path + .canonicalize()? + .as_os_str() + .as_bytes() + .try_into() + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; + } Ok(()) } + + fn uri(&self) -> &http::Uri { + match self { + OperatorSource::SharedLibrary(uri) => uri, + OperatorSource::Python(uri) => uri, + OperatorSource::Wasm(uri) => uri, + } + } + + fn uri_mut(&mut self) -> &mut http::Uri { + match self { + OperatorSource::SharedLibrary(uri) => uri, + OperatorSource::Python(uri) => uri, + OperatorSource::Wasm(uri) => uri, + } + } + + pub fn uri_as_local_path(uri: &http::Uri) -> Option<&Path> { + if uri.path() == uri { + // URI is a local path + Some(Path::new(uri.path())) + } else { + None + } + } } #[derive(Debug, Serialize, Deserialize, Clone)]