From 364333b8d075d28e1195964013848a93d58a9599 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Tue, 22 Mar 2022 09:54:59 +0100 Subject: [PATCH] Fixing commentary --- Cargo.lock | 80 +++++++++++++++++---------------------------------- Cargo.toml | 3 -- README.md | 4 +-- src/main.rs | 14 +++++---- src/server.rs | 38 ++++++++++++++++++++---- 5 files changed, 69 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c634ade2..f035e31c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,7 +511,6 @@ dependencies = [ name = "dora-rs" version = "0.1.0" dependencies = [ - "async-std", "env_logger", "eyre", "futures", @@ -2309,45 +2308,21 @@ dependencies = [ [[package]] name = "validated_struct" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85affc53ebb88700a36aefff74b6a4db38dbccf26ec777d40c9e9b471be261f8" -dependencies = [ - "json5", - "serde", - "serde_json", - "validated_struct_macros 0.1.10", -] - -[[package]] -name = "validated_struct" -version = "1.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d16daadad78d0a3aec2938c75864f3f57efed795bd9bb6b794fa2e3c3ffeffc3" +checksum = "feef04c049b4beae3037a2a31b8da40d8cebec0b97456f24c7de0ede4ed9efed" dependencies = [ "json5", "serde", "serde_json", - "validated_struct_macros 1.0.0", + "validated_struct_macros", ] [[package]] name = "validated_struct_macros" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca689f3addec28c7d67ed3139498d0325253c71bf58ef57c2ba76141f7ce576f" -dependencies = [ - "proc-macro2", - "quote", - "syn", - "unzip-n", -] - -[[package]] -name = "validated_struct_macros" -version = "1.0.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc2d772810cb61e997977e8d0b51c0344321692d2660e1aa2463dd745b4e803f" +checksum = "9d4444a980afa9ef0d29c2a3f4d952ec0495a7a996a9c78b52698b71bc21edb4" dependencies = [ "proc-macro2", "quote", @@ -2586,7 +2561,7 @@ dependencies = [ [[package]] name = "zenoh" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-global-executor", "async-std", @@ -2612,7 +2587,6 @@ dependencies = [ "stop-token", "uhlc", "uuid", - "validated_struct 0.1.11", "vec_map", "zenoh-buffers", "zenoh-cfg-properties", @@ -2632,7 +2606,7 @@ dependencies = [ [[package]] name = "zenoh-buffers" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "bincode", @@ -2647,7 +2621,7 @@ dependencies = [ [[package]] name = "zenoh-cfg-properties" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "zenoh-core", "zenoh-macros", @@ -2656,7 +2630,7 @@ dependencies = [ [[package]] name = "zenoh-collections" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2669,14 +2643,14 @@ dependencies = [ [[package]] name = "zenoh-config" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "flume", "json5", "serde", "serde_json", "serde_yaml", - "validated_struct 1.0.0", + "validated_struct", "zenoh-cfg-properties", "zenoh-core", "zenoh-protocol-core", @@ -2686,7 +2660,7 @@ dependencies = [ [[package]] name = "zenoh-core" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "anyhow", "lazy_static", @@ -2695,7 +2669,7 @@ dependencies = [ [[package]] name = "zenoh-crypto" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "aes", "hmac", @@ -2708,7 +2682,7 @@ dependencies = [ [[package]] name = "zenoh-link" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2727,7 +2701,7 @@ dependencies = [ [[package]] name = "zenoh-link-commons" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2742,7 +2716,7 @@ dependencies = [ [[package]] name = "zenoh-link-quic" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2764,7 +2738,7 @@ dependencies = [ [[package]] name = "zenoh-link-tcp" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2779,7 +2753,7 @@ dependencies = [ [[package]] name = "zenoh-link-tls" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-rustls", "async-std", @@ -2797,7 +2771,7 @@ dependencies = [ [[package]] name = "zenoh-link-udp" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2814,7 +2788,7 @@ dependencies = [ [[package]] name = "zenoh-link-unixsock_stream" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "async-trait", @@ -2830,7 +2804,7 @@ dependencies = [ [[package]] name = "zenoh-macros" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "proc-macro2", "quote", @@ -2842,7 +2816,7 @@ dependencies = [ [[package]] name = "zenoh-plugin-trait" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "libloading", "log", @@ -2855,7 +2829,7 @@ dependencies = [ [[package]] name = "zenoh-protocol" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "log", "uhlc", @@ -2867,7 +2841,7 @@ dependencies = [ [[package]] name = "zenoh-protocol-core" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "hex", "lazy_static", @@ -2880,7 +2854,7 @@ dependencies = [ [[package]] name = "zenoh-sync" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "event-listener", @@ -2893,7 +2867,7 @@ dependencies = [ [[package]] name = "zenoh-transport" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-global-executor", "async-std", @@ -2919,7 +2893,7 @@ dependencies = [ [[package]] name = "zenoh-util" version = "0.6.0-dev.0" -source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545" +source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d" dependencies = [ "async-std", "clap", diff --git a/Cargo.toml b/Cargo.toml index f6751181..9c487bbc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,6 @@ eyre = "0.6.7" serde = { version = "1.0", features = ["derive"] } serde_yaml = "0.8.23" structopt = "0.3.26" -async-std = { version = "1.10.0", default-features = false, features = [ - "attributes", -] } zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" } env_logger = "0.9.0" tokio = { version="1.17.0", features=["full"]} diff --git a/README.md b/README.md index dca24c82..484fa8b3 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ async def function_name(state: DoraState, message: DoraMessage): ``` The philosophy is to use async function as primary instance to: -- Mitigate the risk of running undafe data mutations. +- Mitigate the risk of running unsafe data mutations. - Managing several run at the same time with timeout / deadline capabilities - Using Tokio Spawn to avoid thread locks on CPU bound runs. @@ -19,7 +19,7 @@ The philosophy is to use async function as primary instance to: I have made a simple example that can be run with: ``` -cargo run start-server app:return_1 +cargo run start-python app:return_1 # Running this might required some shared library as: export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/miniconda3/lib diff --git a/src/main.rs b/src/main.rs index 0d7aa1b8..ff2eed02 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use dora_rs::{descriptor::Descriptor, server::start_server}; -use eyre::{Context, ContextCompat}; -use pyo3::prelude::*; +use eyre::{Context, ContextCompat, Result}; +use pyo3::{prelude::*, prepare_freethreaded_python}; use std::{fs::File, path::PathBuf}; use structopt::StructOpt; @@ -13,8 +13,7 @@ enum Command { StartPython { server: String }, } -#[pyo3_asyncio::tokio::main] -async fn main() -> PyResult<()> { +fn main() -> Result<()> { let command = Command::from_args(); match command { Command::Graph { file } => { @@ -39,8 +38,11 @@ async fn main() -> PyResult<()> { let mut server = server.split(":"); let file = server.next().context("Server string is empty.").unwrap(); let app = server.next().context("No app found").unwrap(); - - let _result = start_server(file, app).await; + let rt = tokio::runtime::Runtime::new().unwrap(); + prepare_freethreaded_python(); + rt.block_on(async { + let _result = start_server(file, app).await; + }); } } diff --git a/src/server.rs b/src/server.rs index 20bfb935..0ddde00e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,3 +1,4 @@ +use eyre::{eyre, Context}; use futures::prelude::*; use pyo3::prelude::*; use std::collections::HashMap; @@ -13,6 +14,9 @@ pub async fn start_server(file: &str, app: &str) -> eyre::Result<()> { let mut subscriber = session .subscribe(env::var("SRC_LABELS").unwrap()) .await + .map_err(|err| { + eyre!("Could not subscribe to the given subscription key expression. Error: {err}") + }) .unwrap(); let identity = initialize(file, app).await.unwrap(); @@ -23,24 +27,46 @@ pub async fn start_server(file: &str, app: &str) -> eyre::Result<()> { let result = Python::with_gil(|py| { let args = (binary.into_py(py),); - pyo3_asyncio::tokio::into_future(identity.call(py, args, None).unwrap().as_ref(py)) + pyo3_asyncio::tokio::into_future( + identity + .call(py, args, None) + .wrap_err("The Python function call did not succeed.") + .unwrap() + .as_ref(py), + ) }) + .wrap_err("Could not create future of python function call.") .unwrap() .await + .wrap_err("Could not await the python future.") .unwrap(); - let outputs: HashMap = Python::with_gil(|py| result.extract(py)).unwrap(); + let outputs: HashMap = Python::with_gil(|py| result.extract(py)) + .wrap_err("Could not retrieve the python result.") + .unwrap(); for (key, value) in outputs { - session.put(key, value).await.unwrap(); + session + .put(key, value) + .await + .map_err(|err| { + eyre!("Could not put the output within the chosen key expression topic. Error: {err}") + }) + .unwrap(); } } } -pub async fn initialize(file: &str, app: &str) -> PyResult> { +pub async fn initialize(file: &str, app: &str) -> eyre::Result> { Ok(Python::with_gil(|py| { - let file = py.import(file).unwrap(); + let file = py + .import(file) + .wrap_err("The import file was not found. Check your PYTHONPATH env variable.") + .unwrap(); // convert Function into a PyObject - let identity = file.getattr(app).unwrap(); + let identity = file + .getattr(app) + .wrap_err("The Function was not found in the imported file.") + .unwrap(); identity.to_object(py) })) }