From b0cf6fa4f5df2b5be13fce6e3583b465f02f8687 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 25 Aug 2022 15:13:07 +0200 Subject: [PATCH] Rework raw operator API on top of `safer-api` crate Allows a higher-level interface --- Cargo.lock | 334 ++++++++++++++++++++ Cargo.toml | 1 + apis/rust/operator/Cargo.toml | 3 +- apis/rust/operator/macros/Cargo.toml | 1 + apis/rust/operator/macros/src/lib.rs | 35 +- apis/rust/operator/src/lib.rs | 44 +-- apis/rust/operator/src/raw.rs | 57 ++-- apis/rust/operator/types/Cargo.toml | 11 + apis/rust/operator/types/src/lib.rs | 81 +++++ binaries/runtime/Cargo.toml | 1 + binaries/runtime/src/operator/shared_lib.rs | 143 +++------ 11 files changed, 542 insertions(+), 169 deletions(-) create mode 100644 apis/rust/operator/types/Cargo.toml create mode 100644 apis/rust/operator/types/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 154c7ace..80f6cc01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aes" version = "0.7.5" @@ -389,6 +395,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chunked_transfer" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e" + [[package]] name = "cipher" version = "0.3.0" @@ -502,6 +514,15 @@ dependencies = [ "libc", ] +[[package]] +name = "crc32fast" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d" +dependencies = [ + "cfg-if", +] + [[package]] name = "crossbeam-channel" version = "0.5.4" @@ -811,6 +832,7 @@ name = "dora-operator-api" version = "0.1.0" dependencies = [ "dora-operator-api-macros", + "dora-operator-api-types", ] [[package]] @@ -818,11 +840,19 @@ name = "dora-operator-api-macros" version = "0.1.0" dependencies = [ "dora-operator-api", + "dora-operator-api-types", "proc-macro2", "quote", "syn", ] +[[package]] +name = "dora-operator-api-types" +version = "0.1.0" +dependencies = [ + "safer-ffi", +] + [[package]] name = "dora-runtime" version = "0.1.0" @@ -830,6 +860,7 @@ dependencies = [ "clap 3.1.12", "dora-core", "dora-node-api", + "dora-operator-api-types", "eyre", "fern", "futures", @@ -918,6 +949,16 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e" +[[package]] +name = "flate2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6" +dependencies = [ + "crc32fast", + "miniz_oxide", +] + [[package]] name = "flume" version = "0.10.12" @@ -937,6 +978,16 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "form_urlencoded" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +dependencies = [ + "matches", + "percent-encoding", +] + [[package]] name = "futures" version = "0.3.21" @@ -1295,6 +1346,17 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "idna" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +dependencies = [ + "matches", + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indenter" version = "0.3.3" @@ -1459,12 +1521,34 @@ dependencies = [ "value-bag", ] +[[package]] +name = "macro_rules_attribute" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "258c86475e1616d6f2d8f5227cfaabd3dae1f6d5388b9597df8a199d4497aba7" +dependencies = [ + "macro_rules_attribute-proc_macro", + "paste", +] + +[[package]] +name = "macro_rules_attribute-proc_macro" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26a8d2502d5aa4d411ef494ba7470eb299f05725179ce3b5de77aa01a9ffdea" + [[package]] name = "maplit" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d" +[[package]] +name = "matches" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" + [[package]] name = "memchr" version = "2.4.1" @@ -1480,6 +1564,30 @@ dependencies = [ "autocfg 1.1.0", ] +[[package]] +name = "mini_paste" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2499b7bd9834270bf24cfc4dd96be59020ba6fd7f3276b772aee2de66e82b63" +dependencies = [ + "mini_paste-proc_macro", +] + +[[package]] +name = "mini_paste-proc_macro" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c5f1f52e39b728e73af4b454f1b29173d4544607bd395dafe1918fd149db67" + +[[package]] +name = "miniz_oxide" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc" +dependencies = [ + "adler", +] + [[package]] name = "mio" version = "0.8.2" @@ -1518,6 +1626,94 @@ dependencies = [ "getrandom", ] +[[package]] +name = "napi" +version = "1.3.2" +source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" +dependencies = [ + "napi-build", + "napi-sys", + "once_cell", + "tokio", + "winapi", +] + +[[package]] +name = "napi-build" +version = "1.0.1" +source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" +dependencies = [ + "cfg-if", + "ureq", +] + +[[package]] +name = "napi-derive" +version = "1.0.0" +source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "napi-dispatcher" +version = "0.1.0" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "napi", + "napi-dispatcher-nodejs-derive", + "napi-dispatcher-wasm", +] + +[[package]] +name = "napi-dispatcher-nodejs-derive" +version = "0.1.0" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "napi-derive", + "napi-dispatcher-nodejs-derive-proc_macros", +] + +[[package]] +name = "napi-dispatcher-nodejs-derive-proc_macros" +version = "0.1.0" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "napi-dispatcher-wasm" +version = "0.1.0" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "js-sys", + "napi-dispatcher-wasm-proc_macros", + "ref-cast", + "serde", + "wasm-bindgen", + "wasm-bindgen-futures", +] + +[[package]] +name = "napi-dispatcher-wasm-proc_macros" +version = "0.1.0" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "napi-sys" +version = "1.0.0" +source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249" + [[package]] name = "nix" version = "0.22.3" @@ -2011,6 +2207,16 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872" +[[package]] +name = "prettyplease" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "697ae720ee02011f439e0701db107ffe2916d83f718342d65d7f8bf7b8a5fee9" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -2300,6 +2506,26 @@ dependencies = [ "thiserror", ] +[[package]] +name = "ref-cast" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed13bcd201494ab44900a96490291651d200730904221832b9547d24a87d332b" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5234cd6063258a5e32903b53b1b6ac043a0541c8adc1f610f67b0326c7a578fa" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "regex" version = "1.5.5" @@ -2468,6 +2694,33 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f" +[[package]] +name = "safer-ffi" +version = "0.1.0" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "libc", + "macro_rules_attribute", + "mini_paste", + "napi-dispatcher", + "safer_ffi-proc_macros", + "scopeguard", + "unwind_safe", + "with_builtin_macros", +] + +[[package]] +name = "safer_ffi-proc_macros" +version = "0.1.0-rc1" +source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a" +dependencies = [ + "macro_rules_attribute", + "prettyplease", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "schannel" version = "0.1.19" @@ -3103,12 +3356,27 @@ dependencies = [ "uuid", ] +[[package]] +name = "unicode-bidi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992" + [[package]] name = "unicode-ident" version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf" +[[package]] +name = "unicode-normalization" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854cbdc4f7bc6ae19c820d44abdc3277ac3e1b2b93db20a636825d9322fb60e6" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-segmentation" version = "1.9.0" @@ -3139,6 +3407,12 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "unwind_safe" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0976c77def3f1f75c4ef892a292c31c0bbe9e3d0702c63044d7c76db298171a3" + [[package]] name = "unzip-n" version = "0.1.2" @@ -3150,6 +3424,35 @@ dependencies = [ "syn", ] +[[package]] +name = "ureq" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97acb4c28a254fd7a4aeec976c46a7fa404eac4d7c134b30c75144846d7cb8f" +dependencies = [ + "base64", + "chunked_transfer", + "flate2", + "log", + "once_cell", + "rustls 0.20.6", + "url", + "webpki 0.22.0", + "webpki-roots", +] + +[[package]] +name = "url" +version = "2.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +dependencies = [ + "form_urlencoded", + "idna", + "matches", + "percent-encoding", +] + [[package]] name = "uuid" version = "0.8.2" @@ -3240,6 +3543,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" dependencies = [ "cfg-if", + "serde", + "serde_json", "wasm-bindgen-macro", ] @@ -3329,6 +3634,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki-roots" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf" +dependencies = [ + "webpki 0.22.0", +] + [[package]] name = "wepoll-ffi" version = "0.1.2" @@ -3423,6 +3737,26 @@ version = "0.32.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316" +[[package]] +name = "with_builtin_macros" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a59d55032495429b87f9d69954c6c8602e4d3f3e0a747a12dea6b0b23de685da" +dependencies = [ + "with_builtin_macros-proc_macros", +] + +[[package]] +name = "with_builtin_macros-proc_macros" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15bd7679c15e22924f53aee34d4e448c45b674feb6129689af88593e129f8f42" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/Cargo.toml b/Cargo.toml index 75912b0b..9077db7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "apis/python/node", "apis/rust/*", "apis/rust/operator/macros", + "apis/rust/operator/types", "binaries/*", "examples/rust-dataflow/*", "examples/c++-dataflow/*-rust-*", diff --git a/apis/rust/operator/Cargo.toml b/apis/rust/operator/Cargo.toml index 7cdcf315..08c74a9b 100644 --- a/apis/rust/operator/Cargo.toml +++ b/apis/rust/operator/Cargo.toml @@ -3,9 +3,10 @@ name = "dora-operator-api" version = "0.1.0" edition = "2021" license = "Apache-2.0" -description = "Rust API implemetation for Dora Operator" +description = "Rust API implemetation for Dora Operator" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] dora-operator-api-macros = { path = "macros" } +dora-operator-api-types = { path = "types" } diff --git a/apis/rust/operator/macros/Cargo.toml b/apis/rust/operator/macros/Cargo.toml index 0c23768d..2565aa4e 100644 --- a/apis/rust/operator/macros/Cargo.toml +++ b/apis/rust/operator/macros/Cargo.toml @@ -17,3 +17,4 @@ proc-macro2 = "1.0.32" [dev-dependencies] dora-operator-api = { path = ".." } +dora-operator-api-types = { path = "../types" } diff --git a/apis/rust/operator/macros/src/lib.rs b/apis/rust/operator/macros/src/lib.rs index 3ee7a60c..622d57f7 100644 --- a/apis/rust/operator/macros/src/lib.rs +++ b/apis/rust/operator/macros/src/lib.rs @@ -27,39 +27,40 @@ fn register_operator_impl(item: &TokenStream2) -> syn::Result { let init = quote! { #[no_mangle] - pub unsafe extern "C" fn dora_init_operator(operator_context: *mut *mut std::ffi::c_void) -> isize { - dora_operator_api::raw::dora_init_operator::<#operator_ty>(operator_context) + pub unsafe extern "C" fn dora_init_operator() -> dora_operator_api::types::InitResult { + dora_operator_api::raw::dora_init_operator::<#operator_ty>() } + + const _DORA_INIT_OPERATOR: dora_operator_api::types::DoraInitOperator = + dora_operator_api::types::DoraInitOperator(dora_init_operator); }; let drop = quote! { #[no_mangle] - pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut std::ffi::c_void) { + pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut std::ffi::c_void) + -> dora_operator_api::types::DoraResult + { dora_operator_api::raw::dora_drop_operator::<#operator_ty>(operator_context) } + + const _DORA_DROP_OPERATOR: dora_operator_api::types::DoraDropOperator = + dora_operator_api::types::DoraDropOperator(dora_drop_operator); }; let on_input = quote! { #[no_mangle] pub unsafe extern "C" fn dora_on_input( - id_start: *const u8, - id_len: usize, - data_start: *const u8, - data_len: usize, - output_fn_raw: dora_operator_api::raw::OutputFnRaw, - output_context: *const std::ffi::c_void, + input: &dora_operator_api::types::Input, + send_output: dora_operator_api::types::SendOutput<'_>, operator_context: *mut std::ffi::c_void, - ) -> isize { + ) -> dora_operator_api::types::OnInputResult { dora_operator_api::raw::dora_on_input::<#operator_ty>( - id_start, - id_len, - data_start, - data_len, - output_fn_raw, - output_context, - operator_context, + input, send_output, operator_context ) } + + const _DORA_ON_INPUT: dora_operator_api::types::DoraOnInput = + dora_operator_api::types::DoraOnInput(dora_on_input); }; Ok(quote! { diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index 0284f9ea..2cf78b60 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -2,8 +2,9 @@ #![allow(clippy::missing_safety_doc)] pub use dora_operator_api_macros::register_operator; -use raw::OutputFnRaw; -use std::ffi::c_void; +pub use dora_operator_api_types as types; +pub use types::DoraStatus; +use types::{Metadata, Output, SendOutput}; pub mod raw; @@ -14,34 +15,23 @@ pub trait DoraOperator: Default { id: &str, data: &[u8], output_sender: &mut DoraOutputSender, - ) -> Result; + ) -> Result; } -#[repr(isize)] -pub enum DoraStatus { - Continue = 0, - Stop = 1, -} - -pub struct DoraOutputSender { - output_fn_raw: OutputFnRaw, - output_context: *const c_void, -} +pub struct DoraOutputSender<'a>(SendOutput<'a>); -impl DoraOutputSender { - pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> { - let result = unsafe { - (self.output_fn_raw)( - id.as_ptr(), - id.len(), - data.as_ptr(), - data.len(), - self.output_context, - ) - }; - match result { - 0 => Ok(()), - other => Err(other), +impl<'a> DoraOutputSender<'a> { + pub fn send(&mut self, id: String, data: Vec) -> Result<(), String> { + let result = self.0 .0.call(Output { + id: id.into(), + data: data.into(), + metadata: Metadata { + open_telemetry_context: String::new().into(), // TODO + }, + }); + match result.error { + None => Ok(()), + Some(error) => Err(error.into()), } } } diff --git a/apis/rust/operator/src/raw.rs b/apis/rust/operator/src/raw.rs index c270537f..55add995 100644 --- a/apis/rust/operator/src/raw.rs +++ b/apis/rust/operator/src/raw.rs @@ -1,6 +1,6 @@ -use std::{ffi::c_void, slice}; - -use crate::{DoraOperator, DoraOutputSender}; +use crate::{DoraOperator, DoraOutputSender, DoraStatus}; +use dora_operator_api_types::{DoraResult, InitResult, Input, OnInputResult, SendOutput}; +use std::ffi::c_void; pub type OutputFnRaw = unsafe extern "C" fn( id_start: *const u8, @@ -10,42 +10,41 @@ pub type OutputFnRaw = unsafe extern "C" fn( output_context: *const c_void, ) -> isize; -pub unsafe fn dora_init_operator(operator_context: *mut *mut c_void) -> isize { +pub unsafe fn dora_init_operator() -> InitResult { let operator: O = Default::default(); let ptr: *mut O = Box::leak(Box::new(operator)); - let type_erased: *mut c_void = ptr.cast(); - unsafe { *operator_context = type_erased }; - 0 + let operator_context: *mut c_void = ptr.cast(); + InitResult { + result: DoraResult { error: None }, + operator_context, + } } -pub unsafe fn dora_drop_operator(operator_context: *mut c_void) { +pub unsafe fn dora_drop_operator(operator_context: *mut c_void) -> DoraResult { let raw: *mut O = operator_context.cast(); unsafe { Box::from_raw(raw) }; + DoraResult { error: None } } pub unsafe fn dora_on_input( - id_start: *const u8, - id_len: usize, - data_start: *const u8, - data_len: usize, - output_fn_raw: OutputFnRaw, - output_context: *const c_void, - operator_context: *mut c_void, -) -> isize { - let id = match std::str::from_utf8(unsafe { slice::from_raw_parts(id_start, id_len) }) { - Ok(id) => id, - Err(_) => return -1, - }; - let data = unsafe { slice::from_raw_parts(data_start, data_len) }; - let mut output_sender = DoraOutputSender { - output_fn_raw, - output_context, - }; + input: &Input, + send_output: SendOutput<'_>, + operator_context: *mut std::ffi::c_void, +) -> OnInputResult { + let mut output_sender = DoraOutputSender(send_output); let operator: &mut O = unsafe { &mut *operator_context.cast() }; - - match operator.on_input(id, data, &mut output_sender) { - Ok(status) => status as isize, - Err(_) => -1, + let data = input.data.as_ref().as_slice(); + match operator.on_input(&input.id, &data, &mut output_sender) { + Ok(status) => OnInputResult { + result: DoraResult { error: None }, + status, + }, + Err(error) => OnInputResult { + result: DoraResult { + error: Some(error.into()), + }, + status: DoraStatus::Stop, + }, } } diff --git a/apis/rust/operator/types/Cargo.toml b/apis/rust/operator/types/Cargo.toml new file mode 100644 index 00000000..e6789698 --- /dev/null +++ b/apis/rust/operator/types/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "dora-operator-api-types" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies.safer-ffi] +git = "https://github.com/phil-opp/safer_ffi.git" +branch = "from-instead-of-into" +features = ["proc_macros"] diff --git a/apis/rust/operator/types/src/lib.rs b/apis/rust/operator/types/src/lib.rs new file mode 100644 index 00000000..2c660342 --- /dev/null +++ b/apis/rust/operator/types/src/lib.rs @@ -0,0 +1,81 @@ +pub use safer_ffi; +use safer_ffi::{closure::RefDynFnMut1, derive_ReprC}; + +#[derive_ReprC] +#[repr(transparent)] +pub struct DoraInitOperator(pub unsafe extern "C" fn() -> InitResult); + +#[derive_ReprC] +#[repr(C)] +#[derive(Debug)] +pub struct InitResult { + pub result: DoraResult, + pub operator_context: *mut std::ffi::c_void, +} +#[derive_ReprC] +#[repr(transparent)] +pub struct DoraDropOperator( + pub unsafe extern "C" fn(operator_context: *mut std::ffi::c_void) -> DoraResult, +); + +#[derive_ReprC] +#[repr(C)] +#[derive(Debug)] +pub struct DoraResult { + pub error: Option, +} + +#[derive_ReprC] +#[repr(transparent)] +pub struct DoraOnInput( + pub unsafe extern "C" fn( + input: &Input, + send_output: SendOutput<'_>, + operator_context: *mut std::ffi::c_void, + ) -> OnInputResult, +); + +#[derive_ReprC] +#[repr(C)] +#[derive(Debug)] +pub struct Input { + pub id: safer_ffi::String, + pub data: safer_ffi::Vec, + pub metadata: Metadata, +} + +#[derive_ReprC] +#[repr(C)] +#[derive(Debug)] +pub struct Metadata { + pub open_telemetry_context: safer_ffi::String, +} + +#[derive_ReprC] +#[repr(transparent)] +pub struct SendOutput<'a>(pub RefDynFnMut1<'a, DoraResult, Output>); + +#[derive_ReprC] +#[repr(C)] +#[derive(Debug)] +pub struct Output { + pub id: safer_ffi::String, + pub data: safer_ffi::Vec, + pub metadata: Metadata, +} + +#[derive_ReprC] +#[repr(C)] +#[derive(Debug)] +pub struct OnInputResult { + pub result: DoraResult, + pub status: DoraStatus, +} + +#[derive_ReprC] +#[derive(Debug)] +#[repr(u8)] +pub enum DoraStatus { + Continue = 0, + Stop = 1, +} diff --git a/binaries/runtime/Cargo.toml b/binaries/runtime/Cargo.toml index e3e3258b..b94b9d57 100644 --- a/binaries/runtime/Cargo.toml +++ b/binaries/runtime/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0" [dependencies] clap = { version = "3.1.12", features = ["derive"] } dora-node-api = { path = "../../apis/rust/node" } +dora-operator-api-types = { path = "../../apis/rust/operator/types" } dora-core = { version = "0.1.0", path = "../../libraries/core" } eyre = "0.6.8" futures = "0.3.21" diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 1c84d248..ccf8ea2b 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -1,11 +1,15 @@ use super::{OperatorEvent, OperatorInput}; +use dora_operator_api_types::{ + safer_ffi::closure::RefDynFnMut1, DoraDropOperator, DoraInitOperator, DoraOnInput, DoraResult, + DoraStatus, InitResult, Metadata, OnInputResult, Output, SendOutput, +}; use eyre::{bail, Context}; use libloading::Symbol; use std::{ ffi::c_void, panic::{catch_unwind, AssertUnwindSafe}, path::Path, - ptr, slice, thread, + thread, }; use tokio::sync::mpsc::{Receiver, Sender}; @@ -57,11 +61,14 @@ struct SharedLibraryOperator<'lib> { impl<'lib> SharedLibraryOperator<'lib> { fn run(mut self) -> eyre::Result<()> { let operator_context = { - let mut raw = ptr::null_mut(); - let result = unsafe { (self.bindings.init_operator)(&mut raw) }; - if result != 0 { - bail!("init_operator failed with error code {result}"); - } + let InitResult { + result, + operator_context, + } = unsafe { (self.bindings.init_operator)() }; + let raw = match result.error { + Some(error) => bail!("init_operator failed: {}", String::from(error)), + None => operator_context, + }; OperatorContext { raw, drop_fn: self.bindings.drop_operator.clone(), @@ -69,39 +76,41 @@ impl<'lib> SharedLibraryOperator<'lib> { }; while let Some(input) = self.inputs.blocking_recv() { - let id_start = input.id.as_bytes().as_ptr(); - let id_len = input.id.as_bytes().len(); - let data_start = input.value.as_slice().as_ptr(); - let data_len = input.value.len(); + let operator_input = dora_operator_api_types::Input { + id: String::from(input.id).into(), + data: input.value.into(), + metadata: Metadata { + open_telemetry_context: String::new().into(), + }, + }; - let output = |id: &str, data: &[u8]| -> isize { + let mut send_output_closure = |output: Output| { + let id: String = output.id.into(); let result = self.events_tx.blocking_send(OperatorEvent::Output { - id: id.to_owned().into(), - value: data.to_owned(), + id: id.into(), + value: output.data.into(), }); - match result { - Ok(()) => 0, - Err(_) => -1, - } + + let error = match result { + Ok(()) => None, + Err(_) => Some(String::from("runtime process closed unexpectedly").into()), + }; + + DoraResult { error } }; - let (output_fn, output_ctx) = wrap_closure(&output); - - let result = unsafe { - (self.bindings.on_input)( - id_start, - id_len, - data_start, - data_len, - output_fn, - output_ctx, - operator_context.raw, - ) + let send_output = SendOutput(RefDynFnMut1::new(&mut send_output_closure)); + let OnInputResult { + result: DoraResult { error }, + status, + } = unsafe { + (self.bindings.on_input)(&operator_input, send_output, operator_context.raw) }; - match result { - 0 => {} // DoraStatus::Continue - 1 => break, // DoraStatus::Stop - -1 => bail!("on_input failed"), - other => bail!("on_input finished with unexpected exit code {other}"), + match error { + Some(error) => bail!("on_input failed: {}", String::from(error)), + None => match status { + DoraStatus::Continue => {} + DoraStatus::Stop => break, + }, } } Ok(()) @@ -109,8 +118,8 @@ impl<'lib> SharedLibraryOperator<'lib> { } struct OperatorContext<'lib> { - raw: *mut (), - drop_fn: Symbol<'lib, OperatorContextDropFn>, + raw: *mut c_void, + drop_fn: Symbol<'lib, DoraDropOperator>, } impl<'lib> Drop for OperatorContext<'lib> { @@ -119,45 +128,10 @@ impl<'lib> Drop for OperatorContext<'lib> { } } -/// Wrap a closure with an FFI-compatible trampoline function. -/// -/// Returns a C compatible trampoline function and a data pointer that -/// must be passed as when invoking the trampoline function. -fn wrap_closure(closure: &F) -> (OutputFn, *const c_void) -where - F: Fn(&str, &[u8]) -> isize, -{ - /// Rust closures are just compiler-generated structs with a `call` method. This - /// trampoline function is generic over the closure type, which means that the - /// compiler's monomorphization step creates a different copy of that function - /// for each closure type. - /// - /// The trampoline function expects the pointer to the corresponding closure - /// struct as `context` argument. It casts that pointer back to a closure - /// struct pointer and invokes its call method. - unsafe extern "C" fn trampoline isize>( - id_start: *const u8, - id_len: usize, - data_start: *const u8, - data_len: usize, - context: *const c_void, - ) -> isize { - let id_raw = unsafe { slice::from_raw_parts(id_start, id_len) }; - let data = unsafe { slice::from_raw_parts(data_start, data_len) }; - let id = match std::str::from_utf8(id_raw) { - Ok(s) => s, - Err(_) => return -1, - }; - unsafe { (*(context as *const F))(id, data) } - } - - (trampoline::, closure as *const F as *const c_void) -} - struct Bindings<'lib> { - init_operator: Symbol<'lib, InitFn>, - drop_operator: Symbol<'lib, OperatorContextDropFn>, - on_input: Symbol<'lib, OnInputFn>, + init_operator: Symbol<'lib, DoraInitOperator>, + drop_operator: Symbol<'lib, DoraDropOperator>, + on_input: Symbol<'lib, DoraOnInput>, } impl<'lib> Bindings<'lib> { @@ -178,24 +152,3 @@ impl<'lib> Bindings<'lib> { Ok(bindings) } } - -type InitFn = unsafe extern "C" fn(operator_context: *mut *mut ()) -> isize; -type OperatorContextDropFn = unsafe extern "C" fn(operator_context: *mut ()); - -type OnInputFn = unsafe extern "C" fn( - id_start: *const u8, - id_len: usize, - data_start: *const u8, - data_len: usize, - output: OutputFn, - output_context: *const c_void, - operator_context: *mut (), -) -> isize; - -type OutputFn = unsafe extern "C" fn( - id_start: *const u8, - id_len: usize, - data_start: *const u8, - data_len: usize, - output_context: *const c_void, -) -> isize;