diff --git a/Cargo.lock b/Cargo.lock index f6423d02..b5bb5045 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -926,8 +926,6 @@ name = "dora-core" version = "0.1.0" dependencies = [ "eyre", - "http", - "http-serde", "once_cell", "serde", "serde_yaml 0.9.11", @@ -1066,7 +1064,6 @@ dependencies = [ "flume", "futures", "futures-concurrency 2.0.3", - "http", "libloading", "opentelemetry", "opentelemetry-system-metrics", @@ -1538,16 +1535,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-serde" -version = "1.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e272971f774ba29341db2f686255ff8a979365a26fb9e4277f6b6d9ec0cdd5e" -dependencies = [ - "http", - "serde", -] - [[package]] name = "httparse" version = "1.7.1" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 335751da..f81b5114 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -55,23 +55,32 @@ pub fn check(dataflow_path: &Path, runtime: &Path) -> eyre::Result<()> { } descriptor::CoreNodeKind::Runtime(node) => { for operator_definition in &node.operators { - if let Some(path) = operator_definition.config.source.as_local_path() { - match &operator_definition.config.source { - OperatorSource::SharedLibrary(_) => { - let path = adjust_shared_library_path(path)?; - + match &operator_definition.config.source { + OperatorSource::SharedLibrary(path) => { + if OperatorSource::is_url(path) { + todo!("check URL"); + } else { + let path = adjust_shared_library_path(Path::new(&path))?; if !base.join(&path).exists() { bail!("no shared library at `{}`", path.display()); } } - OperatorSource::Python(_) => { + } + OperatorSource::Python(path) => { + if OperatorSource::is_url(path) { + todo!("check URL"); + } else { if !base.join(&path).exists() { - bail!("no Python library at `{}`", path.display()); + bail!("no Python library at `{path}`"); } } - OperatorSource::Wasm(_) => { + } + OperatorSource::Wasm(path) => { + if OperatorSource::is_url(path) { + todo!("check URL"); + } else { if !base.join(&path).exists() { - bail!("no WASM library at `{}`", path.display()); + bail!("no WASM library at `{path}`"); } } } diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index 4119c6b5..163fa3e6 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -37,7 +37,6 @@ flume = "0.10.14" dora-message = { path = "../../libraries/message" } tracing = "0.1.36" tracing-subscriber = "0.3.15" -http = "0.2.8" tempfile = "3.3.0" reqwest = "0.11.12" diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index b76edbaa..2b422ec1 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -55,23 +55,25 @@ pub fn spawn_operator( let tracer = (); match &operator_definition.config.source { - OperatorSource::SharedLibrary(uri) => { - shared_lib::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { - format!( - "failed to spawn shared library operator for {}", - operator_definition.id - ) - })?; + OperatorSource::SharedLibrary(source) => { + shared_lib::spawn(source, events_tx, inputs, publishers, tracer).wrap_err_with( + || { + format!( + "failed to spawn shared library operator for {}", + operator_definition.id + ) + }, + )?; } - OperatorSource::Python(uri) => { - python::spawn(uri, events_tx, inputs, publishers, tracer).wrap_err_with(|| { + OperatorSource::Python(source) => { + python::spawn(source, events_tx, inputs, publishers, tracer).wrap_err_with(|| { format!( "failed to spawn Python operator for {}", operator_definition.id ) })?; } - OperatorSource::Wasm(_uri) => { + OperatorSource::Wasm(_) => { tracing::error!("WASM operators are not supported yet"); } } @@ -84,12 +86,14 @@ pub enum OperatorEvent { Finished, } -fn download_file(uri: &http::Uri) -> Result { - let uri_str = uri.to_string(); +fn download_file(url: T) -> Result +where + T: reqwest::IntoUrl + std::fmt::Display + Copy, +{ let response = tokio::runtime::Handle::current().block_on(async { - reqwest::get(&uri_str) + reqwest::get(url) .await - .wrap_err_with(|| format!("failed to request operator from `{uri_str}`"))? + .wrap_err_with(|| format!("failed to request operator from `{url}`"))? .bytes() .await .wrap_err("failed to read operator from `{uri}`") diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 8801955c..9ffc6be8 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -15,6 +15,7 @@ use std::{ borrow::Cow, collections::HashMap, panic::{catch_unwind, AssertUnwindSafe}, + path::Path, sync::Arc, thread, }; @@ -33,21 +34,21 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { } pub fn spawn( - uri: &http::Uri, + source: &str, events_tx: Sender, inputs: flume::Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if let Some(path) = OperatorSource::uri_as_local_path(uri) { - path.to_owned() - } else { - // try to download the Python file - let tmp = download_file(uri).wrap_err("failed to download Python operator")?; + let path = if OperatorSource::is_url(source) { + // try to download the shared library + let tmp = download_file(source).wrap_err("failed to download Python operator")?; let path = tmp.path().to_owned(); temp_file = Some(tmp); path + } else { + Path::new(source).to_owned() }; if !path.exists() { diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index d34e1043..f2bb51c8 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -13,27 +13,28 @@ use std::{ ffi::c_void, ops::Deref, panic::{catch_unwind, AssertUnwindSafe}, + path::Path, sync::Arc, thread, }; use tokio::sync::mpsc::Sender; pub fn spawn( - uri: &http::Uri, + source: &str, events_tx: Sender, inputs: Receiver, publishers: HashMap>, tracer: Tracer, ) -> eyre::Result<()> { let mut temp_file = None; - let path = if let Some(path) = OperatorSource::uri_as_local_path(&uri) { - adjust_shared_library_path(path)? - } else { + let path = if OperatorSource::is_url(source) { // try to download the shared library - let tmp = download_file(uri).wrap_err("failed to download shared library operator")?; + let tmp = download_file(source).wrap_err("failed to download shared library operator")?; let path = tmp.path().to_owned(); temp_file = Some(tmp); path + } else { + adjust_shared_library_path(Path::new(source))? }; let library = unsafe { diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 7bd580dd..af7fe3fe 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -12,5 +12,3 @@ serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" once_cell = "1.13.0" zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } -http = "0.2.8" -http-serde = "1.1.2" diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index be166979..8719229e 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,11 +1,8 @@ use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; -use eyre::eyre; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::fmt; -use std::path::Path; use std::{ - collections::{BTreeMap, BTreeSet}, + collections::{BTreeMap, BTreeSet, HashMap}, + fmt, path::PathBuf, }; pub use visualize::collect_dora_timers; @@ -167,57 +164,14 @@ pub struct OperatorConfig { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "kebab-case")] pub enum OperatorSource { - SharedLibrary(#[serde(with = "http_serde::uri")] http::Uri), - Python(#[serde(with = "http_serde::uri")] http::Uri), - Wasm(#[serde(with = "http_serde::uri")] http::Uri), + SharedLibrary(String), + Python(String), + Wasm(String), } impl OperatorSource { - pub fn as_local_path(&self) -> Option<&Path> { - let uri = self.uri(); - Self::uri_as_local_path(uri) - } - - pub fn canonicalize(&mut self) -> std::io::Result<()> { - if let Some(path) = self.as_local_path() { - *self.uri_mut() = path - .canonicalize()? - .to_str() - .ok_or_else(|| { - std::io::Error::new( - std::io::ErrorKind::Other, - eyre!("operator path is invalid utf8"), - ) - })? - .try_into() - .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - } - Ok(()) - } - - fn uri(&self) -> &http::Uri { - match self { - OperatorSource::SharedLibrary(uri) => uri, - OperatorSource::Python(uri) => uri, - OperatorSource::Wasm(uri) => uri, - } - } - - fn uri_mut(&mut self) -> &mut http::Uri { - match self { - OperatorSource::SharedLibrary(uri) => uri, - OperatorSource::Python(uri) => uri, - OperatorSource::Wasm(uri) => uri, - } - } - - pub fn uri_as_local_path(uri: &http::Uri) -> Option<&Path> { - if uri.path() == uri { - // URI is a local path - Some(Path::new(uri.path())) - } else { - None - } + pub fn is_url(source: &str) -> bool { + source.contains("://") } }