Browse Source

Add a `c-dataflow` example

tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
3fa0c4bf81
Failed to extract signature
9 changed files with 252 additions and 16 deletions
  1. +12
    -2
      Cargo.lock
  2. +15
    -0
      Cargo.toml
  3. +1
    -0
      examples/c-dataflow/.gitignore
  4. +25
    -0
      examples/c-dataflow/dataflow.yml
  5. +41
    -0
      examples/c-dataflow/node.c
  6. +19
    -14
      examples/c-dataflow/operator.c
  7. +84
    -0
      examples/c-dataflow/run.rs
  8. +43
    -0
      examples/c-dataflow/sink.c
  9. +12
    -0
      libraries/core/src/descriptor/mod.rs

+ 12
- 2
Cargo.lock View File

@@ -657,6 +657,15 @@ dependencies = [
"serde",
]

[[package]]
name = "dora-examples"
version = "0.0.0"
dependencies = [
"dora-coordinator",
"eyre",
"tokio",
]

[[package]]
name = "dora-message"
version = "0.1.0"
@@ -2749,10 +2758,11 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"

[[package]]
name = "tokio"
version = "1.17.0"
version = "1.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
checksum = "7a8325f63a7d4774dd041e363b2409ed1c5cbbd0f867795e661df066b2b0a581"
dependencies = [
"autocfg 1.1.0",
"bytes",
"libc",
"memchr",


+ 15
- 0
Cargo.toml View File

@@ -12,3 +12,18 @@ members = [
"libraries/extensions/telemetry/*",
"libraries/extensions/zenoh-logger",
]

[package]
name = "dora-examples"
version = "0.0.0"
edition = "2021"
license = "Apache-2.0"

[dev-dependencies]
eyre = "0.6.8"
tokio = "1.20.1"
dora-coordinator = { path = "binaries/coordinator" }

[[example]]
name = "c-dataflow"
path = "examples/c-dataflow/run.rs"

+ 1
- 0
examples/c-dataflow/.gitignore View File

@@ -0,0 +1 @@
build

+ 25
- 0
examples/c-dataflow/dataflow.yml View File

@@ -0,0 +1,25 @@
communication:
zenoh:
prefix: /example-c-dataflow

nodes:
- id: c_node
custom:
run: build/c_node
inputs:
timer: dora/timer/secs/1
outputs:
- tick
- id: runtime-node
operators:
- id: c_operator
shared-library: build/operator.so
inputs:
tick: c_node/tick
outputs:
- counter
- id: c_sink
custom:
run: build/c_sink
inputs:
counter: runtime-node/c_operator/counter

+ 41
- 0
examples/c-dataflow/node.c View File

@@ -0,0 +1,41 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "build/node_api.h"

// sleep
#ifdef _WIN32
#include <Windows.h>
#else
#include <unistd.h>
#endif

int main()
{
void *dora_context = init_dora_context_from_env();
if (dora_context == NULL)
{
fprintf(stderr, "failed to init dora context\n");
return -1;
}

for (char i = 0; i < 10; i++)
{
void *input = dora_next_input(dora_context);

char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);

assert(data_len == 0);

char out_id[] = "tick";
dora_send_output(dora_context, out_id, strlen(out_id), &i, 1);

free_dora_input(input);
}

free_dora_context(dora_context);

return 0;
}

examples/c-operator/operator.c → examples/c-dataflow/operator.c View File

@@ -1,4 +1,5 @@
#include "operator_api.h"
#include "build/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
@@ -21,35 +22,39 @@ void dora_drop_operator(void *operator_context)

int dora_on_input(
const char *id_start,
unsigned int id_len,
size_t id_len,
const char *data_start,
unsigned int data_len,
size_t data_len,
const int (*output_fn_raw)(const char *id_start,
unsigned int id_len,
size_t id_len,
const char *data_start,
unsigned int data_len,
size_t data_len,
const void *output_context),
void *output_context,
const void *operator_context)
{
char *context = (char *)operator_context;
char *counter = (char *)operator_context;

char id[id_len + 1];
memcpy(id, id_start, id_len);
id[id_len] = 0;

if (strcmp(id, "time") == 0)
if (strcmp(id, "tick") == 0)
{
char time[data_len + 1];
memcpy(time, data_start, data_len);
time[data_len] = 0;
char data[data_len + 1];
memcpy(data, data_start, data_len);
data[data_len] = 0;

printf("C operator received time input %s, context: %i\n", time, *context);
*context += 1;
*counter += 1;
printf("C operator received tick input with data `%s`, counter: %i\n", data, *counter);

char *out_id = "counter";

int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context);
char out_data[100];
int count = snprintf(out_data, sizeof(out_data), "The current counter value is %d", *counter);
assert(count >= 0 && count < 100);

int res = (output_fn_raw)(out_id, strlen(out_id), out_data, strlen(out_data), output_context);
if (res != 0)
{
printf("C operator failed to send output\n");
@@ -57,7 +62,7 @@ int dora_on_input(
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *context);
printf("C operator received unexpected input %s, context: %i\n", id, *counter);
}

return 0;

+ 84
- 0
examples/c-dataflow/run.rs View File

@@ -0,0 +1,84 @@
use eyre::{bail, Context};
use std::path::Path;

#[tokio::main]
async fn main() -> eyre::Result<()> {
std::env::set_current_dir(Path::new(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

tokio::fs::create_dir_all("build").await?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
build_package("dora-runtime").await?;
build_package("dora-node-api-c").await?;
build_c_node(root, "node.c", "c_node").await?;
build_c_node(root, "sink.c", "c_sink").await?;
build_c_operator(root).await?;

dora_coordinator::run(dora_coordinator::Command::Run {
dataflow: Path::new("dataflow.yml").to_owned(),
runtime: Some(root.join("target").join("release").join("dora-runtime")),
})
.await?;

Ok(())
}

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build").arg("--release");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<()> {
// copy header file
tokio::fs::copy(
root.join("apis").join("c").join("node").join("node_api.h"),
Path::new("build").join("node_api.h"),
)
.await?;

let mut clang = tokio::process::Command::new("clang");
clang.arg(name);
clang.arg("-l").arg("dora_node_api_c");
clang.arg("-l").arg("m");
clang.arg("-L").arg(root.join("target").join("release"));
clang.arg("--output").arg(Path::new("build").join(out_name));
if !clang.status().await?.success() {
bail!("failed to compile c node");
};
Ok(())
}

async fn build_c_operator(root: &Path) -> eyre::Result<()> {
// copy header file
tokio::fs::copy(
root.join("apis")
.join("c")
.join("operator")
.join("operator_api.h"),
Path::new("build").join("operator_api.h"),
)
.await?;

let mut compile = tokio::process::Command::new("clang");
compile.arg("-c").arg("operator.c");
compile.arg("-o").arg("build/operator.o");
if !compile.status().await?.success() {
bail!("failed to compile c operator");
};

let mut link = tokio::process::Command::new("clang");
link.arg("-shared").arg("build/operator.o");
link.arg("-o").arg("build/operator.so");
if !link.status().await?.success() {
bail!("failed to link c operator");
};

Ok(())
}

+ 43
- 0
examples/c-dataflow/sink.c View File

@@ -0,0 +1,43 @@
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "build/node_api.h"

int main()
{
void *dora_context = init_dora_context_from_env();
if (dora_context == NULL)
{
fprintf(stderr, "failed to init dora context\n");
return -1;
}

while (1)
{
void *input = dora_next_input(dora_context);
if (input == NULL)
{
break;
}

char *id = (char *)0xdeadbeafcafebabe;
size_t id_len;
read_dora_input_id(input, &id, &id_len);

char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);

printf("sink received input `");
fwrite(id, id_len, 1, stdout);
printf("` with data: '");
fwrite(data, data_len, 1, stdout);
printf("'\n");

free_dora_input(input);
}

free_dora_context(dora_context);

return 0;
}

+ 12
- 0
libraries/core/src/descriptor/mod.rs View File

@@ -166,6 +166,18 @@ pub enum OperatorSource {
Wasm(PathBuf),
}

impl OperatorSource {
pub fn canonicalize(&mut self) -> std::io::Result<()> {
let path = match self {
OperatorSource::SharedLibrary(path) => path,
OperatorSource::Python(path) => path,
OperatorSource::Wasm(path) => path,
};
*path = path.canonicalize()?;
Ok(())
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PythonOperatorConfig {
pub path: PathBuf,


Loading…
Cancel
Save