Browse Source

Merge pull request #29 from futurewei-tech/c-api

Create a C-API for operators and an example operator
tags/v0.0.0-test.4
Philipp Oppermann GitHub 3 years ago
parent
commit
989b4780e5
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 166 additions and 44 deletions
  1. +3
    -3
      Cargo.toml
  2. +16
    -0
      api/c/operator/api.h
  3. +0
    -0
      api/rust/node/Cargo.toml
  4. +0
    -0
      api/rust/node/src/communication.rs
  5. +0
    -0
      api/rust/node/src/config.rs
  6. +0
    -0
      api/rust/node/src/lib.rs
  7. +0
    -0
      api/rust/operator/Cargo.toml
  8. +0
    -0
      api/rust/operator/macros/Cargo.toml
  9. +0
    -0
      api/rust/operator/macros/src/lib.rs
  10. +0
    -0
      api/rust/operator/src/lib.rs
  11. +0
    -0
      api/rust/operator/src/raw.rs
  12. +1
    -1
      common/Cargo.toml
  13. +1
    -1
      coordinator/Cargo.toml
  14. +15
    -3
      coordinator/README.md
  15. +3
    -0
      coordinator/examples/example_sink_logger.rs
  16. +12
    -5
      coordinator/examples/mini-dataflow.yml
  17. +1
    -1
      runtime/Cargo.toml
  18. +3
    -0
      runtime/examples/c-operator/.gitignore
  19. +9
    -0
      runtime/examples/c-operator/README.md
  20. +64
    -0
      runtime/examples/c-operator/operator.c
  21. +1
    -1
      runtime/examples/example-operator/Cargo.toml
  22. +36
    -29
      runtime/src/main.rs
  23. +1
    -0
      runtime/src/operator/mod.rs

+ 3
- 3
Cargo.toml View File

@@ -7,9 +7,9 @@ edition = "2021"

[workspace]
members = [
"api/node",
"api/operator",
"api/operator/macros",
"api/rust/node",
"api/rust/operator",
"api/rust/operator/macros",
"coordinator",
"message",
"metrics",


+ 16
- 0
api/c/operator/api.h View File

@@ -0,0 +1,16 @@
int dora_init_operator(void **operator_context);

void dora_drop_operator(void *operator_context);

int dora_on_input(
const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const int (*output_fn_raw)(const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const void *output_context),
void *output_context,
const void *operator_context);

api/node/Cargo.toml → api/rust/node/Cargo.toml View File


api/node/src/communication.rs → api/rust/node/src/communication.rs View File


api/node/src/config.rs → api/rust/node/src/config.rs View File


api/node/src/lib.rs → api/rust/node/src/lib.rs View File


api/operator/Cargo.toml → api/rust/operator/Cargo.toml View File


api/operator/macros/Cargo.toml → api/rust/operator/macros/Cargo.toml View File


api/operator/macros/src/lib.rs → api/rust/operator/macros/src/lib.rs View File


api/operator/src/lib.rs → api/rust/operator/src/lib.rs View File


api/operator/src/raw.rs → api/rust/operator/src/raw.rs View File


+ 1
- 1
common/Cargo.toml View File

@@ -6,6 +6,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-node-api = { version = "0.1.0", path = "../api/node" }
dora-node-api = { version = "0.1.0", path = "../api/rust/node" }
eyre = "0.6.8"
serde = { version = "1.0.136", features = ["derive"] }

+ 1
- 1
coordinator/Cargo.toml View File

@@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
bincode = "1.3.3"
dora-node-api = { path = "../api/node" }
dora-node-api = { path = "../api/rust/node" }
eyre = "0.6.7"
futures = "0.3.21"
serde = { version = "1.0.136", features = ["derive"] }


+ 15
- 3
coordinator/README.md View File

@@ -27,8 +27,20 @@ There are drawbacks too, for example:

## Try it out

- Compile the `examples` using `cargo build -p dora-rs --examples`
- Run the `mini-dataflow` example using `cargo run -- run examples/mini-dataflow.yml`
- Compile: the dora runtime, the nodes in `examples`, and the Rust example operator through:
```bash
cargo build -p dora-runtime --release
cargo build -p dora-coordinator --examples --release
cargo build --manifest-path ../runtime/examples/example-operator/Cargo.toml --release
```
- Compile the C example operator through:
```bash
cd ../runtime/examples/c-operator
cp ../../../api/c/operator/api.h .
clang -c operator.c
clang -shared -v operator.o -o operator.so
```
- Run the `mini-dataflow` example using `cargo run --release -- run examples/mini-dataflow.yml`
- This spawns a `timer` source, which sends the current time periodically, and a `logger` sink, which prints the incoming data.
- The `timer` will exit after 100 iterations. The `logger` will then exit with a timeout error.
- The `timer` will exit after 100 iterations. The other nodes/operators will then exit as well because all sources closed.


+ 3
- 0
coordinator/examples/example_sink_logger.rs View File

@@ -40,6 +40,9 @@ async fn main() -> eyre::Result<()> {
let data = String::from_utf8(input.data)?;
println!("received timestamped random value: {data}");
}
"counter" => {
println!("received counter value: {:?}", input.data);
}

other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 12
- 5
coordinator/examples/mini-dataflow.yml View File

@@ -5,13 +5,13 @@ communication:
nodes:
- id: timer
custom:
run: ../target/debug/examples/example_source_timer
run: ../target/release/examples/example_source_timer
outputs:
- time

- id: rate-limited-timer
custom:
run: ../target/debug/examples/rate_limit --seconds 0.5
run: ../target/release/examples/rate_limit --seconds 0.5
inputs:
data: timer/time
outputs:
@@ -19,7 +19,7 @@ nodes:

- id: random
custom:
run: ../target/debug/examples/random_number
run: ../target/release/examples/random_number
inputs:
timestamp: rate-limited-timer/rate_limited
outputs:
@@ -27,18 +27,25 @@ nodes:

- id: logger
custom:
run: ../target/debug/examples/example_sink_logger
run: ../target/release/examples/example_sink_logger
inputs:
random: random/number
time: timer/time
timestamped-random: runtime-node/op-1/timestamped-random
counter: runtime-node/op-2/counter

- id: runtime-node
operators:
- id: op-1
shared-library: ../target/debug/libexample_operator.so
shared-library: ../target/release/libexample_operator.so
inputs:
random: random/number
time: timer/time
outputs:
- timestamped-random
- id: op-2
shared-library: ../runtime/examples/c-operator/operator.so
inputs:
time: timer/time
outputs:
- counter

+ 1
- 1
runtime/Cargo.toml View File

@@ -7,7 +7,7 @@ edition = "2021"

[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dora-node-api = { path = "../api/node" }
dora-node-api = { path = "../api/rust/node" }
dora-common = { version = "0.1.0", path = "../common" }
eyre = "0.6.8"
futures = "0.3.21"


+ 3
- 0
runtime/examples/c-operator/.gitignore View File

@@ -0,0 +1,3 @@
operator.o
operator.so
api.h

+ 9
- 0
runtime/examples/c-operator/README.md View File

@@ -0,0 +1,9 @@
# C-operator Example

Build with these steps:

```bash
cp ../../../api/c/operator/api.h .
clang -c operator.c
clang -shared -v operator.o -o operator.so
```

+ 64
- 0
runtime/examples/c-operator/operator.c View File

@@ -0,0 +1,64 @@
#include "api.h"
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

int dora_init_operator(void **operator_context)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;

*operator_context = context;

return 0;
}

void dora_drop_operator(void *operator_context)
{
free(operator_context);
}

int dora_on_input(
const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const int (*output_fn_raw)(const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const void *output_context),
void *output_context,
const void *operator_context)
{
char *context = (char *)operator_context;

char id[id_len + 1];
memcpy(id, id_start, id_len);
id[id_len] = 0;

if (strcmp(id, "time") == 0)
{
char time[data_len + 1];
memcpy(time, data_start, data_len);
time[data_len] = 0;

printf("C operator received time input %s, context: %i\n", time, *context);
*context += 1;

char *out_id = "counter";

int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context);
if (res != 0)
{
printf("C operator failed to send output\n");
}
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *context);
}

return 0;
}

+ 1
- 1
runtime/examples/example-operator/Cargo.toml View File

@@ -9,4 +9,4 @@ edition = "2021"
crate-type = ["cdylib"]

[dependencies]
dora-operator-api = { path = "../../../api/operator" }
dora-operator-api = { path = "../../../api/rust/operator" }

+ 36
- 29
runtime/src/main.rs View File

@@ -14,7 +14,10 @@ use futures::{
};
use futures_concurrency::Merge;
use operator::{Operator, OperatorEvent};
use std::{collections::BTreeMap, mem};
use std::{
collections::{BTreeMap, HashMap},
mem,
};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamMap};

@@ -42,6 +45,7 @@ async fn main() -> eyre::Result<()> {

let mut operator_map = BTreeMap::new();
let mut operator_events = StreamMap::new();
let mut operator_events_tx = HashMap::new();
for operator_config in &operators {
let (events_tx, events) = mpsc::channel(1);
let operator = Operator::init(operator_config.clone(), events_tx.clone())
@@ -49,6 +53,7 @@ async fn main() -> eyre::Result<()> {
.wrap_err_with(|| format!("failed to init operator {}", operator_config.id))?;
operator_map.insert(&operator_config.id, operator);
operator_events.insert(operator_config.id.clone(), ReceiverStream::new(events));
operator_events_tx.insert(operator_config.id.clone(), events_tx);
}

let communication: Box<dyn CommunicationLayer> =
@@ -86,35 +91,14 @@ async fn main() -> eyre::Result<()> {
})?;
}
SubscribeEvent::InputsStopped { target_operator } => {
// --------------------------------------------------------
// TODO FIXME: For some reason, these zenoh publish calls
// (and also subsequent ones) are not visible to other
// nodes. This includes the stop command, so the input
// streams of dependent nodes are not closed properly.
// --------------------------------------------------------

communication
.publish("/HHH", &[])
.await
.wrap_err("failed to send on /HHH")?;
if operator_map.remove(&target_operator).is_some() {
println!("operator {node_id}/{target_operator} finished");
// send stopped message
publish(
&node_id,
target_operator.clone(),
STOP_TOPIC.to_owned().into(),
&[],
communication.as_ref(),
)
.await.with_context(|| {
format!("failed to send stop message for operator `{node_id}/{target_operator}`")
})?;
}
let events_tx = operator_events_tx.get(&target_operator).ok_or_else(|| {
eyre!("failed to get events_tx for operator {target_operator}")
})?;

if operator_map.is_empty() {
break;
}
let events_tx = events_tx.clone();
tokio::spawn(async move {
let _ = events_tx.send(OperatorEvent::EndOfInput).await;
});
}
},
Event::Operator { id, event } => {
@@ -134,6 +118,29 @@ async fn main() -> eyre::Result<()> {
bail!(err.wrap_err(format!("operator {id} failed")))
}
OperatorEvent::Panic(payload) => std::panic::resume_unwind(payload),
OperatorEvent::EndOfInput => {
if operator_map.remove(&id).is_some() {
println!("operator {node_id}/{id} finished");
// send stopped message
publish(
&node_id,
id.clone(),
STOP_TOPIC.to_owned().into(),
&[],
communication.as_ref(),
)
.await
.with_context(|| {
format!("failed to send stop message for operator `{node_id}/{id}`")
})?;

operator_events_tx.remove(&id);
}

if operator_map.is_empty() {
break;
}
}
}
}
}


+ 1
- 0
runtime/src/operator/mod.rs View File

@@ -60,6 +60,7 @@ pub enum OperatorEvent {
Output { id: DataId, value: Vec<u8> },
Error(eyre::Error),
Panic(Box<dyn Any + Send>),
EndOfInput,
}

pub struct OperatorInput {


Loading…
Cancel
Save