Browse Source

Fixing commentary

tags/v0.0.0-test.4
haixuanTao 3 years ago
parent
commit
364333b8d0
5 changed files with 69 additions and 70 deletions
  1. +27
    -53
      Cargo.lock
  2. +0
    -3
      Cargo.toml
  3. +2
    -2
      README.md
  4. +8
    -6
      src/main.rs
  5. +32
    -6
      src/server.rs

+ 27
- 53
Cargo.lock View File

@@ -511,7 +511,6 @@ dependencies = [
name = "dora-rs"
version = "0.1.0"
dependencies = [
"async-std",
"env_logger",
"eyre",
"futures",
@@ -2309,45 +2308,21 @@ dependencies = [

[[package]]
name = "validated_struct"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85affc53ebb88700a36aefff74b6a4db38dbccf26ec777d40c9e9b471be261f8"
dependencies = [
"json5",
"serde",
"serde_json",
"validated_struct_macros 0.1.10",
]

[[package]]
name = "validated_struct"
version = "1.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16daadad78d0a3aec2938c75864f3f57efed795bd9bb6b794fa2e3c3ffeffc3"
checksum = "feef04c049b4beae3037a2a31b8da40d8cebec0b97456f24c7de0ede4ed9efed"
dependencies = [
"json5",
"serde",
"serde_json",
"validated_struct_macros 1.0.0",
"validated_struct_macros",
]

[[package]]
name = "validated_struct_macros"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca689f3addec28c7d67ed3139498d0325253c71bf58ef57c2ba76141f7ce576f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"unzip-n",
]

[[package]]
name = "validated_struct_macros"
version = "1.0.0"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc2d772810cb61e997977e8d0b51c0344321692d2660e1aa2463dd745b4e803f"
checksum = "9d4444a980afa9ef0d29c2a3f4d952ec0495a7a996a9c78b52698b71bc21edb4"
dependencies = [
"proc-macro2",
"quote",
@@ -2586,7 +2561,7 @@ dependencies = [
[[package]]
name = "zenoh"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-global-executor",
"async-std",
@@ -2612,7 +2587,6 @@ dependencies = [
"stop-token",
"uhlc",
"uuid",
"validated_struct 0.1.11",
"vec_map",
"zenoh-buffers",
"zenoh-cfg-properties",
@@ -2632,7 +2606,7 @@ dependencies = [
[[package]]
name = "zenoh-buffers"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"bincode",
@@ -2647,7 +2621,7 @@ dependencies = [
[[package]]
name = "zenoh-cfg-properties"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"zenoh-core",
"zenoh-macros",
@@ -2656,7 +2630,7 @@ dependencies = [
[[package]]
name = "zenoh-collections"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2669,14 +2643,14 @@ dependencies = [
[[package]]
name = "zenoh-config"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"flume",
"json5",
"serde",
"serde_json",
"serde_yaml",
"validated_struct 1.0.0",
"validated_struct",
"zenoh-cfg-properties",
"zenoh-core",
"zenoh-protocol-core",
@@ -2686,7 +2660,7 @@ dependencies = [
[[package]]
name = "zenoh-core"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"anyhow",
"lazy_static",
@@ -2695,7 +2669,7 @@ dependencies = [
[[package]]
name = "zenoh-crypto"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"aes",
"hmac",
@@ -2708,7 +2682,7 @@ dependencies = [
[[package]]
name = "zenoh-link"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2727,7 +2701,7 @@ dependencies = [
[[package]]
name = "zenoh-link-commons"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2742,7 +2716,7 @@ dependencies = [
[[package]]
name = "zenoh-link-quic"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2764,7 +2738,7 @@ dependencies = [
[[package]]
name = "zenoh-link-tcp"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2779,7 +2753,7 @@ dependencies = [
[[package]]
name = "zenoh-link-tls"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-rustls",
"async-std",
@@ -2797,7 +2771,7 @@ dependencies = [
[[package]]
name = "zenoh-link-udp"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2814,7 +2788,7 @@ dependencies = [
[[package]]
name = "zenoh-link-unixsock_stream"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"async-trait",
@@ -2830,7 +2804,7 @@ dependencies = [
[[package]]
name = "zenoh-macros"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"proc-macro2",
"quote",
@@ -2842,7 +2816,7 @@ dependencies = [
[[package]]
name = "zenoh-plugin-trait"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"libloading",
"log",
@@ -2855,7 +2829,7 @@ dependencies = [
[[package]]
name = "zenoh-protocol"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"log",
"uhlc",
@@ -2867,7 +2841,7 @@ dependencies = [
[[package]]
name = "zenoh-protocol-core"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"hex",
"lazy_static",
@@ -2880,7 +2854,7 @@ dependencies = [
[[package]]
name = "zenoh-sync"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"event-listener",
@@ -2893,7 +2867,7 @@ dependencies = [
[[package]]
name = "zenoh-transport"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-global-executor",
"async-std",
@@ -2919,7 +2893,7 @@ dependencies = [
[[package]]
name = "zenoh-util"
version = "0.6.0-dev.0"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#71228023fbec3e6051160f05165ec2ef66338545"
source = "git+https://github.com/eclipse-zenoh/zenoh.git#43666dd04433a5836f0cbadb610ca23a58d7ed3d"
dependencies = [
"async-std",
"clap",


+ 0
- 3
Cargo.toml View File

@@ -10,9 +10,6 @@ eyre = "0.6.7"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8.23"
structopt = "0.3.26"
async-std = { version = "1.10.0", default-features = false, features = [
"attributes",
] }
zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" }
env_logger = "0.9.0"
tokio = { version="1.17.0", features=["full"]}


+ 2
- 2
README.md View File

@@ -11,7 +11,7 @@ async def function_name(state: DoraState, message: DoraMessage):
```

The philosophy is to use async function as primary instance to:
- Mitigate the risk of running undafe data mutations.
- Mitigate the risk of running unsafe data mutations.
- Managing several run at the same time with timeout / deadline capabilities
- Using Tokio Spawn to avoid thread locks on CPU bound runs.

@@ -19,7 +19,7 @@ The philosophy is to use async function as primary instance to:

I have made a simple example that can be run with:
```
cargo run start-server app:return_1
cargo run start-python app:return_1

# Running this might required some shared library as:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/miniconda3/lib


+ 8
- 6
src/main.rs View File

@@ -1,6 +1,6 @@
use dora_rs::{descriptor::Descriptor, server::start_server};
use eyre::{Context, ContextCompat};
use pyo3::prelude::*;
use eyre::{Context, ContextCompat, Result};
use pyo3::{prelude::*, prepare_freethreaded_python};
use std::{fs::File, path::PathBuf};
use structopt::StructOpt;

@@ -13,8 +13,7 @@ enum Command {
StartPython { server: String },
}

#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
fn main() -> Result<()> {
let command = Command::from_args();
match command {
Command::Graph { file } => {
@@ -39,8 +38,11 @@ async fn main() -> PyResult<()> {
let mut server = server.split(":");
let file = server.next().context("Server string is empty.").unwrap();
let app = server.next().context("No app found").unwrap();

let _result = start_server(file, app).await;
let rt = tokio::runtime::Runtime::new().unwrap();
prepare_freethreaded_python();
rt.block_on(async {
let _result = start_server(file, app).await;
});
}
}



+ 32
- 6
src/server.rs View File

@@ -1,3 +1,4 @@
use eyre::{eyre, Context};
use futures::prelude::*;
use pyo3::prelude::*;
use std::collections::HashMap;
@@ -13,6 +14,9 @@ pub async fn start_server(file: &str, app: &str) -> eyre::Result<()> {
let mut subscriber = session
.subscribe(env::var("SRC_LABELS").unwrap())
.await
.map_err(|err| {
eyre!("Could not subscribe to the given subscription key expression. Error: {err}")
})
.unwrap();
let identity = initialize(file, app).await.unwrap();

@@ -23,24 +27,46 @@ pub async fn start_server(file: &str, app: &str) -> eyre::Result<()> {

let result = Python::with_gil(|py| {
let args = (binary.into_py(py),);
pyo3_asyncio::tokio::into_future(identity.call(py, args, None).unwrap().as_ref(py))
pyo3_asyncio::tokio::into_future(
identity
.call(py, args, None)
.wrap_err("The Python function call did not succeed.")
.unwrap()
.as_ref(py),
)
})
.wrap_err("Could not create future of python function call.")
.unwrap()
.await
.wrap_err("Could not await the python future.")
.unwrap();

let outputs: HashMap<String, String> = Python::with_gil(|py| result.extract(py)).unwrap();
let outputs: HashMap<String, String> = Python::with_gil(|py| result.extract(py))
.wrap_err("Could not retrieve the python result.")
.unwrap();
for (key, value) in outputs {
session.put(key, value).await.unwrap();
session
.put(key, value)
.await
.map_err(|err| {
eyre!("Could not put the output within the chosen key expression topic. Error: {err}")
})
.unwrap();
}
}
}

pub async fn initialize(file: &str, app: &str) -> PyResult<Py<PyAny>> {
pub async fn initialize(file: &str, app: &str) -> eyre::Result<Py<PyAny>> {
Ok(Python::with_gil(|py| {
let file = py.import(file).unwrap();
let file = py
.import(file)
.wrap_err("The import file was not found. Check your PYTHONPATH env variable.")
.unwrap();
// convert Function into a PyObject
let identity = file.getattr(app).unwrap();
let identity = file
.getattr(app)
.wrap_err("The Function was not found in the imported file.")
.unwrap();
identity.to_object(py)
}))
}

Loading…
Cancel
Save