| @@ -1,28 +1,95 @@ | |||||
| # C API | # C API | ||||
| ## Operator | |||||
| ## Custom Node | |||||
| The operator API gives you a framework for operator that is going to be managed by `dora`. This framework enable us to make optimisation and provide advanced features. | |||||
| The custom node API allow you to integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. | |||||
| #### `init_dora_context_from_env` | |||||
| `init_dora_context_from_env` initiate a node from environment variables set by `dora-coordinator` | |||||
| ```c | |||||
| void *dora_context = init_dora_context_from_env(); | |||||
| ``` | |||||
| #### `dora_next_input` and `read_dora_input_data` | |||||
| `dora_next_input` and `read_dora_input_data` gives you the next input received. | |||||
| ```c | |||||
| void *input = dora_next_input(dora_context); | |||||
| char *data; | |||||
| size_t data_len; | |||||
| read_dora_input_data(input, &data, &data_len); | |||||
| ``` | |||||
| #### `dora_send_output` | |||||
| `dora_send_output` send data from the node. | |||||
| ```c | |||||
| char out_id[] = "tick"; | |||||
| dora_send_output(dora_context, out_id, strlen(out_id), &i, 1); | |||||
| ``` | |||||
| ### Try it out! | ### Try it out! | ||||
| - Create an `operator.c` file: | |||||
| - Create an `node.c` file: | |||||
| ```c | ```c | ||||
| {{#include ../../examples/c-operator/operator.c}} | |||||
| {{#include ../../examples/c-dataflow/node.c}} | |||||
| ``` | ``` | ||||
| - Copy `operator.h` header file: | |||||
| {{#include ../../examples/c-dataflow/README.md:26:35}} | |||||
| ## Operator | |||||
| The operator API gives you a framework for operator that is going to be managed by `dora`. This framework enable us to make optimisation and provide advanced features. | |||||
| The operator definition is composed of 3 functions, `dora_init_operator` that initialise the operator and its context. `dora_drop_operator` that free the memory, and `dora_on_input` that action the logic of the operator on receiving an input. | |||||
| ```c | |||||
| int dora_init_operator(void **operator_context) | |||||
| { | |||||
| void *context = malloc(1); | |||||
| char *context_char = (char *)context; | |||||
| *context_char = 0; | |||||
| *operator_context = context; | |||||
| return 0; | |||||
| } | |||||
| void dora_drop_operator(void *operator_context) | |||||
| { | |||||
| free(operator_context); | |||||
| } | |||||
| ```bash | |||||
| cp apis/c/operator/api.h . | |||||
| int dora_on_input( | |||||
| const char *id_start, | |||||
| size_t id_len, | |||||
| const char *data_start, | |||||
| size_t data_len, | |||||
| const int (*output_fn_raw)(const char *id_start, | |||||
| size_t id_len, | |||||
| const char *data_start, | |||||
| size_t data_len, | |||||
| const void *output_context), | |||||
| void *output_context, | |||||
| const void *operator_context) | |||||
| { | |||||
| ... | |||||
| } | |||||
| ``` | ``` | ||||
| ### Try it out! | |||||
| - And compile your C operator: | |||||
| ```bash | |||||
| clang -c operator.c | |||||
| clang -shared -v operator.o -o operator.so -fPIC | |||||
| - Create an `operator.c` file: | |||||
| ```c | |||||
| {{#include ../../examples/c-dataflow/operator.c}} | |||||
| ``` | ``` | ||||
| {{#include ../../examples/c-dataflow/README.md:40:46}} | |||||
| - Link it in your graph as: | - Link it in your graph as: | ||||
| ```yaml | ```yaml | ||||
| {{#include ../../binaries/coordinator/examples/mini-dataflow.yml:47:52}} | {{#include ../../binaries/coordinator/examples/mini-dataflow.yml:47:52}} | ||||
| @@ -2,7 +2,37 @@ | |||||
| ## Cutom Node | ## Cutom Node | ||||
| The custom node API allow you to integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. | |||||
| The custom node API allow you to integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. | |||||
| #### `Node()` | |||||
| `Node()` initiate a node from environment variables set by `dora-coordinator` | |||||
| ```python | |||||
| from dora import Node() | |||||
| node = Node() | |||||
| ``` | |||||
| #### `.next()` or `__next__()` as an iterator | |||||
| `.next()` gives you the next input that the node has received. | |||||
| ```python | |||||
| input_id, value = node.next() | |||||
| # or | |||||
| for input_id, value in node: | |||||
| ``` | |||||
| #### `.send_output(output_id, data)` | |||||
| `send_output` send data from the node. | |||||
| ```python | |||||
| node.send_output("string", b"string") | |||||
| ``` | |||||
| ### Try it out! | ### Try it out! | ||||
| @@ -29,6 +59,19 @@ maturin develop | |||||
| The operator API gives you a framework for operator that is going to be managed by `dora`. This framework enable us to make optimisation and provide advanced features. | The operator API gives you a framework for operator that is going to be managed by `dora`. This framework enable us to make optimisation and provide advanced features. | ||||
| An operator requires an `on_input` method and requires to return a `DoraStatus` of 0 or 1, depending of it needs to continue or stop. | |||||
| ```python | |||||
| class Operator: | |||||
| def on_input( | |||||
| self, | |||||
| input_id: str, | |||||
| value: bytes, | |||||
| send_output: Callable[[str, bytes], None], | |||||
| ) -> DoraStatus: | |||||
| ``` | |||||
| > For Python, we recommend to allocate the operator on a single runtime. A runtime will share the same GIL with several operators making those operators run almost sequentially. See: [https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks](https://docs.rs/pyo3/latest/pyo3/marker/struct.Python.html#deadlocks) | |||||
| ### Try it out! | ### Try it out! | ||||
| - Create an operator python file called `op.py`: | - Create an operator python file called `op.py`: | ||||
| @@ -3,6 +3,29 @@ | |||||
| ## Node | ## Node | ||||
| The custom node API allow you to integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. | The custom node API allow you to integrate `dora` into your application. It allows you to retrieve input and send output in any fashion you want. | ||||
| #### `DoraNode::init_from_env()` | |||||
| `DoraNode::init_from_env()` initiate a node from environment variables set by `dora-coordinator` | |||||
| ```rust | |||||
| let node = DoraNode::init_from_env().await?; | |||||
| ``` | |||||
| #### `.inputs()` | |||||
| `.inputs()` gives you a stream of input that you can access using `next()` on the input stream. | |||||
| ```rust | |||||
| let mut inputs = node.inputs().await?; | |||||
| ``` | |||||
| #### `.send_output(output_id, data)` | |||||
| `send_output` send data from the node. | |||||
| ```rust | |||||
| node.send_output(&data_id, data.as_bytes()).await?; | |||||
| ``` | |||||
| ### Try it out! | ### Try it out! | ||||
| @@ -42,6 +65,27 @@ time = "0.3.9" | |||||
| The operator API gives you a framework for operator that is going to be managed by `dora`. This framework enable us to make optimisation and provide advanced features. | The operator API gives you a framework for operator that is going to be managed by `dora`. This framework enable us to make optimisation and provide advanced features. | ||||
| An operator requires to be registered and implement the `DoraOperator` trait, which is composed of an `on_input` method that defines the behaviour of the operator when there is an input. | |||||
| ```rust | |||||
| use dora_operator_api::{register_operator, DoraOperator, DoraOutputSender, DoraStatus}; | |||||
| register_operator!(ExampleOperator); | |||||
| #[derive(Debug, Default)] | |||||
| struct ExampleOperator { | |||||
| time: Option<String>, | |||||
| } | |||||
| impl DoraOperator for ExampleOperator { | |||||
| fn on_input( | |||||
| &mut self, | |||||
| id: &str, | |||||
| data: &[u8], | |||||
| output_sender: &mut DoraOutputSender, | |||||
| ) -> Result<DoraStatus, ()> { | |||||
| ``` | |||||
| ### Try it out! | ### Try it out! | ||||
| - Generate a new Rust library | - Generate a new Rust library | ||||