diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 70234f29..17b3e994 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -174,6 +174,7 @@ jobs: QT_QPA_PLATFORM: offscreen run: | source /opt/ros/humble/setup.bash && ros2 run turtlesim turtlesim_node & + source /opt/ros/humble/setup.bash && ros2 run examples_rclcpp_minimal_service service_main & cargo run --example rust-ros2-dataflow --features="ros2-examples" - uses: actions/setup-python@v2 if: runner.os != 'Windows' diff --git a/Cargo.lock b/Cargo.lock index 6414a541..092d125f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2135,6 +2135,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -2196,9 +2197,9 @@ checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-timer" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" @@ -4751,6 +4752,7 @@ dependencies = [ "dora-ros2-bridge", "eyre", "futures", + "futures-timer", "rand", "serde_json", "tokio", diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index 5c5659a2..e3476de8 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -9,7 +9,7 @@ use dora_node_api::{ use eyre::bail; #[cfg(feature = "ros2-bridge")] -use dora_ros2_bridge::_core; +use dora_ros2_bridge::{_core, ros2_client}; use futures_lite::{stream, Stream, StreamExt}; #[cxx::bridge] diff --git a/examples/rust-ros2-dataflow/README.md b/examples/rust-ros2-dataflow/README.md index 6c8445e9..c30e52e6 100644 --- a/examples/rust-ros2-dataflow/README.md +++ b/examples/rust-ros2-dataflow/README.md @@ -11,11 +11,14 @@ This examples requires a sourced ROS2 installation. - Follow tasks 1 and 2 of the [ROS2 turtlesim tutorial](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Introducing-Turtlesim/Introducing-Turtlesim.html#id3) - Install the turtlesim package - Start the turtlesim node through `ros2 run turtlesim turtlesim_node` +- In a separate terminal, start the `/add_two_ints` service: `ros2 run examples_rclcpp_minimal_service service_main` ## Running -After sourcing the ROS2 installation and starting the `turtlesim` node, you can run this example to move the turtle in random directions: +After sourcing the ROS2 installation and starting both the `turtlesim` node and the `/add_two_ints` service, you can run this example to move the turtle in random directions: ``` cargo run --example rust-ros2-dataflow --features ros2-examples ``` + +You should see a few random requests in the terminal where you started the `examples_rclcpp_minimal_service`. diff --git a/examples/rust-ros2-dataflow/dataflow.yml b/examples/rust-ros2-dataflow/dataflow.yml index 83583f75..080f1f5d 100644 --- a/examples/rust-ros2-dataflow/dataflow.yml +++ b/examples/rust-ros2-dataflow/dataflow.yml @@ -5,5 +5,6 @@ nodes: source: ../../target/debug/rust-ros2-dataflow-example-node inputs: tick: dora/timer/millis/500 + service_timer: dora/timer/secs/1 outputs: - pose diff --git a/examples/rust-ros2-dataflow/node/Cargo.toml b/examples/rust-ros2-dataflow/node/Cargo.toml index 90d3efa6..c984455e 100644 --- a/examples/rust-ros2-dataflow/node/Cargo.toml +++ b/examples/rust-ros2-dataflow/node/Cargo.toml @@ -16,7 +16,8 @@ required-features = ["ros2"] [dependencies] dora-node-api = { workspace = true, features = ["tracing"] } eyre = "0.6.8" -futures = "0.3.21" +futures = { version = "0.3.21", features = ["thread-pool"] } +futures-timer = "3.0.3" rand = "0.8.5" tokio = { version = "1.24.2", features = ["rt", "macros"] } dora-ros2-bridge = { workspace = true } diff --git a/examples/rust-ros2-dataflow/node/src/main.rs b/examples/rust-ros2-dataflow/node/src/main.rs index 6d20472f..32ac8cf4 100644 --- a/examples/rust-ros2-dataflow/node/src/main.rs +++ b/examples/rust-ros2-dataflow/node/src/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use dora_node_api::{ self, dora_core::config::DataId, @@ -6,6 +8,7 @@ use dora_node_api::{ }; use dora_ros2_bridge::{ messages::{ + example_interfaces::service::{AddTwoInts, AddTwoIntsRequest}, geometry_msgs::msg::{Twist, Vector3}, turtlesim::msg::Pose, }, @@ -13,12 +16,61 @@ use dora_ros2_bridge::{ rustdds::{self, policy}, }; use eyre::{eyre, Context}; +use futures::task::SpawnExt; fn main() -> eyre::Result<()> { let mut ros_node = init_ros_node()?; let turtle_vel_publisher = create_vel_publisher(&mut ros_node)?; let turtle_pose_reader = create_pose_reader(&mut ros_node)?; + // spawn a background spinner task that is handles service discovery (and other things) + let pool = futures::executor::ThreadPool::new()?; + let spinner = ros_node.spinner(); + pool.spawn(async { + if let Err(err) = spinner.spin().await { + eprintln!("ros2 spinner failed: {err:?}"); + } + }) + .context("failed to spawn ros2 spinner")?; + + // create an example service client + let service_qos = { + rustdds::QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_millis(100), + }) + .history(policy::History::KeepLast { depth: 1 }) + .build() + }; + let add_client = ros_node.create_client::( + ros2_client::ServiceMapping::Enhanced, + &ros2_client::Name::new("/", "add_two_ints").unwrap(), + &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), + service_qos.clone(), + service_qos.clone(), + )?; + + // wait until the service server is ready + println!("wait for add_two_ints service"); + let service_ready = async { + for _ in 0..10 { + let ready = add_client.wait_for_service(&ros_node); + futures::pin_mut!(ready); + let timeout = futures_timer::Delay::new(Duration::from_secs(2)); + match futures::future::select(ready, timeout).await { + futures::future::Either::Left(((), _)) => { + println!("add_two_ints service is ready"); + return Ok(()); + } + futures::future::Either::Right(_) => { + println!("timeout while waiting for add_two_ints service, retrying"); + } + } + } + eyre::bail!("add_two_ints service not available"); + }; + futures::executor::block_on(service_ready)?; + let output = DataId::from("pose".to_owned()); let (mut node, dora_events) = DoraNode::init_from_env()?; @@ -53,6 +105,16 @@ fn main() -> eyre::Result<()> { println!("tick {i}, sending {direction:?}"); turtle_vel_publisher.publish(direction).unwrap(); } + "service_timer" => { + let a = rand::random(); + let b = rand::random(); + let service_result = add_two_ints_request(&add_client, a, b); + let sum = futures::executor::block_on(service_result) + .context("failed to send service request")?; + if sum != a.wrapping_add(b) { + eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) + } + } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), @@ -76,6 +138,31 @@ fn main() -> eyre::Result<()> { Ok(()) } +async fn add_two_ints_request( + add_client: &ros2_client::Client, + a: i64, + b: i64, +) -> eyre::Result { + let request = AddTwoIntsRequest { a, b }; + println!("sending add request {request:?}"); + let request_id = add_client.async_send_request(request.clone()).await?; + println!("{request_id:?}"); + + let response = add_client.async_receive_response(request_id); + futures::pin_mut!(response); + let timeout = futures_timer::Delay::new(Duration::from_secs(5)); + match futures::future::select(response, timeout).await { + futures::future::Either::Left((Ok(response), _)) => { + println!("received response: {response:?}"); + Ok(response.sum) + } + futures::future::Either::Left((Err(err), _)) => eyre::bail!(err), + futures::future::Either::Right(_) => { + eyre::bail!("timeout while waiting for response"); + } + } +} + fn init_ros_node() -> eyre::Result { let ros_context = ros2_client::Context::new().unwrap(); diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs b/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs index 54e7155c..073f985c 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs @@ -25,6 +25,8 @@ where let mut message_struct_impls = Vec::new(); let mut message_topic_defs = Vec::new(); let mut message_topic_impls = Vec::new(); + let mut service_defs = Vec::new(); + let mut service_impls = Vec::new(); let mut aliases = Vec::new(); for package in &packages { for message in &package.messages { @@ -37,6 +39,13 @@ where message_topic_impls.push(topic_impl); } } + + for service in &package.services { + let (def, imp) = service.struct_token_stream(&package.name, create_cxx_bridge); + service_defs.push(def); + service_impls.push(imp); + } + aliases.push(package.aliases_token_stream()); } @@ -212,6 +221,7 @@ where } #(#shared_type_defs)* + #(#service_defs)* } @@ -227,6 +237,8 @@ where #(#message_topic_impls)* + #(#service_impls)* + #(#aliases)* } } diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs index 01bc8327..157d88ce 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs @@ -179,10 +179,20 @@ impl Message { (quote! {}, quote! {}) }; - if self.members.is_empty() { - (quote! {}, quote! {}) + let def = if self.members.is_empty() { + quote! { + #[allow(non_camel_case_types)] + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] + #attributes + pub struct #struct_raw_name { + #[serde(skip)] + pub(super) _dummy: u8, + } + + #cxx_consts + } } else { - let def = quote! { + quote! { #[allow(non_camel_case_types)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #attributes @@ -191,33 +201,46 @@ impl Message { } #cxx_consts - }; - let impls = quote! { - impl ffi::#struct_raw_name { - #(#constants_def_inner)* - + } + }; + let default = if self.members.is_empty() { + quote! { + Self { + _dummy: 0, } + } + } else { + quote! { + Self { + #(#rust_type_default_inner)* + } + } + }; + let impls = quote! { + impl ffi::#struct_raw_name { + #(#constants_def_inner)* - impl crate::_core::InternalDefault for ffi::#struct_raw_name { - fn _default() -> Self { - Self { - #(#rust_type_default_inner)* - } - } + } + + impl crate::_core::InternalDefault for ffi::#struct_raw_name { + fn _default() -> Self { + #default } + } - impl std::default::Default for ffi::#struct_raw_name { - #[inline] - fn default() -> Self { - crate::_core::InternalDefault::_default() - } + impl std::default::Default for ffi::#struct_raw_name { + #[inline] + fn default() -> Self { + crate::_core::InternalDefault::_default() } + } - #(#cxx_const_impl_inner)* - }; + impl crate::ros2_client::Message for ffi::#struct_raw_name {} - (def, impls) - } + #(#cxx_const_impl_inner)* + }; + + (def, impls) } pub fn topic_def(&self, package_name: &str) -> (impl ToTokens, impl ToTokens) { @@ -289,8 +312,8 @@ impl Message { impl Ros2Node { #[allow(non_snake_case)] pub fn #create_topic(&self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result> { - let name = ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; - let type_name = ros2_client::MessageTypeName::new(#package_name, #self_name); + let name = crate::ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; + let type_name = crate::ros2_client::MessageTypeName::new(#package_name, #self_name); let topic = self.0.create_topic(&name, type_name, &qos.into())?; Ok(Box::new(#topic_name(topic))) } @@ -316,7 +339,7 @@ impl Message { } #[allow(non_camel_case_types)] - pub struct #publisher_name(ros2_client::Publisher); + pub struct #publisher_name(crate::ros2_client::Publisher); impl #publisher_name { #[allow(non_snake_case)] @@ -345,7 +368,7 @@ impl Message { match (*event.event).0 { Some(crate::MergedEvent::External(event)) if event.id == self.id => { - let result = event.event.downcast::>() + let result = event.event.downcast::>() .map_err(|_| eyre::eyre!("downcast to {} failed", #struct_raw_name_str))?; let (data, _info) = result.with_context(|| format!("failed to receive {} event", #subscription_name_str))?; @@ -372,18 +395,6 @@ impl Message { } } - pub fn token_stream_with_mod(&self, gen_cxx_bridge: bool) -> impl ToTokens { - let mod_name = format_ident!("_{}", self.name.to_snake_case()); - let inner = self.token_stream_args(gen_cxx_bridge); - - quote! { - pub use #mod_name::*; - mod #mod_name { - #inner - } - } - } - pub fn token_stream(&self) -> impl ToTokens { self.token_stream_args(false) } diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs index ab0ccb3c..b42901ee 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs @@ -65,20 +65,20 @@ impl Package { } } - fn messages_block(&self, gen_cxx_bridge: bool) -> impl ToTokens { - if self.messages.is_empty() { + fn service_aliases(&self, package_name: &Ident) -> impl ToTokens { + if self.services.is_empty() { quote! { // empty msg } } else { let items = self - .messages + .services .iter() - .map(|v| v.token_stream_with_mod(gen_cxx_bridge)); + .map(|v| v.alias_token_stream(package_name)); quote! { - pub mod msg { + pub mod service { #(#items)* - } // msg + } } } } @@ -116,23 +116,23 @@ impl Package { pub fn aliases_token_stream(&self) -> impl ToTokens { let package_name = Ident::new(&self.name, Span::call_site()); let aliases = self.message_aliases(&package_name); + let service_aliases = self.service_aliases(&package_name); quote! { pub mod #package_name { #aliases + #service_aliases } } } pub fn token_stream(&self, gen_cxx_bridge: bool) -> impl ToTokens { let name = Ident::new(&self.name, Span::call_site()); - let messages_block = self.messages_block(gen_cxx_bridge); let services_block = self.services_block(); let actions_block = self.actions_block(); quote! { pub mod #name { - #messages_block #services_block #actions_block } diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs index bc3b8727..f86b4080 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs @@ -1,5 +1,6 @@ use heck::SnakeCase; use quote::{format_ident, quote, ToTokens}; +use syn::Ident; use super::Message; @@ -17,6 +18,64 @@ pub struct Service { } impl Service { + pub fn struct_token_stream( + &self, + package_name: &str, + gen_cxx_bridge: bool, + ) -> (impl ToTokens, impl ToTokens) { + let (request_def, request_impl) = self + .request + .struct_token_stream(package_name, gen_cxx_bridge); + let (response_def, response_impl) = self + .response + .struct_token_stream(package_name, gen_cxx_bridge); + + let def = quote! { + #request_def + #response_def + }; + + let impls = quote! { + #request_impl + #response_impl + }; + + (def, impls) + } + + pub fn alias_token_stream(&self, package_name: &Ident) -> impl ToTokens { + let srv_type = format_ident!("{}", self.name); + let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); + let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); + + let req_type = format_ident!("{}Request", self.name); + let res_type = format_ident!("{}Response", self.name); + + let request_type_name = req_type.to_string(); + let response_type_name = res_type.to_string(); + + quote! { + #[allow(non_camel_case_types)] + #[derive(std::fmt::Debug)] + pub struct #srv_type; + + impl crate::ros2_client::Service for #srv_type { + type Request = #req_type; + type Response = #res_type; + + fn request_type_name(&self) -> &str { + #request_type_name + } + fn response_type_name(&self) -> &str { + #response_type_name + } + } + + pub use super::super::ffi::#req_type_raw as #req_type; + pub use super::super::ffi::#res_type_raw as #res_type; + } + } + pub fn token_stream_with_mod(&self) -> impl ToTokens { let mod_name = format_ident!("_{}", self.name.to_snake_case()); let inner = self.token_stream(); diff --git a/libraries/extensions/ros2-bridge/src/_core/mod.rs b/libraries/extensions/ros2-bridge/src/_core/mod.rs index 167cd9e4..31b88dce 100644 --- a/libraries/extensions/ros2-bridge/src/_core/mod.rs +++ b/libraries/extensions/ros2-bridge/src/_core/mod.rs @@ -6,7 +6,4 @@ pub mod traits; pub use sequence::{FFISeq, OwnedFFISeq, RefFFISeq}; pub use string::{FFIString, FFIWString, OwnedFFIString, OwnedFFIWString}; -pub use traits::{ActionT, FFIFromRust, FFIToRust, InternalDefault, MessageT, ServiceT}; - -pub type ServiceRequestRaw = <::Request as MessageT>::Raw; -pub type ServiceResponseRaw = <::Response as MessageT>::Raw; +pub use traits::{ActionT, FFIFromRust, FFIToRust, InternalDefault, MessageT}; diff --git a/libraries/extensions/ros2-bridge/src/_core/traits.rs b/libraries/extensions/ros2-bridge/src/_core/traits.rs index a40f10c4..709f3407 100644 --- a/libraries/extensions/ros2-bridge/src/_core/traits.rs +++ b/libraries/extensions/ros2-bridge/src/_core/traits.rs @@ -16,17 +16,12 @@ pub trait MessageT: Default + Send + Sync { } } -pub trait ServiceT: Send { - type Request: MessageT; - type Response: MessageT; -} - pub trait ActionT: Send { type Goal: MessageT; type Result: MessageT; type Feedback: MessageT; - type SendGoal: ServiceT; - type GetResult: ServiceT; + type SendGoal; + type GetResult; type FeedbackMessage: MessageT; }