From aac5a473ced222dec5696ffc5abc3cfcadf7e7d5 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 24 Feb 2023 11:26:29 +0100 Subject: [PATCH] Re-add C operator example --- examples/c-dataflow/dataflow.yml | 12 ++++- examples/c-dataflow/node.c | 8 ++-- examples/c-dataflow/operator.c | 81 ++++++++++++++++++++++++++++++++ examples/c-dataflow/run.rs | 29 +++++++++++- examples/c-dataflow/sink.c | 3 +- 5 files changed, 125 insertions(+), 8 deletions(-) create mode 100644 examples/c-dataflow/operator.c diff --git a/examples/c-dataflow/dataflow.yml b/examples/c-dataflow/dataflow.yml index 472d7094..1cc144fb 100644 --- a/examples/c-dataflow/dataflow.yml +++ b/examples/c-dataflow/dataflow.yml @@ -9,9 +9,17 @@ nodes: inputs: timer: dora/timer/millis/50 outputs: - - counter + - message + - id: runtime-node + operators: + - id: c_operator + shared-library: build/operator + inputs: + message: c_node/message + outputs: + - counter - id: c_sink custom: source: build/c_sink inputs: - counter: c_node/counter + counter: runtime-node/c_operator/counter diff --git a/examples/c-dataflow/node.c b/examples/c-dataflow/node.c index e858baf1..9c39766a 100644 --- a/examples/c-dataflow/node.c +++ b/examples/c-dataflow/node.c @@ -25,7 +25,6 @@ int main() for (char i = 0; i < 100; i++) { - printf("[c node] waiting for next input\n"); void *event = dora_next_event(dora_context); if (event == NULL) { @@ -43,8 +42,11 @@ int main() assert(data_len == 0); - char out_id[] = "counter"; - dora_send_output(dora_context, out_id, strlen(out_id), &i, 1); + char out_id[] = "message"; + char out_data[50]; + int out_data_len = sprintf(out_data, "loop iteration %d", i); + + dora_send_output(dora_context, out_id, strlen(out_id), out_data, out_data_len); } else if (ty == DoraEventType_Stop) { diff --git a/examples/c-dataflow/operator.c b/examples/c-dataflow/operator.c new file mode 100644 index 00000000..ba64e8e3 --- /dev/null +++ b/examples/c-dataflow/operator.c @@ -0,0 +1,81 @@ +#include "../../apis/c/operator/operator_api.h" +#include +#include +#include +#include + +DoraInitResult_t dora_init_operator(void) +{ + void *context = malloc(1); + char *context_char = (char *)context; + *context_char = 0; + + DoraInitResult_t result = {.operator_context = context}; + return result; +} + +DoraResult_t dora_drop_operator(void *operator_context) +{ + free(operator_context); + + DoraResult_t result = {}; + return result; +} + +OnEventResult_t dora_on_event( + const RawEvent_t *event, + const SendOutput_t *send_output, + void *operator_context) +{ + char *counter = (char *)operator_context; + + if (event->input != NULL) + { + // input event + Input_t *input = event->input; + + char id[input->id.len + 1]; + memcpy(id, input->id.ptr, input->id.len); + id[input->id.len] = 0; + + if (strcmp(id, "message") == 0) + { + char data[input->data.len + 1]; + memcpy(data, input->data.ptr, input->data.len); + data[input->data.len] = 0; + + *counter += 1; + printf("C operator received message `%s`, counter: %i\n", data, *counter); + + char *out_id = "counter"; + char *out_id_heap = strdup(out_id); + + int data_alloc_size = 100; + char *out_data = (char *)malloc(data_alloc_size); + int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter); + assert(count >= 0 && count < 100); + + Output_t output = {.id = { + .ptr = (uint8_t *)out_id_heap, + .len = strlen(out_id_heap), + .cap = strlen(out_id_heap) + 1, + }, + .data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}}; + DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output); + + OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE}; + return result; + } + else + { + printf("C operator received unexpected input %s, context: %i\n", id, *counter); + } + } + if (event->stop) + { + printf("C operator received stop event\n"); + } + + OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; + return result; +} diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index 39290ff0..d0bd1ecf 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -17,8 +17,13 @@ async fn main() -> eyre::Result<()> { build_c_node(root, "node.c", "c_node").await?; build_c_node(root, "sink.c", "c_sink").await?; + build_package("dora-operator-api-c").await?; + build_c_operator().await?; + let dataflow = Path::new("dataflow.yml").to_owned(); - dora_daemon::Daemon::run_dataflow(&dataflow, None).await?; + build_package("dora-runtime").await?; + let dora_runtime_path = Some(root.join("target").join("debug").join("dora-runtime")); + dora_daemon::Daemon::run_dataflow(&dataflow, dora_runtime_path).await?; Ok(()) } @@ -97,6 +102,28 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<( Ok(()) } +async fn build_c_operator() -> eyre::Result<()> { + let mut compile = tokio::process::Command::new("clang"); + compile.arg("-c").arg("operator.c"); + compile.arg("-o").arg("build/operator.o"); + compile.arg("-fdeclspec"); + #[cfg(unix)] + compile.arg("-fPIC"); + if !compile.status().await?.success() { + bail!("failed to compile c operator"); + }; + + let mut link = tokio::process::Command::new("clang"); + link.arg("-shared").arg("build/operator.o"); + link.arg("-o") + .arg(Path::new("build").join(library_filename("operator"))); + if !link.status().await?.success() { + bail!("failed to link c operator"); + }; + + Ok(()) +} + // taken from `rust_libloading` crate by Simonas Kazlauskas, licensed under the ISC license ( // see https://github.com/nagisa/rust_libloading/blob/master/LICENSE) pub fn library_filename>(name: S) -> OsString { diff --git a/examples/c-dataflow/sink.c b/examples/c-dataflow/sink.c index 3d40894d..d1b89924 100644 --- a/examples/c-dataflow/sink.c +++ b/examples/c-dataflow/sink.c @@ -18,7 +18,6 @@ int main() while (1) { - printf("[c sink] waiting for next input\n"); void *event = dora_next_event(dora_context); if (event == NULL) { @@ -40,7 +39,7 @@ int main() printf("[c sink] received input `"); fwrite(id, id_len, 1, stdout); - printf("` with data: %d\n", *data); + printf("` with data: %s\n", data); } else if (ty == DoraEventType_InputClosed) {