Browse Source

Merge branch 'main' into rust-typed-input

tags/v0.3.0-rc
Philipp Oppermann 2 years ago
parent
commit
b38c112bc8
Failed to extract signature
13 changed files with 781 additions and 565 deletions
  1. +52
    -3
      .github/workflows/ci.yml
  2. +501
    -460
      Cargo.lock
  3. +1
    -0
      Cargo.toml
  4. +2
    -2
      README.md
  5. +89
    -0
      apis/python/operator/src/lib.rs
  6. +59
    -24
      examples/python-dataflow/run.rs
  7. +0
    -11
      examples/python-dataflow/run.sh
  8. +59
    -24
      examples/python-operator-dataflow/run.rs
  9. +0
    -11
      examples/python-operator-dataflow/run.sh
  10. +1
    -1
      examples/python-ros2-dataflow/random_turtle.py
  11. +9
    -9
      libraries/extensions/ros2-bridge/python/src/lib.rs
  12. +5
    -17
      libraries/extensions/ros2-bridge/python/src/typed/deserialize.rs
  13. +3
    -3
      libraries/extensions/ros2-bridge/python/src/typed/mod.rs

+ 52
- 3
.github/workflows/ci.yml View File

@@ -18,11 +18,32 @@ jobs:
platform: [ubuntu-latest, macos-latest, windows-latest]
fail-fast: false
runs-on: ${{ matrix.platform }}
timeout-minutes: 30
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
if: runner.os == 'Linux'
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: false
- name: Free disk Space (Windows)
if: runner.os == 'Windows'
run: |
docker system prune --all -f
Remove-Item "C:\Android" -Force -Recurse
- uses: Swatinem/rust-cache@v2
with:
cache-provider: buildjet
@@ -35,7 +56,13 @@ jobs:
- name: "Build"
run: cargo build --all
- name: "Test"
# Remove Windows as there is `pdb` linker issue.
# See: https://github.com/dora-rs/dora/pull/359#discussion_r1360268497
if: runner.os == 'Linux' || runner.os == 'macOS'
run: cargo test --all
- name: "Test"
if: runner.os == 'Windows'
run: cargo test --all --lib

# Run examples as separate job because otherwise we will exhaust the disk
# space of the GitHub action runners.
@@ -51,6 +78,27 @@ jobs:
- uses: actions/checkout@v3
- uses: r7kamura/rust-problem-matchers@v1.1.0
- run: cargo --version --verbose
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
if: runner.os == 'Linux'
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
android: true
dotnet: true
haskell: true
large-packages: false
docker-images: true
swap-storage: false
- name: Free disk Space (Windows)
if: runner.os == 'Windows'
run: |
docker system prune --all -f
Remove-Item "C:\Android" -Force -Recurse
- uses: Swatinem/rust-cache@v2
with:
cache-provider: buildjet
@@ -122,7 +170,8 @@ jobs:
env:
QT_QPA_PLATFORM: offscreen
run: |
source /opt/ros/humble/setup.bash && ros2 run turtlesim turtlesim_node &
# Reset only the turtlesim instance as it is not destroyed at the end of the previous job
source /opt/ros/humble/setup.bash && ros2 service call /reset std_srvs/srv/Empty &
cargo run --example python-ros2-dataflow --features="ros2-examples"

bench:
@@ -155,7 +204,7 @@ jobs:
platform: [ubuntu-latest, macos-latest, windows-latest]
fail-fast: false
runs-on: ${{ matrix.platform }}
timeout-minutes: 30
timeout-minutes: 60
steps:
- uses: actions/checkout@v3
- uses: r7kamura/rust-problem-matchers@v1.1.0


+ 501
- 460
Cargo.lock
File diff suppressed because it is too large
View File


+ 1
- 0
Cargo.toml View File

@@ -96,6 +96,7 @@ name = "rust-ros2-dataflow"
path = "examples/rust-ros2-dataflow/run.rs"
required-features = ["ros2-examples"]

# TODO: Fix example #192
[[example]]
name = "rust-dataflow-url"
path = "examples/rust-dataflow-url/run.rs"


+ 2
- 2
README.md View File

@@ -13,7 +13,7 @@
|
<a href="https://dora.carsmos.ai/docs/guides/">Guide</a>
|
<a href="https://discord.gg/ucY3AMeu">Discord</a>
<a href="https://discord.gg/6eMGGutkfE">Discord</a>
</h2>

<div align="center">
@@ -104,7 +104,7 @@ The full documentation is available on our website: https://dora.carsmos.ai
## Discussions

Our main communication channels are:
- [Our Discord server](https://discord.gg/ucY3AMeu)
- [Our Discord server](https://discord.gg/6eMGGutkfE)
- [Our Github Project Discussion](https://github.com/orgs/dora-rs/discussions)

Feel free to reach out on any topic, issues or ideas.


+ 89
- 0
apis/python/operator/src/lib.rs View File

@@ -146,3 +146,92 @@ pub fn metadata_to_pydict<'a>(metadata: &'a Metadata, py: Python<'a>) -> &'a PyD
.unwrap();
dict
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow::{
array::{
ArrayData, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, Int8Array,
ListArray, StructArray,
},
buffer::Buffer,
};

use arrow_schema::{DataType, Field};
use dora_node_api::Data;
use eyre::{Context, Result};

use crate::{copy_array_into_sample, required_data_size};

fn assert_roundtrip(arrow_array: &ArrayData) -> Result<()> {
let size = required_data_size(&arrow_array);
let mut sample: Vec<u8> = vec![0; size];

let info = copy_array_into_sample(&mut sample, &arrow_array)?;

let serialized_deserialized_arrow_array = Arc::new(Data::Vec(sample))
.into_arrow_array(&info)
.context("Could not create arrow array")?;
assert_eq!(arrow_array, &serialized_deserialized_arrow_array);

Ok(())
}

#[test]
fn serialize_deserialize_arrow() -> Result<()> {
// Int8
let arrow_array = Int8Array::from(vec![1, -2, 3, 4]).into();
assert_roundtrip(&arrow_array)?;

// Int64
let arrow_array = Int64Array::from(vec![1, -2, 3, 4]).into();
assert_roundtrip(&arrow_array)?;

// Float64
let arrow_array = Float64Array::from(vec![1., -2., 3., 4.]).into();
assert_roundtrip(&arrow_array)?;

// Struct
let boolean = Arc::new(BooleanArray::from(vec![false, false, true, true]));
let int = Arc::new(Int32Array::from(vec![42, 28, 19, 31]));

let struct_array = StructArray::from(vec![
(
Arc::new(Field::new("b", DataType::Boolean, false)),
boolean.clone() as ArrayRef,
),
(
Arc::new(Field::new("c", DataType::Int32, false)),
int.clone() as ArrayRef,
),
])
.into();
assert_roundtrip(&struct_array)?;

// List
let value_data = ArrayData::builder(DataType::Int32)
.len(8)
.add_buffer(Buffer::from_slice_ref([0, 1, 2, 3, 4, 5, 6, 7]))
.build()
.unwrap();

// Construct a buffer for value offsets, for the nested array:
// [[0, 1, 2], [3, 4, 5], [6, 7]]
let value_offsets = Buffer::from_slice_ref([0, 3, 6, 8]);

// Construct a list array from the above two
let list_data_type = DataType::List(Arc::new(Field::new("item", DataType::Int32, false)));
let list_data = ArrayData::builder(list_data_type.clone())
.len(3)
.add_buffer(value_offsets.clone())
.add_child_data(value_data.clone())
.build()
.unwrap();
let list_array = ListArray::from(list_data).into();
assert_roundtrip(&list_array)?;

Ok(())
}
}

+ 59
- 24
examples/python-dataflow/run.rs View File

@@ -1,46 +1,81 @@
use eyre::{bail, Context};
use std::{env, path::Path};
use eyre::{ContextCompat, WrapErr};
use std::path::Path;
use tracing_subscriber::{
filter::{FilterExt, LevelFilter},
prelude::*,
EnvFilter, Registry,
};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().wrap_err("failed to set up tracing subscriber")?;
set_up_tracing()?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

build_package("dora-daemon").await?;
run(&["python3", "-m", "venv", "../.env"], None)
.await
.context("failed to create venv")?;
let venv = &root.join("examples").join(".env");
std::env::set_var(
"VIRTUAL_ENV",
venv.to_str()
.context("venv path not valid unicode")?
.to_owned(),
);
let orig_path = std::env::var("PATH")?;
let venv_bin = venv.join("bin");
std::env::set_var(
"PATH",
format!(
"{}:{orig_path}",
venv_bin.to_str().context("venv path not valid unicode")?
),
);

run(root).await?;
run(&["pip", "install", "--upgrade", "pip"], None)
.await
.context("failed to install pip")?;
run(&["pip", "install", "-r", "requirements.txt"], None)
.await
.context("pip install failed")?;

Ok(())
}
run(
&["maturin", "develop"],
Some(&root.join("apis").join("python").join("node")),
)
.await
.context("maturin develop failed")?;

let dataflow = Path::new("dataflow.yml");
dora_daemon::Daemon::run_dataflow(dataflow).await?;

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

async fn run(_root: &Path) -> eyre::Result<()> {
let mut run = tokio::process::Command::new("sh");
run.arg("./run.sh");
async fn run(cmd: &[&str], pwd: Option<&Path>) -> eyre::Result<()> {
let mut run = tokio::process::Command::new(cmd[0]);
run.args(&cmd[1..]);

if let Some(pwd) = pwd {
run.current_dir(pwd);
}
if !run.status().await?.success() {
bail!("failed to run python example.");
eyre::bail!("failed to run {cmd:?}");
};
Ok(())
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
pub fn set_up_tracing() -> eyre::Result<()> {
// Filter log using `RUST_LOG`. More useful for CLI.
let filter = EnvFilter::from_default_env().or(LevelFilter::DEBUG);
let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(filter);

let registry = Registry::default().with(stdout_log);

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
tracing::subscriber::set_global_default(registry)
.context("failed to set tracing global subscriber")
}

+ 0
- 11
examples/python-dataflow/run.sh View File

@@ -1,11 +0,0 @@
set -e

python3 -m venv ../.env
. $(pwd)/../.env/bin/activate

# Dependencies
pip install --upgrade pip
pip install -r requirements.txt
maturin develop -m ../../apis/python/node/Cargo.toml

cargo run -p dora-daemon -- --run-dataflow dataflow.yml

+ 59
- 24
examples/python-operator-dataflow/run.rs View File

@@ -1,46 +1,81 @@
use eyre::{bail, Context};
use std::{env, path::Path};
use eyre::{ContextCompat, WrapErr};
use std::path::Path;
use tracing_subscriber::{
filter::{FilterExt, LevelFilter},
prelude::*,
EnvFilter, Registry,
};

#[tokio::main]
async fn main() -> eyre::Result<()> {
set_up_tracing().wrap_err("failed to set up tracing subscriber")?;
set_up_tracing()?;

let root = Path::new(env!("CARGO_MANIFEST_DIR"));
std::env::set_current_dir(root.join(file!()).parent().unwrap())
.wrap_err("failed to set working dir")?;

build_package("dora-daemon").await?;
run(&["python3", "-m", "venv", "../.env"], None)
.await
.context("failed to create venv")?;
let venv = &root.join("examples").join(".env");
std::env::set_var(
"VIRTUAL_ENV",
venv.to_str()
.context("venv path not valid unicode")?
.to_owned(),
);
let orig_path = std::env::var("PATH")?;
let venv_bin = venv.join("bin");
std::env::set_var(
"PATH",
format!(
"{}:{orig_path}",
venv_bin.to_str().context("venv path not valid unicode")?
),
);

run(root).await?;
run(&["pip", "install", "--upgrade", "pip"], None)
.await
.context("failed to install pip")?;
run(&["pip", "install", "-r", "requirements.txt"], None)
.await
.context("pip install failed")?;

Ok(())
}
run(
&["maturin", "develop"],
Some(&root.join("apis").join("python").join("node")),
)
.await
.context("maturin develop failed")?;

let dataflow = Path::new("dataflow.yml");
dora_daemon::Daemon::run_dataflow(dataflow).await?;

async fn build_package(package: &str) -> eyre::Result<()> {
let cargo = std::env::var("CARGO").unwrap();
let mut cmd = tokio::process::Command::new(&cargo);
cmd.arg("build");
cmd.arg("--package").arg(package);
if !cmd.status().await?.success() {
bail!("failed to build {package}");
};
Ok(())
}

async fn run(_root: &Path) -> eyre::Result<()> {
let mut run = tokio::process::Command::new("sh");
run.arg("./run.sh");
async fn run(cmd: &[&str], pwd: Option<&Path>) -> eyre::Result<()> {
let mut run = tokio::process::Command::new(cmd[0]);
run.args(&cmd[1..]);

if let Some(pwd) = pwd {
run.current_dir(pwd);
}
if !run.status().await?.success() {
bail!("failed to run python example.");
eyre::bail!("failed to run {cmd:?}");
};
Ok(())
}

fn set_up_tracing() -> eyre::Result<()> {
use tracing_subscriber::prelude::__tracing_subscriber_SubscriberExt;
pub fn set_up_tracing() -> eyre::Result<()> {
// Filter log using `RUST_LOG`. More useful for CLI.
let filter = EnvFilter::from_default_env().or(LevelFilter::DEBUG);
let stdout_log = tracing_subscriber::fmt::layer()
.pretty()
.with_filter(filter);

let registry = Registry::default().with(stdout_log);

let stdout_log = tracing_subscriber::fmt::layer().pretty();
let subscriber = tracing_subscriber::Registry::default().with(stdout_log);
tracing::subscriber::set_global_default(subscriber)
tracing::subscriber::set_global_default(registry)
.context("failed to set tracing global subscriber")
}

+ 0
- 11
examples/python-operator-dataflow/run.sh View File

@@ -1,11 +0,0 @@
set -e

python3 -m venv .env
. $(pwd)/.env/bin/activate

# Dependencies
pip install --upgrade pip
pip install -r requirements.txt
maturin develop -m ../../apis/python/node/Cargo.toml

cargo run -p dora-daemon -- --run-dataflow dataflow.yml

+ 1
- 1
examples/python-ros2-dataflow/random_turtle.py View File

@@ -7,7 +7,7 @@ import dora
from dora import Node
import pyarrow as pa

CHECK_TICK = 20
CHECK_TICK = 50

ros2_context = dora.experimental.ros2_bridge.Ros2Context()
ros2_node = ros2_context.new_node(


+ 9
- 9
libraries/extensions/ros2-bridge/python/src/lib.rs View File

@@ -5,7 +5,10 @@ use std::{
};

use ::dora_ros2_bridge::{ros2_client, rustdds};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow::{
array::ArrayData,
pyarrow::{FromPyArrow, ToPyArrow},
};
use dora_ros2_bridge_msg_gen::types::Message;
use eyre::{eyre, Context, ContextCompat};
use futures::{Stream, StreamExt};
@@ -14,10 +17,7 @@ use pyo3::{
types::{PyDict, PyList, PyModule},
PyAny, PyObject, PyResult, Python,
};
use typed::{
deserialize::{Ros2Value, TypedDeserializer},
for_message, TypeInfo, TypedValue,
};
use typed::{deserialize::TypedDeserializer, for_message, TypeInfo, TypedValue};

pub mod qos;
pub mod typed;
@@ -219,7 +219,7 @@ impl Ros2Publisher {
#[non_exhaustive]
pub struct Ros2Subscription {
deserializer: TypedDeserializer,
subscription: Option<ros2_client::Subscription<Ros2Value>>,
subscription: Option<ros2_client::Subscription<ArrayData>>,
}

#[pymethods]
@@ -258,13 +258,13 @@ impl Ros2Subscription {

pub struct Ros2SubscriptionStream {
deserializer: TypedDeserializer,
subscription: ros2_client::Subscription<Ros2Value>,
subscription: ros2_client::Subscription<ArrayData>,
}

impl Ros2SubscriptionStream {
pub fn as_stream(
&self,
) -> impl Stream<Item = Result<(Ros2Value, ros2_client::MessageInfo), rustdds::dds::ReadError>> + '_
) -> impl Stream<Item = Result<(ArrayData, ros2_client::MessageInfo), rustdds::dds::ReadError>> + '_
{
self.subscription
.async_stream_seed(self.deserializer.clone())
@@ -272,7 +272,7 @@ impl Ros2SubscriptionStream {
}

impl Stream for Ros2SubscriptionStream {
type Item = Result<(Ros2Value, ros2_client::MessageInfo), rustdds::dds::ReadError>;
type Item = Result<(ArrayData, ros2_client::MessageInfo), rustdds::dds::ReadError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,


+ 5
- 17
libraries/extensions/ros2-bridge/python/src/typed/deserialize.rs View File

@@ -10,19 +10,7 @@ use arrow::{
datatypes::{DataType, Field, Fields},
};
use core::fmt;
use std::{ops::Deref, sync::Arc};

#[derive(Debug, Clone, PartialEq)]
pub struct Ros2Value(ArrayData);

impl Deref for Ros2Value {
type Target = ArrayData;

fn deref(&self) -> &Self::Target {
&self.0
}
}

use std::sync::Arc;
#[derive(Debug, Clone, PartialEq)]
pub struct TypedDeserializer {
type_info: TypeInfo,
@@ -35,7 +23,7 @@ impl TypedDeserializer {
}

impl<'de> serde::de::DeserializeSeed<'de> for TypedDeserializer {
type Value = Ros2Value;
type Value = ArrayData;

fn deserialize<D>(self, deserializer: D) -> Result<Self::Value, D::Error>
where
@@ -87,7 +75,7 @@ impl<'de> serde::de::DeserializeSeed<'de> for TypedDeserializer {
"Datatype does not correspond to default data type.\n Expected: {:#?} \n but got: {:#?}, with value: {:#?}", data_type, value.data_type(), value
);

Ok(Ros2Value(value))
Ok(value)
}
}

@@ -264,7 +252,7 @@ impl<'de> serde::de::Visitor<'de> for StructVisitor {
defaults: default.to_data(),
},
})? {
Some(value) => make_array(value.0),
Some(value) => make_array(value),
None => default,
};
fields.push((
@@ -382,7 +370,7 @@ impl<'de> serde::de::Visitor<'de> for ListVisitor {
defaults: self.defaults.clone(),
},
})? {
let element = make_array(value.0);
let element = make_array(value);
buffer.push(element);
}



+ 3
- 3
libraries/extensions/ros2-bridge/python/src/typed/mod.rs View File

@@ -1,8 +1,8 @@
use arrow::{
array::{
make_array, Array, ArrayData, ArrayRef, BooleanArray, Float32Array, Float64Array,
Int16Array, Int32Array, Int64Array, Int8Array, StringArray, StructArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
make_array, Array, ArrayData, BooleanArray, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, Int8Array, StringArray, StructArray, UInt16Array, UInt32Array,
UInt64Array, UInt8Array,
},
buffer::Buffer,
compute::concat,


Loading…
Cancel
Save