Browse Source

Add examples for C++ nodes and operators based on C-API again

tags/v0.2.0-candidate
Philipp Oppermann 2 years ago
parent
commit
9c7ce16966
Failed to extract signature
4 changed files with 159 additions and 63 deletions
  1. +17
    -15
      examples/c++-dataflow/dataflow.yml
  2. +85
    -0
      examples/c++-dataflow/node-c-api/main.cc
  3. +39
    -28
      examples/c++-dataflow/operator-c-api/operator.cc
  4. +18
    -20
      examples/c++-dataflow/run.rs

+ 17
- 15
examples/c++-dataflow/dataflow.yml View File

@@ -10,26 +10,28 @@ nodes:
tick: dora/timer/millis/300
outputs:
- counter
# - id: cxx-node-c-api
# custom:
# source: build/node_c_api
# inputs:
# tick: dora/timer/millis/300
# outputs:
# - counter
- id: cxx-node-c-api
custom:
source: build/node_c_api
inputs:
tick: dora/timer/millis/300
outputs:
- counter

- id: runtime-node
- id: runtime-node-1
operators:
- id: operator-rust-api
shared-library: build/operator_rust_api
inputs:
# counter_1: cxx-node-c-api/counter
counter_1: cxx-node-c-api/counter
counter_2: cxx-node-rust-api/counter
outputs:
- status
# - id: operator-c-api
# shared-library: build/operator_c_api
# inputs:
# op_status: runtime-node/operator-rust-api/status
# outputs:
# - half-status
- id: runtime-node-2
operators:
- id: operator-c-api
shared-library: build/operator_c_api
inputs:
op_status: runtime-node-1/operator-rust-api/status
outputs:
- half-status

+ 85
- 0
examples/c++-dataflow/node-c-api/main.cc View File

@@ -0,0 +1,85 @@
extern "C"
{
#include "../../../apis/c/node/node_api.h"
}

#include <iostream>
#include <vector>

int run(void *dora_context)
{
unsigned char counter = 0;

for (int i = 0; i < 20; i++)
{
void *event = dora_next_event(dora_context);
if (event == NULL)
{
printf("[c node] ERROR: unexpected end of event\n");
return -1;
}

enum DoraEventType ty = read_dora_event_type(event);

if (ty == DoraEventType_Input)
{
counter += 1;

char *id_ptr;
size_t id_len;
read_dora_input_id(event, &id_ptr, &id_len);
std::string id(id_ptr, id_len);

char *data_ptr;
size_t data_len;
read_dora_input_data(event, &data_ptr, &data_len);
std::vector<unsigned char> data;
for (size_t i = 0; i < data_len; i++)
{
data.push_back(*(data_ptr + i));
}

std::cout
<< "Received input "
<< " (counter: " << (unsigned int)counter << ") data: [";
for (unsigned char &v : data)
{
std::cout << (unsigned int)v << ", ";
}
std::cout << "]" << std::endl;

std::vector<unsigned char> out_vec{counter};
std::string out_id = "counter";
int result = dora_send_output(dora_context, &out_id[0], out_id.length(), (char *)&counter, 1);
if (result != 0)
{
std::cerr << "failed to send output" << std::endl;
return 1;
}
}
else if (ty == DoraEventType_Stop)
{
printf("[c node] received stop event\n");
}
else
{
printf("[c node] received unexpected event: %d\n", ty);
}

free_dora_event(event);
}
return 0;
}

int main()
{
std::cout << "HELLO FROM C++ (using C API)" << std::endl;

auto dora_context = init_dora_context_from_env();
auto ret = run(dora_context);
free_dora_context(dora_context);

std::cout << "GOODBYE FROM C++ node (using C API)" << std::endl;

return ret;
}

+ 39
- 28
examples/c++-dataflow/operator-c-api/operator.cc View File

@@ -30,44 +30,55 @@ extern "C" DoraResult_t dora_drop_operator(void *operator_context)
return {};
}

extern "C" OnInputResult_t dora_on_input(
const Input_t *input,
extern "C" OnEventResult_t dora_on_event(
const RawEvent_t *event,
const SendOutput_t *send_output,
void *operator_context)
{
if (event->input != NULL)
{
// input event
Input_t *input = event->input;
std::string id((char *)input->id.ptr, input->id.len);

std::string id((char *)input->id.ptr, input->id.len);
std::vector<unsigned char> data;
for (size_t i = 0; i < input->data.len; i++)
{
data.push_back(*(input->data.ptr + i));
}

std::vector<unsigned char> data;
for (size_t i = 0; i < input->data.len; i++)
{
data.push_back(*(input->data.ptr + i));
}
std::cout
<< "C++ Operator (C-API) received input `" << id << "` with data: [";
for (unsigned char &v : data)
{
std::cout << (unsigned int)v << ", ";
}
std::cout << "]" << std::endl;

std::cout
<< "C++ Operator (C-API) received input `" << id << "` with data: [";
for (unsigned char &v : data)
{
std::cout << (unsigned int)v << ", ";
}
std::cout << "]" << std::endl;
const char *out_id = "half-status";
char *out_id_heap = strdup(out_id);

const char *out_id = "half-status";
char *out_id_heap = strdup(out_id);
size_t out_data_len = 1;
uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len);
*out_data_heap = data[0] / 2;

size_t out_data_len = 1;
uint8_t *out_data_heap = (uint8_t *)malloc(out_data_len);
*out_data_heap = data[0] / 2;
Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = out_data_heap, .len = out_data_len, .cap = out_data_len}};

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = out_data_heap, .len = out_data_len, .cap = out_data_len}};
DoraResult_t send_result = (send_output->send_output.call)(send_output->send_output.env_ptr, output);

DoraResult_t send_result = (send_output->send_output.call)(send_output->send_output.env_ptr, output);
OnEventResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE};
return result;
}
if (event->stop)
{
printf("C operator received stop event\n");
}

OnInputResult_t result = {.result = send_result, .status = DORA_STATUS_CONTINUE};
OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}

+ 18
- 20
examples/c++-dataflow/run.rs View File

@@ -58,7 +58,7 @@ async fn main() -> eyre::Result<()> {
.await?;

build_package("dora-node-api-c").await?;
// build_package("dora-operator-api-c").await?;
build_package("dora-operator-api-c").await?;
build_cxx_node(
root,
&[
@@ -69,15 +69,15 @@ async fn main() -> eyre::Result<()> {
&["-l", "dora_node_api_cxx"],
)
.await?;
// build_cxx_node(
// root,
// &[&dunce::canonicalize(
// Path::new("node-c-api").join("main.cc"),
// )?],
// "node_c_api",
// &["-l", "dora_node_api_c"],
// )
// .await?;
build_cxx_node(
root,
&[&dunce::canonicalize(
Path::new("node-c-api").join("main.cc"),
)?],
"node_c_api",
&["-l", "dora_node_api_c"],
)
.await?;
build_cxx_operator(
&[
&dunce::canonicalize(Path::new("operator-rust-api").join("operator.cc"))?,
@@ -92,16 +92,14 @@ async fn main() -> eyre::Result<()> {
],
)
.await?;
// build_cxx_operator(
// &[&dunce::canonicalize(
// Path::new("operator-c-api").join("operator.cc"),
// )?],
// "operator_c_api",
// &[],
// )
// .await?;

// build_package("dora-runtime").await?;
build_cxx_operator(
&[&dunce::canonicalize(
Path::new("operator-c-api").join("operator.cc"),
)?],
"operator_c_api",
&[],
)
.await?;

let dataflow = Path::new("dataflow.yml").to_owned();
build_package("dora-runtime").await?;


Loading…
Cancel
Save