From 75b05131e80a6c45dfbd1f170dc297443ce38bfd Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 19 Oct 2023 16:45:50 +0200 Subject: [PATCH] Update C dataflow example to use new API functions --- examples/c-dataflow/operator.c | 30 +++++++++++++----------------- examples/c-dataflow/run.rs | 6 ++++-- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/examples/c-dataflow/operator.c b/examples/c-dataflow/operator.c index d3cc01ca..7e739028 100644 --- a/examples/c-dataflow/operator.c +++ b/examples/c-dataflow/operator.c @@ -27,6 +27,8 @@ OnEventResult_t dora_on_event( const SendOutput_t *send_output, void *operator_context) { + OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; + char *counter = (char *)operator_context; if (event->input != NULL) @@ -34,18 +36,17 @@ OnEventResult_t dora_on_event( // input event Input_t *input = event->input; - char id[input->id.len + 1]; - memcpy(id, input->id.ptr, input->id.len); - id[input->id.len] = 0; + char *id = dora_read_input_id(input); if (strcmp(id, "message") == 0) { - char data[input->data.len + 1]; - memcpy(data, input->data.ptr, input->data.len); - data[input->data.len] = 0; + printf("message event\n"); + + Vec_uint8_t data = dora_read_data(input); + assert(data.ptr != NULL); *counter += 1; - printf("C operator received message `%s`, counter: %i\n", data, *counter); + printf("C operator received message `%.*s`, counter: %i\n", (int)data.len, data.ptr, *counter); char *out_id = "counter"; char *out_id_heap = strdup(out_id); @@ -55,27 +56,22 @@ OnEventResult_t dora_on_event( int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter); assert(count >= 0 && count < 100); - Output_t output = {.id = { - .ptr = (uint8_t *)out_id_heap, - .len = strlen(out_id_heap), - .cap = strlen(out_id_heap) + 1, - }, - .data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}}; - DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output); + DoraResult_t res = dora_send_output(send_output, out_id_heap, (uint8_t *)out_data, strlen(out_data)); + result.result = res; - OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE}; - return result; + dora_free_data(data); } else { printf("C operator received unexpected input %s, context: %i\n", id, *counter); } + + dora_free_input_id(id); } if (event->stop) { printf("C operator received stop event\n"); } - OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; return result; } diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index 2143dddf..6795dc2f 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -33,7 +33,7 @@ async fn main() -> eyre::Result<()> { build_c_node(root, "sink.c", "c_sink").await?; build_package("dora-operator-api-c").await?; - build_c_operator().await?; + build_c_operator(root).await?; let dataflow = Path::new("dataflow.yml").to_owned(); dora_daemon::Daemon::run_dataflow(&dataflow).await?; @@ -116,7 +116,7 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<( Ok(()) } -async fn build_c_operator() -> eyre::Result<()> { +async fn build_c_operator(root: &Path) -> eyre::Result<()> { let mut compile = tokio::process::Command::new("clang"); compile.arg("-c").arg("operator.c"); compile.arg("-o").arg("build/operator.o"); @@ -129,6 +129,8 @@ async fn build_c_operator() -> eyre::Result<()> { let mut link = tokio::process::Command::new("clang"); link.arg("-shared").arg("build/operator.o"); + link.arg("-L").arg(root.join("target").join("debug")); + link.arg("-l").arg("dora_operator_api_c"); link.arg("-o") .arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}"))); if !link.status().await?.success() {