From 9c7ce169669ed8a403d46676421da16a81b9329e Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 24 Feb 2023 11:51:55 +0100 Subject: [PATCH] Add examples for C++ nodes and operators based on C-API again --- examples/c++-dataflow/dataflow.yml | 32 +++---- examples/c++-dataflow/node-c-api/main.cc | 85 +++++++++++++++++++ .../c++-dataflow/operator-c-api/operator.cc | 67 +++++++++------ examples/c++-dataflow/run.rs | 38 ++++----- 4 files changed, 159 insertions(+), 63 deletions(-) create mode 100644 examples/c++-dataflow/node-c-api/main.cc diff --git a/examples/c++-dataflow/dataflow.yml b/examples/c++-dataflow/dataflow.yml index 21bee9a1..7b52ae6d 100644 --- a/examples/c++-dataflow/dataflow.yml +++ b/examples/c++-dataflow/dataflow.yml @@ -10,26 +10,28 @@ nodes: tick: dora/timer/millis/300 outputs: - counter - # - id: cxx-node-c-api - # custom: - # source: build/node_c_api - # inputs: - # tick: dora/timer/millis/300 - # outputs: - # - counter + - id: cxx-node-c-api + custom: + source: build/node_c_api + inputs: + tick: dora/timer/millis/300 + outputs: + - counter - - id: runtime-node + - id: runtime-node-1 operators: - id: operator-rust-api shared-library: build/operator_rust_api inputs: - # counter_1: cxx-node-c-api/counter + counter_1: cxx-node-c-api/counter counter_2: cxx-node-rust-api/counter outputs: - status - # - id: operator-c-api - # shared-library: build/operator_c_api - # inputs: - # op_status: runtime-node/operator-rust-api/status - # outputs: - # - half-status + - id: runtime-node-2 + operators: + - id: operator-c-api + shared-library: build/operator_c_api + inputs: + op_status: runtime-node-1/operator-rust-api/status + outputs: + - half-status diff --git a/examples/c++-dataflow/node-c-api/main.cc b/examples/c++-dataflow/node-c-api/main.cc new file mode 100644 index 00000000..8148bf19 --- /dev/null +++ b/examples/c++-dataflow/node-c-api/main.cc @@ -0,0 +1,85 @@ +extern "C" +{ +#include "../../../apis/c/node/node_api.h" +} + +#include +#include + +int run(void *dora_context) +{ + unsigned char counter = 0; + + for (int i = 0; i < 20; i++) + { + void *event = dora_next_event(dora_context); + if (event == NULL) + { + printf("[c node] ERROR: unexpected end of event\n"); + return -1; + } + + enum DoraEventType ty = read_dora_event_type(event); + + if (ty == DoraEventType_Input) + { + counter += 1; + + char *id_ptr; + size_t id_len; + read_dora_input_id(event, &id_ptr, &id_len); + std::string id(id_ptr, id_len); + + char *data_ptr; + size_t data_len; + read_dora_input_data(event, &data_ptr, &data_len); + std::vector data; + for (size_t i = 0; i < data_len; i++) + { + data.push_back(*(data_ptr + i)); + } + + std::cout + << "Received input " + << " (counter: " << (unsigned int)counter << ") data: ["; + for (unsigned char &v : data) + { + std::cout << (unsigned int)v << ", "; + } + std::cout << "]" << std::endl; + + std::vector out_vec{counter}; + std::string out_id = "counter"; + int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1); + if (result != 0) + { + std::cerr << "failed to send output" << std::endl; + return 1; + } + } + else if (ty == DoraEventType_Stop) + { + printf("[c node] received stop event\n"); + } + else + { + printf("[c node] received unexpected event: %d\n", ty); + } + + free_dora_event(event); + } + return 0; +} + +int main() +{ + std::cout << "HELLO FROM C++ (using C API)" << std::endl; + + auto dora_context = init_dora_context_from_env(); + auto ret = run(dora_context); + free_dora_context(dora_context); + + std::cout << "GOODBYE FROM C++ node (using C API)" << std::endl; + + return ret; +} diff --git a/examples/c++-dataflow/operator-c-api/operator.cc b/examples/c++-dataflow/operator-c-api/operator.cc index 33fd17d5..7c9fd299 100644 --- a/examples/c++-dataflow/operator-c-api/operator.cc +++ b/examples/c++-dataflow/operator-c-api/operator.cc @@ -30,44 +30,55 @@ extern "C" DoraResult_t dora_drop_operator(void *operator_context) return {}; } -extern "C" OnInputResult_t dora_on_input( - const Input_t *input, +extern "C" OnEventResult_t dora_on_event( + const RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { + if (event->input != NULL) + { + // input event + Input_t *input = event->input; + std::string id((char *)input->id.ptr, input->id.len); - std::string id((char *)input->id.ptr, input->id.len); + std::vector data; + for (size_t i = 0; i < input->data.len; i++) + { + data.push_back(*(input->data.ptr + i)); + } - std::vector data; - for (size_t i = 0; i < input->data.len; i++) - { - data.push_back(*(input->data.ptr + i)); - } + std::cout + << "C++ Operator (C-API) received input `" << id << "` with data: ["; + for (unsigned char &v : data) + { + std::cout << (unsigned int)v << ", "; + } + std::cout << "]" << std::endl; - std::cout - << "C++ Operator (C-API) received input `" << id << "` with data: ["; - for (unsigned char &v : data) - { - std::cout << (unsigned int)v << ", "; - } - std::cout << "]" << std::endl; + const char *out_id = "half-status"; + char *out_id_heap = strdup(out_id); - const char *out_id = "half-status"; - char *out_id_heap = strdup(out_id); + size_t out_data_len = 1; + uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len); + *out_data_heap = data[0] / 2; - size_t out_data_len = 1; - uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len); - *out_data_heap = data[0] / 2; + Output_t output = {.id = { + .ptr = (uint8_t *)out_id_heap, + .len = strlen(out_id_heap), + .cap = strlen(out_id_heap) + 1, + }, + .data = {.ptr = out_data_heap, .len = out_data_len, .cap = out_data_len}}; - Output_t output = {.id = { - .ptr = (uint8_t *)out_id_heap, - .len = strlen(out_id_heap), - .cap = strlen(out_id_heap) + 1, - }, - .data = {.ptr = out_data_heap, .len = out_data_len, .cap = out_data_len}}; + DoraResult_t send_result = (send_output->send_output.call)(send_output->send_output.env_ptr, output); - DoraResult_t send_result = (send_output->send_output.call)(send_output->send_output.env_ptr, output); + OnEventResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE}; + return result; + } + if (event->stop) + { + printf("C operator received stop event\n"); + } - OnInputResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE}; + OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; return result; } diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index 01435939..2698b6d9 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -58,7 +58,7 @@ async fn main() -> eyre::Result<()> { .await?; build_package("dora-node-api-c").await?; - // build_package("dora-operator-api-c").await?; + build_package("dora-operator-api-c").await?; build_cxx_node( root, &[ @@ -69,15 +69,15 @@ async fn main() -> eyre::Result<()> { &["-l", "dora_node_api_cxx"], ) .await?; - // build_cxx_node( - // root, - // &[&dunce::canonicalize( - // Path::new("node-c-api").join("main.cc"), - // )?], - // "node_c_api", - // &["-l", "dora_node_api_c"], - // ) - // .await?; + build_cxx_node( + root, + &[&dunce::canonicalize( + Path::new("node-c-api").join("main.cc"), + )?], + "node_c_api", + &["-l", "dora_node_api_c"], + ) + .await?; build_cxx_operator( &[ &dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?, @@ -92,16 +92,14 @@ async fn main() -> eyre::Result<()> { ], ) .await?; - // build_cxx_operator( - // &[&dunce::canonicalize( - // Path::new("operator-c-api").join("operator.cc"), - // )?], - // "operator_c_api", - // &[], - // ) - // .await?; - - // build_package("dora-runtime").await?; + build_cxx_operator( + &[&dunce::canonicalize( + Path::new("operator-c-api").join("operator.cc"), + )?], + "operator_c_api", + &[], + ) + .await?; let dataflow = Path::new("dataflow.yml").to_owned(); build_package("dora-runtime").await?;