Browse Source

Create a `dora-operator-api` crate

Rename the previous `dora-api` crate to `dora-node-api`.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
97b79b9680
22 changed files with 102 additions and 74 deletions
  1. +29
    -22
      Cargo.lock
  2. +3
    -2
      Cargo.toml
  3. +1
    -1
      api/node/Cargo.toml
  4. +0
    -0
      api/node/src/communication.rs
  5. +0
    -0
      api/node/src/config.rs
  6. +0
    -0
      api/node/src/lib.rs
  7. +8
    -0
      api/operator/Cargo.toml
  8. +42
    -0
      api/operator/src/lib.rs
  9. +1
    -1
      common/Cargo.toml
  10. +1
    -1
      common/src/descriptor/mod.rs
  11. +1
    -1
      common/src/descriptor/visualize.rs
  12. +1
    -1
      coordinator/Cargo.toml
  13. +1
    -1
      coordinator/examples/example_sink_logger.rs
  14. +1
    -1
      coordinator/examples/example_source_timer.rs
  15. +1
    -1
      coordinator/examples/random_number.rs
  16. +1
    -1
      coordinator/examples/rate_limit.rs
  17. +4
    -4
      coordinator/src/main.rs
  18. +1
    -1
      runtime/Cargo.toml
  19. +1
    -0
      runtime/examples/example-operator/Cargo.toml
  20. +2
    -33
      runtime/examples/example-operator/src/lib.rs
  21. +2
    -2
      runtime/src/main.rs
  22. +1
    -1
      runtime/src/operator/mod.rs

+ 29
- 22
Cargo.lock View File

@@ -546,30 +546,11 @@ dependencies = [
"winapi",
]

[[package]]
name = "dora-api"
version = "0.1.0"
dependencies = [
"async-trait",
"eyre",
"futures",
"futures-concurrency",
"serde",
"serde_yaml",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"zenoh",
"zenoh-config",
]

[[package]]
name = "dora-common"
version = "0.1.0"
dependencies = [
"dora-api",
"dora-node-api",
"eyre",
"serde",
]
@@ -580,8 +561,8 @@ version = "0.1.0"
dependencies = [
"bincode",
"clap 3.1.12",
"dora-api",
"dora-common",
"dora-node-api",
"eyre",
"futures",
"futures-concurrency",
@@ -595,6 +576,29 @@ dependencies = [
"uuid",
]

[[package]]
name = "dora-node-api"
version = "0.1.0"
dependencies = [
"async-trait",
"eyre",
"futures",
"futures-concurrency",
"serde",
"serde_yaml",
"thiserror",
"tokio",
"tokio-stream",
"tracing",
"uuid",
"zenoh",
"zenoh-config",
]

[[package]]
name = "dora-operator-api"
version = "0.1.0"

[[package]]
name = "dora-rs"
version = "0.1.0"
@@ -621,8 +625,8 @@ name = "dora-runtime"
version = "0.1.0"
dependencies = [
"clap 3.1.12",
"dora-api",
"dora-common",
"dora-node-api",
"eyre",
"futures",
"futures-concurrency",
@@ -665,6 +669,9 @@ checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "example-operator"
version = "0.1.0"
dependencies = [
"dora-operator-api",
]

[[package]]
name = "eyre"


+ 3
- 2
Cargo.toml View File

@@ -7,11 +7,12 @@ edition = "2021"

[workspace]
members = [
"api",
"api/node",
"api/operator",
"coordinator",
"common",
"runtime",
"coordinator/examples/example-operator",
"runtime/examples/example-operator",
]

[dependencies]


api/Cargo.toml → api/node/Cargo.toml View File

@@ -1,5 +1,5 @@
[package]
name = "dora-api"
name = "dora-node-api"
version = "0.1.0"
edition = "2021"


api/src/communication.rs → api/node/src/communication.rs View File


api/src/config.rs → api/node/src/config.rs View File


api/src/lib.rs → api/node/src/lib.rs View File


+ 8
- 0
api/operator/Cargo.toml View File

@@ -0,0 +1,8 @@
[package]
name = "dora-operator-api"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

+ 42
- 0
api/operator/src/lib.rs View File

@@ -0,0 +1,42 @@
use std::ffi::c_void;

pub trait DoraOperator {
fn on_input(
&mut self,
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<(), ()>;
}

pub struct DoraOutputSender {
pub output_fn_raw: OutputFnRaw,
pub output_context: *const c_void,
}

impl DoraOutputSender {
pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> {
println!("operator sending output..");
let result = unsafe {
(self.output_fn_raw)(
id.as_ptr(),
id.len(),
data.as_ptr(),
data.len(),
self.output_context,
)
};
match result {
0 => Ok(()),
other => Err(other),
}
}
}

pub type OutputFnRaw = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_context: *const c_void,
) -> isize;

+ 1
- 1
common/Cargo.toml View File

@@ -6,6 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-api = { version = "0.1.0", path = "../api" }
dora-node-api = { version = "0.1.0", path = "../api/node" }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }

+ 1
- 1
common/src/descriptor/mod.rs View File

@@ -1,4 +1,4 @@
use dora_api::config::{
use dora_node_api::config::{
CommunicationConfig, DataId, InputMapping, NodeId, NodeRunConfig, OperatorId,
};
use serde::{Deserialize, Serialize};


+ 1
- 1
common/src/descriptor/visualize.rs View File

@@ -1,4 +1,4 @@
use dora_api::config::{DataId, InputMapping, NodeId};
use dora_node_api::config::{DataId, InputMapping, NodeId};

use super::{CustomNode, Node, NodeKind, OperatorConfig, RuntimeNode};
use std::collections::{BTreeMap, HashMap};


+ 1
- 1
coordinator/Cargo.toml View File

@@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
bincode = "1.3.3"
dora-api = { path = "../api" }
dora-node-api = { path = "../api/node" }
eyre = "0.6.7"
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }


+ 1
- 1
coordinator/examples/example_sink_logger.rs View File

@@ -1,4 +1,4 @@
use dora_api::{self, DoraNode};
use dora_node_api::{self, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;


+ 1
- 1
coordinator/examples/example_source_timer.rs View File

@@ -1,4 +1,4 @@
use dora_api::{self, config::DataId, DoraNode};
use dora_node_api::{self, config::DataId, DoraNode};
use std::time::Duration;
use time::OffsetDateTime;



+ 1
- 1
coordinator/examples/random_number.rs View File

@@ -1,4 +1,4 @@
use dora_api::{self, config::DataId, DoraNode};
use dora_node_api::{self, config::DataId, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::Duration;


+ 1
- 1
coordinator/examples/rate_limit.rs View File

@@ -1,5 +1,5 @@
use clap::StructOpt;
use dora_api::{self, config::DataId, DoraNode};
use dora_node_api::{self, config::DataId, DoraNode};
use eyre::bail;
use futures::StreamExt;
use std::time::{Duration, Instant};


+ 4
- 4
coordinator/src/main.rs View File

@@ -1,5 +1,5 @@
use dora_api::config::NodeId;
use dora_common::descriptor::{self, Descriptor};
use dora_node_api::config::NodeId;
use eyre::{bail, eyre, WrapErr};
use futures::{stream::FuturesUnordered, StreamExt};
use std::path::{Path, PathBuf};
@@ -95,7 +95,7 @@ async fn run_dataflow(dataflow_path: PathBuf, runtime: &Path) -> eyre::Result<()
fn spawn_custom_node(
node_id: NodeId,
node: &descriptor::CustomNode,
communication: &dora_api::config::CommunicationConfig,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut args = node.run.split_ascii_whitespace();
let cmd = args
@@ -130,7 +130,7 @@ fn spawn_runtime_node(
runtime: &Path,
node_id: NodeId,
node: &descriptor::RuntimeNode,
communication: &dora_api::config::CommunicationConfig,
communication: &dora_node_api::config::CommunicationConfig,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<(), eyre::Error>>> {
let mut command = tokio::process::Command::new(runtime);
command_init_common_env(&mut command, &node_id, communication)?;
@@ -160,7 +160,7 @@ fn spawn_runtime_node(
fn command_init_common_env(
command: &mut tokio::process::Command,
node_id: &NodeId,
communication: &dora_api::config::CommunicationConfig,
communication: &dora_node_api::config::CommunicationConfig,
) -> Result<(), eyre::Error> {
command.env(
"DORA_NODE_ID",


+ 1
- 1
runtime/Cargo.toml View File

@@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dora-api = { path = "../api" }
dora-node-api = { path = "../api/node" }
dora-common = { version = "0.1.0", path = "../common" }
eyre = "0.6.8"
futures = "0.3.21"


coordinator/examples/example-operator/Cargo.toml → runtime/examples/example-operator/Cargo.toml View File

@@ -9,3 +9,4 @@ edition = "2021"
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { path = "../../../api/operator" }

coordinator/examples/example-operator/src/lib.rs → runtime/examples/example-operator/src/lib.rs View File

@@ -1,5 +1,6 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_operator_api::{DoraOperator, DoraOutputSender, OutputFnRaw};
use std::{ffi::c_void, slice};

#[no_mangle]
@@ -17,14 +18,6 @@ pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut ()) {
unsafe { Box::from_raw(raw) };
}

type OutputFnRaw = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_context: *const c_void,
) -> isize;

#[no_mangle]
pub unsafe extern "C" fn dora_on_input(
id_start: *const u8,
@@ -53,36 +46,12 @@ pub unsafe extern "C" fn dora_on_input(
}
}

struct DoraOutputSender {
output_fn_raw: OutputFnRaw,
output_context: *const c_void,
}

impl DoraOutputSender {
pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> {
println!("operator sending output..");
let result = unsafe {
(self.output_fn_raw)(
id.as_ptr(),
id.len(),
data.as_ptr(),
data.len(),
self.output_context,
)
};
match result {
0 => Ok(()),
other => Err(other),
}
}
}

#[derive(Debug, Default)]
struct Operator {
time: Option<String>,
}

impl Operator {
impl DoraOperator for Operator {
fn on_input(
&mut self,
id: &str,

+ 2
- 2
runtime/src/main.rs View File

@@ -1,12 +1,12 @@
#![warn(unsafe_op_in_unsafe_fn)]

use dora_api::{
use dora_common::{descriptor::OperatorConfig, BoxError};
use dora_node_api::{
self,
communication::CommunicationLayer,
config::{CommunicationConfig, DataId, InputMapping, NodeId, OperatorId},
STOP_TOPIC,
};
use dora_common::{descriptor::OperatorConfig, BoxError};
use eyre::{bail, eyre, Context};
use futures::{stream::FuturesUnordered, StreamExt};
use futures_concurrency::Merge;


+ 1
- 1
runtime/src/operator/mod.rs View File

@@ -1,5 +1,5 @@
use dora_api::config::DataId;
use dora_common::descriptor::{OperatorConfig, OperatorSource};
use dora_node_api::config::DataId;
use eyre::{eyre, Context};
use std::any::Any;
use tokio::sync::mpsc::{self, Sender};


Loading…
Cancel
Save