diff --git a/binaries/cli/src/template/c/dataflow-template.yml b/binaries/cli/src/template/c/dataflow-template.yml index 9f50172b..ab5e71a7 100644 --- a/binaries/cli/src/template/c/dataflow-template.yml +++ b/binaries/cli/src/template/c/dataflow-template.yml @@ -3,25 +3,26 @@ communication: prefix: ___name___ nodes: - - id: runtime-node_1 - operators: - - id: op_1 - shared-library: build/op_1 - inputs: - tick: dora/timer/millis/100 - outputs: - - some-output - - id: op_2 - shared-library: build/op_2 - inputs: - tick: dora/timer/secs/2 - outputs: - - some-output + - id: op_1 + operator: + shared-library: build/op_1 + inputs: + foo: dora/timer/millis/100 + outputs: + - bar + - id: op_2 + operator: + shared-library: build/op_2 + inputs: + foo: dora/timer/secs/2 + outputs: + - bar - id: custom-node_1 custom: source: build/node_1 inputs: - tick: dora/timer/secs/1 - input-1: op_1/some-output - input-2: op_2/some-output + input-1: op_1/bar + input-2: op_2/bar + outputs: + - foo diff --git a/binaries/cli/src/template/c/mod.rs b/binaries/cli/src/template/c/mod.rs index 909f7999..5dffaff5 100644 --- a/binaries/cli/src/template/c/mod.rs +++ b/binaries/cli/src/template/c/mod.rs @@ -53,7 +53,8 @@ fn create_dataflow(name: String, path: Option) -> Result<(), eyre::ErrR fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrReport> { const OPERATOR: &str = include_str!("operator/operator-template.c"); - const HEADER: &str = include_str!("../../../../../apis/c/operator/operator_api.h"); + const HEADER_API: &str = include_str!("../../../../../apis/c/operator/operator_api.h"); + const HEADER_TYPE: &str = include_str!("../../../../../apis/c/operator/operator_types.h"); if name.contains('/') { bail!("operator name must not contain `/` separators"); @@ -73,9 +74,12 @@ fn create_operator(name: String, path: Option) -> Result<(), eyre::ErrR let operator_path = root.join("operator.c"); fs::write(&operator_path, OPERATOR) .with_context(|| format!("failed to write `{}`", operator_path.display()))?; - let header_path = root.join("operator_api.h"); - fs::write(&header_path, HEADER) - .with_context(|| format!("failed to write `{}`", header_path.display()))?; + let header_api_path = root.join("operator_api.h"); + let header_type_path = root.join("operator_types.h"); + fs::write(&header_api_path, HEADER_API) + .with_context(|| format!("failed to write `{}`", header_api_path.display()))?; + fs::write(&header_type_path, HEADER_TYPE) + .with_context(|| format!("failed to write `{}`", header_type_path.display()))?; // TODO: Makefile? diff --git a/binaries/cli/src/template/c/node/node-template.c b/binaries/cli/src/template/c/node/node-template.c index 6e9af2ab..48f40918 100644 --- a/binaries/cli/src/template/c/node/node-template.c +++ b/binaries/cli/src/template/c/node/node-template.c @@ -21,26 +21,30 @@ int main() while (1) { - void *input = dora_next_input(dora_context); - if (input == NULL) + void *event = dora_next_event(dora_context); + if (event == NULL) { - // end of input - break; + printf("[c node] ERROR: unexpected end of event\n"); + return -1; } - char *id; - size_t id_len; - read_dora_input_id(input, &id, &id_len); + enum DoraEventType ty = read_dora_event_type(event); + if (ty == DoraEventType_Input) + { + char *id; + size_t id_len; + read_dora_input_id(event, &id, &id_len); - char *data; - size_t data_len; - read_dora_input_data(input, &data, &data_len); + char *data; + size_t data_len; + read_dora_input_data(event, &data, &data_len); - char out_id[] = "foo"; - char out_data[] = "bar"; - dora_send_output(dora_context, out_id, strlen(out_id), out_data, strlen(out_data)); + char out_id[] = "foo"; + char out_data[] = "bar"; + dora_send_output(dora_context, out_id, strlen(out_id), out_data, strlen(out_data)); - free_dora_input(input); // do not use `id` or `data` pointer after freeing + free_dora_event(event); // do not use `id` or `data` pointer after freeing + } } free_dora_context(dora_context); diff --git a/binaries/cli/src/template/c/operator/operator-template.c b/binaries/cli/src/template/c/operator/operator-template.c index a1a41293..3ba7fd5d 100644 --- a/binaries/cli/src/template/c/operator/operator-template.c +++ b/binaries/cli/src/template/c/operator/operator-template.c @@ -22,39 +22,43 @@ DoraResult_t dora_drop_operator(void *operator_context) return result; } -OnInputResult_t dora_on_input( - const Input_t *input, +OnEventResult_t dora_on_event( + const RawEvent_t *event, const SendOutput_t *send_output, void *operator_context) { - char id[input->id.len + 1]; - memcpy(id, input->id.ptr, input->id.len); - id[input->id.len] = 0; - - // example for matching on input name - if (strcmp(id, "foo") == 0) + if (event->input != NULL) { - char *out_id = "bar"; - char *out_id_heap = strdup(out_id); - - int data_alloc_size = 10; - void *out_data = malloc(data_alloc_size); - // TODO intialize out_data - - Output_t output = {.id = { - .ptr = (uint8_t *)out_id_heap, - .len = strlen(out_id_heap), - .cap = strlen(out_id_heap) + 1, - }, - .data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}}; - DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output); - - OnInputResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE}; - return result; + char id[event->input->id.len + 1]; + memcpy(id, event->input->id.ptr, event->input->id.len); + id[event->input->id.len] = 0; + + // example for matching on input name + if (strcmp(id, "foo") == 0) + { + char *out_id = "bar"; + char *out_id_heap = strdup(out_id); + + int data_alloc_size = 10; + void *out_data = malloc(data_alloc_size); + // TODO intialize out_data + + Output_t output = {.id = { + .ptr = (uint8_t *)out_id_heap, + .len = strlen(out_id_heap), + .cap = strlen(out_id_heap) + 1, + }, + .data = {.ptr = (uint8_t *)out_data, .len = strlen(out_data), .cap = data_alloc_size}}; + DoraResult_t res = (send_output->send_output.call)(send_output->send_output.env_ptr, output); + + OnEventResult_t result = {.result = res, .status = DORA_STATUS_CONTINUE}; + return result; + } } - else + if (event->stop) { - OnInputResult_t result = {.status = DORA_STATUS_CONTINUE}; - return result; + printf("C operator received stop event\n"); } + OnEventResult_t result = {.status = DORA_STATUS_CONTINUE}; + return result; } diff --git a/binaries/cli/src/template/python/node/node-template.py b/binaries/cli/src/template/python/node/node-template.py index ff28dbdd..296a36ec 100644 --- a/binaries/cli/src/template/python/node/node-template.py +++ b/binaries/cli/src/template/python/node/node-template.py @@ -5,6 +5,11 @@ from dora import Node node = Node() -input_id, value, metadata = node.next() - -print(f"id: {input_id}, value: {value}, metadata: {metadata}") +event = node.next() +if event["type"] == "INPUT": + print( + f"""Node received: + id: {event["id"]}, + value: {event["data"]}, + metadata: {event["metadata"]}""" + ) diff --git a/binaries/cli/src/template/rust/dataflow-template.yml b/binaries/cli/src/template/rust/dataflow-template.yml index 572d7ef2..48c4bccd 100644 --- a/binaries/cli/src/template/rust/dataflow-template.yml +++ b/binaries/cli/src/template/rust/dataflow-template.yml @@ -3,16 +3,16 @@ communication: prefix: ___name___ nodes: - - id: runtime-node_1 - operators: - - id: op_1 + - id: op_1 + operator: build: cargo build -p op_1 shared-library: target/debug/op_1 inputs: tick: dora/timer/millis/100 outputs: - some-output - - id: op_2 + - id: op_2 + operator: build: cargo build -p op_2 shared-library: target/debug/op_2 inputs: