diff --git a/Cargo.lock b/Cargo.lock index 86d21a2d..e3a37e2a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -925,10 +925,11 @@ dependencies = [ name = "dora-core" version = "0.1.0" dependencies = [ - "dora-node-api", "eyre", + "once_cell", "serde", "serde_yaml 0.9.11", + "zenoh-config", ] [[package]] @@ -966,6 +967,7 @@ version = "0.1.0" dependencies = [ "capnp", "communication-layer-pub-sub", + "dora-core", "dora-message", "eyre", "flume", diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 7fec3e78..0c5cc7b7 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,6 +1,6 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] -use dora_node_api::{config::NodeId, DoraNode, Input}; +use dora_node_api::{core::config::NodeId, DoraNode, Input}; use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata}; use eyre::{Context, Result}; use flume::Receiver; diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index bffebcd0..bc7f2f34 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -23,6 +23,7 @@ communication-layer-pub-sub = { path = "../../../libraries/communication-layer", uuid = { version = "1.1.2", features = ["v4"] } capnp = "0.14.9" dora-message = { path = "../../../libraries/message" } +dora-core = { path = "../../../libraries/core" } [dev-dependencies] tokio = { version = "1.17.0", features = ["rt"] } diff --git a/apis/rust/node/src/communication.rs b/apis/rust/node/src/communication.rs index 49022022..c9245b7f 100644 --- a/apis/rust/node/src/communication.rs +++ b/apis/rust/node/src/communication.rs @@ -1,11 +1,8 @@ +use crate::BoxError; use communication_layer_pub_sub::ReceivedSample; pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber}; +use dora_core::config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}; use dora_message::Metadata; - -use crate::{ - config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId}, - BoxError, -}; use eyre::Context; use std::{ borrow::Cow, diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index b75e7306..8fa5350e 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -1,14 +1,13 @@ pub use communication::Input; -pub use dora_message::Metadata; -pub use flume::Receiver; - use communication::STOP_TOPIC; use communication_layer_pub_sub::CommunicationLayer; -use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; +pub use dora_core as core; +use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig}; +pub use dora_message::Metadata; use eyre::WrapErr; +pub use flume::Receiver; pub mod communication; -pub mod config; pub struct DoraNode { id: NodeId, @@ -144,6 +143,8 @@ fn set_up_tracing() -> eyre::Result<()> { #[cfg(test)] mod tests { + use dora_core::config; + use super::*; #[test] diff --git a/binaries/cli/src/build.rs b/binaries/cli/src/build.rs index 48867dee..4b308ba6 100644 --- a/binaries/cli/src/build.rs +++ b/binaries/cli/src/build.rs @@ -1,5 +1,5 @@ use crate::graph; -use dora_core::descriptor::{OperatorId, SINGLE_OPERATOR_DEFAULT_ID}; +use dora_core::{config::OperatorId, descriptor::SINGLE_OPERATOR_DEFAULT_ID}; use eyre::{eyre, Context}; use std::{path::Path, process::Command}; diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 6643d483..01ce395c 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,7 +1,8 @@ use crate::graph::read_descriptor; use dora_core::{ adjust_shared_library_path, - descriptor::{self, CoreNodeKind, InputMapping, OperatorSource, UserInputMapping}, + config::{InputMapping, UserInputMapping}, + descriptor::{self, CoreNodeKind, OperatorSource}, }; use eyre::{bail, eyre, Context}; use std::{env::consts::EXE_EXTENSION, path::Path}; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 8e59fe72..84242119 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -1,9 +1,12 @@ use crate::run::spawn_dataflow; -use dora_core::topics::{ - StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, - ZENOH_CONTROL_STOP, +use dora_core::{ + config::CommunicationConfig, + topics::{ + StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START, + ZENOH_CONTROL_STOP, + }, }; -use dora_node_api::{communication, config::CommunicationConfig}; +use dora_node_api::communication; use eyre::{bail, eyre, WrapErr}; use futures::StreamExt; use futures_concurrency::stream::Merge; @@ -188,7 +191,7 @@ async fn start_dataflow( uuid, communication_config, tasks, - } = spawn_dataflow(&runtime_path, &path).await?; + } = spawn_dataflow(&runtime_path, path).await?; let path = path.to_owned(); let task = async move { let result = await_tasks(tasks) diff --git a/binaries/coordinator/src/run/custom.rs b/binaries/coordinator/src/run/custom.rs index 64c11a59..e3411353 100644 --- a/binaries/coordinator/src/run/custom.rs +++ b/binaries/coordinator/src/run/custom.rs @@ -1,6 +1,5 @@ use super::command_init_common_env; -use dora_core::descriptor; -use dora_node_api::config::NodeId; +use dora_core::{config::NodeId, descriptor}; use eyre::{eyre, WrapErr}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -8,7 +7,7 @@ use std::{env::consts::EXE_EXTENSION, path::Path}; pub(super) fn spawn_custom_node( node_id: NodeId, node: &descriptor::CustomNode, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { let mut args = node.run.split_ascii_whitespace(); diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 824a0415..b8e34b83 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,9 +1,9 @@ use self::{custom::spawn_custom_node, runtime::spawn_runtime_node}; -use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId}; -use dora_node_api::{ - communication, - config::{format_duration, CommunicationConfig}, +use dora_core::{ + config::{format_duration, CommunicationConfig, NodeId}, + descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor}, }; +use dora_node_api::communication; use eyre::{bail, eyre, WrapErr}; use futures::{stream::FuturesUnordered, StreamExt}; use std::{env::consts::EXE_EXTENSION, path::Path}; @@ -20,7 +20,7 @@ pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result< pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result { let mut runtime = runtime.with_extension(EXE_EXTENSION); - let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| { + let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| { format!( "failed to read dataflow descriptor at {}", dataflow_path.display() @@ -144,7 +144,7 @@ async fn read_descriptor(file: &Path) -> Result { fn command_init_common_env( command: &mut tokio::process::Command, node_id: &NodeId, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, ) -> Result<(), eyre::Error> { command.env( "DORA_NODE_ID", diff --git a/binaries/coordinator/src/run/runtime.rs b/binaries/coordinator/src/run/runtime.rs index 4f7d7082..a0bf2797 100644 --- a/binaries/coordinator/src/run/runtime.rs +++ b/binaries/coordinator/src/run/runtime.rs @@ -1,6 +1,5 @@ use super::command_init_common_env; -use dora_core::descriptor; -use dora_node_api::config::NodeId; +use dora_core::{config::NodeId, descriptor}; use eyre::{eyre, WrapErr}; use std::path::Path; @@ -9,7 +8,7 @@ pub fn spawn_runtime_node( runtime: &Path, node_id: NodeId, node: &descriptor::RuntimeNode, - communication: &dora_node_api::config::CommunicationConfig, + communication: &dora_core::config::CommunicationConfig, working_dir: &Path, ) -> eyre::Result>> { let mut command = tokio::process::Command::new(runtime); diff --git a/binaries/runtime/src/main.rs b/binaries/runtime/src/main.rs index fda84724..9253e798 100644 --- a/binaries/runtime/src/main.rs +++ b/binaries/runtime/src/main.rs @@ -1,10 +1,12 @@ #![warn(unsafe_op_in_unsafe_fn)] -use dora_core::descriptor::OperatorDefinition; +use dora_core::{ + config::{CommunicationConfig, DataId, NodeId, OperatorId}, + descriptor::OperatorDefinition, +}; use dora_node_api::{ self, communication::{self, CommunicationLayer, Publisher, STOP_TOPIC}, - config::{CommunicationConfig, DataId, NodeId, OperatorId}, }; use eyre::{bail, Context}; use futures::{Stream, StreamExt}; diff --git a/binaries/runtime/src/operator/mod.rs b/binaries/runtime/src/operator/mod.rs index 036ef926..f7801df6 100644 --- a/binaries/runtime/src/operator/mod.rs +++ b/binaries/runtime/src/operator/mod.rs @@ -1,8 +1,8 @@ -use dora_core::descriptor::{OperatorDefinition, OperatorSource}; -use dora_node_api::{ - communication::{self, CommunicationLayer}, +use dora_core::{ config::NodeId, + descriptor::{OperatorDefinition, OperatorSource}, }; +use dora_node_api::communication::{self, CommunicationLayer}; use eyre::Context; #[cfg(feature = "tracing")] use opentelemetry::sdk::trace::Tracer; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index fb90ec86..7c60dd97 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -1,7 +1,8 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] use super::{OperatorEvent, Tracer}; -use dora_node_api::{communication::Publisher, config::DataId}; +use dora_core::config::DataId; +use dora_node_api::communication::Publisher; use dora_operator_api_python::metadata_to_pydict; use eyre::{bail, eyre, Context}; use pyo3::{ diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index dd524734..7da7670d 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,6 +1,6 @@ use super::{OperatorEvent, Tracer}; -use dora_core::adjust_shared_library_path; -use dora_node_api::{communication::Publisher, config::DataId}; +use dora_core::{adjust_shared_library_path, config::DataId}; +use dora_node_api::communication::Publisher; use dora_operator_api_types::{ safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput, DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput, diff --git a/examples/iceoryx/node/src/main.rs b/examples/iceoryx/node/src/main.rs index 06437786..7ca69c08 100644 --- a/examples/iceoryx/node/src/main.rs +++ b/examples/iceoryx/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index 06437786..7ca69c08 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, config::DataId, DoraNode}; +use dora_node_api::{self, core::config::DataId, DoraNode}; fn main() -> eyre::Result<()> { let output = DataId::from("random".to_owned()); diff --git a/libraries/core/Cargo.toml b/libraries/core/Cargo.toml index 793bc8ef..af7fe3fe 100644 --- a/libraries/core/Cargo.toml +++ b/libraries/core/Cargo.toml @@ -7,7 +7,8 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -dora-node-api = { version = "0.1.0", path = "../../apis/rust/node" } eyre = "0.6.8" serde = { version = "1.0.136", features = ["derive"] } serde_yaml = "0.9.11" +once_cell = "1.13.0" +zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" } diff --git a/apis/rust/node/src/config.rs b/libraries/core/src/config.rs similarity index 99% rename from apis/rust/node/src/config.rs rename to libraries/core/src/config.rs index 201d6341..2a880802 100644 --- a/apis/rust/node/src/config.rs +++ b/libraries/core/src/config.rs @@ -1,4 +1,3 @@ -use communication_layer_pub_sub::zenoh::zenoh_config; use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; use std::{ @@ -10,15 +9,6 @@ use std::{ time::Duration, }; -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(deny_unknown_fields)] -pub struct NodeRunConfig { - #[serde(default)] - pub inputs: BTreeMap, - #[serde(default)] - pub outputs: BTreeSet, -} - #[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)] pub struct NodeId(String); @@ -160,22 +150,6 @@ impl fmt::Display for InputMapping { } } -pub struct FormattedDuration(pub Duration); - -impl fmt::Display for FormattedDuration { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - if self.0.subsec_millis() == 0 { - write!(f, "secs/{}", self.0.as_secs()) - } else { - write!(f, "millis/{}", self.0.as_millis()) - } - } -} - -pub fn format_duration(interval: Duration) -> FormattedDuration { - FormattedDuration(interval) -} - impl Serialize for InputMapping { fn serialize(&self, serializer: S) -> Result where @@ -262,6 +236,31 @@ pub struct UserInputMapping { pub output: DataId, } +pub struct FormattedDuration(pub Duration); + +impl fmt::Display for FormattedDuration { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0.subsec_millis() == 0 { + write!(f, "secs/{}", self.0.as_secs()) + } else { + write!(f, "millis/{}", self.0.as_millis()) + } + } +} + +pub fn format_duration(interval: Duration) -> FormattedDuration { + FormattedDuration(interval) +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct NodeRunConfig { + #[serde(default)] + pub inputs: BTreeMap, + #[serde(default)] + pub outputs: BTreeSet, +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(deny_unknown_fields, rename_all = "lowercase")] pub enum CommunicationConfig { diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index 4d8ffb77..664184db 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -1,5 +1,3 @@ -use dora_node_api::config::{CommunicationConfig, NodeRunConfig}; -pub use dora_node_api::config::{DataId, InputMapping, NodeId, OperatorId, UserInputMapping}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; @@ -9,6 +7,8 @@ use std::{ }; pub use visualize::collect_dora_timers; +use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId}; + mod visualize; #[derive(Debug, Serialize, Deserialize)] diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index a3e25643..fac88464 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,5 +1,5 @@ use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode}; -use dora_node_api::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; +use crate::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, fmt::Write as _, diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index 74220733..997a3204 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -4,6 +4,7 @@ use std::{ path::Path, }; +pub mod config; pub mod descriptor; pub mod topics;