Browse Source

Update C dataflow example to use new API functions

tags/v0.3.0-rc
Philipp Oppermann 2 years ago
parent
commit
75b05131e8
Failed to extract signature
2 changed files with 17 additions and 19 deletions
  1. +13
    -17
      examples/c-dataflow/operator.c
  2. +4
    -2
      examples/c-dataflow/run.rs

+ 13
- 17
examples/c-dataflow/operator.c View File

@@ -27,6 +27,8 @@ OnEventResult_t dora_on_event(
const SendOutput_t *send_output,
void *operator_context)
{
OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};

char *counter = (char *)operator_context;

if (event->input != NULL)
@@ -34,18 +36,17 @@ OnEventResult_t dora_on_event(
// input event
Input_t *input = event->input;

char id[input->id.len + 1];
memcpy(id, input->id.ptr, input->id.len);
id[input->id.len] = 0;
char *id = dora_read_input_id(input);

if (strcmp(id, "message") == 0)
{
char data[input->data.len + 1];
memcpy(data, input->data.ptr, input->data.len);
data[input->data.len] = 0;
printf("message event\n");

Vec_uint8_t data = dora_read_data(input);
assert(data.ptr != NULL);

*counter += 1;
printf("C operator received message `%s`, counter: %i\n", data, *counter);
printf("C operator received message `%.*s`, counter: %i\n", (int)data.len, data.ptr, *counter);

char *out_id = "counter";
char *out_id_heap = strdup(out_id);
@@ -55,27 +56,22 @@ OnEventResult_t dora_on_event(
int count = snprintf(out_data, data_alloc_size, "The current counter value is %d", *counter);
assert(count >= 0 && count < 100);

Output_t output = {.id = {
.ptr = (uint8_t *)out_id_heap,
.len = strlen(out_id_heap),
.cap = strlen(out_id_heap) + 1,
},
.data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}};
DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output);
DoraResult_t res = dora_send_output(send_output, out_id_heap, (uint8_t *)out_data, strlen(out_data));
result.result = res;

OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE};
return result;
dora_free_data(data);
}
else
{
printf("C operator received unexpected input %s, context: %i\n", id, *counter);
}

dora_free_input_id(id);
}
if (event->stop)
{
printf("C operator received stop event\n");
}

OnEventResult_t result = {.status = DORA_STATUS_CONTINUE};
return result;
}

+ 4
- 2
examples/c-dataflow/run.rs View File

@@ -33,7 +33,7 @@ async fn main() -> eyre::Result<()> {
build_c_node(root, "sink.c", "c_sink").await?;

build_package("dora-operator-api-c").await?;
build_c_operator().await?;
build_c_operator(root).await?;

let dataflow = Path::new("dataflow.yml").to_owned();
dora_daemon::Daemon::run_dataflow(&dataflow).await?;
@@ -116,7 +116,7 @@ async fn build_c_node(root: &Path, name: &str, out_name: &str) -> eyre::Result<(
Ok(())
}

async fn build_c_operator() -> eyre::Result<()> {
async fn build_c_operator(root: &Path) -> eyre::Result<()> {
let mut compile = tokio::process::Command::new("clang");
compile.arg("-c").arg("operator.c");
compile.arg("-o").arg("build/operator.o");
@@ -129,6 +129,8 @@ async fn build_c_operator() -> eyre::Result<()> {

let mut link = tokio::process::Command::new("clang");
link.arg("-shared").arg("build/operator.o");
link.arg("-L").arg(root.join("target").join("debug"));
link.arg("-l").arg("dora_operator_api_c");
link.arg("-o")
.arg(Path::new("build").join(format!("{DLL_PREFIX}operator{DLL_SUFFIX}")));
if !link.status().await?.success() {


Loading…
Cancel
Save