| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
e9f102caa7
|
Fix unused import warning | 1 year ago |
|
|
d4ed69fe97
|
Add some debug output | 1 year ago |
|
|
64bde8cd00
|
Enable debug logging on CI | 1 year ago |
|
|
a072a7e3e4
|
Create C++ functions for constructing service servers | 1 year ago |
|
|
db03eedc7b
|
Add example for ROS2 service server using `ros2-client` crate | 1 year ago |
| @@ -8,7 +8,7 @@ on: | |||||
| workflow_dispatch: | workflow_dispatch: | ||||
| env: | env: | ||||
| RUST_LOG: INFO | |||||
| RUST_LOG: DEBUG | |||||
| jobs: | jobs: | ||||
| test: | test: | ||||
| @@ -316,6 +316,7 @@ pub async fn spawn_node( | |||||
| let (log_finish_tx, log_finish_rx) = oneshot::channel(); | let (log_finish_tx, log_finish_rx) = oneshot::channel(); | ||||
| tokio::spawn(async move { | tokio::spawn(async move { | ||||
| let exit_status = NodeExitStatus::from(child.wait().await); | let exit_status = NodeExitStatus::from(child.wait().await); | ||||
| tracing::debug!("node exited with status: {exit_status:?}"); | |||||
| let _ = log_finish_rx.await; | let _ = log_finish_rx.await; | ||||
| let event = DoraEvent::SpawnedNodeResult { | let event = DoraEvent::SpawnedNodeResult { | ||||
| dataflow_id, | dataflow_id, | ||||
| @@ -8,3 +8,10 @@ nodes: | |||||
| service_timer: dora/timer/secs/1 | service_timer: dora/timer/secs/1 | ||||
| outputs: | outputs: | ||||
| - pose | - pose | ||||
| - finished | |||||
| - id: service-server | |||||
| custom: | |||||
| build: cargo build -p rust-ros2-dataflow-example-node --features ros2 | |||||
| source: ../../target/debug/rust-ros2-dataflow-service-server | |||||
| inputs: | |||||
| exit: rust-node/finished | |||||
| @@ -13,6 +13,11 @@ ros2 = [] | |||||
| name = "rust-ros2-dataflow-example-node" | name = "rust-ros2-dataflow-example-node" | ||||
| required-features = ["ros2"] | required-features = ["ros2"] | ||||
| [[bin]] | |||||
| path = "src/bin/service-server.rs" | |||||
| name = "rust-ros2-dataflow-service-server" | |||||
| required-features = ["ros2"] | |||||
| [dependencies] | [dependencies] | ||||
| dora-node-api = { workspace = true, features = ["tracing"] } | dora-node-api = { workspace = true, features = ["tracing"] } | ||||
| eyre = "0.6.8" | eyre = "0.6.8" | ||||
| @@ -0,0 +1,97 @@ | |||||
| use std::time::Duration; | |||||
| use dora_node_api::{ | |||||
| self, | |||||
| dora_core::config::DataId, | |||||
| merged::{MergeExternal, MergedEvent}, | |||||
| DoraNode, Event, | |||||
| }; | |||||
| use dora_ros2_bridge::{ | |||||
| messages::{ | |||||
| example_interfaces::service::{AddTwoInts, AddTwoIntsRequest, AddTwoIntsResponse}, | |||||
| geometry_msgs::msg::{Twist, Vector3}, | |||||
| turtlesim::msg::Pose, | |||||
| }, | |||||
| ros2_client::{self, ros2, NodeOptions}, | |||||
| rustdds::{self, policy}, | |||||
| }; | |||||
| use eyre::{eyre, Context}; | |||||
| use futures::task::SpawnExt; | |||||
| fn main() -> eyre::Result<()> { | |||||
| let mut ros_node = init_ros_node()?; | |||||
| let service_qos = { | |||||
| rustdds::QosPolicyBuilder::new() | |||||
| .reliability(policy::Reliability::Reliable { | |||||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||||
| }) | |||||
| .history(policy::History::KeepLast { depth: 10 }) | |||||
| .build() | |||||
| }; | |||||
| let server = ros_node | |||||
| .create_server::<AddTwoInts>( | |||||
| ros2_client::ServiceMapping::Enhanced, | |||||
| &ros2_client::Name::new("/", "add_two_ints_custom").unwrap(), | |||||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||||
| service_qos.clone(), | |||||
| service_qos.clone(), | |||||
| ) | |||||
| .context("failed to create service server")?; | |||||
| let (node, dora_events) = DoraNode::init_from_env()?; | |||||
| let merged = dora_events.merge_external(Box::pin(server.receive_request_stream())); | |||||
| let mut events = futures::executor::block_on_stream(merged); | |||||
| loop { | |||||
| let event = match events.next() { | |||||
| Some(input) => input, | |||||
| None => break, | |||||
| }; | |||||
| match event { | |||||
| MergedEvent::Dora(event) => match event { | |||||
| Event::Input { | |||||
| id, | |||||
| metadata: _, | |||||
| data: _, | |||||
| } => match id.as_str() { | |||||
| "exit" => { | |||||
| println!("received exit signal"); | |||||
| break; | |||||
| } | |||||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||||
| }, | |||||
| Event::Stop => println!("Received manual stop"), | |||||
| other => eprintln!("Received unexpected input: {other:?}"), | |||||
| }, | |||||
| MergedEvent::External(request) => match request { | |||||
| Ok((id, request)) => { | |||||
| let response = AddTwoIntsResponse { | |||||
| sum: request.a.wrapping_add(request.b), | |||||
| }; | |||||
| println!("replying to incoming request {id:?} {request:?} with response {response:?}"); | |||||
| server | |||||
| .send_response(id, response) | |||||
| .context("failed to send response")?; | |||||
| } | |||||
| Err(err) => eprintln!("error while receiving incoming request: {err:?}"), | |||||
| }, | |||||
| } | |||||
| } | |||||
| Ok(()) | |||||
| } | |||||
| fn init_ros_node() -> eyre::Result<ros2_client::Node> { | |||||
| let ros_context = ros2_client::Context::new().unwrap(); | |||||
| ros_context | |||||
| .new_node( | |||||
| ros2_client::NodeName::new("/ros2_demo", "service_server_example") | |||||
| .map_err(|e| eyre!("failed to create ROS2 node name: {e}"))?, | |||||
| NodeOptions::new().enable_rosout(true), | |||||
| ) | |||||
| .context("failed to create ros2 node") | |||||
| } | |||||
| @@ -4,7 +4,7 @@ use dora_node_api::{ | |||||
| self, | self, | ||||
| dora_core::config::DataId, | dora_core::config::DataId, | ||||
| merged::{MergeExternal, MergedEvent}, | merged::{MergeExternal, MergedEvent}, | ||||
| DoraNode, Event, | |||||
| DoraNode, Event, IntoArrow, | |||||
| }; | }; | ||||
| use dora_ros2_bridge::{ | use dora_ros2_bridge::{ | ||||
| messages::{ | messages::{ | ||||
| @@ -12,7 +12,7 @@ use dora_ros2_bridge::{ | |||||
| geometry_msgs::msg::{Twist, Vector3}, | geometry_msgs::msg::{Twist, Vector3}, | ||||
| turtlesim::msg::Pose, | turtlesim::msg::Pose, | ||||
| }, | }, | ||||
| ros2_client::{self, ros2, NodeOptions}, | |||||
| ros2_client::{self, ros2, Client, NodeOptions}, | |||||
| rustdds::{self, policy}, | rustdds::{self, policy}, | ||||
| }; | }; | ||||
| use eyre::{eyre, Context}; | use eyre::{eyre, Context}; | ||||
| @@ -33,43 +33,11 @@ fn main() -> eyre::Result<()> { | |||||
| }) | }) | ||||
| .context("failed to spawn ros2 spinner")?; | .context("failed to spawn ros2 spinner")?; | ||||
| // create an example service client | |||||
| let service_qos = { | |||||
| rustdds::QosPolicyBuilder::new() | |||||
| .reliability(policy::Reliability::Reliable { | |||||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||||
| }) | |||||
| .history(policy::History::KeepLast { depth: 1 }) | |||||
| .build() | |||||
| }; | |||||
| let add_client = ros_node.create_client::<AddTwoInts>( | |||||
| ros2_client::ServiceMapping::Enhanced, | |||||
| &ros2_client::Name::new("/", "add_two_ints").unwrap(), | |||||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||||
| service_qos.clone(), | |||||
| service_qos.clone(), | |||||
| )?; | |||||
| let add_client = create_add_client(&mut ros_node, "add_two_ints")?; | |||||
| let add_client_custom = create_add_client(&mut ros_node, "add_two_ints_custom")?; | |||||
| // wait until the service server is ready | |||||
| println!("wait for add_two_ints service"); | |||||
| let service_ready = async { | |||||
| for _ in 0..10 { | |||||
| let ready = add_client.wait_for_service(&ros_node); | |||||
| futures::pin_mut!(ready); | |||||
| let timeout = futures_timer::Delay::new(Duration::from_secs(2)); | |||||
| match futures::future::select(ready, timeout).await { | |||||
| futures::future::Either::Left(((), _)) => { | |||||
| println!("add_two_ints service is ready"); | |||||
| return Ok(()); | |||||
| } | |||||
| futures::future::Either::Right(_) => { | |||||
| println!("timeout while waiting for add_two_ints service, retrying"); | |||||
| } | |||||
| } | |||||
| } | |||||
| eyre::bail!("add_two_ints service not available"); | |||||
| }; | |||||
| futures::executor::block_on(service_ready)?; | |||||
| wait_for_service(&add_client, &mut ros_node, "add_two_ints")?; | |||||
| wait_for_service(&add_client_custom, &mut ros_node, "add_two_ints_custom")?; | |||||
| let output = DataId::from("pose".to_owned()); | let output = DataId::from("pose".to_owned()); | ||||
| @@ -114,6 +82,16 @@ fn main() -> eyre::Result<()> { | |||||
| if sum != a.wrapping_add(b) { | if sum != a.wrapping_add(b) { | ||||
| eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) | eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) | ||||
| } | } | ||||
| let service_result_custom = add_two_ints_request(&add_client_custom, a, b); | |||||
| let sum = futures::executor::block_on(service_result_custom) | |||||
| .context("failed to send custom service request")?; | |||||
| if sum != a.wrapping_add(b) { | |||||
| eyre::bail!( | |||||
| "unexpected addition result from custom service: expected {}, got {sum}", | |||||
| a + b | |||||
| ) | |||||
| } | |||||
| } | } | ||||
| other => eprintln!("Ignoring unexpected input `{other}`"), | other => eprintln!("Ignoring unexpected input `{other}`"), | ||||
| }, | }, | ||||
| @@ -135,9 +113,66 @@ fn main() -> eyre::Result<()> { | |||||
| } | } | ||||
| } | } | ||||
| node.send_output( | |||||
| DataId::from("finished".to_owned()), | |||||
| Default::default(), | |||||
| true.into_arrow(), | |||||
| ) | |||||
| .context("failed to send `finished` output")?; | |||||
| Ok(()) | |||||
| } | |||||
| fn wait_for_service( | |||||
| client: &Client<AddTwoInts>, | |||||
| ros_node: &mut ros2_client::Node, | |||||
| service_name: &str, | |||||
| ) -> Result<(), eyre::Error> { | |||||
| println!("wait for {service_name} service"); | |||||
| let service_ready = async { | |||||
| for _ in 0..10 { | |||||
| let ready = client.wait_for_service(&ros_node); | |||||
| futures::pin_mut!(ready); | |||||
| let timeout = futures_timer::Delay::new(Duration::from_secs(2)); | |||||
| match futures::future::select(ready, timeout).await { | |||||
| futures::future::Either::Left(((), _)) => { | |||||
| println!("add_two_ints service is ready"); | |||||
| return Ok(()); | |||||
| } | |||||
| futures::future::Either::Right(_) => { | |||||
| println!("timeout while waiting for {service_name} service, retrying"); | |||||
| } | |||||
| } | |||||
| } | |||||
| eyre::bail!("{service_name} service not available"); | |||||
| }; | |||||
| futures::executor::block_on(service_ready)?; | |||||
| Ok(()) | Ok(()) | ||||
| } | } | ||||
| fn create_add_client( | |||||
| ros_node: &mut ros2_client::Node, | |||||
| service_name: &str, | |||||
| ) -> Result<ros2_client::Client<AddTwoInts>, eyre::Error> { | |||||
| let service_qos = { | |||||
| rustdds::QosPolicyBuilder::new() | |||||
| .reliability(policy::Reliability::Reliable { | |||||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||||
| }) | |||||
| .history(policy::History::KeepLast { depth: 1 }) | |||||
| .build() | |||||
| }; | |||||
| let add_client = ros_node.create_client::<AddTwoInts>( | |||||
| ros2_client::ServiceMapping::Enhanced, | |||||
| &ros2_client::Name::new("/", service_name).unwrap(), | |||||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||||
| service_qos.clone(), | |||||
| service_qos.clone(), | |||||
| )?; | |||||
| Ok(add_client) | |||||
| } | |||||
| async fn add_two_ints_request( | async fn add_two_ints_request( | ||||
| add_client: &ros2_client::Client<AddTwoInts>, | add_client: &ros2_client::Client<AddTwoInts>, | ||||
| a: i64, | a: i64, | ||||
| @@ -140,6 +140,12 @@ where | |||||
| } | } | ||||
| } | } | ||||
| impl Drop for Ros2Context { | |||||
| fn drop(&mut self) { | |||||
| tracing::debug!("dropping Ros2Context"); | |||||
| } | |||||
| } | |||||
| struct Ros2Node { | struct Ros2Node { | ||||
| node : ros2_client::Node, | node : ros2_client::Node, | ||||
| executor: std::sync::Arc<futures::executor::ThreadPool>, | executor: std::sync::Arc<futures::executor::ThreadPool>, | ||||
| @@ -85,6 +85,11 @@ impl Service { | |||||
| let create_client = format_ident!("new_Client__{package_name}__{}", self.name); | let create_client = format_ident!("new_Client__{package_name}__{}", self.name); | ||||
| let cxx_create_client = format!("create_client_{package_name}_{}", self.name); | let cxx_create_client = format!("create_client_{package_name}_{}", self.name); | ||||
| let server_name = format_ident!("Server__{package_name}__{}", self.name); | |||||
| let cxx_server_name = format_ident!("Server_{}", self.name); | |||||
| let create_server = format_ident!("new_Server__{package_name}__{}", self.name); | |||||
| let cxx_create_server = format!("create_server_{package_name}_{}", self.name); | |||||
| let package = format_ident!("{package_name}"); | let package = format_ident!("{package_name}"); | ||||
| let self_name = format_ident!("{}", self.name); | let self_name = format_ident!("{}", self.name); | ||||
| let self_name_str = &self.name; | let self_name_str = &self.name; | ||||
| @@ -94,6 +99,7 @@ impl Service { | |||||
| let send_request = format_ident!("send_request__{package_name}__{}", self.name); | let send_request = format_ident!("send_request__{package_name}__{}", self.name); | ||||
| let cxx_send_request = format_ident!("send_request"); | let cxx_send_request = format_ident!("send_request"); | ||||
| let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); | let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); | ||||
| let req_type_raw_str = req_type_raw.to_string(); | |||||
| let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); | let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); | ||||
| let res_type_raw_str = res_type_raw.to_string(); | let res_type_raw_str = res_type_raw.to_string(); | ||||
| @@ -106,10 +112,15 @@ impl Service { | |||||
| #[namespace = #package_name] | #[namespace = #package_name] | ||||
| #[cxx_name = #cxx_client_name] | #[cxx_name = #cxx_client_name] | ||||
| type #client_name; | type #client_name; | ||||
| // TODO: add `merged_streams` argument (for sending replies) | |||||
| #[cxx_name = #cxx_create_client] | #[cxx_name = #cxx_create_client] | ||||
| fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#client_name>>; | fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#client_name>>; | ||||
| #[namespace = #package_name] | |||||
| #[cxx_name = #cxx_server_name] | |||||
| type #server_name; | |||||
| #[cxx_name = #cxx_create_server] | |||||
| fn #create_server(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#server_name>>; | |||||
| #[namespace = #package_name] | #[namespace = #package_name] | ||||
| #[cxx_name = #cxx_wait_for_service] | #[cxx_name = #cxx_wait_for_service] | ||||
| fn #wait_for_service(self: &mut #client_name, node: &Box<Ros2Node>) -> Result<()>; | fn #wait_for_service(self: &mut #client_name, node: &Box<Ros2Node>) -> Result<()>; | ||||
| @@ -147,6 +158,29 @@ impl Service { | |||||
| stream_id: id, | stream_id: id, | ||||
| })) | })) | ||||
| } | } | ||||
| #[allow(non_snake_case)] | |||||
| pub fn #create_server(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result<Box<#server_name>> { | |||||
| let server = self.node.create_server::< #package :: service :: #self_name >( | |||||
| ros2_client::ServiceMapping::Enhanced, | |||||
| &ros2_client::Name::new(name_space, base_name).unwrap(), | |||||
| &ros2_client::ServiceTypeName::new(#package_name, #self_name_str), | |||||
| qos.clone().into(), | |||||
| qos.into(), | |||||
| )?; | |||||
| let server = std::sync::Arc::new(server); | |||||
| let stream = futures::stream::unfold(server.clone(), |server| async move { | |||||
| let result = server.async_receive_request().await; | |||||
| Some((Box::new(result) as Box<dyn std::any::Any + 'static>, server)) | |||||
| }); | |||||
| let id = events.events.merge(Box::pin(stream)); | |||||
| Ok(Box::new(#server_name { | |||||
| server, | |||||
| stream_id: id, | |||||
| })) | |||||
| } | |||||
| } | } | ||||
| #[allow(non_camel_case_types)] | #[allow(non_camel_case_types)] | ||||
| @@ -221,6 +255,38 @@ impl Service { | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| #[allow(non_camel_case_types)] | |||||
| pub struct #server_name { | |||||
| server: std::sync::Arc<ros2_client::service::Server< #package :: service :: #self_name >>, | |||||
| stream_id: u32, | |||||
| } | |||||
| impl #server_name { | |||||
| #[allow(non_snake_case)] | |||||
| fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool { | |||||
| match &event.event.as_ref().0 { | |||||
| Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true, | |||||
| _ => false | |||||
| } | |||||
| } | |||||
| #[allow(non_snake_case)] | |||||
| fn #downcast(&self, event: crate::ffi::CombinedEvent) -> eyre::Result<ffi::#req_type_raw> { | |||||
| use eyre::WrapErr; | |||||
| match (*event.event).0 { | |||||
| Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => { | |||||
| let result = event.event.downcast::<eyre::Result<ffi::#req_type_raw>>() | |||||
| .map_err(|_| eyre::eyre!("downcast to {} failed", #req_type_raw_str))?; | |||||
| let data = result.with_context(|| format!("failed to receive {} request", #self_name_str))?; | |||||
| Ok(data) | |||||
| }, | |||||
| _ => eyre::bail!("not a {} request event", #self_name_str), | |||||
| } | |||||
| } | |||||
| } | |||||
| }; | }; | ||||
| (def, imp) | (def, imp) | ||||
| } | } | ||||