Browse Source

Multithreaded Python API and Pylot Example (#18)

* Refactoring for multithreading

* Refactoring code in order to use MemoryView

* Resolving multi output by casting Python output type

* Adding Python example runner

* Adding Rayon ThreadPool for CPU bound multithreading

* Adding benches

* Small Refactoring of Python Binding

* Adding documentation to Pylot Demo

* Removing cloning states data using RwLock

* Refactoring Servers to pass messages through tokio channels

* Removing unwrap when possible

* Splitting Zenoh function into separate module

* Refactoring Zenoh into a struct

* Adding several Python fix

* Fix eyre issue

* Adding docker for ease of build

* Fixing docker problem

* Reduce the frequency of source

* Adding better Python Operator

* Improving carla visualisation capabilities

* Enabling better visualisation

* adding object trajectory

* Improving planning

* Refactoring Python

* Adding control operator

* Improving planning operator

* Better Control Operator

* Fixing Planning Errors linked to applying Speed Factor

* Fixing Docker Image Build issues

* Adding a timestamp to messages

* Fixing PID mutlithread errors

* Drop Push Send after Pull period

* Limiting the latency

* Adding InfluxDB

* Fixing Influxdb Naming and quota

* Adding positional data

* Making launching container command faster

* Removing Dora-Pylot

* Refactor Error Handling

* Refactoring Error dubgging function

Co-authored-by: haixuanTao <hai-xuan.tao@student.ecp.fr>
tags/v0.0.0-test.4
Xavier Tao GitHub 3 years ago
parent
commit
b3ab72b393
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 358 additions and 182 deletions
  1. +3
    -1
      .gitignore
  2. +103
    -35
      Cargo.lock
  3. +3
    -1
      Cargo.toml
  4. +1
    -29
      README.md
  5. +0
    -13
      app.py
  6. +1
    -0
      src/lib.rs
  7. +87
    -16
      src/python/binding.rs
  8. +23
    -87
      src/python/server.rs
  9. +137
    -0
      src/zenoh_client/mod.rs

+ 3
- 1
.gitignore View File

@@ -5,7 +5,9 @@
# These are backup files generated by rustfmt
**/*.rs.bk


# Removing images.
*.jpg
*.png





+ 103
- 35
Cargo.lock View File

@@ -192,9 +192,9 @@ checksum = "30696a84d817107fc028e049980e09d5e140e8da8f1caeb17e8e950658a3cea9"

[[package]]
name = "async-trait"
version = "0.1.52"
version = "0.1.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
checksum = "ed6aa3524a2dfcf9fe180c51eae2b58738348d819517ceadf95789c51fff7600"
dependencies = [
"proc-macro2",
"quote",
@@ -417,6 +417,41 @@ dependencies = [
"libc",
]

[[package]]
name = "crossbeam-channel"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [
"cfg-if",
"crossbeam-utils",
]

[[package]]
name = "crossbeam-deque"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]

[[package]]
name = "crossbeam-epoch"
version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [
"autocfg 1.1.0",
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]

[[package]]
name = "crossbeam-utils"
version = "0.8.8"
@@ -515,7 +550,9 @@ dependencies = [
"envy",
"eyre",
"futures",
"log",
"pyo3",
"rayon",
"serde",
"serde_yaml",
"structopt",
@@ -523,6 +560,12 @@ dependencies = [
"zenoh",
]

[[package]]
name = "either"
version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"

[[package]]
name = "env_logger"
version = "0.9.0"
@@ -729,9 +772,9 @@ dependencies = [

[[package]]
name = "getrandom"
version = "0.2.5"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d39cd93900197114fa1fcb7ae84ca742095eed9442088988ae74fa744e930e77"
checksum = "9be70c98951c83b8d2f8f60d7065fa6d5146873094452a1008da8c2f1e4205ad"
dependencies = [
"cfg-if",
"js-sys",
@@ -967,9 +1010,9 @@ dependencies = [

[[package]]
name = "log"
version = "0.4.14"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710"
checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [
"cfg-if",
"value-bag",
@@ -1224,9 +1267,9 @@ dependencies = [

[[package]]
name = "paste"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0744126afe1a6dd7f394cb50a716dbe086cb06e255e53d8d0185d82828358fb5"
checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc"

[[package]]
name = "pem-rfc7468"
@@ -1607,9 +1650,9 @@ dependencies = [

[[package]]
name = "quote"
version = "1.0.16"
version = "1.0.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57"
checksum = "632d02bff7f874a36f33ea8bb416cd484b90cc66c1194b1a1110d067a7013f58"
dependencies = [
"proc-macro2",
]
@@ -1644,11 +1687,36 @@ dependencies = [
"getrandom",
]

[[package]]
name = "rayon"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c06aca804d41dbc8ba42dfd964f0d01334eceb64314b9ecf7c5fad5188a06d90"
dependencies = [
"autocfg 1.1.0",
"crossbeam-deque",
"either",
"rayon-core",
]

[[package]]
name = "rayon-core"
version = "1.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d78120e2c850279833f1dd3582f730c4ab53ed95aeaaaa862a2a5c71b1656d8e"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"lazy_static",
"num_cpus",
]

[[package]]
name = "redox_syscall"
version = "0.2.11"
version = "0.2.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8380fe0152551244f0747b1bf41737e0f8a74f97a14ccefd1148187271634f3c"
checksum = "8ae183fc1b06c149f0c1793e1eb447c8b04bfe46d48e9e48bfb8d2d7ed64ecf0"
dependencies = [
"bitflags",
]
@@ -1847,9 +1915,9 @@ dependencies = [

[[package]]
name = "semver"
version = "1.0.6"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4a3381e03edd24287172047536f20cabde766e2cd3e65e6b00fb3af51c4f38d"
checksum = "d65bd28f48be7196d222d95b9243287f48d27aca604e08497513019ff0502cc4"

[[package]]
name = "serde"
@@ -2523,7 +2591,7 @@ dependencies = [
[[package]]
name = "zenoh"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-global-executor",
"async-std",
@@ -2568,7 +2636,7 @@ dependencies = [
[[package]]
name = "zenoh-buffers"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"bincode",
@@ -2583,7 +2651,7 @@ dependencies = [
[[package]]
name = "zenoh-cfg-properties"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"zenoh-core",
"zenoh-macros",
@@ -2592,7 +2660,7 @@ dependencies = [
[[package]]
name = "zenoh-collections"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2605,7 +2673,7 @@ dependencies = [
[[package]]
name = "zenoh-config"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"flume",
"json5",
@@ -2622,7 +2690,7 @@ dependencies = [
[[package]]
name = "zenoh-core"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"anyhow",
"lazy_static",
@@ -2631,7 +2699,7 @@ dependencies = [
[[package]]
name = "zenoh-crypto"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"aes",
"hmac",
@@ -2644,7 +2712,7 @@ dependencies = [
[[package]]
name = "zenoh-link"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2663,7 +2731,7 @@ dependencies = [
[[package]]
name = "zenoh-link-commons"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2678,7 +2746,7 @@ dependencies = [
[[package]]
name = "zenoh-link-quic"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2700,7 +2768,7 @@ dependencies = [
[[package]]
name = "zenoh-link-tcp"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2715,7 +2783,7 @@ dependencies = [
[[package]]
name = "zenoh-link-tls"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-rustls",
"async-std",
@@ -2733,7 +2801,7 @@ dependencies = [
[[package]]
name = "zenoh-link-udp"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2750,7 +2818,7 @@ dependencies = [
[[package]]
name = "zenoh-link-unixsock_stream"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"async-trait",
@@ -2766,7 +2834,7 @@ dependencies = [
[[package]]
name = "zenoh-macros"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"proc-macro2",
"quote",
@@ -2778,7 +2846,7 @@ dependencies = [
[[package]]
name = "zenoh-plugin-trait"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"libloading",
"log",
@@ -2791,7 +2859,7 @@ dependencies = [
[[package]]
name = "zenoh-protocol"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"log",
"uhlc",
@@ -2803,7 +2871,7 @@ dependencies = [
[[package]]
name = "zenoh-protocol-core"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"hex",
"lazy_static",
@@ -2816,7 +2884,7 @@ dependencies = [
[[package]]
name = "zenoh-sync"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"event-listener",
@@ -2829,7 +2897,7 @@ dependencies = [
[[package]]
name = "zenoh-transport"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-global-executor",
"async-std",
@@ -2855,7 +2923,7 @@ dependencies = [
[[package]]
name = "zenoh-util"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#c0d746f54f3f366db2c759116b84eb6825a68252"
dependencies = [
"async-std",
"clap",


+ 3
- 1
Cargo.toml View File

@@ -15,4 +15,6 @@ env_logger = "0.9.0"
tokio = { version="1.17.0", features=["full"]}
pyo3 = "0.16.1"
futures = "0.3.12"
envy = "0.4.2"
envy = "0.4.2"
rayon = "1.5.1"
log = "0.4.16"

+ 1
- 29
README.md View File

@@ -1,30 +1,2 @@
# dora-rs
Dataflow Oriented Robotic Architecture

## Python API Design

The Python API is probably going to look as follows:
```python
@register
async def function_name(state: DoraState, message: DoraMessage):
return outputs
```

The philosophy is to use async function as primary instance to:
- Mitigate the risk of running unsafe data mutations.
- Managing several run at the same time with timeout / deadline capabilities
- Using Tokio Spawn to avoid thread locks on CPU bound runs.

## Getting started

I have made a simple example that can be run with:
```
cargo run start-python app:return_1

# Running this might required some shared library as:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/miniconda3/lib
export PYTHONPATH=$PYTHONPATH:$(pwd)
```
That is going to listen to the key_expr "a" and run the `return_1` function within the `app.py` python async.

This is still very experimental.
Dataflow Oriented Robotic Architecture

+ 0
- 13
app.py View File

@@ -1,13 +0,0 @@
import asyncio

counter = 0

import logging


def return_1(x):
global counter
counter += 1
print(counter)
logging.info(x)
return {"b": "b", "c": "c"}

+ 1
- 0
src/lib.rs View File

@@ -1,2 +1,3 @@
pub mod descriptor;
pub mod python;
pub mod zenoh_client;

+ 87
- 16
src/python/binding.rs View File

@@ -1,32 +1,103 @@
use eyre::Context;
use pyo3::prelude::*;
use std::collections::{BTreeMap, HashMap};
use log::{debug, warn};
use pyo3::{
buffer::PyBuffer,
prelude::*,
types::{PyByteArray, PyDict, PyString},
};
use std::{collections::BTreeMap, sync::Arc};

pub fn init(app: &str, function: &str) -> eyre::Result<Py<PyAny>> {
use super::server::PythonCommand;
use super::server::Workload;

fn init(app: &str, function: &str) -> eyre::Result<Py<PyAny>> {
pyo3::prepare_freethreaded_python();
Python::with_gil(|py| {
let file = py
.import(app)
.wrap_err("The import file was not found. Check your PYTHONPATH env variable.")?;
.wrap_err(format!("Importing '{app}' did not succeed."))?;
// convert Function into a PyObject
let identity = file
.getattr(function)
.wrap_err("The Function was not found in the imported file.")?;
.wrap_err(format!("'{function}' was not found in '{app}'."))?;
Ok(identity.to_object(py))
})
}

pub async fn call(
py_function: &PyObject,
states: BTreeMap<String, String>,
) -> eyre::Result<HashMap<String, String>> {
fn call(
py_function: Arc<PyObject>,
function_name: &str,
states: &BTreeMap<String, Vec<u8>>,
pulled_states: &Option<BTreeMap<String, Vec<u8>>>,
) -> eyre::Result<BTreeMap<String, Vec<u8>>> {
Python::with_gil(|py| {
let args = (states.into_py(py),);
let result = py_function
.call(py, args, None)
.wrap_err("The Python function call did not succeed.")?;
result
.extract(py)
.wrap_err("The Python function returned an error.")
let py_inputs = PyDict::new(py);
for (k, v) in states.iter() {
py_inputs.set_item(k, PyByteArray::new(py, v))?;
}
if let Some(pulled_states) = pulled_states {
for (k, v) in pulled_states.iter() {
py_inputs.set_item(k, PyByteArray::new(py, v))?;
}
}

let results = py_function
.call(py, (py_inputs,), None)
.wrap_err(format!("'{function_name}' call did not succeed."))?;

let py_outputs = results.cast_as::<PyDict>(py).unwrap();
let mut outputs = BTreeMap::new();
for (k, v) in py_outputs.into_iter() {
let values = PyBuffer::get(v)
.wrap_err("Reading from Python Buffer failed")?
.to_vec(py)?;
let key = k
.cast_as::<PyString>()
.or_else(|e| eyre::bail!("{e}"))?
.to_string();
outputs.insert(key, values);
}

Ok(outputs)
})
}

pub fn python_compute_event_loop(
mut input_receiver: tokio::sync::mpsc::Receiver<Workload>,
output_sender: tokio::sync::mpsc::Sender<
std::collections::BTreeMap<std::string::String, std::vec::Vec<u8>>,
>,
variables: PythonCommand,
) {
tokio::spawn(async move {
let app = &variables.app;
let function_name = &variables.function;

let py_function = Arc::new(
init(app, function_name)
.context(format!(
"Failed to init '{app}' with function '{function_name}'"
))
.unwrap(),
);

while let Some(workload) = input_receiver.recv().await {
let pyfunc = py_function.clone();
let push_tx = output_sender.clone();
let states = workload.states.read().await.clone(); // This is probably expensive.
push_tx
.send(
call(pyfunc, function_name, &states, &workload.pulled_states).unwrap_or_else(
|err| {
warn!("App: '{app}', Function: '{function_name}', Error: {err}");
states
},
),
)
.await
.unwrap_or_else(|err| {
debug!("App: '{app}', Function: '{function_name}', Sending Error: {err}")
});
}
});
}

+ 23
- 87
src/python/server.rs View File

@@ -1,100 +1,36 @@
use super::binding;
use eyre::eyre;
use eyre::WrapErr;
use futures::future::join_all;
use futures::prelude::*;
use pyo3::prelude::*;
use crate::zenoh_client::ZenohClient;

use super::binding::python_compute_event_loop;
use eyre::Result;
use serde::Deserialize;
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use std::hash::Hasher;
use std::time::{Duration, Instant};
use std::collections::BTreeMap;
use std::sync::Arc;
use structopt::StructOpt;
use tokio::time::timeout;
use zenoh::config::Config;
use zenoh::prelude::SplitBuffer;

static DURATION_MILLIS: u64 = 5;
use tokio::sync::mpsc;
use tokio::sync::RwLock;

#[derive(Deserialize, Debug, Clone, StructOpt)]
pub struct PythonCommand {
pub subscriptions: Vec<String>,
pub app: String,
pub function: String,
subscriptions: Vec<String>,
}

#[tokio::main]
pub async fn run(variables: PythonCommand) -> PyResult<()> {
// Subscribe
let session = zenoh::open(Config::default()).await.unwrap();

// Create a hashmap of all subscriptions.
let mut subscribers = HashMap::new();
for subscription in &variables.subscriptions {
subscribers.insert(subscription.clone(), session
.subscribe(subscription)
.await
.map_err(|err| {
eyre!("Could not subscribe to the given subscription key expression. Error: {err}")
})
.unwrap());
}

// Store the latest value of all subscription as well as the output of the function. hash the state to easily check if the state has changed.
let mut states = BTreeMap::new();
let mut states_hash = hash(&states);

let py_function = binding::init(&variables.app, &variables.function)
.wrap_err("Failed to init the Python Function")
.unwrap();
let duration = Duration::from_millis(DURATION_MILLIS);
let mut futures_put = vec![];

loop {
let loop_start = Instant::now();

let mut futures = vec![];
for (_, v) in subscribers.iter_mut() {
futures.push(timeout(duration, v.next()));
}

let results = join_all(futures).await;

for (result, subscription) in results.into_iter().zip(&variables.subscriptions.clone()) {
if let Ok(Some(data)) = result {
let value = data.value.payload;
let binary = value.contiguous();
states.insert(
subscription.clone().to_string(),
String::from_utf8(binary.to_vec()).unwrap(),
);
}
}

let new_hash = hash(&states);

if states_hash == new_hash {
continue;
}

let now = Instant::now();

let outputs = binding::call(&py_function, states.clone()).await.unwrap();
println!("call python {:#?}", now.elapsed());
#[derive(Debug, Clone)]
pub struct Workload {
pub states: Arc<RwLock<BTreeMap<String, Vec<u8>>>>,
pub pulled_states: Option<BTreeMap<String, Vec<u8>>>,
}

for (key, value) in outputs {
states.insert(key.clone(), value.clone());
futures_put.push(session.put(key, value));
}
#[tokio::main]
pub async fn run(variables: PythonCommand) -> Result<()> {
let (push_sender, push_receiver) = mpsc::channel::<BTreeMap<String, Vec<u8>>>(1);
let (python_sender, python_receiver) = mpsc::channel::<Workload>(1);

states_hash = hash(&states);
println!("loop {:#?}", loop_start.elapsed());
}
}
let context = format!("App: {}, Function: {}", &variables.app, &variables.function);

fn hash(states: &BTreeMap<String, String>) -> u64 {
let mut hasher = DefaultHasher::new();
states.hash(&mut hasher);
hasher.finish()
let zenoh_client = ZenohClient::try_new(variables.subscriptions.clone(), context).await?;
zenoh_client.clone().push_event_loop(push_receiver);
python_compute_event_loop(python_receiver, push_sender, variables);
zenoh_client.pull_event_loop(python_sender).await
}

+ 137
- 0
src/zenoh_client/mod.rs View File

@@ -0,0 +1,137 @@
use std::{collections::BTreeMap, sync::Arc, time::Duration};

use eyre::Result;
use futures::{future::join_all, prelude::*};
use log::debug;
use tokio::{
sync::{
mpsc::{Receiver, Sender},
RwLock,
},
time::timeout,
};
use zenoh::{config::Config, prelude::SplitBuffer};
use zenoh::{subscriber::SampleReceiver, Session};

use crate::python::server::Workload;

static PULL_WAIT_PERIOD: std::time::Duration = Duration::from_millis(100);
static PUSH_WAIT_PERIOD: std::time::Duration = Duration::from_millis(100);

#[derive(Clone, Debug)]
pub struct ZenohClient {
session: Arc<Session>,
context: String,
states: Arc<RwLock<BTreeMap<String, Vec<u8>>>>,
subscriptions: Vec<String>,
}

impl ZenohClient {
pub async fn try_new(subscriptions: Vec<String>, context: String) -> Result<Self> {
let session = Arc::new(
zenoh::open(Config::default())
.await
.or_else(|e| eyre::bail!("{context}, Error: {e}"))?,
);
let states = Arc::new(RwLock::new(BTreeMap::new()));

Ok(ZenohClient {
session,
context,
states,
subscriptions,
})
}

pub async fn push(&self, outputs: BTreeMap<String, Vec<u8>>) {
join_all(
outputs
.iter()
.map(|(key, value)| self.session.put(key, value.clone())),
)
.await;
self.states.write().await.extend(outputs);
}

pub async fn pull(
&self,
receivers: &mut Vec<&mut SampleReceiver>,
) -> Option<BTreeMap<String, Vec<u8>>> {
let fetched_data = join_all(
receivers
.iter_mut()
.map(|reciever| timeout(PULL_WAIT_PERIOD, reciever.next())),
)
.await;

let mut pulled_states = BTreeMap::new();
for (result, subscription) in fetched_data.into_iter().zip(&self.subscriptions) {
if let Ok(Some(data)) = result {
let value = data.value.payload;
let binary = value.contiguous();
pulled_states.insert(subscription.clone().to_string(), binary.to_vec());
}
}
if pulled_states.is_empty() {
None
} else {
Some(pulled_states)
}
}
pub fn push_event_loop(self, mut receiver: Receiver<BTreeMap<String, Vec<u8>>>) {
tokio::spawn(async move {
while let Some(outputs) = receiver.recv().await {
self.push(outputs).await;
}
});
}

pub async fn pull_event_loop(self, sender: Sender<Workload>) -> eyre::Result<()> {
let mut subscribers = Vec::new();
for subscription in self.subscriptions.iter() {
subscribers.push(self.session.subscribe(subscription).await.or_else(|err| {
eyre::bail!(
"Could not subscribe to the given subscription key expression. Error: {err}"
)
})?);
}
let mut receivers: Vec<_> = subscribers.iter_mut().map(|sub| sub.receiver()).collect();
let is_source = self.subscriptions.is_empty();
if is_source {
loop {
let states = self.states.clone();
if let Err(err) = timeout(
PULL_WAIT_PERIOD,
sender.send(Workload {
states,
pulled_states: None,
}),
)
.await
{
let context = &self.context;
debug!("{context}, Sending Error: {err}");
}
tokio::time::sleep(PUSH_WAIT_PERIOD).await;
}
} else {
loop {
if let Some(pulled_states) = self.pull(&mut receivers).await {
let states = self.states.clone();
if let Err(err) = timeout(
PULL_WAIT_PERIOD,
sender.send(Workload {
states,
pulled_states: Some(pulled_states),
}),
)
.await
{
let context = &self.context;
debug!("{context}, Sending Error: {err}");
}
}
}
}
}
}

Loading…
Cancel
Save