Add initial support for calling ROS2 services from Rust nodestags/v0.3.3-rc1
| @@ -174,6 +174,7 @@ jobs: | |||
| QT_QPA_PLATFORM: offscreen | |||
| run: | | |||
| source /opt/ros/humble/setup.bash && ros2 run turtlesim turtlesim_node & | |||
| source /opt/ros/humble/setup.bash && ros2 run examples_rclcpp_minimal_service service_main & | |||
| cargo run --example rust-ros2-dataflow --features="ros2-examples" | |||
| - uses: actions/setup-python@v2 | |||
| if: runner.os != 'Windows' | |||
| @@ -2135,6 +2135,7 @@ dependencies = [ | |||
| "futures-core", | |||
| "futures-task", | |||
| "futures-util", | |||
| "num_cpus", | |||
| ] | |||
| [[package]] | |||
| @@ -2196,9 +2197,9 @@ checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" | |||
| [[package]] | |||
| name = "futures-timer" | |||
| version = "3.0.2" | |||
| version = "3.0.3" | |||
| source = "registry+https://github.com/rust-lang/crates.io-index" | |||
| checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" | |||
| checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" | |||
| [[package]] | |||
| name = "futures-util" | |||
| @@ -4751,6 +4752,7 @@ dependencies = [ | |||
| "dora-ros2-bridge", | |||
| "eyre", | |||
| "futures", | |||
| "futures-timer", | |||
| "rand", | |||
| "serde_json", | |||
| "tokio", | |||
| @@ -9,7 +9,7 @@ use dora_node_api::{ | |||
| use eyre::bail; | |||
| #[cfg(feature = "ros2-bridge")] | |||
| use dora_ros2_bridge::_core; | |||
| use dora_ros2_bridge::{_core, ros2_client}; | |||
| use futures_lite::{stream, Stream, StreamExt}; | |||
| #[cxx::bridge] | |||
| @@ -11,11 +11,14 @@ This examples requires a sourced ROS2 installation. | |||
| - Follow tasks 1 and 2 of the [ROS2 turtlesim tutorial](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Introducing-Turtlesim/Introducing-Turtlesim.html#id3) | |||
| - Install the turtlesim package | |||
| - Start the turtlesim node through `ros2 run turtlesim turtlesim_node` | |||
| - In a separate terminal, start the `/add_two_ints` service: `ros2 run examples_rclcpp_minimal_service service_main` | |||
| ## Running | |||
| After sourcing the ROS2 installation and starting the `turtlesim` node, you can run this example to move the turtle in random directions: | |||
| After sourcing the ROS2 installation and starting both the `turtlesim` node and the `/add_two_ints` service, you can run this example to move the turtle in random directions: | |||
| ``` | |||
| cargo run --example rust-ros2-dataflow --features ros2-examples | |||
| ``` | |||
| You should see a few random requests in the terminal where you started the `examples_rclcpp_minimal_service`. | |||
| @@ -5,5 +5,6 @@ nodes: | |||
| source: ../../target/debug/rust-ros2-dataflow-example-node | |||
| inputs: | |||
| tick: dora/timer/millis/500 | |||
| service_timer: dora/timer/secs/1 | |||
| outputs: | |||
| - pose | |||
| @@ -16,7 +16,8 @@ required-features = ["ros2"] | |||
| [dependencies] | |||
| dora-node-api = { workspace = true, features = ["tracing"] } | |||
| eyre = "0.6.8" | |||
| futures = "0.3.21" | |||
| futures = { version = "0.3.21", features = ["thread-pool"] } | |||
| futures-timer = "3.0.3" | |||
| rand = "0.8.5" | |||
| tokio = { version = "1.24.2", features = ["rt", "macros"] } | |||
| dora-ros2-bridge = { workspace = true } | |||
| @@ -1,3 +1,5 @@ | |||
| use std::time::Duration; | |||
| use dora_node_api::{ | |||
| self, | |||
| dora_core::config::DataId, | |||
| @@ -6,6 +8,7 @@ use dora_node_api::{ | |||
| }; | |||
| use dora_ros2_bridge::{ | |||
| messages::{ | |||
| example_interfaces::service::{AddTwoInts, AddTwoIntsRequest}, | |||
| geometry_msgs::msg::{Twist, Vector3}, | |||
| turtlesim::msg::Pose, | |||
| }, | |||
| @@ -13,12 +16,61 @@ use dora_ros2_bridge::{ | |||
| rustdds::{self, policy}, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| use futures::task::SpawnExt; | |||
| fn main() -> eyre::Result<()> { | |||
| let mut ros_node = init_ros_node()?; | |||
| let turtle_vel_publisher = create_vel_publisher(&mut ros_node)?; | |||
| let turtle_pose_reader = create_pose_reader(&mut ros_node)?; | |||
| // spawn a background spinner task that is handles service discovery (and other things) | |||
| let pool = futures::executor::ThreadPool::new()?; | |||
| let spinner = ros_node.spinner(); | |||
| pool.spawn(async { | |||
| if let Err(err) = spinner.spin().await { | |||
| eprintln!("ros2 spinner failed: {err:?}"); | |||
| } | |||
| }) | |||
| .context("failed to spawn ros2 spinner")?; | |||
| // create an example service client | |||
| let service_qos = { | |||
| rustdds::QosPolicyBuilder::new() | |||
| .reliability(policy::Reliability::Reliable { | |||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||
| }) | |||
| .history(policy::History::KeepLast { depth: 1 }) | |||
| .build() | |||
| }; | |||
| let add_client = ros_node.create_client::<AddTwoInts>( | |||
| ros2_client::ServiceMapping::Enhanced, | |||
| &ros2_client::Name::new("/", "add_two_ints").unwrap(), | |||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||
| service_qos.clone(), | |||
| service_qos.clone(), | |||
| )?; | |||
| // wait until the service server is ready | |||
| println!("wait for add_two_ints service"); | |||
| let service_ready = async { | |||
| for _ in 0..10 { | |||
| let ready = add_client.wait_for_service(&ros_node); | |||
| futures::pin_mut!(ready); | |||
| let timeout = futures_timer::Delay::new(Duration::from_secs(2)); | |||
| match futures::future::select(ready, timeout).await { | |||
| futures::future::Either::Left(((), _)) => { | |||
| println!("add_two_ints service is ready"); | |||
| return Ok(()); | |||
| } | |||
| futures::future::Either::Right(_) => { | |||
| println!("timeout while waiting for add_two_ints service, retrying"); | |||
| } | |||
| } | |||
| } | |||
| eyre::bail!("add_two_ints service not available"); | |||
| }; | |||
| futures::executor::block_on(service_ready)?; | |||
| let output = DataId::from("pose".to_owned()); | |||
| let (mut node, dora_events) = DoraNode::init_from_env()?; | |||
| @@ -53,6 +105,16 @@ fn main() -> eyre::Result<()> { | |||
| println!("tick {i}, sending {direction:?}"); | |||
| turtle_vel_publisher.publish(direction).unwrap(); | |||
| } | |||
| "service_timer" => { | |||
| let a = rand::random(); | |||
| let b = rand::random(); | |||
| let service_result = add_two_ints_request(&add_client, a, b); | |||
| let sum = futures::executor::block_on(service_result) | |||
| .context("failed to send service request")?; | |||
| if sum != a.wrapping_add(b) { | |||
| eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) | |||
| } | |||
| } | |||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||
| }, | |||
| Event::Stop => println!("Received manual stop"), | |||
| @@ -76,6 +138,31 @@ fn main() -> eyre::Result<()> { | |||
| Ok(()) | |||
| } | |||
| async fn add_two_ints_request( | |||
| add_client: &ros2_client::Client<AddTwoInts>, | |||
| a: i64, | |||
| b: i64, | |||
| ) -> eyre::Result<i64> { | |||
| let request = AddTwoIntsRequest { a, b }; | |||
| println!("sending add request {request:?}"); | |||
| let request_id = add_client.async_send_request(request.clone()).await?; | |||
| println!("{request_id:?}"); | |||
| let response = add_client.async_receive_response(request_id); | |||
| futures::pin_mut!(response); | |||
| let timeout = futures_timer::Delay::new(Duration::from_secs(5)); | |||
| match futures::future::select(response, timeout).await { | |||
| futures::future::Either::Left((Ok(response), _)) => { | |||
| println!("received response: {response:?}"); | |||
| Ok(response.sum) | |||
| } | |||
| futures::future::Either::Left((Err(err), _)) => eyre::bail!(err), | |||
| futures::future::Either::Right(_) => { | |||
| eyre::bail!("timeout while waiting for response"); | |||
| } | |||
| } | |||
| } | |||
| fn init_ros_node() -> eyre::Result<ros2_client::Node> { | |||
| let ros_context = ros2_client::Context::new().unwrap(); | |||
| @@ -25,6 +25,8 @@ where | |||
| let mut message_struct_impls = Vec::new(); | |||
| let mut message_topic_defs = Vec::new(); | |||
| let mut message_topic_impls = Vec::new(); | |||
| let mut service_defs = Vec::new(); | |||
| let mut service_impls = Vec::new(); | |||
| let mut aliases = Vec::new(); | |||
| for package in &packages { | |||
| for message in &package.messages { | |||
| @@ -37,6 +39,13 @@ where | |||
| message_topic_impls.push(topic_impl); | |||
| } | |||
| } | |||
| for service in &package.services { | |||
| let (def, imp) = service.struct_token_stream(&package.name, create_cxx_bridge); | |||
| service_defs.push(def); | |||
| service_impls.push(imp); | |||
| } | |||
| aliases.push(package.aliases_token_stream()); | |||
| } | |||
| @@ -212,6 +221,7 @@ where | |||
| } | |||
| #(#shared_type_defs)* | |||
| #(#service_defs)* | |||
| } | |||
| @@ -227,6 +237,8 @@ where | |||
| #(#message_topic_impls)* | |||
| #(#service_impls)* | |||
| #(#aliases)* | |||
| } | |||
| } | |||
| @@ -179,10 +179,20 @@ impl Message { | |||
| (quote! {}, quote! {}) | |||
| }; | |||
| if self.members.is_empty() { | |||
| (quote! {}, quote! {}) | |||
| let def = if self.members.is_empty() { | |||
| quote! { | |||
| #[allow(non_camel_case_types)] | |||
| #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |||
| #attributes | |||
| pub struct #struct_raw_name { | |||
| #[serde(skip)] | |||
| pub(super) _dummy: u8, | |||
| } | |||
| #cxx_consts | |||
| } | |||
| } else { | |||
| let def = quote! { | |||
| quote! { | |||
| #[allow(non_camel_case_types)] | |||
| #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] | |||
| #attributes | |||
| @@ -191,33 +201,46 @@ impl Message { | |||
| } | |||
| #cxx_consts | |||
| }; | |||
| let impls = quote! { | |||
| impl ffi::#struct_raw_name { | |||
| #(#constants_def_inner)* | |||
| } | |||
| }; | |||
| let default = if self.members.is_empty() { | |||
| quote! { | |||
| Self { | |||
| _dummy: 0, | |||
| } | |||
| } | |||
| } else { | |||
| quote! { | |||
| Self { | |||
| #(#rust_type_default_inner)* | |||
| } | |||
| } | |||
| }; | |||
| let impls = quote! { | |||
| impl ffi::#struct_raw_name { | |||
| #(#constants_def_inner)* | |||
| impl crate::_core::InternalDefault for ffi::#struct_raw_name { | |||
| fn _default() -> Self { | |||
| Self { | |||
| #(#rust_type_default_inner)* | |||
| } | |||
| } | |||
| } | |||
| impl crate::_core::InternalDefault for ffi::#struct_raw_name { | |||
| fn _default() -> Self { | |||
| #default | |||
| } | |||
| } | |||
| impl std::default::Default for ffi::#struct_raw_name { | |||
| #[inline] | |||
| fn default() -> Self { | |||
| crate::_core::InternalDefault::_default() | |||
| } | |||
| impl std::default::Default for ffi::#struct_raw_name { | |||
| #[inline] | |||
| fn default() -> Self { | |||
| crate::_core::InternalDefault::_default() | |||
| } | |||
| } | |||
| #(#cxx_const_impl_inner)* | |||
| }; | |||
| impl crate::ros2_client::Message for ffi::#struct_raw_name {} | |||
| (def, impls) | |||
| } | |||
| #(#cxx_const_impl_inner)* | |||
| }; | |||
| (def, impls) | |||
| } | |||
| pub fn topic_def(&self, package_name: &str) -> (impl ToTokens, impl ToTokens) { | |||
| @@ -289,8 +312,8 @@ impl Message { | |||
| impl Ros2Node { | |||
| #[allow(non_snake_case)] | |||
| pub fn #create_topic(&self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result<Box<#topic_name>> { | |||
| let name = ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; | |||
| let type_name = ros2_client::MessageTypeName::new(#package_name, #self_name); | |||
| let name = crate::ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; | |||
| let type_name = crate::ros2_client::MessageTypeName::new(#package_name, #self_name); | |||
| let topic = self.0.create_topic(&name, type_name, &qos.into())?; | |||
| Ok(Box::new(#topic_name(topic))) | |||
| } | |||
| @@ -316,7 +339,7 @@ impl Message { | |||
| } | |||
| #[allow(non_camel_case_types)] | |||
| pub struct #publisher_name(ros2_client::Publisher<ffi::#struct_raw_name>); | |||
| pub struct #publisher_name(crate::ros2_client::Publisher<ffi::#struct_raw_name>); | |||
| impl #publisher_name { | |||
| #[allow(non_snake_case)] | |||
| @@ -345,7 +368,7 @@ impl Message { | |||
| match (*event.event).0 { | |||
| Some(crate::MergedEvent::External(event)) if event.id == self.id => { | |||
| let result = event.event.downcast::<rustdds::dds::result::ReadResult<(ffi::#struct_raw_name, ros2_client::MessageInfo)>>() | |||
| let result = event.event.downcast::<rustdds::dds::result::ReadResult<(ffi::#struct_raw_name, crate::ros2_client::MessageInfo)>>() | |||
| .map_err(|_| eyre::eyre!("downcast to {} failed", #struct_raw_name_str))?; | |||
| let (data, _info) = result.with_context(|| format!("failed to receive {} event", #subscription_name_str))?; | |||
| @@ -372,18 +395,6 @@ impl Message { | |||
| } | |||
| } | |||
| pub fn token_stream_with_mod(&self, gen_cxx_bridge: bool) -> impl ToTokens { | |||
| let mod_name = format_ident!("_{}", self.name.to_snake_case()); | |||
| let inner = self.token_stream_args(gen_cxx_bridge); | |||
| quote! { | |||
| pub use #mod_name::*; | |||
| mod #mod_name { | |||
| #inner | |||
| } | |||
| } | |||
| } | |||
| pub fn token_stream(&self) -> impl ToTokens { | |||
| self.token_stream_args(false) | |||
| } | |||
| @@ -65,20 +65,20 @@ impl Package { | |||
| } | |||
| } | |||
| fn messages_block(&self, gen_cxx_bridge: bool) -> impl ToTokens { | |||
| if self.messages.is_empty() { | |||
| fn service_aliases(&self, package_name: &Ident) -> impl ToTokens { | |||
| if self.services.is_empty() { | |||
| quote! { | |||
| // empty msg | |||
| } | |||
| } else { | |||
| let items = self | |||
| .messages | |||
| .services | |||
| .iter() | |||
| .map(|v| v.token_stream_with_mod(gen_cxx_bridge)); | |||
| .map(|v| v.alias_token_stream(package_name)); | |||
| quote! { | |||
| pub mod msg { | |||
| pub mod service { | |||
| #(#items)* | |||
| } // msg | |||
| } | |||
| } | |||
| } | |||
| } | |||
| @@ -116,23 +116,23 @@ impl Package { | |||
| pub fn aliases_token_stream(&self) -> impl ToTokens { | |||
| let package_name = Ident::new(&self.name, Span::call_site()); | |||
| let aliases = self.message_aliases(&package_name); | |||
| let service_aliases = self.service_aliases(&package_name); | |||
| quote! { | |||
| pub mod #package_name { | |||
| #aliases | |||
| #service_aliases | |||
| } | |||
| } | |||
| } | |||
| pub fn token_stream(&self, gen_cxx_bridge: bool) -> impl ToTokens { | |||
| let name = Ident::new(&self.name, Span::call_site()); | |||
| let messages_block = self.messages_block(gen_cxx_bridge); | |||
| let services_block = self.services_block(); | |||
| let actions_block = self.actions_block(); | |||
| quote! { | |||
| pub mod #name { | |||
| #messages_block | |||
| #services_block | |||
| #actions_block | |||
| } | |||
| @@ -1,5 +1,6 @@ | |||
| use heck::SnakeCase; | |||
| use quote::{format_ident, quote, ToTokens}; | |||
| use syn::Ident; | |||
| use super::Message; | |||
| @@ -17,6 +18,64 @@ pub struct Service { | |||
| } | |||
| impl Service { | |||
| pub fn struct_token_stream( | |||
| &self, | |||
| package_name: &str, | |||
| gen_cxx_bridge: bool, | |||
| ) -> (impl ToTokens, impl ToTokens) { | |||
| let (request_def, request_impl) = self | |||
| .request | |||
| .struct_token_stream(package_name, gen_cxx_bridge); | |||
| let (response_def, response_impl) = self | |||
| .response | |||
| .struct_token_stream(package_name, gen_cxx_bridge); | |||
| let def = quote! { | |||
| #request_def | |||
| #response_def | |||
| }; | |||
| let impls = quote! { | |||
| #request_impl | |||
| #response_impl | |||
| }; | |||
| (def, impls) | |||
| } | |||
| pub fn alias_token_stream(&self, package_name: &Ident) -> impl ToTokens { | |||
| let srv_type = format_ident!("{}", self.name); | |||
| let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); | |||
| let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); | |||
| let req_type = format_ident!("{}Request", self.name); | |||
| let res_type = format_ident!("{}Response", self.name); | |||
| let request_type_name = req_type.to_string(); | |||
| let response_type_name = res_type.to_string(); | |||
| quote! { | |||
| #[allow(non_camel_case_types)] | |||
| #[derive(std::fmt::Debug)] | |||
| pub struct #srv_type; | |||
| impl crate::ros2_client::Service for #srv_type { | |||
| type Request = #req_type; | |||
| type Response = #res_type; | |||
| fn request_type_name(&self) -> &str { | |||
| #request_type_name | |||
| } | |||
| fn response_type_name(&self) -> &str { | |||
| #response_type_name | |||
| } | |||
| } | |||
| pub use super::super::ffi::#req_type_raw as #req_type; | |||
| pub use super::super::ffi::#res_type_raw as #res_type; | |||
| } | |||
| } | |||
| pub fn token_stream_with_mod(&self) -> impl ToTokens { | |||
| let mod_name = format_ident!("_{}", self.name.to_snake_case()); | |||
| let inner = self.token_stream(); | |||
| @@ -6,7 +6,4 @@ pub mod traits; | |||
| pub use sequence::{FFISeq, OwnedFFISeq, RefFFISeq}; | |||
| pub use string::{FFIString, FFIWString, OwnedFFIString, OwnedFFIWString}; | |||
| pub use traits::{ActionT, FFIFromRust, FFIToRust, InternalDefault, MessageT, ServiceT}; | |||
| pub type ServiceRequestRaw<Srv> = <<Srv as ServiceT>::Request as MessageT>::Raw; | |||
| pub type ServiceResponseRaw<Srv> = <<Srv as ServiceT>::Response as MessageT>::Raw; | |||
| pub use traits::{ActionT, FFIFromRust, FFIToRust, InternalDefault, MessageT}; | |||
| @@ -16,17 +16,12 @@ pub trait MessageT: Default + Send + Sync { | |||
| } | |||
| } | |||
| pub trait ServiceT: Send { | |||
| type Request: MessageT; | |||
| type Response: MessageT; | |||
| } | |||
| pub trait ActionT: Send { | |||
| type Goal: MessageT; | |||
| type Result: MessageT; | |||
| type Feedback: MessageT; | |||
| type SendGoal: ServiceT; | |||
| type GetResult: ServiceT; | |||
| type SendGoal; | |||
| type GetResult; | |||
| type FeedbackMessage: MessageT; | |||
| } | |||