Browse Source

Enable service clients in C++ ROS2 bridge

tags/v0.3.3-rc1
Philipp Oppermann 1 year ago
parent
commit
1a0992a9b7
Failed to extract signature
6 changed files with 177 additions and 28 deletions
  1. +26
    -12
      Cargo.lock
  2. +3
    -0
      libraries/extensions/ros2-bridge/Cargo.toml
  3. +26
    -5
      libraries/extensions/ros2-bridge/msg-gen/src/lib.rs
  4. +3
    -4
      libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs
  5. +115
    -7
      libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs
  6. +4
    -0
      libraries/extensions/ros2-bridge/src/lib.rs

+ 26
- 12
Cargo.lock View File

@@ -1015,7 +1015,7 @@ dependencies = [
name = "communication-layer-pub-sub"
version = "0.3.2"
dependencies = [
"flume",
"flume 0.10.14",
"zenoh",
]

@@ -1488,7 +1488,7 @@ dependencies = [
"dora-node-api",
"dora-tracing",
"eyre",
"flume",
"flume 0.10.14",
"futures",
"futures-concurrency",
"serde_json",
@@ -1562,7 +1562,7 @@ dependencies = [
"dora-core",
"dora-tracing",
"eyre",
"flume",
"flume 0.10.14",
"futures",
"futures-concurrency",
"futures-timer",
@@ -1610,7 +1610,7 @@ dependencies = [
"dora-ros2-bridge-python",
"dora-runtime",
"eyre",
"flume",
"flume 0.10.14",
"futures",
"pyo3",
"pythonize",
@@ -1660,7 +1660,7 @@ dependencies = [
"arrow-schema",
"dora-node-api",
"eyre",
"flume",
"flume 0.10.14",
"pyo3",
"serde_yaml 0.8.26",
]
@@ -1692,7 +1692,9 @@ dependencies = [
"dora-daemon",
"dora-ros2-bridge-msg-gen",
"eyre",
"flume 0.11.0",
"futures",
"futures-timer",
"rand",
"ros2-client",
"rust-format",
@@ -1747,7 +1749,7 @@ dependencies = [
"dora-operator-api-types",
"dora-tracing",
"eyre",
"flume",
"flume 0.10.14",
"futures",
"futures-concurrency",
"libloading",
@@ -1998,6 +2000,18 @@ dependencies = [
"spin 0.9.8",
]

[[package]]
name = "flume"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181"
dependencies = [
"futures-core",
"futures-sink",
"nanorand",
"spin 0.9.8",
]

[[package]]
name = "fnv"
version = "1.0.7"
@@ -6530,7 +6544,7 @@ dependencies = [
"base64 0.13.1",
"env_logger",
"event-listener 2.5.3",
"flume",
"flume 0.10.14",
"form_urlencoded",
"futures",
"git-version",
@@ -6594,7 +6608,7 @@ checksum = "1e256d7aff2c9af765d77efbfae7fcb708d2d7f4e179aa201bff2f81ad7a3845"
dependencies = [
"async-std",
"async-trait",
"flume",
"flume 0.10.14",
"log",
"zenoh-core",
"zenoh-sync",
@@ -6606,7 +6620,7 @@ version = "0.7.0-rc"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bad1ff61abf28c57e8879ec4286fa29becf7e9bf12555df9a7faddff3bc9ea1b"
dependencies = [
"flume",
"flume 0.10.14",
"json5",
"num_cpus",
"serde",
@@ -6673,7 +6687,7 @@ checksum = "21aab9eeb2aba53e37aae57467ffca1268d209811c5e2f39761aab4c1343bce3"
dependencies = [
"async-std",
"async-trait",
"flume",
"flume 0.10.14",
"serde",
"zenoh-buffers",
"zenoh-cfg-properties",
@@ -6845,7 +6859,7 @@ checksum = "821070b62a55d4c8a22e1e06c939c1f2d94767e660df9fcbea377781f72f59bf"
dependencies = [
"async-std",
"event-listener 2.5.3",
"flume",
"flume 0.10.14",
"futures",
"tokio",
"zenoh-core",
@@ -6861,7 +6875,7 @@ dependencies = [
"async-global-executor",
"async-std",
"async-trait",
"flume",
"flume 0.10.14",
"log",
"paste",
"rand",


+ 3
- 0
libraries/extensions/ros2-bridge/Cargo.toml View File

@@ -24,6 +24,9 @@ tokio = { version = "1.29.1", features = ["full"], optional = true }
dora-daemon = { path = "../../../binaries/daemon", optional = true }
tracing = "0.1.37"
tracing-subscriber = "0.3.17"
flume = "0.11.0"
futures = { version = "0.3.21", features = ["thread-pool"] }
futures-timer = "3.0.3"

[dev-dependencies]
rand = "0.8.5"


+ 26
- 5
libraries/extensions/ros2-bridge/msg-gen/src/lib.rs View File

@@ -107,22 +107,43 @@ where
}
},
quote! {
struct Ros2Context(ros2_client::Context);
struct Ros2Context{
context: ros2_client::Context,
executor: std::sync::Arc<futures::executor::ThreadPool>,
}

fn init_ros2_context() -> eyre::Result<Box<Ros2Context>> {
Ok(Box::new(Ros2Context(ros2_client::Context::new()?)))
Ok(Box::new(Ros2Context{
context: ros2_client::Context::new()?,
executor: std::sync::Arc::new(futures::executor::ThreadPool::new()?),
}))
}

impl Ros2Context {
fn new_node(&self, name_space: &str, base_name: &str) -> eyre::Result<Box<Ros2Node>> {
use futures::task::SpawnExt as _;
use eyre::WrapErr as _;

let name = ros2_client::NodeName::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?;
let options = ros2_client::NodeOptions::new().enable_rosout(true);
let node = self.0.new_node(name, options)?;
Ok(Box::new(Ros2Node(node)))
let mut node = self.context.new_node(name, options)?;

let spinner = node.spinner();
self.executor.spawn(async {
if let Err(err) = spinner.spin().await {
eprintln!("ros2 spinner failed: {err:?}");
}
})
.context("failed to spawn ros2 spinner")?;

Ok(Box::new(Ros2Node{ node, executor: self.executor.clone(), }))
}
}

struct Ros2Node(ros2_client::Node);
struct Ros2Node {
node : ros2_client::Node,
executor: std::sync::Arc<futures::executor::ThreadPool>,
}

fn qos_default() -> ffi::Ros2QosPolicies {
ffi::Ros2QosPolicies::new(None, None, None, None, None, None, None)


+ 3
- 4
libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs View File

@@ -1,4 +1,3 @@
use heck::SnakeCase;
use quote::{format_ident, quote, ToTokens};
use syn::Ident;

@@ -314,19 +313,19 @@ impl Message {
pub fn #create_topic(&self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result<Box<#topic_name>> {
let name = crate::ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?;
let type_name = crate::ros2_client::MessageTypeName::new(#package_name, #self_name);
let topic = self.0.create_topic(&name, type_name, &qos.into())?;
let topic = self.node.create_topic(&name, type_name, &qos.into())?;
Ok(Box::new(#topic_name(topic)))
}

#[allow(non_snake_case)]
pub fn #create_publisher(&mut self, topic: &Box<#topic_name>, qos: ffi::Ros2QosPolicies) -> eyre::Result<Box<#publisher_name>> {
let publisher = self.0.create_publisher(&topic.0, Some(qos.into()))?;
let publisher = self.node.create_publisher(&topic.0, Some(qos.into()))?;
Ok(Box::new(#publisher_name(publisher)))
}

#[allow(non_snake_case)]
pub fn #create_subscription(&mut self, topic: &Box<#topic_name>, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result<Box<#subscription_name>> {
let subscription = self.0.create_subscription::<ffi::#struct_raw_name>(&topic.0, Some(qos.into()))?;
let subscription = self.node.create_subscription::<ffi::#struct_raw_name>(&topic.0, Some(qos.into()))?;
let stream = futures_lite::stream::unfold(subscription, |sub| async {
let item = sub.async_take().await;
let item_boxed: Box<dyn std::any::Any + 'static> = Box::new(item);


+ 115
- 7
libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs View File

@@ -89,28 +89,136 @@ impl Service {
let self_name = format_ident!("{}", self.name);
let self_name_str = &self.name;

let wait_for_service = format_ident!("wait_for_service__{package_name}__{}", self.name);
let cxx_wait_for_service = format_ident!("wait_for_service");
let send_request = format_ident!("send_request__{package_name}__{}", self.name);
let cxx_send_request = format_ident!("send_request");
let req_type_raw = format_ident!("{package_name}__{}_Request", self.name);
let res_type_raw = format_ident!("{package_name}__{}_Response", self.name);
let res_type_raw_str = res_type_raw.to_string();

let matches = format_ident!("matches__{package_name}__{}", self.name);
let cxx_matches = format_ident!("matches");
let downcast = format_ident!("downcast__{package_name}__{}", self.name);
let cxx_downcast = format_ident!("downcast");

let def = quote! {
#[namespace = #package_name]
#[cxx_name = #cxx_client_name]
type #client_name;
// TODO: add `merged_streams` argument (for sending replies)
#[cxx_name = #cxx_create_client]
fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies) -> Result<Box<#client_name>>;
fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#client_name>>;

#[namespace = #package_name]
#[cxx_name = #cxx_wait_for_service]
fn #wait_for_service(self: &mut #client_name, node: &Box<Ros2Node>) -> Result<()>;
#[namespace = #package_name]
#[cxx_name = #cxx_send_request]
fn #send_request(self: &mut #client_name, request: #req_type_raw) -> Result<()>;
#[namespace = #package_name]
#[cxx_name = #cxx_matches]
fn #matches(self: &#client_name, event: &CombinedEvent) -> bool;
#[namespace = #package_name]
#[cxx_name = #cxx_downcast]
fn #downcast(self: &#client_name, event: CombinedEvent) -> Result<#res_type_raw>;
};
let imp = quote! {
#[allow(non_camel_case_types)]
pub struct #client_name(ros2_client::service::Client< #package :: service :: #self_name >);

impl Ros2Node {
#[allow(non_snake_case)]
pub fn #create_client(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result<Box<#client_name>> {
let client = self.0.create_client::< #package :: service :: #self_name >(
pub fn #create_client(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result<Box<#client_name>> {
use futures::StreamExt as _;

let client = self.node.create_client::< #package :: service :: #self_name >(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new(name_space, base_name).unwrap(),
&ros2_client::ServiceTypeName::new(#package_name, #self_name_str),
qos.clone().into(),
qos.into(),
)?;
Ok(Box::new(#client_name(client)))
let (response_tx, response_rx) = flume::bounded(1);
let stream = response_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box<dyn std::any::Any + 'static>);
let id = events.events.merge(Box::pin(stream));

Ok(Box::new(#client_name {
client: std::sync::Arc::new(client),
response_tx: std::sync::Arc::new(response_tx),
executor: self.executor.clone(),
stream_id: id,
}))
}
}

#[allow(non_camel_case_types)]
pub struct #client_name {
client: std::sync::Arc<ros2_client::service::Client< #package :: service :: #self_name >>,
response_tx: std::sync::Arc<flume::Sender<eyre::Result<ffi::#res_type_raw>>>,
executor: std::sync::Arc<futures::executor::ThreadPool>,
stream_id: u32,
}

impl #client_name {
#[allow(non_snake_case)]
fn #wait_for_service(self: &mut #client_name, node: &Box<Ros2Node>) -> eyre::Result<()> {
let service_ready = async {
for _ in 0..10 {
let ready = self.client.wait_for_service(&node.node);
futures::pin_mut!(ready);
let timeout = futures_timer::Delay::new(std::time::Duration::from_secs(2));
match futures::future::select(ready, timeout).await {
futures::future::Either::Left(((), _)) => {
return Ok(());
}
futures::future::Either::Right(_) => {
eprintln!("timeout while waiting for service, retrying");
}
}
}
eyre::bail!("service not available");
};
futures::executor::block_on(service_ready)?;
Ok(())
}

#[allow(non_snake_case)]
fn #send_request(&mut self, request: ffi::#req_type_raw) -> eyre::Result<()> {
use eyre::WrapErr;
use futures::task::SpawnExt as _;

let request_id = futures::executor::block_on(self.client.async_send_request(request.clone())).context("failed to send request")?;
let client = self.client.clone();
let response_tx = self.response_tx.clone();
let send_result = async move {
let response = client.async_receive_response(request_id).await.with_context(|| format!("failed to receive response for request {request_id:?}"));
if response_tx.send_async(response).await.is_err() {
tracing::warn!("failed to send service response");
}
};
self.executor.spawn(send_result).context("failed to spawn response task")?;
Ok(())
}

#[allow(non_snake_case)]
fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool {
match &event.event.as_ref().0 {
Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true,
_ => false
}
}
#[allow(non_snake_case)]
fn #downcast(&self, event: crate::ffi::CombinedEvent) -> eyre::Result<ffi::#res_type_raw> {
use eyre::WrapErr;

match (*event.event).0 {
Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => {
let result = event.event.downcast::<eyre::Result<ffi::#res_type_raw>>()
.map_err(|_| eyre::eyre!("downcast to {} failed", #res_type_raw_str))?;

let data = result.with_context(|| format!("failed to receive {} response", #self_name_str))?;
Ok(data)
},
_ => eyre::bail!("not a {} response event", #self_name_str),
}
}
}
};


+ 4
- 0
libraries/extensions/ros2-bridge/src/lib.rs View File

@@ -1,5 +1,9 @@
pub use flume;
pub use futures;
pub use futures_timer;
pub use ros2_client;
pub use rustdds;
pub use tracing;

#[cfg(feature = "generate-messages")]
pub mod messages {


Loading…
Cancel
Save