diff --git a/coordinator/examples/example_sink_logger.rs b/coordinator/examples/example_sink_logger.rs index dc56630f..8fbeed01 100644 --- a/coordinator/examples/example_sink_logger.rs +++ b/coordinator/examples/example_sink_logger.rs @@ -40,6 +40,9 @@ async fn main() -> eyre::Result<()> { let data = String::from_utf8(input.data)?; println!("received timestamped random value: {data}"); } + "counter" => { + println!("received counter value: {:?}", input.data); + } other => eprintln!("Ignoring unexpected input `{other}`"), } diff --git a/coordinator/examples/mini-dataflow.yml b/coordinator/examples/mini-dataflow.yml index c47a1cb0..838d8dc3 100644 --- a/coordinator/examples/mini-dataflow.yml +++ b/coordinator/examples/mini-dataflow.yml @@ -32,6 +32,7 @@ nodes: random: random/number time: timer/time timestamped-random: runtime-node/op-1/timestamped-random + counter: runtime-node/op-2/counter - id: runtime-node operators: @@ -42,3 +43,9 @@ nodes: time: timer/time outputs: - timestamped-random + - id: op-2 + shared-library: ../runtime/examples/c-operator/operator.so + inputs: + time: timer/time + outputs: + - counter diff --git a/runtime/examples/c-operator/.gitignore b/runtime/examples/c-operator/.gitignore new file mode 100644 index 00000000..7c547269 --- /dev/null +++ b/runtime/examples/c-operator/.gitignore @@ -0,0 +1,2 @@ +operator.o +operator.so diff --git a/runtime/examples/c-operator/api.h b/runtime/examples/c-operator/api.h new file mode 100644 index 00000000..5ba25954 --- /dev/null +++ b/runtime/examples/c-operator/api.h @@ -0,0 +1,16 @@ +int dora_init_operator(void **operator_context); + +void dora_drop_operator(void *operator_context); + +int dora_on_input( + const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const int (*output_fn_raw)(const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const void *output_context), + void *output_context, + const void *operator_context); diff --git a/runtime/examples/c-operator/operator.c b/runtime/examples/c-operator/operator.c new file mode 100644 index 00000000..2e3584a6 --- /dev/null +++ b/runtime/examples/c-operator/operator.c @@ -0,0 +1,64 @@ +#include "api.h" +#include +#include +#include + +int dora_init_operator(void **operator_context) +{ + void *context = malloc(1); + char *context_char = (char *)context; + *context_char = 0; + + *operator_context = context; + + return 0; +} + +void dora_drop_operator(void *operator_context) +{ + free(operator_context); +} + +int dora_on_input( + const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const int (*output_fn_raw)(const char *id_start, + unsigned int id_len, + const char *data_start, + unsigned int data_len, + const void *output_context), + void *output_context, + const void *operator_context) +{ + char *context = (char *)operator_context; + + char id[id_len + 1]; + memcpy(id, id_start, id_len); + id[id_len] = 0; + + if (strcmp(id, "time") == 0) + { + char time[data_len + 1]; + memcpy(time, data_start, data_len); + time[data_len] = 0; + + printf("C operator received time input %s, context: %i\n", time, *context); + *context += 1; + + char *out_id = "counter"; + + int res = (output_fn_raw)(out_id, strlen(out_id), context, 1, output_context); + if (res != 0) + { + printf("C operator failed to send output\n"); + } + } + else + { + printf("C operator received unexpected input %s, context: %i\n", id, *context); + } + + return 0; +}