Browse Source

Create a C-API for operators and an example operator

Adds an `api.h` header file that defines the dora functions in a C-compatible way. To showcase that C-operators can interact with Rust operators and custom nodes, we create a simple C operator and add it to our `mini-dataflow`. The operator reads the periodic timestamps, increments a counter, and sends the counter value out. The counter is kept across calls by storing it behind the context pointer.
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
38bc8a8284
5 changed files with 92 additions and 0 deletions
  1. +3
    -0
      coordinator/examples/example_sink_logger.rs
  2. +7
    -0
      coordinator/examples/mini-dataflow.yml
  3. +2
    -0
      runtime/examples/c-operator/.gitignore
  4. +16
    -0
      runtime/examples/c-operator/api.h
  5. +64
    -0
      runtime/examples/c-operator/operator.c

+ 3
- 0
coordinator/examples/example_sink_logger.rs View File

@@ -40,6 +40,9 @@ async fn main() -> eyre::Result<()> {
let data = String::from_utf8(input.data)?;
println!("received timestamped random value: {data}");
}
"counter" => {
println!("received counter value: {:?}", input.data);
}

other => eprintln!("Ignoring unexpected input `{other}`"),
}


+ 7
- 0
coordinator/examples/mini-dataflow.yml View File

@@ -32,6 +32,7 @@ nodes:
random: random/number
time: timer/time
timestamped-random: runtime-node/op-1/timestamped-random
counter: runtime-node/op-2/counter

- id: runtime-node
operators:
@@ -42,3 +43,9 @@ nodes:
time: timer/time
outputs:
- timestamped-random
- id: op-2
shared-library: ../runtime/examples/c-operator/operator.so
inputs:
time: timer/time
outputs:
- counter

+ 2
- 0
runtime/examples/c-operator/.gitignore View File

@@ -0,0 +1,2 @@
operator.o
operator.so

+ 16
- 0
runtime/examples/c-operator/api.h View File

@@ -0,0 +1,16 @@
int dora_init_operator(void **operator_context);

void dora_drop_operator(void *operator_context);

int dora_on_input(
const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const int (*output_fn_raw)(const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const void *output_context),
void *output_context,
const void *operator_context);

+ 64
- 0
runtime/examples/c-operator/operator.c View File

@@ -0,0 +1,64 @@
#include "api.h"
#include <string.h>
#include <stdio.h>
#include <stdlib.h>

int dora_init_operator(void **operator_context)
{
void *context = malloc(1);
char *context_char = (char *)context;
*context_char = 0;

*operator_context = context;

return 0;
}

void dora_drop_operator(void *operator_context)
{
free(operator_context);
}

int dora_on_input(
const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const int (*output_fn_raw)(const char *id_start,
unsigned int id_len,
const char *data_start,
unsigned int data_len,
const void *output_context),
void *output_context,
const void *operator_context)
{
char *context = (char *)operator_context;

char id[id_len + 1];
memcpy(id, id_start, id_len);
id[id_len] = 0;

if (strcmp(id, "time") == 0)
{
char time[data_len + 1];
memcpy(time, data_start, data_len);
time[data_len] = 0;

printf("C operator received time input %s, context: %i\n", time, *context);
*context += 1;

char *out_id = "counter";

int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context);
if (res != 0)
{
printf("C operator failed to send output\n");
}
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *context);
}

return 0;
}

Loading…
Cancel
Save