Browse Source

Implement support for downloading operator sources

Makes it possible to set an URL as operator source. The `dora-runtime` will then try to download the operator from the given URL when the dataflow is started.
tags/v0.0.0-test-pr-120
Philipp Oppermann 3 years ago
parent
commit
13d3a91683
Failed to extract signature
8 changed files with 320 additions and 39 deletions
  1. +201
    -0
      Cargo.lock
  2. +15
    -13
      binaries/cli/src/check.rs
  3. +3
    -0
      binaries/runtime/Cargo.toml
  4. +24
    -6
      binaries/runtime/src/operator/mod.rs
  5. +16
    -4
      binaries/runtime/src/operator/python.rs
  6. +15
    -5
      binaries/runtime/src/operator/shared_lib.rs
  7. +2
    -0
      libraries/core/Cargo.toml
  8. +44
    -11
      libraries/core/src/descriptor/mod.rs

+ 201
- 0
Cargo.lock View File

@@ -926,6 +926,8 @@ name = "dora-core"
version = "0.1.0"
dependencies = [
"eyre",
"http",
"http-serde",
"once_cell",
"serde",
"serde_yaml 0.9.11",
@@ -1064,11 +1066,14 @@ dependencies = [
"flume",
"futures",
"futures-concurrency 2.0.3",
"http",
"libloading",
"opentelemetry",
"opentelemetry-system-metrics",
"pyo3",
"reqwest",
"serde_yaml 0.8.23",
"tempfile",
"tokio",
"tokio-stream",
"tracing",
@@ -1098,6 +1103,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"

[[package]]
name = "encoding_rs"
version = "0.8.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b"
dependencies = [
"cfg-if",
]

[[package]]
name = "env_logger"
version = "0.9.0"
@@ -1186,6 +1200,21 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"

[[package]]
name = "foreign-types"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1"
dependencies = [
"foreign-types-shared",
]

[[package]]
name = "foreign-types-shared"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b"

[[package]]
name = "form_urlencoded"
version = "1.0.1"
@@ -1509,6 +1538,16 @@ dependencies = [
"pin-project-lite",
]

[[package]]
name = "http-serde"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e"
dependencies = [
"http",
"serde",
]

[[package]]
name = "httparse"
version = "1.7.1"
@@ -1563,6 +1602,19 @@ dependencies = [
"tokio-io-timeout",
]

[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper",
"native-tls",
"tokio",
"tokio-native-tls",
]

[[package]]
name = "iceoryx-example-node"
version = "0.1.0"
@@ -1687,6 +1739,12 @@ dependencies = [
"syn",
]

[[package]]
name = "ipnet"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b"

[[package]]
name = "ipnetwork"
version = "0.18.0"
@@ -1883,6 +1941,12 @@ dependencies = [
"autocfg 1.1.0",
]

[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"

[[package]]
name = "mini_paste"
version = "0.1.11"
@@ -2022,6 +2086,24 @@ name = "napi-sys"
version = "1.0.0"
source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249"

[[package]]
name = "native-tls"
version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd7e2f3618557f980e0b17e8856252eee3c97fa12c54dff0ca290fb6266ca4a9"
dependencies = [
"lazy_static",
"libc",
"log",
"openssl",
"openssl-probe",
"openssl-sys",
"schannel",
"security-framework",
"security-framework-sys",
"tempfile",
]

[[package]]
name = "ndk"
version = "0.7.0"
@@ -2231,12 +2313,51 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"

[[package]]
name = "openssl"
version = "0.10.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12fc0523e3bd51a692c8850d075d74dc062ccf251c0110668cbd921917118a13"
dependencies = [
"bitflags",
"cfg-if",
"foreign-types",
"libc",
"once_cell",
"openssl-macros",
"openssl-sys",
]

[[package]]
name = "openssl-macros"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "openssl-probe"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"

[[package]]
name = "openssl-sys"
version = "0.9.76"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5230151e44c0f05157effb743e8d517472843121cf9243e8b81393edb5acd9ce"
dependencies = [
"autocfg 1.1.0",
"cc",
"libc",
"pkg-config",
"vcpkg",
]

[[package]]
name = "opentelemetry"
version = "0.17.0"
@@ -2496,6 +2617,12 @@ dependencies = [
"zeroize",
]

[[package]]
name = "pkg-config"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1df8c4ec4b0627e53bdf214615ad287367e482558cf84b109250b37464dc03ae"

[[package]]
name = "pnet"
version = "0.28.0"
@@ -2968,6 +3095,43 @@ dependencies = [
"winapi",
]

[[package]]
name = "reqwest"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]

[[package]]
name = "ring"
version = "0.16.20"
@@ -3247,6 +3411,18 @@ dependencies = [
"serde",
]

[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]

[[package]]
name = "serde_yaml"
version = "0.8.23"
@@ -3621,6 +3797,16 @@ dependencies = [
"syn",
]

[[package]]
name = "tokio-native-tls"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d995660bd2b7f8c1568414c1126076c13fbb725c40112dc0120b78eb9b717b"
dependencies = [
"native-tls",
"tokio",
]

[[package]]
name = "tokio-stream"
version = "0.1.9"
@@ -4006,6 +4192,12 @@ dependencies = [
"version_check",
]

[[package]]
name = "vcpkg"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"

[[package]]
name = "vec_map"
version = "0.8.2"
@@ -4323,6 +4515,15 @@ version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"

[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]

[[package]]
name = "with_builtin_macros"
version = "0.0.3"


+ 15
- 13
binaries/cli/src/check.rs View File

@@ -55,22 +55,24 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> {
}
descriptor::CoreNodeKind::Runtime(node) => {
for operator_definition in &node.operators {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
let path = adjust_shared_library_path(path)?;
if let Some(path) = operator_definition.config.source.as_local_path() {
match &operator_definition.config.source {
OperatorSource::SharedLibrary(_) => {
let path = adjust_shared_library_path(path)?;

if !base.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
if !base.join(&path).exists() {
bail!("no shared library at `{}`", path.display());
}
}
}
OperatorSource::Python(path) => {
if !base.join(&path).exists() {
bail!("no Python library at `{}`", path.display());
OperatorSource::Python(_) => {
if !base.join(&path).exists() {
bail!("no Python library at `{}`", path.display());
}
}
}
OperatorSource::Wasm(path) => {
if !base.join(&path).exists() {
bail!("no WASM library at `{}`", path.display());
OperatorSource::Wasm(_) => {
if !base.join(&path).exists() {
bail!("no WASM library at `{}`", path.display());
}
}
}
}


+ 3
- 0
binaries/runtime/Cargo.toml View File

@@ -37,6 +37,9 @@ flume = "0.10.14"
dora-message = { path = "../../libraries/message" }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
http = "0.2.8"
tempfile = "3.3.0"
reqwest = "0.11.12"

[features]
tracing = ["opentelemetry", "dora-tracing"]


+ 24
- 6
binaries/runtime/src/operator/mod.rs View File

@@ -6,7 +6,7 @@ use dora_node_api::communication::{self, CommunicationLayer};
use eyre::Context;
#[cfg(feature = "tracing")]
use opentelemetry::sdk::trace::Tracer;
use std::any::Any;
use std::{any::Any, io::Write};
use tokio::sync::mpsc::Sender;

#[cfg(not(feature = "tracing"))]
@@ -55,23 +55,23 @@ pub fn spawn_operator(
let tracer = ();

match &operator_definition.config.source {
OperatorSource::SharedLibrary(path) => {
shared_lib::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| {
OperatorSource::SharedLibrary(uri) => {
shared_lib::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| {
format!(
"failed to spawn shared library operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Python(path) => {
python::spawn(path, events_tx, inputs, publishers, tracer).wrap_err_with(|| {
OperatorSource::Python(uri) => {
python::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| {
format!(
"failed to spawn Python operator for {}",
operator_definition.id
)
})?;
}
OperatorSource::Wasm(_path) => {
OperatorSource::Wasm(_uri) => {
tracing::error!("WASM operators are not supported yet");
}
}
@@ -83,3 +83,21 @@ pub enum OperatorEvent {
Panic(Box<dyn Any + Send>),
Finished,
}

fn download_file(uri: &http::Uri) -> Result<tempfile::NamedTempFile, eyre::ErrReport> {
let uri_str = uri.to_string();
let response = tokio::runtime::Handle::current().block_on(async {
reqwest::get(&uri_str)
.await
.wrap_err_with(|| format!("failed to request operator from `{uri_str}`"))?
.bytes()
.await
.wrap_err("failed to read operator from `{uri}`")
})?;
let mut tmp =
tempfile::NamedTempFile::new().wrap_err("failed to create temp file for operator")?;
tmp.as_file_mut()
.write_all(&response)
.wrap_err("failed to write downloaded operator to file")?;
Ok(tmp)
}

+ 16
- 4
binaries/runtime/src/operator/python.rs View File

@@ -1,7 +1,7 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use super::{OperatorEvent, Tracer};
use dora_core::config::DataId;
use super::{download_file, OperatorEvent, Tracer};
use dora_core::{config::DataId, descriptor::OperatorSource};
use dora_node_api::communication::Publisher;
use dora_operator_api_python::metadata_to_pydict;
use eyre::{bail, eyre, Context};
@@ -15,7 +15,6 @@ use std::{
borrow::Cow,
collections::HashMap,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
sync::Arc,
thread,
};
@@ -34,12 +33,23 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report {
}

pub fn spawn(
path: &Path,
uri: &http::Uri,
events_tx: Sender<OperatorEvent>,
inputs: flume::Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
tracer: Tracer,
) -> eyre::Result<()> {
let mut temp_file = None;
let path = if let Some(path) = OperatorSource::uri_as_local_path(uri) {
path.to_owned()
} else {
// try to download the Python file
let tmp = download_file(uri).wrap_err("failed to download Python operator")?;
let path = tmp.path().to_owned();
temp_file = Some(tmp);
path
};

if !path.exists() {
bail!("No python file exists at {}", path.display());
}
@@ -87,6 +97,8 @@ pub fn spawn(
};

let python_runner = move || {
let _temp_file = temp_file;

let operator =
Python::with_gil(init_operator).wrap_err("failed to init python operator")?;



+ 15
- 5
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,5 +1,5 @@
use super::{OperatorEvent, Tracer};
use dora_core::{adjust_shared_library_path, config::DataId};
use super::{download_file, OperatorEvent, Tracer};
use dora_core::{adjust_shared_library_path, config::DataId, descriptor::OperatorSource};
use dora_node_api::communication::Publisher;
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
@@ -13,20 +13,28 @@ use std::{
ffi::c_void,
ops::Deref,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
sync::Arc,
thread,
};
use tokio::sync::mpsc::Sender;

pub fn spawn(
path: &Path,
uri: &http::Uri,
events_tx: Sender<OperatorEvent>,
inputs: Receiver<dora_node_api::Input>,
publishers: HashMap<DataId, Box<dyn Publisher>>,
tracer: Tracer,
) -> eyre::Result<()> {
let path = adjust_shared_library_path(path)?;
let mut temp_file = None;
let path = if let Some(path) = OperatorSource::uri_as_local_path(&uri) {
adjust_shared_library_path(path)?
} else {
// try to download the shared library
let tmp = download_file(uri).wrap_err("failed to download shared library operator")?;
let path = tmp.path().to_owned();
temp_file = Some(tmp);
path
};

let library = unsafe {
libloading::Library::new(&path)
@@ -35,6 +43,8 @@ pub fn spawn(

thread::spawn(move || {
let closure = AssertUnwindSafe(|| {
let _temp_file = temp_file;

let bindings = Bindings::init(&library).context("failed to init operator")?;

let operator = SharedLibraryOperator { inputs, bindings };


+ 2
- 0
libraries/core/Cargo.toml View File

@@ -12,3 +12,5 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
once_cell = "1.13.0"
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }
http = "0.2.8"
http-serde = "1.1.2"

+ 44
- 11
libraries/core/src/descriptor/mod.rs View File

@@ -1,14 +1,15 @@
use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
use std::os::unix::prelude::OsStrExt;
use std::path::Path;
use std::{
collections::{BTreeMap, BTreeSet},
path::PathBuf,
};
pub use visualize::collect_dora_timers;

use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId};

mod visualize;

#[derive(Debug, Serialize, Deserialize)]
@@ -166,21 +167,53 @@ pub struct OperatorConfig {
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(PathBuf),
Python(PathBuf),
Wasm(PathBuf),
SharedLibrary(#[serde(with = "http_serde::uri")] http::Uri),
Python(#[serde(with = "http_serde::uri")] http::Uri),
Wasm(#[serde(with = "http_serde::uri")] http::Uri),
}

impl OperatorSource {
pub fn as_local_path(&self) -> Option<&Path> {
let uri = self.uri();
Self::uri_as_local_path(uri)
}

pub fn canonicalize(&mut self) -> std::io::Result<()> {
let path = match self {
OperatorSource::SharedLibrary(path) => path,
OperatorSource::Python(path) => path,
OperatorSource::Wasm(path) => path,
};
*path = path.canonicalize()?;
if let Some(path) = self.as_local_path() {
*self.uri_mut() = path
.canonicalize()?
.as_os_str()
.as_bytes()
.try_into()
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?;
}
Ok(())
}

fn uri(&self) -> &http::Uri {
match self {
OperatorSource::SharedLibrary(uri) => uri,
OperatorSource::Python(uri) => uri,
OperatorSource::Wasm(uri) => uri,
}
}

fn uri_mut(&mut self) -> &mut http::Uri {
match self {
OperatorSource::SharedLibrary(uri) => uri,
OperatorSource::Python(uri) => uri,
OperatorSource::Wasm(uri) => uri,
}
}

pub fn uri_as_local_path(uri: &http::Uri) -> Option<&Path> {
if uri.path() == uri {
// URI is a local path
Some(Path::new(uri.path()))
} else {
None
}
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]


Loading…
Cancel
Save