Browse Source

Rework raw operator API on top of `safer-api` crate

Allows a higher-level interface
tags/v0.0.0-test.4
Philipp Oppermann 3 years ago
parent
commit
b0cf6fa4f5
Failed to extract signature
11 changed files with 542 additions and 169 deletions
  1. +334
    -0
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +2
    -1
      apis/rust/operator/Cargo.toml
  4. +1
    -0
      apis/rust/operator/macros/Cargo.toml
  5. +18
    -17
      apis/rust/operator/macros/src/lib.rs
  6. +17
    -27
      apis/rust/operator/src/lib.rs
  7. +28
    -29
      apis/rust/operator/src/raw.rs
  8. +11
    -0
      apis/rust/operator/types/Cargo.toml
  9. +81
    -0
      apis/rust/operator/types/src/lib.rs
  10. +1
    -0
      binaries/runtime/Cargo.toml
  11. +48
    -95
      binaries/runtime/src/operator/shared_lib.rs

+ 334
- 0
Cargo.lock View File

@@ -2,6 +2,12 @@
# It is not intended for manual editing.
version = 3

[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"

[[package]]
name = "aes"
version = "0.7.5"
@@ -389,6 +395,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"

[[package]]
name = "chunked_transfer"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fff857943da45f546682664a79488be82e69e43c1a7a2307679ab9afb3a66d2e"

[[package]]
name = "cipher"
version = "0.3.0"
@@ -502,6 +514,15 @@ dependencies = [
"libc",
]

[[package]]
name = "crc32fast"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
dependencies = [
"cfg-if",
]

[[package]]
name = "crossbeam-channel"
version = "0.5.4"
@@ -811,6 +832,7 @@ name = "dora-operator-api"
version = "0.1.0"
dependencies = [
"dora-operator-api-macros",
"dora-operator-api-types",
]

[[package]]
@@ -818,11 +840,19 @@ name = "dora-operator-api-macros"
version = "0.1.0"
dependencies = [
"dora-operator-api",
"dora-operator-api-types",
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "dora-operator-api-types"
version = "0.1.0"
dependencies = [
"safer-ffi",
]

[[package]]
name = "dora-runtime"
version = "0.1.0"
@@ -830,6 +860,7 @@ dependencies = [
"clap 3.1.12",
"dora-core",
"dora-node-api",
"dora-operator-api-types",
"eyre",
"fern",
"futures",
@@ -918,6 +949,16 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"

[[package]]
name = "flate2"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f82b0f4c27ad9f8bfd1f3208d882da2b09c301bc1c828fd3a00d0216d2fbbff6"
dependencies = [
"crc32fast",
"miniz_oxide",
]

[[package]]
name = "flume"
version = "0.10.12"
@@ -937,6 +978,16 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"

[[package]]
name = "form_urlencoded"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
dependencies = [
"matches",
"percent-encoding",
]

[[package]]
name = "futures"
version = "0.3.21"
@@ -1295,6 +1346,17 @@ dependencies = [
"tokio-io-timeout",
]

[[package]]
name = "idna"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8"
dependencies = [
"matches",
"unicode-bidi",
"unicode-normalization",
]

[[package]]
name = "indenter"
version = "0.3.3"
@@ -1459,12 +1521,34 @@ dependencies = [
"value-bag",
]

[[package]]
name = "macro_rules_attribute"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "258c86475e1616d6f2d8f5227cfaabd3dae1f6d5388b9597df8a199d4497aba7"
dependencies = [
"macro_rules_attribute-proc_macro",
"paste",
]

[[package]]
name = "macro_rules_attribute-proc_macro"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26a8d2502d5aa4d411ef494ba7470eb299f05725179ce3b5de77aa01a9ffdea"

[[package]]
name = "maplit"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e2e65a1a2e43cfcb47a895c4c8b10d1f4a61097f9f254f183aee60cad9c651d"

[[package]]
name = "matches"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"

[[package]]
name = "memchr"
version = "2.4.1"
@@ -1480,6 +1564,30 @@ dependencies = [
"autocfg 1.1.0",
]

[[package]]
name = "mini_paste"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2499b7bd9834270bf24cfc4dd96be59020ba6fd7f3276b772aee2de66e82b63"
dependencies = [
"mini_paste-proc_macro",
]

[[package]]
name = "mini_paste-proc_macro"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6c5f1f52e39b728e73af4b454f1b29173d4544607bd395dafe1918fd149db67"

[[package]]
name = "miniz_oxide"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f5c75688da582b8ffc1f1799e9db273f32133c49e048f614d22ec3256773ccc"
dependencies = [
"adler",
]

[[package]]
name = "mio"
version = "0.8.2"
@@ -1518,6 +1626,94 @@ dependencies = [
"getrandom",
]

[[package]]
name = "napi"
version = "1.3.2"
source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249"
dependencies = [
"napi-build",
"napi-sys",
"once_cell",
"tokio",
"winapi",
]

[[package]]
name = "napi-build"
version = "1.0.1"
source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249"
dependencies = [
"cfg-if",
"ureq",
]

[[package]]
name = "napi-derive"
version = "1.0.0"
source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "napi-dispatcher"
version = "0.1.0"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"napi",
"napi-dispatcher-nodejs-derive",
"napi-dispatcher-wasm",
]

[[package]]
name = "napi-dispatcher-nodejs-derive"
version = "0.1.0"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"napi-derive",
"napi-dispatcher-nodejs-derive-proc_macros",
]

[[package]]
name = "napi-dispatcher-nodejs-derive-proc_macros"
version = "0.1.0"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "napi-dispatcher-wasm"
version = "0.1.0"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"js-sys",
"napi-dispatcher-wasm-proc_macros",
"ref-cast",
"serde",
"wasm-bindgen",
"wasm-bindgen-futures",
]

[[package]]
name = "napi-dispatcher-wasm-proc_macros"
version = "0.1.0"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "napi-sys"
version = "1.0.0"
source = "git+https://github.com/getditto/napi-rs?branch=ditto/closure-into-jsfunction#da095cc3f1af133344083b525d7e9763b347e249"

[[package]]
name = "nix"
version = "0.22.3"
@@ -2011,6 +2207,16 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"

[[package]]
name = "prettyplease"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "697ae720ee02011f439e0701db107ffe2916d83f718342d65d7f8bf7b8a5fee9"
dependencies = [
"proc-macro2",
"syn",
]

[[package]]
name = "proc-macro-error"
version = "1.0.4"
@@ -2300,6 +2506,26 @@ dependencies = [
"thiserror",
]

[[package]]
name = "ref-cast"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed13bcd201494ab44900a96490291651d200730904221832b9547d24a87d332b"
dependencies = [
"ref-cast-impl",
]

[[package]]
name = "ref-cast-impl"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5234cd6063258a5e32903b53b1b6ac043a0541c8adc1f610f67b0326c7a578fa"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "regex"
version = "1.5.5"
@@ -2468,6 +2694,33 @@ version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b4b750c782965c211b42f022f59af1fbceabdd026623714f104152f1ec149f"

[[package]]
name = "safer-ffi"
version = "0.1.0"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"libc",
"macro_rules_attribute",
"mini_paste",
"napi-dispatcher",
"safer_ffi-proc_macros",
"scopeguard",
"unwind_safe",
"with_builtin_macros",
]

[[package]]
name = "safer_ffi-proc_macros"
version = "0.1.0-rc1"
source = "git+https://github.com/phil-opp/safer_ffi.git?branch=from-instead-of-into#22a005fdf64517bec012f8a62878b864ad20906a"
dependencies = [
"macro_rules_attribute",
"prettyplease",
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "schannel"
version = "0.1.19"
@@ -3103,12 +3356,27 @@ dependencies = [
"uuid",
]

[[package]]
name = "unicode-bidi"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099b7128301d285f79ddd55b9a83d5e6b9e97c92e0ea0daebee7263e932de992"

[[package]]
name = "unicode-ident"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4f5b37a154999a8f3f98cc23a628d850e154479cd94decf3414696e12e31aaf"

[[package]]
name = "unicode-normalization"
version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "854cbdc4f7bc6ae19c820d44abdc3277ac3e1b2b93db20a636825d9322fb60e6"
dependencies = [
"tinyvec",
]

[[package]]
name = "unicode-segmentation"
version = "1.9.0"
@@ -3139,6 +3407,12 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"

[[package]]
name = "unwind_safe"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0976c77def3f1f75c4ef892a292c31c0bbe9e3d0702c63044d7c76db298171a3"

[[package]]
name = "unzip-n"
version = "0.1.2"
@@ -3150,6 +3424,35 @@ dependencies = [
"syn",
]

[[package]]
name = "ureq"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97acb4c28a254fd7a4aeec976c46a7fa404eac4d7c134b30c75144846d7cb8f"
dependencies = [
"base64",
"chunked_transfer",
"flate2",
"log",
"once_cell",
"rustls 0.20.6",
"url",
"webpki 0.22.0",
"webpki-roots",
]

[[package]]
name = "url"
version = "2.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c"
dependencies = [
"form_urlencoded",
"idna",
"matches",
"percent-encoding",
]

[[package]]
name = "uuid"
version = "0.8.2"
@@ -3240,6 +3543,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06"
dependencies = [
"cfg-if",
"serde",
"serde_json",
"wasm-bindgen-macro",
]

@@ -3329,6 +3634,15 @@ dependencies = [
"untrusted",
]

[[package]]
name = "webpki-roots"
version = "0.22.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf"
dependencies = [
"webpki 0.22.0",
]

[[package]]
name = "wepoll-ffi"
version = "0.1.2"
@@ -3423,6 +3737,26 @@ version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "504a2476202769977a040c6364301a3f65d0cc9e3fb08600b2bda150a0488316"

[[package]]
name = "with_builtin_macros"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a59d55032495429b87f9d69954c6c8602e4d3f3e0a747a12dea6b0b23de685da"
dependencies = [
"with_builtin_macros-proc_macros",
]

[[package]]
name = "with_builtin_macros-proc_macros"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15bd7679c15e22924f53aee34d4e448c45b674feb6129689af88593e129f8f42"
dependencies = [
"proc-macro2",
"quote",
"syn",
]

[[package]]
name = "yaml-rust"
version = "0.4.5"


+ 1
- 0
Cargo.toml View File

@@ -4,6 +4,7 @@ members = [
"apis/python/node",
"apis/rust/*",
"apis/rust/operator/macros",
"apis/rust/operator/types",
"binaries/*",
"examples/rust-dataflow/*",
"examples/c++-dataflow/*-rust-*",


+ 2
- 1
apis/rust/operator/Cargo.toml View File

@@ -3,9 +3,10 @@ name = "dora-operator-api"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
description = "Rust API implemetation for Dora Operator"
description = "Rust API implemetation for Dora Operator"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dora-operator-api-macros = { path = "macros" }
dora-operator-api-types = { path = "types" }

+ 1
- 0
apis/rust/operator/macros/Cargo.toml View File

@@ -17,3 +17,4 @@ proc-macro2 = "1.0.32"

[dev-dependencies]
dora-operator-api = { path = ".." }
dora-operator-api-types = { path = "../types" }

+ 18
- 17
apis/rust/operator/macros/src/lib.rs View File

@@ -27,39 +27,40 @@ fn register_operator_impl(item: &TokenStream2) -> syn::Result<TokenStream2> {

let init = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_init_operator(operator_context: *mut *mut std::ffi::c_void) -> isize {
dora_operator_api::raw::dora_init_operator::<#operator_ty>(operator_context)
pub unsafe extern "C" fn dora_init_operator() -> dora_operator_api::types::InitResult {
dora_operator_api::raw::dora_init_operator::<#operator_ty>()
}

const _DORA_INIT_OPERATOR: dora_operator_api::types::DoraInitOperator =
dora_operator_api::types::DoraInitOperator(dora_init_operator);
};

let drop = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut std::ffi::c_void) {
pub unsafe extern "C" fn dora_drop_operator(operator_context: *mut std::ffi::c_void)
-> dora_operator_api::types::DoraResult
{
dora_operator_api::raw::dora_drop_operator::<#operator_ty>(operator_context)
}

const _DORA_DROP_OPERATOR: dora_operator_api::types::DoraDropOperator =
dora_operator_api::types::DoraDropOperator(dora_drop_operator);
};

let on_input = quote! {
#[no_mangle]
pub unsafe extern "C" fn dora_on_input(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_fn_raw: dora_operator_api::raw::OutputFnRaw,
output_context: *const std::ffi::c_void,
input: &dora_operator_api::types::Input,
send_output: dora_operator_api::types::SendOutput<'_>,
operator_context: *mut std::ffi::c_void,
) -> isize {
) -> dora_operator_api::types::OnInputResult {
dora_operator_api::raw::dora_on_input::<#operator_ty>(
id_start,
id_len,
data_start,
data_len,
output_fn_raw,
output_context,
operator_context,
input, send_output, operator_context
)
}

const _DORA_ON_INPUT: dora_operator_api::types::DoraOnInput =
dora_operator_api::types::DoraOnInput(dora_on_input);
};

Ok(quote! {


+ 17
- 27
apis/rust/operator/src/lib.rs View File

@@ -2,8 +2,9 @@
#![allow(clippy::missing_safety_doc)]

pub use dora_operator_api_macros::register_operator;
use raw::OutputFnRaw;
use std::ffi::c_void;
pub use dora_operator_api_types as types;
pub use types::DoraStatus;
use types::{Metadata, Output, SendOutput};

pub mod raw;

@@ -14,34 +15,23 @@ pub trait DoraOperator: Default {
id: &str,
data: &[u8],
output_sender: &mut DoraOutputSender,
) -> Result<DoraStatus, ()>;
) -> Result<DoraStatus, String>;
}

#[repr(isize)]
pub enum DoraStatus {
Continue = 0,
Stop = 1,
}

pub struct DoraOutputSender {
output_fn_raw: OutputFnRaw,
output_context: *const c_void,
}
pub struct DoraOutputSender<'a>(SendOutput<'a>);

impl DoraOutputSender {
pub fn send(&mut self, id: &str, data: &[u8]) -> Result<(), isize> {
let result = unsafe {
(self.output_fn_raw)(
id.as_ptr(),
id.len(),
data.as_ptr(),
data.len(),
self.output_context,
)
};
match result {
0 => Ok(()),
other => Err(other),
impl<'a> DoraOutputSender<'a> {
pub fn send(&mut self, id: String, data: Vec<u8>) -> Result<(), String> {
let result = self.0 .0.call(Output {
id: id.into(),
data: data.into(),
metadata: Metadata {
open_telemetry_context: String::new().into(), // TODO
},
});
match result.error {
None => Ok(()),
Some(error) => Err(error.into()),
}
}
}

+ 28
- 29
apis/rust/operator/src/raw.rs View File

@@ -1,6 +1,6 @@
use std::{ffi::c_void, slice};
use crate::{DoraOperator, DoraOutputSender};
use crate::{DoraOperator, DoraOutputSender, DoraStatus};
use dora_operator_api_types::{DoraResult, InitResult, Input, OnInputResult, SendOutput};
use std::ffi::c_void;

pub type OutputFnRaw = unsafe extern "C" fn(
id_start: *const u8,
@@ -10,42 +10,41 @@ pub type OutputFnRaw = unsafe extern "C" fn(
output_context: *const c_void,
) -> isize;

pub unsafe fn dora_init_operator<O: DoraOperator>(operator_context: *mut *mut c_void) -> isize {
pub unsafe fn dora_init_operator<O: DoraOperator>() -> InitResult {
let operator: O = Default::default();
let ptr: *mut O = Box::leak(Box::new(operator));
let type_erased: *mut c_void = ptr.cast();
unsafe { *operator_context = type_erased };
0
let operator_context: *mut c_void = ptr.cast();
InitResult {
result: DoraResult { error: None },
operator_context,
}
}

pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) {
pub unsafe fn dora_drop_operator<O>(operator_context: *mut c_void) -> DoraResult {
let raw: *mut O = operator_context.cast();
unsafe { Box::from_raw(raw) };
DoraResult { error: None }
}

pub unsafe fn dora_on_input<O: DoraOperator>(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_fn_raw: OutputFnRaw,
output_context: *const c_void,
operator_context: *mut c_void,
) -> isize {
let id = match std::str::from_utf8(unsafe { slice::from_raw_parts(id_start, id_len) }) {
Ok(id) => id,
Err(_) => return -1,
};
let data = unsafe { slice::from_raw_parts(data_start, data_len) };
let mut output_sender = DoraOutputSender {
output_fn_raw,
output_context,
};
input: &Input,
send_output: SendOutput<'_>,
operator_context: *mut std::ffi::c_void,
) -> OnInputResult {
let mut output_sender = DoraOutputSender(send_output);

let operator: &mut O = unsafe { &mut *operator_context.cast() };

match operator.on_input(id, data, &mut output_sender) {
Ok(status) => status as isize,
Err(_) => -1,
let data = input.data.as_ref().as_slice();
match operator.on_input(&input.id, &data, &mut output_sender) {
Ok(status) => OnInputResult {
result: DoraResult { error: None },
status,
},
Err(error) => OnInputResult {
result: DoraResult {
error: Some(error.into()),
},
status: DoraStatus::Stop,
},
}
}

+ 11
- 0
apis/rust/operator/types/Cargo.toml View File

@@ -0,0 +1,11 @@
[package]
name = "dora-operator-api-types"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies.safer-ffi]
git = "https://github.com/phil-opp/safer_ffi.git"
branch = "from-instead-of-into"
features = ["proc_macros"]

+ 81
- 0
apis/rust/operator/types/src/lib.rs View File

@@ -0,0 +1,81 @@
pub use safer_ffi;
use safer_ffi::{closure::RefDynFnMut1, derive_ReprC};

#[derive_ReprC]
#[repr(transparent)]
pub struct DoraInitOperator(pub unsafe extern "C" fn() -> InitResult);

#[derive_ReprC]
#[repr(C)]
#[derive(Debug)]
pub struct InitResult {
pub result: DoraResult,
pub operator_context: *mut std::ffi::c_void,
}
#[derive_ReprC]
#[repr(transparent)]
pub struct DoraDropOperator(
pub unsafe extern "C" fn(operator_context: *mut std::ffi::c_void) -> DoraResult,
);

#[derive_ReprC]
#[repr(C)]
#[derive(Debug)]
pub struct DoraResult {
pub error: Option<safer_ffi::String>,
}

#[derive_ReprC]
#[repr(transparent)]
pub struct DoraOnInput(
pub unsafe extern "C" fn(
input: &Input,
send_output: SendOutput<'_>,
operator_context: *mut std::ffi::c_void,
) -> OnInputResult,
);

#[derive_ReprC]
#[repr(C)]
#[derive(Debug)]
pub struct Input {
pub id: safer_ffi::String,
pub data: safer_ffi::Vec<u8>,
pub metadata: Metadata,
}

#[derive_ReprC]
#[repr(C)]
#[derive(Debug)]
pub struct Metadata {
pub open_telemetry_context: safer_ffi::String,
}

#[derive_ReprC]
#[repr(transparent)]
pub struct SendOutput<'a>(pub RefDynFnMut1<'a, DoraResult, Output>);

#[derive_ReprC]
#[repr(C)]
#[derive(Debug)]
pub struct Output {
pub id: safer_ffi::String,
pub data: safer_ffi::Vec<u8>,
pub metadata: Metadata,
}

#[derive_ReprC]
#[repr(C)]
#[derive(Debug)]
pub struct OnInputResult {
pub result: DoraResult,
pub status: DoraStatus,
}

#[derive_ReprC]
#[derive(Debug)]
#[repr(u8)]
pub enum DoraStatus {
Continue = 0,
Stop = 1,
}

+ 1
- 0
binaries/runtime/Cargo.toml View File

@@ -9,6 +9,7 @@ license = "Apache-2.0"
[dependencies]
clap = { version = "3.1.12", features = ["derive"] }
dora-node-api = { path = "../../apis/rust/node" }
dora-operator-api-types = { path = "../../apis/rust/operator/types" }
dora-core = { version = "0.1.0", path = "../../libraries/core" }
eyre = "0.6.8"
futures = "0.3.21"


+ 48
- 95
binaries/runtime/src/operator/shared_lib.rs View File

@@ -1,11 +1,15 @@
use super::{OperatorEvent, OperatorInput};
use dora_operator_api_types::{
safer_ffi::closure::RefDynFnMut1, DoraDropOperator, DoraInitOperator, DoraOnInput, DoraResult,
DoraStatus, InitResult, Metadata, OnInputResult, Output, SendOutput,
};
use eyre::{bail, Context};
use libloading::Symbol;
use std::{
ffi::c_void,
panic::{catch_unwind, AssertUnwindSafe},
path::Path,
ptr, slice, thread,
thread,
};
use tokio::sync::mpsc::{Receiver, Sender};

@@ -57,11 +61,14 @@ struct SharedLibraryOperator<'lib> {
impl<'lib> SharedLibraryOperator<'lib> {
fn run(mut self) -> eyre::Result<()> {
let operator_context = {
let mut raw = ptr::null_mut();
let result = unsafe { (self.bindings.init_operator)(&mut raw) };
if result != 0 {
bail!("init_operator failed with error code {result}");
}
let InitResult {
result,
operator_context,
} = unsafe { (self.bindings.init_operator)() };
let raw = match result.error {
Some(error) => bail!("init_operator failed: {}", String::from(error)),
None => operator_context,
};
OperatorContext {
raw,
drop_fn: self.bindings.drop_operator.clone(),
@@ -69,39 +76,41 @@ impl<'lib> SharedLibraryOperator<'lib> {
};

while let Some(input) = self.inputs.blocking_recv() {
let id_start = input.id.as_bytes().as_ptr();
let id_len = input.id.as_bytes().len();
let data_start = input.value.as_slice().as_ptr();
let data_len = input.value.len();
let operator_input = dora_operator_api_types::Input {
id: String::from(input.id).into(),
data: input.value.into(),
metadata: Metadata {
open_telemetry_context: String::new().into(),
},
};

let output = |id: &str, data: &[u8]| -> isize {
let mut send_output_closure = |output: Output| {
let id: String = output.id.into();
let result = self.events_tx.blocking_send(OperatorEvent::Output {
id: id.to_owned().into(),
value: data.to_owned(),
id: id.into(),
value: output.data.into(),
});
match result {
Ok(()) => 0,
Err(_) => -1,
}

let error = match result {
Ok(()) => None,
Err(_) => Some(String::from("runtime process closed unexpectedly").into()),
};

DoraResult { error }
};
let (output_fn, output_ctx) = wrap_closure(&output);

let result = unsafe {
(self.bindings.on_input)(
id_start,
id_len,
data_start,
data_len,
output_fn,
output_ctx,
operator_context.raw,
)
let send_output = SendOutput(RefDynFnMut1::new(&mut send_output_closure));
let OnInputResult {
result: DoraResult { error },
status,
} = unsafe {
(self.bindings.on_input)(&operator_input, send_output, operator_context.raw)
};
match result {
0 => {} // DoraStatus::Continue
1 => break, // DoraStatus::Stop
-1 => bail!("on_input failed"),
other => bail!("on_input finished with unexpected exit code {other}"),
match error {
Some(error) => bail!("on_input failed: {}", String::from(error)),
None => match status {
DoraStatus::Continue => {}
DoraStatus::Stop => break,
},
}
}
Ok(())
@@ -109,8 +118,8 @@ impl<'lib> SharedLibraryOperator<'lib> {
}

struct OperatorContext<'lib> {
raw: *mut (),
drop_fn: Symbol<'lib, OperatorContextDropFn>,
raw: *mut c_void,
drop_fn: Symbol<'lib, DoraDropOperator>,
}

impl<'lib> Drop for OperatorContext<'lib> {
@@ -119,45 +128,10 @@ impl<'lib> Drop for OperatorContext<'lib> {
}
}

/// Wrap a closure with an FFI-compatible trampoline function.
///
/// Returns a C compatible trampoline function and a data pointer that
/// must be passed as when invoking the trampoline function.
fn wrap_closure<F>(closure: &F) -> (OutputFn, *const c_void)
where
F: Fn(&str, &[u8]) -> isize,
{
/// Rust closures are just compiler-generated structs with a `call` method. This
/// trampoline function is generic over the closure type, which means that the
/// compiler's monomorphization step creates a different copy of that function
/// for each closure type.
///
/// The trampoline function expects the pointer to the corresponding closure
/// struct as `context` argument. It casts that pointer back to a closure
/// struct pointer and invokes its call method.
unsafe extern "C" fn trampoline<F: Fn(&str, &[u8]) -> isize>(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
context: *const c_void,
) -> isize {
let id_raw = unsafe { slice::from_raw_parts(id_start, id_len) };
let data = unsafe { slice::from_raw_parts(data_start, data_len) };
let id = match std::str::from_utf8(id_raw) {
Ok(s) => s,
Err(_) => return -1,
};
unsafe { (*(context as *const F))(id, data) }
}

(trampoline::<F>, closure as *const F as *const c_void)
}

struct Bindings<'lib> {
init_operator: Symbol<'lib, InitFn>,
drop_operator: Symbol<'lib, OperatorContextDropFn>,
on_input: Symbol<'lib, OnInputFn>,
init_operator: Symbol<'lib, DoraInitOperator>,
drop_operator: Symbol<'lib, DoraDropOperator>,
on_input: Symbol<'lib, DoraOnInput>,
}

impl<'lib> Bindings<'lib> {
@@ -178,24 +152,3 @@ impl<'lib> Bindings<'lib> {
Ok(bindings)
}
}

type InitFn = unsafe extern "C" fn(operator_context: *mut *mut ()) -> isize;
type OperatorContextDropFn = unsafe extern "C" fn(operator_context: *mut ());

type OnInputFn = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output: OutputFn,
output_context: *const c_void,
operator_context: *mut (),
) -> isize;

type OutputFn = unsafe extern "C" fn(
id_start: *const u8,
id_len: usize,
data_start: *const u8,
data_len: usize,
output_context: *const c_void,
) -> isize;

Loading…
Cancel
Save