diff --git a/.gitignore b/.gitignore index 7e646758..a7f72ae0 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,9 @@ # These are backup files generated by rustfmt **/*.rs.bk - +# Removing images. +*.jpg +*.png diff --git a/Cargo.lock b/Cargo.lock index 96ae2434..5a1dba04 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -192,9 +192,9 @@ checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9" [[package]] name = "async-trait" -version = "0.1.52" +version = "0.1.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600" dependencies = [ "proc-macro2", "quote", @@ -417,6 +417,41 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" +dependencies = [ + "autocfg 1.1.0", + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.8" @@ -515,7 +550,9 @@ dependencies = [ "envy", "eyre", "futures", + "log", "pyo3", + "rayon", "serde", "serde_yaml", "structopt", @@ -523,6 +560,12 @@ dependencies = [ "zenoh", ] +[[package]] +name = "either" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" + [[package]] name = "env_logger" version = "0.9.0" @@ -729,9 +772,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77" +checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad" dependencies = [ "cfg-if", "js-sys", @@ -967,9 +1010,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.14" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" +checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8" dependencies = [ "cfg-if", "value-bag", @@ -1224,9 +1267,9 @@ dependencies = [ [[package]] name = "paste" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0744126afe1a6dd7f394cb50a716dbe086cb06e255e53d8d0185d82828358fb5" +checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" [[package]] name = "pem-rfc7468" @@ -1607,9 +1650,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.16" +version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57" +checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58" dependencies = [ "proc-macro2", ] @@ -1644,11 +1687,36 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90" +dependencies = [ + "autocfg 1.1.0", + "crossbeam-deque", + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "lazy_static", + "num_cpus", +] + [[package]] name = "redox_syscall" -version = "0.2.11" +version = "0.2.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c" +checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0" dependencies = [ "bitflags", ] @@ -1847,9 +1915,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.6" +version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d" +checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4" [[package]] name = "serde" @@ -2523,7 +2591,7 @@ dependencies = [ [[package]] name = "zenoh" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-global-executor", "async-std", @@ -2568,7 +2636,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "bincode", @@ -2583,7 +2651,7 @@ dependencies = [ [[package]] name = "zenoh-cfg-properties" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "zenoh-core", "zenoh-macros", @@ -2592,7 +2660,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2605,7 +2673,7 @@ dependencies = [ [[package]] name = "zenoh-config" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "flume", "json5", @@ -2622,7 +2690,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "anyhow", "lazy_static", @@ -2631,7 +2699,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "aes", "hmac", @@ -2644,7 +2712,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2663,7 +2731,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2678,7 +2746,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2700,7 +2768,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2715,7 +2783,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-rustls", "async-std", @@ -2733,7 +2801,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2750,7 +2818,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "async-trait", @@ -2766,7 +2834,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "proc-macro2", "quote", @@ -2778,7 +2846,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "libloading", "log", @@ -2791,7 +2859,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "log", "uhlc", @@ -2803,7 +2871,7 @@ dependencies = [ [[package]] name = "zenoh-protocol-core" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "hex", "lazy_static", @@ -2816,7 +2884,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "event-listener", @@ -2829,7 +2897,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-global-executor", "async-std", @@ -2855,7 +2923,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252" dependencies = [ "async-std", "clap", diff --git a/Cargo.toml b/Cargo.toml index d3c42e4b..b324b135 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,4 +15,6 @@ env_logger = "0.9.0" tokio = { version="1.17.0", features=["full"]} pyo3 = "0.16.1" futures = "0.3.12" -envy = "0.4.2" \ No newline at end of file +envy = "0.4.2" +rayon = "1.5.1" +log = "0.4.16" \ No newline at end of file diff --git a/README.md b/README.md index 484fa8b3..787975c7 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,2 @@ # dora-rs -Dataflow Oriented Robotic Architecture - -## Python API Design - -The Python API is probably going to look as follows: -```python -@register -async def function_name(state: DoraState, message: DoraMessage): - return outputs -``` - -The philosophy is to use async function as primary instance to: -- Mitigate the risk of running unsafe data mutations. -- Managing several run at the same time with timeout / deadline capabilities -- Using Tokio Spawn to avoid thread locks on CPU bound runs. - -## Getting started - -I have made a simple example that can be run with: -``` -cargo run start-python app:return_1 - -# Running this might required some shared library as: -export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/miniconda3/lib -export PYTHONPATH=$PYTHONPATH:$(pwd) -``` -That is going to listen to the key_expr "a" and run the `return_1` function within the `app.py` python async. - -This is still very experimental. \ No newline at end of file +Dataflow Oriented Robotic Architecture \ No newline at end of file diff --git a/app.py b/app.py deleted file mode 100644 index 6535b44f..00000000 --- a/app.py +++ /dev/null @@ -1,13 +0,0 @@ -import asyncio - -counter = 0 - -import logging - - -def return_1(x): - global counter - counter += 1 - print(counter) - logging.info(x) - return {"b": "b", "c": "c"} diff --git a/src/lib.rs b/src/lib.rs index 76efc867..e2568713 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,3 @@ pub mod descriptor; pub mod python; +pub mod zenoh_client; diff --git a/src/python/binding.rs b/src/python/binding.rs index 1b0a1612..4c2f9fd5 100644 --- a/src/python/binding.rs +++ b/src/python/binding.rs @@ -1,32 +1,103 @@ use eyre::Context; -use pyo3::prelude::*; -use std::collections::{BTreeMap, HashMap}; +use log::{debug, warn}; +use pyo3::{ + buffer::PyBuffer, + prelude::*, + types::{PyByteArray, PyDict, PyString}, +}; +use std::{collections::BTreeMap, sync::Arc}; -pub fn init(app: &str, function: &str) -> eyre::Result> { +use super::server::PythonCommand; +use super::server::Workload; + +fn init(app: &str, function: &str) -> eyre::Result> { pyo3::prepare_freethreaded_python(); Python::with_gil(|py| { let file = py .import(app) - .wrap_err("The import file was not found. Check your PYTHONPATH env variable.")?; + .wrap_err(format!("Importing '{app}' did not succeed."))?; // convert Function into a PyObject let identity = file .getattr(function) - .wrap_err("The Function was not found in the imported file.")?; + .wrap_err(format!("'{function}' was not found in '{app}'."))?; Ok(identity.to_object(py)) }) } -pub async fn call( - py_function: &PyObject, - states: BTreeMap, -) -> eyre::Result> { +fn call( + py_function: Arc, + function_name: &str, + states: &BTreeMap>, + pulled_states: &Option>>, +) -> eyre::Result>> { Python::with_gil(|py| { - let args = (states.into_py(py),); - let result = py_function - .call(py, args, None) - .wrap_err("The Python function call did not succeed.")?; - result - .extract(py) - .wrap_err("The Python function returned an error.") + let py_inputs = PyDict::new(py); + for (k, v) in states.iter() { + py_inputs.set_item(k, PyByteArray::new(py, v))?; + } + if let Some(pulled_states) = pulled_states { + for (k, v) in pulled_states.iter() { + py_inputs.set_item(k, PyByteArray::new(py, v))?; + } + } + + let results = py_function + .call(py, (py_inputs,), None) + .wrap_err(format!("'{function_name}' call did not succeed."))?; + + let py_outputs = results.cast_as::(py).unwrap(); + let mut outputs = BTreeMap::new(); + for (k, v) in py_outputs.into_iter() { + let values = PyBuffer::get(v) + .wrap_err("Reading from Python Buffer failed")? + .to_vec(py)?; + let key = k + .cast_as::() + .or_else(|e| eyre::bail!("{e}"))? + .to_string(); + outputs.insert(key, values); + } + + Ok(outputs) }) } + +pub fn python_compute_event_loop( + mut input_receiver: tokio::sync::mpsc::Receiver, + output_sender: tokio::sync::mpsc::Sender< + std::collections::BTreeMap>, + >, + variables: PythonCommand, +) { + tokio::spawn(async move { + let app = &variables.app; + let function_name = &variables.function; + + let py_function = Arc::new( + init(app, function_name) + .context(format!( + "Failed to init '{app}' with function '{function_name}'" + )) + .unwrap(), + ); + + while let Some(workload) = input_receiver.recv().await { + let pyfunc = py_function.clone(); + let push_tx = output_sender.clone(); + let states = workload.states.read().await.clone(); // This is probably expensive. + push_tx + .send( + call(pyfunc, function_name, &states, &workload.pulled_states).unwrap_or_else( + |err| { + warn!("App: '{app}', Function: '{function_name}', Error: {err}"); + states + }, + ), + ) + .await + .unwrap_or_else(|err| { + debug!("App: '{app}', Function: '{function_name}', Sending Error: {err}") + }); + } + }); +} diff --git a/src/python/server.rs b/src/python/server.rs index 5b764808..3b076b1e 100644 --- a/src/python/server.rs +++ b/src/python/server.rs @@ -1,100 +1,36 @@ -use super::binding; -use eyre::eyre; -use eyre::WrapErr; -use futures::future::join_all; -use futures::prelude::*; -use pyo3::prelude::*; +use crate::zenoh_client::ZenohClient; + +use super::binding::python_compute_event_loop; +use eyre::Result; use serde::Deserialize; -use std::collections::hash_map::DefaultHasher; -use std::collections::{BTreeMap, HashMap}; -use std::hash::Hash; -use std::hash::Hasher; -use std::time::{Duration, Instant}; +use std::collections::BTreeMap; +use std::sync::Arc; use structopt::StructOpt; -use tokio::time::timeout; -use zenoh::config::Config; -use zenoh::prelude::SplitBuffer; - -static DURATION_MILLIS: u64 = 5; +use tokio::sync::mpsc; +use tokio::sync::RwLock; #[derive(Deserialize, Debug, Clone, StructOpt)] pub struct PythonCommand { - pub subscriptions: Vec, pub app: String, pub function: String, + subscriptions: Vec, } -#[tokio::main] -pub async fn run(variables: PythonCommand) -> PyResult<()> { - // Subscribe - let session = zenoh::open(Config::default()).await.unwrap(); - - // Create a hashmap of all subscriptions. - let mut subscribers = HashMap::new(); - for subscription in &variables.subscriptions { - subscribers.insert(subscription.clone(), session - .subscribe(subscription) - .await - .map_err(|err| { - eyre!("Could not subscribe to the given subscription key expression. Error: {err}") - }) - .unwrap()); - } - - // Store the latest value of all subscription as well as the output of the function. hash the state to easily check if the state has changed. - let mut states = BTreeMap::new(); - let mut states_hash = hash(&states); - - let py_function = binding::init(&variables.app, &variables.function) - .wrap_err("Failed to init the Python Function") - .unwrap(); - let duration = Duration::from_millis(DURATION_MILLIS); - let mut futures_put = vec![]; - - loop { - let loop_start = Instant::now(); - - let mut futures = vec![]; - for (_, v) in subscribers.iter_mut() { - futures.push(timeout(duration, v.next())); - } - - let results = join_all(futures).await; - - for (result, subscription) in results.into_iter().zip(&variables.subscriptions.clone()) { - if let Ok(Some(data)) = result { - let value = data.value.payload; - let binary = value.contiguous(); - states.insert( - subscription.clone().to_string(), - String::from_utf8(binary.to_vec()).unwrap(), - ); - } - } - - let new_hash = hash(&states); - - if states_hash == new_hash { - continue; - } - - let now = Instant::now(); - - let outputs = binding::call(&py_function, states.clone()).await.unwrap(); - println!("call python {:#?}", now.elapsed()); +#[derive(Debug, Clone)] +pub struct Workload { + pub states: Arc>>>, + pub pulled_states: Option>>, +} - for (key, value) in outputs { - states.insert(key.clone(), value.clone()); - futures_put.push(session.put(key, value)); - } +#[tokio::main] +pub async fn run(variables: PythonCommand) -> Result<()> { + let (push_sender, push_receiver) = mpsc::channel::>>(1); + let (python_sender, python_receiver) = mpsc::channel::(1); - states_hash = hash(&states); - println!("loop {:#?}", loop_start.elapsed()); - } -} + let context = format!("App: {}, Function: {}", &variables.app, &variables.function); -fn hash(states: &BTreeMap) -> u64 { - let mut hasher = DefaultHasher::new(); - states.hash(&mut hasher); - hasher.finish() + let zenoh_client = ZenohClient::try_new(variables.subscriptions.clone(), context).await?; + zenoh_client.clone().push_event_loop(push_receiver); + python_compute_event_loop(python_receiver, push_sender, variables); + zenoh_client.pull_event_loop(python_sender).await } diff --git a/src/zenoh_client/mod.rs b/src/zenoh_client/mod.rs new file mode 100644 index 00000000..e4288bee --- /dev/null +++ b/src/zenoh_client/mod.rs @@ -0,0 +1,137 @@ +use std::{collections::BTreeMap, sync::Arc, time::Duration}; + +use eyre::Result; +use futures::{future::join_all, prelude::*}; +use log::debug; +use tokio::{ + sync::{ + mpsc::{Receiver, Sender}, + RwLock, + }, + time::timeout, +}; +use zenoh::{config::Config, prelude::SplitBuffer}; +use zenoh::{subscriber::SampleReceiver, Session}; + +use crate::python::server::Workload; + +static PULL_WAIT_PERIOD: std::time::Duration = Duration::from_millis(100); +static PUSH_WAIT_PERIOD: std::time::Duration = Duration::from_millis(100); + +#[derive(Clone, Debug)] +pub struct ZenohClient { + session: Arc, + context: String, + states: Arc>>>, + subscriptions: Vec, +} + +impl ZenohClient { + pub async fn try_new(subscriptions: Vec, context: String) -> Result { + let session = Arc::new( + zenoh::open(Config::default()) + .await + .or_else(|e| eyre::bail!("{context}, Error: {e}"))?, + ); + let states = Arc::new(RwLock::new(BTreeMap::new())); + + Ok(ZenohClient { + session, + context, + states, + subscriptions, + }) + } + + pub async fn push(&self, outputs: BTreeMap>) { + join_all( + outputs + .iter() + .map(|(key, value)| self.session.put(key, value.clone())), + ) + .await; + self.states.write().await.extend(outputs); + } + + pub async fn pull( + &self, + receivers: &mut Vec<&mut SampleReceiver>, + ) -> Option>> { + let fetched_data = join_all( + receivers + .iter_mut() + .map(|reciever| timeout(PULL_WAIT_PERIOD, reciever.next())), + ) + .await; + + let mut pulled_states = BTreeMap::new(); + for (result, subscription) in fetched_data.into_iter().zip(&self.subscriptions) { + if let Ok(Some(data)) = result { + let value = data.value.payload; + let binary = value.contiguous(); + pulled_states.insert(subscription.clone().to_string(), binary.to_vec()); + } + } + if pulled_states.is_empty() { + None + } else { + Some(pulled_states) + } + } + pub fn push_event_loop(self, mut receiver: Receiver>>) { + tokio::spawn(async move { + while let Some(outputs) = receiver.recv().await { + self.push(outputs).await; + } + }); + } + + pub async fn pull_event_loop(self, sender: Sender) -> eyre::Result<()> { + let mut subscribers = Vec::new(); + for subscription in self.subscriptions.iter() { + subscribers.push(self.session.subscribe(subscription).await.or_else(|err| { + eyre::bail!( + "Could not subscribe to the given subscription key expression. Error: {err}" + ) + })?); + } + let mut receivers: Vec<_> = subscribers.iter_mut().map(|sub| sub.receiver()).collect(); + let is_source = self.subscriptions.is_empty(); + if is_source { + loop { + let states = self.states.clone(); + if let Err(err) = timeout( + PULL_WAIT_PERIOD, + sender.send(Workload { + states, + pulled_states: None, + }), + ) + .await + { + let context = &self.context; + debug!("{context}, Sending Error: {err}"); + } + tokio::time::sleep(PUSH_WAIT_PERIOD).await; + } + } else { + loop { + if let Some(pulled_states) = self.pull(&mut receivers).await { + let states = self.states.clone(); + if let Err(err) = timeout( + PULL_WAIT_PERIOD, + sender.send(Workload { + states, + pulled_states: Some(pulled_states), + }), + ) + .await + { + let context = &self.context; + debug!("{context}, Sending Error: {err}"); + } + } + } + } + } +}