Browse Source

Refactor: Move configuration to `core` crate

tags/v0.0.0-test-pr-120
Philipp Oppermann 3 years ago
parent
commit
c4da2cbdad
Failed to extract signature
22 changed files with 78 additions and 71 deletions
  1. +3
    -1
      Cargo.lock
  2. +1
    -1
      apis/python/node/src/lib.rs
  3. +1
    -0
      apis/rust/node/Cargo.toml
  4. +2
    -5
      apis/rust/node/src/communication.rs
  5. +6
    -5
      apis/rust/node/src/lib.rs
  6. +1
    -1
      binaries/cli/src/build.rs
  7. +2
    -1
      binaries/cli/src/check.rs
  8. +8
    -5
      binaries/coordinator/src/lib.rs
  9. +2
    -3
      binaries/coordinator/src/run/custom.rs
  10. +6
    -6
      binaries/coordinator/src/run/mod.rs
  11. +2
    -3
      binaries/coordinator/src/run/runtime.rs
  12. +4
    -2
      binaries/runtime/src/main.rs
  13. +3
    -3
      binaries/runtime/src/operator/mod.rs
  14. +2
    -1
      binaries/runtime/src/operator/python.rs
  15. +2
    -2
      binaries/runtime/src/operator/shared_lib.rs
  16. +1
    -1
      examples/iceoryx/node/src/main.rs
  17. +1
    -1
      examples/rust-dataflow/node/src/main.rs
  18. +2
    -1
      libraries/core/Cargo.toml
  19. +25
    -26
      libraries/core/src/config.rs
  20. +2
    -2
      libraries/core/src/descriptor/mod.rs
  21. +1
    -1
      libraries/core/src/descriptor/visualize.rs
  22. +1
    -0
      libraries/core/src/lib.rs

+ 3
- 1
Cargo.lock View File

@@ -925,10 +925,11 @@ dependencies = [
name = "dora-core"
version = "0.1.0"
dependencies = [
"dora-node-api",
"eyre",
"once_cell",
"serde",
"serde_yaml 0.9.11",
"zenoh-config",
]

[[package]]
@@ -966,6 +967,7 @@ version = "0.1.0"
dependencies = [
"capnp",
"communication-layer-pub-sub",
"dora-core",
"dora-message",
"eyre",
"flume",


+ 1
- 1
apis/python/node/src/lib.rs View File

@@ -1,6 +1,6 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use dora_node_api::{config::NodeId, DoraNode, Input};
use dora_node_api::{core::config::NodeId, DoraNode, Input};
use dora_operator_api_python::{metadata_to_pydict, pydict_to_metadata};
use eyre::{Context, Result};
use flume::Receiver;


+ 1
- 0
apis/rust/node/Cargo.toml View File

@@ -23,6 +23,7 @@ communication-layer-pub-sub = { path = "../../../libraries/communication-layer",
uuid = { version = "1.1.2", features = ["v4"] }
capnp = "0.14.9"
dora-message = { path = "../../../libraries/message" }
dora-core = { path = "../../../libraries/core" }

[dev-dependencies]
tokio = { version = "1.17.0", features = ["rt"] }

+ 2
- 5
apis/rust/node/src/communication.rs View File

@@ -1,11 +1,8 @@
use crate::BoxError;
use communication_layer_pub_sub::ReceivedSample;
pub use communication_layer_pub_sub::{CommunicationLayer, Publisher, Subscriber};
use dora_core::config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId};
use dora_message::Metadata;

use crate::{
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
BoxError,
};
use eyre::Context;
use std::{
borrow::Cow,


+ 6
- 5
apis/rust/node/src/lib.rs View File

@@ -1,14 +1,13 @@
pub use communication::Input;
pub use dora_message::Metadata;
pub use flume::Receiver;

use communication::STOP_TOPIC;
use communication_layer_pub_sub::CommunicationLayer;
use config::{CommunicationConfig, DataId, NodeId, NodeRunConfig};
pub use dora_core as core;
use dora_core::config::{CommunicationConfig, DataId, NodeId, NodeRunConfig};
pub use dora_message::Metadata;
use eyre::WrapErr;
pub use flume::Receiver;

pub mod communication;
pub mod config;

pub struct DoraNode {
id: NodeId,
@@ -144,6 +143,8 @@ fn set_up_tracing() -> eyre::Result<()> {

#[cfg(test)]
mod tests {
use dora_core::config;

use super::*;

#[test]


+ 1
- 1
binaries/cli/src/build.rs View File

@@ -1,5 +1,5 @@
use crate::graph;
use dora_core::descriptor::{OperatorId, SINGLE_OPERATOR_DEFAULT_ID};
use dora_core::{config::OperatorId, descriptor::SINGLE_OPERATOR_DEFAULT_ID};
use eyre::{eyre, Context};
use std::{path::Path, process::Command};



+ 2
- 1
binaries/cli/src/check.rs View File

@@ -1,7 +1,8 @@
use crate::graph::read_descriptor;
use dora_core::{
adjust_shared_library_path,
descriptor::{self, CoreNodeKind, InputMapping, OperatorSource, UserInputMapping},
config::{InputMapping, UserInputMapping},
descriptor::{self, CoreNodeKind, OperatorSource},
};
use eyre::{bail, eyre, Context};
use std::{env::consts::EXE_EXTENSION, path::Path};


+ 8
- 5
binaries/coordinator/src/lib.rs View File

@@ -1,9 +1,12 @@
use crate::run::spawn_dataflow;
use dora_core::topics::{
StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START,
ZENOH_CONTROL_STOP,
use dora_core::{
config::CommunicationConfig,
topics::{
StartDataflowResult, StopDataflowResult, ZENOH_CONTROL_DESTROY, ZENOH_CONTROL_START,
ZENOH_CONTROL_STOP,
},
};
use dora_node_api::{communication, config::CommunicationConfig};
use dora_node_api::communication;
use eyre::{bail, eyre, WrapErr};
use futures::StreamExt;
use futures_concurrency::stream::Merge;
@@ -188,7 +191,7 @@ async fn start_dataflow(
uuid,
communication_config,
tasks,
} = spawn_dataflow(&runtime_path, &path).await?;
} = spawn_dataflow(&runtime_path, path).await?;
let path = path.to_owned();
let task = async move {
let result = await_tasks(tasks)


+ 2
- 3
binaries/coordinator/src/run/custom.rs View File

@@ -1,6 +1,5 @@
use super::command_init_common_env;
use dora_core::descriptor;
use dora_node_api::config::NodeId;
use dora_core::{config::NodeId, descriptor};
use eyre::{eyre, WrapErr};
use std::{env::consts::EXE_EXTENSION, path::Path};

@@ -8,7 +7,7 @@ use std::{env::consts::EXE_EXTENSION, path::Path};
pub(super) fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
communication: &dora_node_api::config::CommunicationConfig,
communication: &dora_core::config::CommunicationConfig,
working_dir: &Path,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();


+ 6
- 6
binaries/coordinator/src/run/mod.rs View File

@@ -1,9 +1,9 @@
use self::{custom::spawn_custom_node, runtime::spawn_runtime_node};
use dora_core::descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor, NodeId};
use dora_node_api::{
communication,
config::{format_duration, CommunicationConfig},
use dora_core::{
config::{format_duration, CommunicationConfig, NodeId},
descriptor::{self, collect_dora_timers, CoreNodeKind, Descriptor},
};
use dora_node_api::communication;
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::{env::consts::EXE_EXTENSION, path::Path};
@@ -20,7 +20,7 @@ pub async fn run_dataflow(dataflow_path: &Path, runtime: &Path) -> eyre::Result<

pub async fn spawn_dataflow(runtime: &Path, dataflow_path: &Path) -> eyre::Result<SpawnedDataflow> {
let mut runtime = runtime.with_extension(EXE_EXTENSION);
let descriptor = read_descriptor(&dataflow_path).await.wrap_err_with(|| {
let descriptor = read_descriptor(dataflow_path).await.wrap_err_with(|| {
format!(
"failed to read dataflow descriptor at {}",
dataflow_path.display()
@@ -144,7 +144,7 @@ async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> {
fn command_init_common_env(
command: &mut tokio::process::Command,
node_id: &NodeId,
communication: &dora_node_api::config::CommunicationConfig,
communication: &dora_core::config::CommunicationConfig,
) -> Result<(), eyre::Error> {
command.env(
"DORA_NODE_ID",


+ 2
- 3
binaries/coordinator/src/run/runtime.rs View File

@@ -1,6 +1,5 @@
use super::command_init_common_env;
use dora_core::descriptor;
use dora_node_api::config::NodeId;
use dora_core::{config::NodeId, descriptor};
use eyre::{eyre, WrapErr};
use std::path::Path;

@@ -9,7 +8,7 @@ pub fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
node: &descriptor::RuntimeNode,
communication: &dora_node_api::config::CommunicationConfig,
communication: &dora_core::config::CommunicationConfig,
working_dir: &Path,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);


+ 4
- 2
binaries/runtime/src/main.rs View File

@@ -1,10 +1,12 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_core::descriptor::OperatorDefinition;
use dora_core::{
config::{CommunicationConfig, DataId, NodeId, OperatorId},
descriptor::OperatorDefinition,
};
use dora_node_api::{
self,
communication::{self, CommunicationLayer, Publisher, STOP_TOPIC},
config::{CommunicationConfig, DataId, NodeId, OperatorId},
};
use eyre::{bail, Context};
use futures::{Stream, StreamExt};


+ 3
- 3
binaries/runtime/src/operator/mod.rs View File

@@ -1,8 +1,8 @@
use dora_core::descriptor::{OperatorDefinition, OperatorSource};
use dora_node_api::{
communication::{self, CommunicationLayer},
use dora_core::{
config::NodeId,
descriptor::{OperatorDefinition, OperatorSource},
};
use dora_node_api::communication::{self, CommunicationLayer};
use eyre::Context;
#[cfg(feature = "tracing")]
use opentelemetry::sdk::trace::Tracer;


+ 2
- 1
binaries/runtime/src/operator/python.rs View File

@@ -1,7 +1,8 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use super::{OperatorEvent, Tracer};
use dora_node_api::{communication::Publisher, config::DataId};
use dora_core::config::DataId;
use dora_node_api::communication::Publisher;
use dora_operator_api_python::metadata_to_pydict;
use eyre::{bail, eyre, Context};
use pyo3::{


+ 2
- 2
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,6 +1,6 @@
use super::{OperatorEvent, Tracer};
use dora_core::adjust_shared_library_path;
use dora_node_api::{communication::Publisher, config::DataId};
use dora_core::{adjust_shared_library_path, config::DataId};
use dora_node_api::communication::Publisher;
use dora_operator_api_types::{
safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnInput,
DoraResult, DoraStatus, Metadata, OnInputResult, Output, SendOutput,


+ 1
- 1
examples/iceoryx/node/src/main.rs View File

@@ -1,4 +1,4 @@
use dora_node_api::{self, config::DataId, DoraNode};
use dora_node_api::{self, core::config::DataId, DoraNode};

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());


+ 1
- 1
examples/rust-dataflow/node/src/main.rs View File

@@ -1,4 +1,4 @@
use dora_node_api::{self, config::DataId, DoraNode};
use dora_node_api::{self, core::config::DataId, DoraNode};

fn main() -> eyre::Result<()> {
let output = DataId::from("random".to_owned());


+ 2
- 1
libraries/core/Cargo.toml View File

@@ -7,7 +7,8 @@ license = "Apache-2.0"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../../apis/rust/node" }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }
serde_yaml = "0.9.11"
once_cell = "1.13.0"
zenoh-config = { git = "https://github.com/eclipse-zenoh/zenoh.git" }

apis/rust/node/src/config.rs → libraries/core/src/config.rs View File

@@ -1,4 +1,3 @@
use communication_layer_pub_sub::zenoh::zenoh_config;
use once_cell::sync::OnceCell;
use serde::{Deserialize, Serialize};
use std::{
@@ -10,15 +9,6 @@ use std::{
time::Duration,
};

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NodeRunConfig {
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize, Deserialize)]
pub struct NodeId(String);

@@ -160,22 +150,6 @@ impl fmt::Display for InputMapping {
}
}

pub struct FormattedDuration(pub Duration);

impl fmt::Display for FormattedDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.subsec_millis() == 0 {
write!(f, "secs/{}", self.0.as_secs())
} else {
write!(f, "millis/{}", self.0.as_millis())
}
}
}

pub fn format_duration(interval: Duration) -> FormattedDuration {
FormattedDuration(interval)
}

impl Serialize for InputMapping {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
@@ -262,6 +236,31 @@ pub struct UserInputMapping {
pub output: DataId,
}

pub struct FormattedDuration(pub Duration);

impl fmt::Display for FormattedDuration {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.0.subsec_millis() == 0 {
write!(f, "secs/{}", self.0.as_secs())
} else {
write!(f, "millis/{}", self.0.as_millis())
}
}
}

pub fn format_duration(interval: Duration) -> FormattedDuration {
FormattedDuration(interval)
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct NodeRunConfig {
#[serde(default)]
pub inputs: BTreeMap<DataId, InputMapping>,
#[serde(default)]
pub outputs: BTreeSet<DataId>,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub enum CommunicationConfig {

+ 2
- 2
libraries/core/src/descriptor/mod.rs View File

@@ -1,5 +1,3 @@
use dora_node_api::config::{CommunicationConfig, NodeRunConfig};
pub use dora_node_api::config::{DataId, InputMapping, NodeId, OperatorId, UserInputMapping};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt;
@@ -9,6 +7,8 @@ use std::{
};
pub use visualize::collect_dora_timers;

use crate::config::{CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId};

mod visualize;

#[derive(Debug, Serialize, Deserialize)]


+ 1
- 1
libraries/core/src/descriptor/visualize.rs View File

@@ -1,5 +1,5 @@
use super::{CoreNodeKind, CustomNode, OperatorDefinition, ResolvedNode, RuntimeNode};
use dora_node_api::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping};
use crate::config::{format_duration, DataId, InputMapping, NodeId, UserInputMapping};
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
fmt::Write as _,


+ 1
- 0
libraries/core/src/lib.rs View File

@@ -4,6 +4,7 @@ use std::{
path::Path,
};

pub mod config;
pub mod descriptor;
pub mod topics;



Loading…
Cancel
Save