From 97b79b9680ea71cab6d81bd3bfdb76f10a108e9e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 27 May 2022 13:21:21 +0200 Subject: [PATCH] Create a `dora-operator-api` crate Rename the previous `dora-api` crate to `dora-node-api`. --- Cargo.lock | 51 +++++++++++-------- Cargo.toml | 5 +- api/{ => node}/Cargo.toml | 2 +- api/{ => node}/src/communication.rs | 0 api/{ => node}/src/config.rs | 0 api/{ => node}/src/lib.rs | 0 api/operator/Cargo.toml | 8 +++ api/operator/src/lib.rs | 42 +++++++++++++++ common/Cargo.toml | 2 +- common/src/descriptor/mod.rs | 2 +- common/src/descriptor/visualize.rs | 2 +- coordinator/Cargo.toml | 2 +- coordinator/examples/example_sink_logger.rs | 2 +- coordinator/examples/example_source_timer.rs | 2 +- coordinator/examples/random_number.rs | 2 +- coordinator/examples/rate_limit.rs | 2 +- coordinator/src/main.rs | 8 +-- runtime/Cargo.toml | 2 +- .../examples/example-operator/Cargo.toml | 1 + .../examples/example-operator/src/lib.rs | 35 +------------ runtime/src/main.rs | 4 +- runtime/src/operator/mod.rs | 2 +- 22 files changed, 102 insertions(+), 74 deletions(-) rename api/{ => node}/Cargo.toml (96%) rename api/{ => node}/src/communication.rs (100%) rename api/{ => node}/src/config.rs (100%) rename api/{ => node}/src/lib.rs (100%) create mode 100644 api/operator/Cargo.toml create mode 100644 api/operator/src/lib.rs rename {coordinator => runtime}/examples/example-operator/Cargo.toml (79%) rename {coordinator => runtime}/examples/example-operator/src/lib.rs (75%) diff --git a/Cargo.lock b/Cargo.lock index b6ef8d50..3805cdc1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -546,30 +546,11 @@ dependencies = [ "winapi", ] -[[package]] -name = "dora-api" -version = "0.1.0" -dependencies = [ - "async-trait", - "eyre", - "futures", - "futures-concurrency", - "serde", - "serde_yaml", - "thiserror", - "tokio", - "tokio-stream", - "tracing", - "uuid", - "zenoh", - "zenoh-config", -] - [[package]] name = "dora-common" version = "0.1.0" dependencies = [ - "dora-api", + "dora-node-api", "eyre", "serde", ] @@ -580,8 +561,8 @@ version = "0.1.0" dependencies = [ "bincode", "clap 3.1.12", - "dora-api", "dora-common", + "dora-node-api", "eyre", "futures", "futures-concurrency", @@ -595,6 +576,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "dora-node-api" +version = "0.1.0" +dependencies = [ + "async-trait", + "eyre", + "futures", + "futures-concurrency", + "serde", + "serde_yaml", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "uuid", + "zenoh", + "zenoh-config", +] + +[[package]] +name = "dora-operator-api" +version = "0.1.0" + [[package]] name = "dora-rs" version = "0.1.0" @@ -621,8 +625,8 @@ name = "dora-runtime" version = "0.1.0" dependencies = [ "clap 3.1.12", - "dora-api", "dora-common", + "dora-node-api", "eyre", "futures", "futures-concurrency", @@ -665,6 +669,9 @@ checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71" [[package]] name = "example-operator" version = "0.1.0" +dependencies = [ + "dora-operator-api", +] [[package]] name = "eyre" diff --git a/Cargo.toml b/Cargo.toml index 0dd0520e..34be3b1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,12 @@ edition = "2021" [workspace] members = [ - "api", + "api/node", + "api/operator", "coordinator", "common", "runtime", - "coordinator/examples/example-operator", + "runtime/examples/example-operator", ] [dependencies] diff --git a/api/Cargo.toml b/api/node/Cargo.toml similarity index 96% rename from api/Cargo.toml rename to api/node/Cargo.toml index 7ff5527a..6e2594d9 100644 --- a/api/Cargo.toml +++ b/api/node/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dora-api" +name = "dora-node-api" version = "0.1.0" edition = "2021" diff --git a/api/src/communication.rs b/api/node/src/communication.rs similarity index 100% rename from api/src/communication.rs rename to api/node/src/communication.rs diff --git a/api/src/config.rs b/api/node/src/config.rs similarity index 100% rename from api/src/config.rs rename to api/node/src/config.rs diff --git a/api/src/lib.rs b/api/node/src/lib.rs similarity index 100% rename from api/src/lib.rs rename to api/node/src/lib.rs diff --git a/api/operator/Cargo.toml b/api/operator/Cargo.toml new file mode 100644 index 00000000..baf6c0e3 --- /dev/null +++ b/api/operator/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "dora-operator-api" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/api/operator/src/lib.rs b/api/operator/src/lib.rs new file mode 100644 index 00000000..d9fbc904 --- /dev/null +++ b/api/operator/src/lib.rs @@ -0,0 +1,42 @@ +use std::ffi::c_void; + +pub trait DoraOperator { + fn on_input( + &mut self, + id: &str, + data: &[u8], + output_sender: &mut DoraOutputSender, + ) -> Result<(), ()>; +} + +pub struct DoraOutputSender { + pub output_fn_raw: OutputFnRaw, + pub output_context: *const c_void, +} + +impl DoraOutputSender { + pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> { + println!("operator sending output.."); + let result = unsafe { + (self.output_fn_raw)( + id.as_ptr(), + id.len(), + data.as_ptr(), + data.len(), + self.output_context, + ) + }; + match result { + 0 => Ok(()), + other => Err(other), + } + } +} + +pub type OutputFnRaw = unsafe extern "C" fn( + id_start: *const u8, + id_len: usize, + data_start: *const u8, + data_len: usize, + output_context: *const c_void, +) -> isize; diff --git a/common/Cargo.toml b/common/Cargo.toml index 9f576fa1..27de99d7 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -6,6 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-api = { version = "0.1.0", path = "../api" } +dora-node-api = { version = "0.1.0", path = "../api/node" } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } diff --git a/common/src/descriptor/mod.rs b/common/src/descriptor/mod.rs index df4512be..b1524476 100644 --- a/common/src/descriptor/mod.rs +++ b/common/src/descriptor/mod.rs @@ -1,4 +1,4 @@ -use dora_api::config::{ +use dora_node_api::config::{ CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId, }; use serde::{Deserialize, Serialize}; diff --git a/common/src/descriptor/visualize.rs b/common/src/descriptor/visualize.rs index f06e3ef1..a52835d9 100644 --- a/common/src/descriptor/visualize.rs +++ b/common/src/descriptor/visualize.rs @@ -1,4 +1,4 @@ -use dora_api::config::{DataId, InputMapping, NodeId}; +use dora_node_api::config::{DataId, InputMapping, NodeId}; use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode}; use std::collections::{BTreeMap, HashMap}; diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 9bf7ecc6..8ce99741 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] bincode = "1.3.3" -dora-api = { path = "../api" } +dora-node-api = { path = "../api/node" } eyre = "0.6.7" futures = "0.3.21" serde = { version = "1.0.136", features = ["derive"] } diff --git a/coordinator/examples/example_sink_logger.rs b/coordinator/examples/example_sink_logger.rs index 956feb09..dc56630f 100644 --- a/coordinator/examples/example_sink_logger.rs +++ b/coordinator/examples/example_sink_logger.rs @@ -1,4 +1,4 @@ -use dora_api::{self, DoraNode}; +use dora_node_api::{self, DoraNode}; use eyre::bail; use futures::StreamExt; use std::time::Duration; diff --git a/coordinator/examples/example_source_timer.rs b/coordinator/examples/example_source_timer.rs index 932cbf1f..c71493fa 100644 --- a/coordinator/examples/example_source_timer.rs +++ b/coordinator/examples/example_source_timer.rs @@ -1,4 +1,4 @@ -use dora_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, config::DataId, DoraNode}; use std::time::Duration; use time::OffsetDateTime; diff --git a/coordinator/examples/random_number.rs b/coordinator/examples/random_number.rs index 5fe4adc5..5d0f97a1 100644 --- a/coordinator/examples/random_number.rs +++ b/coordinator/examples/random_number.rs @@ -1,4 +1,4 @@ -use dora_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, config::DataId, DoraNode}; use eyre::bail; use futures::StreamExt; use std::time::Duration; diff --git a/coordinator/examples/rate_limit.rs b/coordinator/examples/rate_limit.rs index 4647771d..4b581f13 100644 --- a/coordinator/examples/rate_limit.rs +++ b/coordinator/examples/rate_limit.rs @@ -1,5 +1,5 @@ use clap::StructOpt; -use dora_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, config::DataId, DoraNode}; use eyre::bail; use futures::StreamExt; use std::time::{Duration, Instant}; diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index fadd3cce..b7869505 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,5 +1,5 @@ -use dora_api::config::NodeId; use dora_common::descriptor::{self, Descriptor}; +use dora_node_api::config::NodeId; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::path::{Path, PathBuf}; @@ -95,7 +95,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<() fn spawn_custom_node( node_id: NodeId, node: &descriptor::CustomNode, - communication: &dora_api::config::CommunicationConfig, + communication: &dora_node_api::config::CommunicationConfig, ) -> eyre::Result>> { let mut args = node.run.split_ascii_whitespace(); let cmd = args @@ -130,7 +130,7 @@ fn spawn_runtime_node( runtime: &Path, node_id: NodeId, node: &descriptor::RuntimeNode, - communication: &dora_api::config::CommunicationConfig, + communication: &dora_node_api::config::CommunicationConfig, ) -> eyre::Result>> { let mut command = tokio::process::Command::new(runtime); command_init_common_env(&mut command, &node_id, communication)?; @@ -160,7 +160,7 @@ fn spawn_runtime_node( fn command_init_common_env( command: &mut tokio::process::Command, node_id: &NodeId, - communication: &dora_api::config::CommunicationConfig, + communication: &dora_node_api::config::CommunicationConfig, ) -> Result<(), eyre::Error> { command.env( "DORA_NODE_ID", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 4f4e02bb..86a1034d 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [dependencies] clap = { version = "3.1.12", features = ["derive"] } -dora-api = { path = "../api" } +dora-node-api = { path = "../api/node" } dora-common = { version = "0.1.0", path = "../common" } eyre = "0.6.8" futures = "0.3.21" diff --git a/coordinator/examples/example-operator/Cargo.toml b/runtime/examples/example-operator/Cargo.toml similarity index 79% rename from coordinator/examples/example-operator/Cargo.toml rename to runtime/examples/example-operator/Cargo.toml index 5d4ee761..956da5af 100644 --- a/coordinator/examples/example-operator/Cargo.toml +++ b/runtime/examples/example-operator/Cargo.toml @@ -9,3 +9,4 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] +dora-operator-api = { path = "../../../api/operator" } diff --git a/coordinator/examples/example-operator/src/lib.rs b/runtime/examples/example-operator/src/lib.rs similarity index 75% rename from coordinator/examples/example-operator/src/lib.rs rename to runtime/examples/example-operator/src/lib.rs index ad325d48..82e1929b 100644 --- a/coordinator/examples/example-operator/src/lib.rs +++ b/runtime/examples/example-operator/src/lib.rs @@ -1,5 +1,6 @@ #![warn(unsafe_op_in_unsafe_fn)] +use dora_operator_api::{DoraOperator, DoraOutputSender, OutputFnRaw}; use std::{ffi::c_void, slice}; #[no_mangle] @@ -17,14 +18,6 @@ pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut ()) { unsafe { Box::from_raw(raw) }; } -type OutputFnRaw = unsafe extern "C" fn( - id_start: *const u8, - id_len: usize, - data_start: *const u8, - data_len: usize, - output_context: *const c_void, -) -> isize; - #[no_mangle] pub unsafe extern "C" fn dora_on_input( id_start: *const u8, @@ -53,36 +46,12 @@ pub unsafe extern "C" fn dora_on_input( } } -struct DoraOutputSender { - output_fn_raw: OutputFnRaw, - output_context: *const c_void, -} - -impl DoraOutputSender { - pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> { - println!("operator sending output.."); - let result = unsafe { - (self.output_fn_raw)( - id.as_ptr(), - id.len(), - data.as_ptr(), - data.len(), - self.output_context, - ) - }; - match result { - 0 => Ok(()), - other => Err(other), - } - } -} - #[derive(Debug, Default)] struct Operator { time: Option, } -impl Operator { +impl DoraOperator for Operator { fn on_input( &mut self, id: &str, diff --git a/runtime/src/main.rs b/runtime/src/main.rs index 6e4d79d1..e5eaa0ed 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -1,12 +1,12 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_api::{ +use dora_common::{descriptor::OperatorConfig, BoxError}; +use dora_node_api::{ self, communication::CommunicationLayer, config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, STOP_TOPIC, }; -use dora_common::{descriptor::OperatorConfig, BoxError}; use eyre::{bail, eyre, Context}; use futures::{stream::FuturesUnordered, StreamExt}; use futures_concurrency::Merge; diff --git a/runtime/src/operator/mod.rs b/runtime/src/operator/mod.rs index 2e54afba..732042fd 100644 --- a/runtime/src/operator/mod.rs +++ b/runtime/src/operator/mod.rs @@ -1,5 +1,5 @@ -use dora_api::config::DataId; use dora_common::descriptor::{OperatorConfig, OperatorSource}; +use dora_node_api::config::DataId; use eyre::{eyre, Context}; use std::any::Any; use tokio::sync::mpsc::{self, Sender};