Browse Source

Update C dataflow example for new API

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
3c2bc57473
Failed to extract signature
5 changed files with 66 additions and 138 deletions
  1. +3
    -11
      examples/c-dataflow/dataflow.yml
  2. +27
    -12
      examples/c-dataflow/node.c
  3. +0
    -70
      examples/c-dataflow/operator.c
  4. +2
    -30
      examples/c-dataflow/run.rs
  5. +34
    -15
      examples/c-dataflow/sink.c

+ 3
- 11
examples/c-dataflow/dataflow.yml View File

@@ -7,19 +7,11 @@ nodes:
custom:
source: build/c_node
inputs:
timer: dora/timer/secs/1
timer: dora/timer/millis/50
outputs:
- tick
- id: runtime-node
operators:
- id: c_operator
shared-library: build/operator
inputs:
tick: c_node/tick
outputs:
- counter
- counter
- id: c_sink
custom:
source: build/c_sink
inputs:
counter: runtime-node/c_operator/counter
counter: c_node/counter

+ 27
- 12
examples/c-dataflow/node.c View File

@@ -23,29 +23,44 @@ int main()

printf("[c node] dora context initialized\n");

for (char i = 0; i < 10; i++)
for (char i = 0; i < 100; i++)
{
printf("[c node] waiting for next input\n");
void *input = dora_next_input(dora_context);
if (input == NULL)
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c node] ERROR: unexpected end of input\n");
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}

char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);
enum EventType ty = read_dora_event_type(event);

assert(data_len == 0);
if (ty == Input)
{
char *data;
size_t data_len;
read_dora_input_data(event, &data, &data_len);

assert(data_len == 0);

char out_id[] = "tick";
dora_send_output(dora_context, out_id, strlen(out_id), &i, 1);
char out_id[] = "counter";
dora_send_output(dora_context, out_id, strlen(out_id), &i, 1);
}
else if (ty == Stop)
{
printf("[c node] received stop event\n");
free_dora_event(event);
break;
}
else
{
printf("[c node] received unexpected event: %d\n", ty);
}

free_dora_input(input);
free_dora_event(event);
}

printf("[c node] received 10 inputs\n");
printf("[c node] received 10 events\n");

free_dora_context(dora_context);



+ 0
- 70
examples/c-dataflow/operator.c View File

@@ -1,70 +0,0 @@
#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

DoraInitResult_t dora_init_operator(void)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;

DoraInitResult_t result = {.operator_context = context};
return result;
}

DoraResult_t dora_drop_operator(void *operator_context)
{
free(operator_context);

DoraResult_t result = {};
return result;
}

OnInputResult_t dora_on_input(
const Input_t *input,
const SendOutput_t *send_output,
void *operator_context)
{
char *counter = (char *)operator_context;

char id[input->id.len + 1];
memcpy(id, input->id.ptr, input->id.len);
id[input->id.len] = 0;

if (strcmp(id, "tick") == 0)
{
char data[input->data.len + 1];
memcpy(data, input->data.ptr, input->data.len);
data[input->data.len] = 0;

*counter += 1;
printf("C operator received tick input with data `%s`, counter: %i\n", data, *counter);

char *out_id = "counter";
char *out_id_heap = strdup(out_id);

int data_alloc_size = 100;
char *out_data = (char *)malloc(data_alloc_size);
int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter);
assert(count >= 0 && count < 100);

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

OnInputResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *counter);
OnInputResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}
}

+ 2
- 30
examples/c-dataflow/run.rs View File

@@ -13,18 +13,12 @@ async fn main() -> eyre::Result<()> {

tokio::fs::create_dir_all("build").await?;

build_package("dora-runtime").await?;
build_package("dora-node-api-c").await?;
build_package("dora-operator-api-c").await?;
build_c_node(root, "node.c", "c_node").await?;
build_c_node(root, "sink.c", "c_sink").await?;
build_c_operator().await?;

dora_coordinator::run(dora_coordinator::Args {
run_dataflow: Path::new("dataflow.yml").to_owned().into(),
runtime: Some(root.join("target").join("debug").join("dora-runtime")),
})
.await?;
let dataflow = Path::new("dataflow.yml").to_owned();
dora_daemon::Daemon::run_dataflow(&dataflow).await?;

Ok(())
}
@@ -103,28 +97,6 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<(
Ok(())
}

async fn build_c_operator() -> eyre::Result<()> {
let mut compile = tokio::process::Command::new("clang");
compile.arg("-c").arg("operator.c");
compile.arg("-o").arg("build/operator.o");
compile.arg("-fdeclspec");
#[cfg(unix)]
compile.arg("-fPIC");
if !compile.status().await?.success() {
bail!("failed to compile c operator");
};

let mut link = tokio::process::Command::new("clang");
link.arg("-shared").arg("build/operator.o");
link.arg("-o")
.arg(Path::new("build").join(library_filename("operator")));
if !link.status().await?.success() {
bail!("failed to link c operator");
};

Ok(())
}

// taken from `rust_libloading` crate by Simonas Kazlauskas, licensed under the ISC license (
// see https://github.com/nagisa/rust_libloading/blob/master/LICENSE)
pub fn library_filename<S: AsRef<OsStr>>(name: S) -> OsString {


+ 34
- 15
examples/c-dataflow/sink.c View File

@@ -19,28 +19,47 @@ int main()
while (1)
{
printf("[c sink] waiting for next input\n");
void *input = dora_next_input(dora_context);
if (input == NULL)
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c sink] end of input\n");
printf("[c sink] end of event\n");
break;
}

char *id;
size_t id_len;
read_dora_input_id(input, &id, &id_len);
enum EventType ty = read_dora_event_type(event);

char *data;
size_t data_len;
read_dora_input_data(input, &data, &data_len);
if (ty == Input)
{
char *id;
size_t id_len;
read_dora_input_id(event, &id, &id_len);

char *data;
size_t data_len;
read_dora_input_data(event, &data, &data_len);

printf("sink received input `");
fwrite(id, id_len, 1, stdout);
printf("` with data: '");
fwrite(data, data_len, 1, stdout);
printf("'\n");
printf("[c sink] received input `");
fwrite(id, id_len, 1, stdout);
printf("` with data: %d\n", *data);
}
else if (ty == InputClosed)
{
printf("[c sink] received InputClosed event\n");
free_dora_event(event);
break;
}
else if (ty == Stop)
{
printf("[c sink] received stop event\n");
free_dora_event(event);
break;
}
else
{
printf("[c sink] received unexpected event: %d\n", ty);
}

free_dora_input(input);
free_dora_event(event);
}

free_dora_context(dora_context);


Loading…
Cancel
Save