Browse Source

Merge pull request #17 from futurewei-tech/rust-python-binding-improvements

Some improvements to new Python bindings
tags/v0.0.0-test.4
Xavier Tao GitHub 4 years ago
parent
commit
5d57726015
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 62 additions and 77 deletions
  1. +1
    -2
      src/lib.rs
  2. +10
    -15
      src/main.rs
  3. +32
    -0
      src/python/binding.rs
  4. +2
    -0
      src/python/mod.rs
  5. +17
    -19
      src/python/server.rs
  6. +0
    -41
      src/python_binding.rs

+ 1
- 2
src/lib.rs View File

@@ -1,3 +1,2 @@
pub mod descriptor;
pub mod python_binding;
pub mod server;
pub mod python;

+ 10
- 15
src/main.rs View File

@@ -1,6 +1,5 @@
use dora_rs::descriptor::Descriptor;
use eyre::{Context, Result};
use pyo3::prepare_freethreaded_python;
use eyre::Context;
use std::{fs::File, path::PathBuf};
use structopt::StructOpt;

@@ -10,34 +9,30 @@ enum Command {
#[structopt(about = "Print Graph")]
Graph { file: PathBuf },
#[structopt(about = "Run Python server")]
StartPython,
StartPython(dora_rs::python::server::PythonCommand),
}

fn main() -> Result<()> {
fn main() -> eyre::Result<()> {
env_logger::init();

let command = Command::from_args();
match command {
Command::Graph { file } => {
let descriptor_file = File::open(&file)
.context("failed to open given file")
.unwrap();
let descriptor_file = File::open(&file).context("failed to open given file")?;

let descriptor: Descriptor = serde_yaml::from_reader(descriptor_file)
.context("failed to parse given descriptor")
.unwrap();
.context("failed to parse given descriptor")?;
let visualized = descriptor
.visualize_as_mermaid()
.context("failed to visualize descriptor")
.unwrap();
.context("failed to visualize descriptor")?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
}
Command::StartPython => {
prepare_freethreaded_python();

dora_rs::server::main();
Command::StartPython(command) => {
dora_rs::python::server::run(command).context("python server failed")?;
}
}



+ 32
- 0
src/python/binding.rs View File

@@ -0,0 +1,32 @@
use eyre::Context;
use pyo3::prelude::*;
use std::collections::{BTreeMap, HashMap};

pub fn init(app: &str, function: &str) -> eyre::Result<Py<PyAny>> {
pyo3::prepare_freethreaded_python();
Python::with_gil(|py| {
let file = py
.import(app)
.wrap_err("The import file was not found. Check your PYTHONPATH env variable.")?;
// convert Function into a PyObject
let identity = file
.getattr(function)
.wrap_err("The Function was not found in the imported file.")?;
Ok(identity.to_object(py))
})
}

pub async fn call(
py_function: &PyObject,
states: BTreeMap<String, String>,
) -> eyre::Result<HashMap<String, String>> {
Python::with_gil(|py| {
let args = (states.into_py(py),);
let result = py_function
.call(py, args, None)
.wrap_err("The Python function call did not succeed.")?;
result
.extract(py)
.wrap_err("The Python function returned an error.")
})
}

+ 2
- 0
src/python/mod.rs View File

@@ -0,0 +1,2 @@
pub mod binding;
pub mod server;

src/server.rs → src/python/server.rs View File

@@ -1,3 +1,4 @@
use super::binding;
use eyre::eyre;
use eyre::WrapErr;
use futures::future::join_all;
@@ -9,32 +10,28 @@ use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use std::hash::Hasher;
use std::time::{Duration, Instant};
use structopt::StructOpt;
use tokio::time::timeout;
use zenoh::config::Config;
use zenoh::prelude::SplitBuffer;

use crate::python_binding::{call, init};

static DURATION_MILLIS: u64 = 5;
#[derive(Deserialize, Debug)]
struct ConfigVariables {
subscriptions: Vec<String>,

#[derive(Deserialize, Debug, Clone, StructOpt)]
pub struct PythonCommand {
pub subscriptions: Vec<String>,
pub app: String,
pub function: String,
}

#[tokio::main]
pub async fn main() -> PyResult<()> {
pub async fn run(variables: PythonCommand) -> PyResult<()> {
// Subscribe
let variables = envy::from_env::<ConfigVariables>().unwrap();

env_logger::init();
let config = Config::default();
let session = zenoh::open(config).await.unwrap();
let session = zenoh::open(Config::default()).await.unwrap();

// Create a hashmap of all subscriptions.
let mut subscribers = HashMap::new();
let subs = variables.subscriptions.clone();

for subscription in &subs {
for subscription in &variables.subscriptions {
subscribers.insert(subscription.clone(), session
.subscribe(subscription)
.await
@@ -48,14 +45,15 @@ pub async fn main() -> PyResult<()> {
let mut states = BTreeMap::new();
let mut states_hash = hash(&states);

let py_function = init()
let py_function = binding::init(&variables.app, &variables.function)
.wrap_err("Failed to init the Python Function")
.unwrap();
let duration = Duration::from_millis(DURATION_MILLIS);
let mut futures_put = vec![];

loop {
let now = Instant::now();
let loop_start = Instant::now();

let mut futures = vec![];
for (_, v) in subscribers.iter_mut() {
futures.push(timeout(duration, v.next()));
@@ -63,7 +61,7 @@ pub async fn main() -> PyResult<()> {

let results = join_all(futures).await;

for (result, subscription) in results.into_iter().zip(&subs) {
for (result, subscription) in results.into_iter().zip(&variables.subscriptions.clone()) {
if let Ok(Some(data)) = result {
let value = data.value.payload;
let binary = value.contiguous();
@@ -82,7 +80,7 @@ pub async fn main() -> PyResult<()> {

let now = Instant::now();

let outputs = call(&py_function, states.clone()).await.unwrap();
let outputs = binding::call(&py_function, states.clone()).await.unwrap();
println!("call python {:#?}", now.elapsed());

for (key, value) in outputs {
@@ -91,7 +89,7 @@ pub async fn main() -> PyResult<()> {
}

states_hash = hash(&states);
println!("loop {:#?}", now.elapsed());
println!("loop {:#?}", loop_start.elapsed());
}
}


+ 0
- 41
src/python_binding.rs View File

@@ -1,41 +0,0 @@
use eyre::{eyre, Context};
use pyo3::prelude::*;
use serde::Deserialize;
use std::collections::{BTreeMap, HashMap};

#[derive(Deserialize, Debug)]
struct PythonVariables {
app: String,
function: String,
}

pub fn init() -> eyre::Result<Py<PyAny>> {
let variables = envy::from_env::<PythonVariables>().unwrap();
Ok(Python::with_gil(|py| {
let file = py
.import(&variables.app)
.wrap_err("The import file was not found. Check your PYTHONPATH env variable.")
.unwrap();
// convert Function into a PyObject
let identity = file
.getattr(variables.function)
.wrap_err("The Function was not found in the imported file.")
.unwrap();
identity.to_object(py)
}))
}

pub async fn call(
py_function: &PyObject,
states: BTreeMap<String, String>,
) -> eyre::Result<HashMap<String, String>> {
Python::with_gil(|py| {
let args = (states.clone().into_py(py),);
let result = py_function
.call(py, args, None)
.wrap_err("The Python function call did not succeed.")
.unwrap();
result.extract(py)
})
.wrap_err("")
}

Loading…
Cancel
Save