Browse Source

Merge pull request #16 from futurewei-tech/rust-python-binding

Rust python binding
tags/v0.0.0-test.4
Xavier Tao GitHub 4 years ago
parent
commit
6ca35fd0c9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 3095 additions and 179 deletions
  1. +150
    -0
      .gitignore
  2. +2738
    -164
      Cargo.lock
  3. +6
    -2
      Cargo.toml
  4. +28
    -0
      README.md
  5. +13
    -0
      app.py
  6. +1
    -0
      src/lib.rs
  7. +25
    -13
      src/main.rs
  8. +32
    -0
      src/python/binding.rs
  9. +2
    -0
      src/python/mod.rs
  10. +100
    -0
      src/python/server.rs

+ 150
- 0
.gitignore View File

@@ -4,3 +4,153 @@

# These are backup files generated by rustfmt
**/*.rs.bk





# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

+ 2738
- 164
Cargo.lock
File diff suppressed because it is too large
View File


+ 6
- 2
Cargo.toml View File

@@ -5,10 +5,14 @@ edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace]

[dependencies]
eyre = "0.6.7"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.8.23"
structopt = "0.3.26"
zenoh = { git="https://github.com/eclipse-zenoh/zenoh.git" }
env_logger = "0.9.0"
tokio = { version="1.17.0", features=["full"]}
pyo3 = "0.16.1"
futures = "0.3.12"
envy = "0.4.2"

+ 28
- 0
README.md View File

@@ -1,2 +1,30 @@
# dora-rs
Dataflow Oriented Robotic Architecture

## Python API Design

The Python API is probably going to look as follows:
```python
@register
async def function_name(state: DoraState, message: DoraMessage):
return outputs
```

The philosophy is to use async function as primary instance to:
- Mitigate the risk of running unsafe data mutations.
- Managing several run at the same time with timeout / deadline capabilities
- Using Tokio Spawn to avoid thread locks on CPU bound runs.

## Getting started

I have made a simple example that can be run with:
```
cargo run start-python app:return_1

# Running this might required some shared library as:
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:~/miniconda3/lib
export PYTHONPATH=$PYTHONPATH:$(pwd)
```
That is going to listen to the key_expr "a" and run the `return_1` function within the `app.py` python async.

This is still very experimental.

+ 13
- 0
app.py View File

@@ -0,0 +1,13 @@
import asyncio

counter = 0

import logging


def return_1(x):
global counter
counter += 1
print(counter)
logging.info(x)
return {"b": "b", "c": "c"}

+ 1
- 0
src/lib.rs View File

@@ -1 +1,2 @@
pub mod descriptor;
pub mod python;

+ 25
- 13
src/main.rs View File

@@ -4,25 +4,37 @@ use std::{fs::File, path::PathBuf};
use structopt::StructOpt;

#[derive(Debug, Clone, StructOpt)]
struct Args {
file: PathBuf,
#[structopt(about = "Dora control")]
enum Command {
#[structopt(about = "Print Graph")]
Graph { file: PathBuf },
#[structopt(about = "Run Python server")]
StartPython(dora_rs::python::server::PythonCommand),
}

fn main() -> eyre::Result<()> {
let args = Args::from_args();
let descriptor_file = File::open(&args.file).context("failed to open given file")?;
env_logger::init();

let descriptor: Descriptor =
serde_yaml::from_reader(descriptor_file).context("failed to parse given descriptor")?;
let command = Command::from_args();
match command {
Command::Graph { file } => {
let descriptor_file = File::open(&file).context("failed to open given file")?;

let visualized = descriptor
.visualize_as_mermaid()
.context("failed to visualize descriptor")?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
let descriptor: Descriptor = serde_yaml::from_reader(descriptor_file)
.context("failed to parse given descriptor")?;
let visualized = descriptor
.visualize_as_mermaid()
.context("failed to visualize descriptor")?;
println!("{visualized}");
println!(
"Paste the above output on https://mermaid.live/ or in a \
```mermaid code block on GitHub to display it."
);
);
}
Command::StartPython(command) => {
dora_rs::python::server::run(command).context("python server failed")?;
}
}

Ok(())
}

+ 32
- 0
src/python/binding.rs View File

@@ -0,0 +1,32 @@
use eyre::Context;
use pyo3::prelude::*;
use std::collections::{BTreeMap, HashMap};

pub fn init(app: &str, function: &str) -> eyre::Result<Py<PyAny>> {
pyo3::prepare_freethreaded_python();
Python::with_gil(|py| {
let file = py
.import(app)
.wrap_err("The import file was not found. Check your PYTHONPATH env variable.")?;
// convert Function into a PyObject
let identity = file
.getattr(function)
.wrap_err("The Function was not found in the imported file.")?;
Ok(identity.to_object(py))
})
}

pub async fn call(
py_function: &PyObject,
states: BTreeMap<String, String>,
) -> eyre::Result<HashMap<String, String>> {
Python::with_gil(|py| {
let args = (states.into_py(py),);
let result = py_function
.call(py, args, None)
.wrap_err("The Python function call did not succeed.")?;
result
.extract(py)
.wrap_err("The Python function returned an error.")
})
}

+ 2
- 0
src/python/mod.rs View File

@@ -0,0 +1,2 @@
pub mod binding;
pub mod server;

+ 100
- 0
src/python/server.rs View File

@@ -0,0 +1,100 @@
use super::binding;
use eyre::eyre;
use eyre::WrapErr;
use futures::future::join_all;
use futures::prelude::*;
use pyo3::prelude::*;
use serde::Deserialize;
use std::collections::hash_map::DefaultHasher;
use std::collections::{BTreeMap, HashMap};
use std::hash::Hash;
use std::hash::Hasher;
use std::time::{Duration, Instant};
use structopt::StructOpt;
use tokio::time::timeout;
use zenoh::config::Config;
use zenoh::prelude::SplitBuffer;

static DURATION_MILLIS: u64 = 5;

#[derive(Deserialize, Debug, Clone, StructOpt)]
pub struct PythonCommand {
pub subscriptions: Vec<String>,
pub app: String,
pub function: String,
}

#[tokio::main]
pub async fn run(variables: PythonCommand) -> PyResult<()> {
// Subscribe
let session = zenoh::open(Config::default()).await.unwrap();

// Create a hashmap of all subscriptions.
let mut subscribers = HashMap::new();
for subscription in &variables.subscriptions {
subscribers.insert(subscription.clone(), session
.subscribe(subscription)
.await
.map_err(|err| {
eyre!("Could not subscribe to the given subscription key expression. Error: {err}")
})
.unwrap());
}

// Store the latest value of all subscription as well as the output of the function. hash the state to easily check if the state has changed.
let mut states = BTreeMap::new();
let mut states_hash = hash(&states);

let py_function = binding::init(&variables.app, &variables.function)
.wrap_err("Failed to init the Python Function")
.unwrap();
let duration = Duration::from_millis(DURATION_MILLIS);
let mut futures_put = vec![];

loop {
let loop_start = Instant::now();

let mut futures = vec![];
for (_, v) in subscribers.iter_mut() {
futures.push(timeout(duration, v.next()));
}

let results = join_all(futures).await;

for (result, subscription) in results.into_iter().zip(&variables.subscriptions.clone()) {
if let Ok(Some(data)) = result {
let value = data.value.payload;
let binary = value.contiguous();
states.insert(
subscription.clone().to_string(),
String::from_utf8(binary.to_vec()).unwrap(),
);
}
}

let new_hash = hash(&states);

if states_hash == new_hash {
continue;
}

let now = Instant::now();

let outputs = binding::call(&py_function, states.clone()).await.unwrap();
println!("call python {:#?}", now.elapsed());

for (key, value) in outputs {
states.insert(key.clone(), value.clone());
futures_put.push(session.put(key, value));
}

states_hash = hash(&states);
println!("loop {:#?}", loop_start.elapsed());
}
}

fn hash(states: &BTreeMap<String, String>) -> u64 {
let mut hasher = DefaultHasher::new();
states.hash(&mut hasher);
hasher.finish()
}

Loading…
Cancel
Save