Browse Source

Re-add C operator example

tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
aac5a473ce
Failed to extract signature
5 changed files with 125 additions and 8 deletions
  1. +10
    -2
      examples/c-dataflow/dataflow.yml
  2. +5
    -3
      examples/c-dataflow/node.c
  3. +81
    -0
      examples/c-dataflow/operator.c
  4. +28
    -1
      examples/c-dataflow/run.rs
  5. +1
    -2
      examples/c-dataflow/sink.c

+ 10
- 2
examples/c-dataflow/dataflow.yml View File

@@ -9,9 +9,17 @@ nodes:
inputs:
timer: dora/timer/millis/50
outputs:
- counter
- message
- id: runtime-node
operators:
- id: c_operator
shared-library: build/operator
inputs:
message: c_node/message
outputs:
- counter
- id: c_sink
custom:
source: build/c_sink
inputs:
counter: c_node/counter
counter: runtime-node/c_operator/counter

+ 5
- 3
examples/c-dataflow/node.c View File

@@ -25,7 +25,6 @@ int main()

for (char i = 0; i < 100; i++)
{
printf("[c node] waiting for next input\n");
void *event = dora_next_event(dora_context);
if (event == NULL)
{
@@ -43,8 +42,11 @@ int main()

assert(data_len == 0);

char out_id[] = "counter";
dora_send_output(dora_context, out_id, strlen(out_id), &i, 1);
char out_id[] = "message";
char out_data[50];
int out_data_len = sprintf(out_data, "loop iteration %d", i);

dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len);
}
else if (ty == DoraEventType_Stop)
{


+ 81
- 0
examples/c-dataflow/operator.c View File

@@ -0,0 +1,81 @@
#include "../../apis/c/operator/operator_api.h"
#include <assert.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

DoraInitResult_t dora_init_operator(void)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;

DoraInitResult_t result = {.operator_context = context};
return result;
}

DoraResult_t dora_drop_operator(void *operator_context)
{
free(operator_context);

DoraResult_t result = {};
return result;
}

OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
char *counter = (char *)operator_context;

if (event->input != NULL)
{
// input event
Input_t *input = event->input;

char id[input->id.len + 1];
memcpy(id, input->id.ptr, input->id.len);
id[input->id.len] = 0;

if (strcmp(id, "message") == 0)
{
char data[input->data.len + 1];
memcpy(data, input->data.ptr, input->data.len);
data[input->data.len] = 0;

*counter += 1;
printf("C operator received message `%s`, counter: %i\n", data, *counter);

char *out_id = "counter";
char *out_id_heap = strdup(out_id);

int data_alloc_size = 100;
char *out_data = (char *)malloc(data_alloc_size);
int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter);
assert(count >= 0 && count < 100);

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *counter);
}
}
if (event->stop)
{
printf("C operator received stop event\n");
}

OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}

+ 28
- 1
examples/c-dataflow/run.rs View File

@@ -17,8 +17,13 @@ async fn main() -> eyre::Result<()> {
build_c_node(root, "node.c", "c_node").await?;
build_c_node(root, "sink.c", "c_sink").await?;

build_package("dora-operator-api-c").await?;
build_c_operator().await?;

let dataflow = Path::new("dataflow.yml").to_owned();
dora_daemon::Daemon::run_dataflow(&dataflow, None).await?;
build_package("dora-runtime").await?;
let dora_runtime_path = Some(root.join("target").join("debug").join("dora-runtime"));
dora_daemon::Daemon::run_dataflow(&dataflow, dora_runtime_path).await?;

Ok(())
}
@@ -97,6 +102,28 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<(
Ok(())
}

async fn build_c_operator() -> eyre::Result<()> {
let mut compile = tokio::process::Command::new("clang");
compile.arg("-c").arg("operator.c");
compile.arg("-o").arg("build/operator.o");
compile.arg("-fdeclspec");
#[cfg(unix)]
compile.arg("-fPIC");
if !compile.status().await?.success() {
bail!("failed to compile c operator");
};

let mut link = tokio::process::Command::new("clang");
link.arg("-shared").arg("build/operator.o");
link.arg("-o")
.arg(Path::new("build").join(library_filename("operator")));
if !link.status().await?.success() {
bail!("failed to link c operator");
};

Ok(())
}

// taken from `rust_libloading` crate by Simonas Kazlauskas, licensed under the ISC license (
// see https://github.com/nagisa/rust_libloading/blob/master/LICENSE)
pub fn library_filename<S: AsRef<OsStr>>(name: S) -> OsString {


+ 1
- 2
examples/c-dataflow/sink.c View File

@@ -18,7 +18,6 @@ int main()

while (1)
{
printf("[c sink] waiting for next input\n");
void *event = dora_next_event(dora_context);
if (event == NULL)
{
@@ -40,7 +39,7 @@ int main()

printf("[c sink] received input `");
fwrite(id, id_len, 1, stdout);
printf("` with data: %d\n", *data);
printf("` with data: %s\n", data);
}
else if (ty == DoraEventType_InputClosed)
{


Loading…
Cancel
Save