From 81f5a891c46b621ae87b79c8d77e27614fa2c8f6 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 22 Mar 2022 18:30:25 +0100 Subject: [PATCH] Refactoring the code --- app.py | 5 +++ src/lib.rs | 1 + src/python_binding.rs | 48 ++++++++++++++++++++++++++ src/server.rs | 79 ++++++++++++++----------------------------- 4 files changed, 80 insertions(+), 53 deletions(-) create mode 100644 src/python_binding.rs diff --git a/app.py b/app.py index fd627d05..be2eaf1a 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,11 @@ import asyncio +counter = 0 + async def return_1(x): + global counter + counter += 1 + print(counter) print(x) return {"b": "b", "c": "c"} diff --git a/src/lib.rs b/src/lib.rs index 0c24e539..f14aea1e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,2 +1,3 @@ pub mod descriptor; +pub mod python_binding; pub mod server; diff --git a/src/python_binding.rs b/src/python_binding.rs new file mode 100644 index 00000000..ecaa1c59 --- /dev/null +++ b/src/python_binding.rs @@ -0,0 +1,48 @@ +use eyre::{eyre, Context}; +use pyo3::prelude::*; +use serde::Deserialize; +use std::collections::{BTreeMap, HashMap}; + +#[derive(Deserialize, Debug)] +struct PythonVariables { + app: String, + function: String, +} + +pub fn init() -> eyre::Result> { + let variables = envy::from_env::().unwrap(); + Ok(Python::with_gil(|py| { + let file = py + .import(&variables.app) + .wrap_err("The import file was not found. Check your PYTHONPATH env variable.") + .unwrap(); + // convert Function into a PyObject + let identity = file + .getattr(variables.function) + .wrap_err("The Function was not found in the imported file.") + .unwrap(); + identity.to_object(py) + })) +} + +pub async fn call( + py_function: &PyObject, + states: BTreeMap, +) -> eyre::Result> { + let result = Python::with_gil(|py| { + let args = (states.clone().into_py(py),); + pyo3_asyncio::tokio::into_future( + py_function + .call(py, args, None) + .wrap_err("The Python function call did not succeed.") + .unwrap() + .as_ref(py), + ) + }) + .wrap_err("Could not create future of python function call.") + .unwrap() + .await + .wrap_err("Could not await the python future.") + .unwrap(); + Python::with_gil(|py| result.extract(py)).wrap_err("Could not retrieve the python result.") +} diff --git a/src/server.rs b/src/server.rs index 3e857752..cb773caa 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,9 @@ -use eyre::{eyre, Context}; +use eyre::eyre; +use eyre::WrapErr; use futures::future::join_all; -use futures::stream::Next; -use futures::{join, prelude::*}; +use futures::prelude::*; use pyo3::prelude::*; +use serde::Deserialize; use std::collections::hash_map::DefaultHasher; use std::collections::{BTreeMap, HashMap}; use std::hash::Hash; @@ -11,14 +12,13 @@ use std::time::{Duration, Instant}; use tokio::time::timeout; use zenoh::config::Config; use zenoh::prelude::SplitBuffer; -static DURATION_MILLIS: u64 = 1; -use serde::Deserialize; +use crate::python_binding::{call, init}; + +static DURATION_MILLIS: u64 = 1; #[derive(Deserialize, Debug)] struct ConfigVariables { subscriptions: Vec, - app: String, - function: String, } #[pyo3_asyncio::tokio::main] @@ -46,19 +46,19 @@ pub async fn main() -> PyResult<()> { // Store the latest value of all subscription as well as the output of the function. hash the state to easily check if the state has changed. let mut states = BTreeMap::new(); - let mut hasher = DefaultHasher::new(); - states.hash(&mut hasher); - let mut state_hash = hasher.finish(); + let mut states_hash = hash(&states); - let py_function = initialize(variables.app, variables.function).unwrap(); - let dur = Duration::from_millis(DURATION_MILLIS); + let py_function = init() + .wrap_err("Failed to init the Python Function") + .unwrap(); + let duration = Duration::from_millis(DURATION_MILLIS); let mut futures_put = vec![]; loop { let now = Instant::now(); let mut futures = vec![]; for (_, v) in subscribers.iter_mut() { - futures.push(timeout(dur, v.next())); + futures.push(timeout(duration, v.next())); } let results = join_all(futures).await; @@ -74,56 +74,29 @@ pub async fn main() -> PyResult<()> { } } - let mut hasher = DefaultHasher::new(); - states.hash(&mut hasher); - let new_hash = hasher.finish(); - if state_hash == new_hash { + let new_hash = hash(&states); + + if states_hash == new_hash { continue; } - let result = Python::with_gil(|py| { - let args = (states.clone().into_py(py),); - pyo3_asyncio::tokio::into_future( - py_function - .call(py, args, None) - .wrap_err("The Python function call did not succeed.") - .unwrap() - .as_ref(py), - ) - }) - .wrap_err("Could not create future of python function call.") - .unwrap() - .await - .wrap_err("Could not await the python future.") - .unwrap(); - - let outputs: HashMap = Python::with_gil(|py| result.extract(py)) - .wrap_err("Could not retrieve the python result.") - .unwrap(); + let now = Instant::now(); + let outputs = call(&py_function, states.clone()).await.unwrap(); + println!("call python {:#?}", now.elapsed()); for (key, value) in outputs { states.insert(key.clone(), value.clone()); - futures_put.push(timeout(dur, session.put(key, value))); + futures_put.push(session.put(key, value)); } - let mut hasher = DefaultHasher::new(); - states.hash(&mut hasher); - state_hash = hasher.finish(); + states_hash = hash(&states); + println!("loop {:#?}", now.elapsed()); } } -pub fn initialize(file: String, app: String) -> eyre::Result> { - Ok(Python::with_gil(|py| { - let file = py - .import(&file) - .wrap_err("The import file was not found. Check your PYTHONPATH env variable.") - .unwrap(); - // convert Function into a PyObject - let identity = file - .getattr(app) - .wrap_err("The Function was not found in the imported file.") - .unwrap(); - identity.to_object(py) - })) +fn hash(states: &BTreeMap) -> u64 { + let mut hasher = DefaultHasher::new(); + states.hash(&mut hasher); + hasher.finish() }