| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
e9f102caa7
|
Fix unused import warning | 1 year ago |
|
|
d4ed69fe97
|
Add some debug output | 1 year ago |
|
|
64bde8cd00
|
Enable debug logging on CI | 1 year ago |
|
|
a072a7e3e4
|
Create C++ functions for constructing service servers | 1 year ago |
|
|
db03eedc7b
|
Add example for ROS2 service server using `ros2-client` crate | 1 year ago |
| @@ -8,7 +8,7 @@ on: | |||
| workflow_dispatch: | |||
| env: | |||
| RUST_LOG: INFO | |||
| RUST_LOG: DEBUG | |||
| jobs: | |||
| test: | |||
| @@ -316,6 +316,7 @@ pub async fn spawn_node( | |||
| let (log_finish_tx, log_finish_rx) = oneshot::channel(); | |||
| tokio::spawn(async move { | |||
| let exit_status = NodeExitStatus::from(child.wait().await); | |||
| tracing::debug!("node exited with status: {exit_status:?}"); | |||
| let _ = log_finish_rx.await; | |||
| let event = DoraEvent::SpawnedNodeResult { | |||
| dataflow_id, | |||
| @@ -8,3 +8,10 @@ nodes: | |||
| service_timer: dora/timer/secs/1 | |||
| outputs: | |||
| - pose | |||
| - finished | |||
| - id: service-server | |||
| custom: | |||
| build: cargo build -p rust-ros2-dataflow-example-node --features ros2 | |||
| source: ../../target/debug/rust-ros2-dataflow-service-server | |||
| inputs: | |||
| exit: rust-node/finished | |||
| @@ -13,6 +13,11 @@ ros2 = [] | |||
| name = "rust-ros2-dataflow-example-node" | |||
| required-features = ["ros2"] | |||
| [[bin]] | |||
| path = "src/bin/service-server.rs" | |||
| name = "rust-ros2-dataflow-service-server" | |||
| required-features = ["ros2"] | |||
| [dependencies] | |||
| dora-node-api = { workspace = true, features = ["tracing"] } | |||
| eyre = "0.6.8" | |||
| @@ -0,0 +1,97 @@ | |||
| use std::time::Duration; | |||
| use dora_node_api::{ | |||
| self, | |||
| dora_core::config::DataId, | |||
| merged::{MergeExternal, MergedEvent}, | |||
| DoraNode, Event, | |||
| }; | |||
| use dora_ros2_bridge::{ | |||
| messages::{ | |||
| example_interfaces::service::{AddTwoInts, AddTwoIntsRequest, AddTwoIntsResponse}, | |||
| geometry_msgs::msg::{Twist, Vector3}, | |||
| turtlesim::msg::Pose, | |||
| }, | |||
| ros2_client::{self, ros2, NodeOptions}, | |||
| rustdds::{self, policy}, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| use futures::task::SpawnExt; | |||
| fn main() -> eyre::Result<()> { | |||
| let mut ros_node = init_ros_node()?; | |||
| let service_qos = { | |||
| rustdds::QosPolicyBuilder::new() | |||
| .reliability(policy::Reliability::Reliable { | |||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||
| }) | |||
| .history(policy::History::KeepLast { depth: 10 }) | |||
| .build() | |||
| }; | |||
| let server = ros_node | |||
| .create_server::<AddTwoInts>( | |||
| ros2_client::ServiceMapping::Enhanced, | |||
| &ros2_client::Name::new("/", "add_two_ints_custom").unwrap(), | |||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||
| service_qos.clone(), | |||
| service_qos.clone(), | |||
| ) | |||
| .context("failed to create service server")?; | |||
| let (node, dora_events) = DoraNode::init_from_env()?; | |||
| let merged = dora_events.merge_external(Box::pin(server.receive_request_stream())); | |||
| let mut events = futures::executor::block_on_stream(merged); | |||
| loop { | |||
| let event = match events.next() { | |||
| Some(input) => input, | |||
| None => break, | |||
| }; | |||
| match event { | |||
| MergedEvent::Dora(event) => match event { | |||
| Event::Input { | |||
| id, | |||
| metadata: _, | |||
| data: _, | |||
| } => match id.as_str() { | |||
| "exit" => { | |||
| println!("received exit signal"); | |||
| break; | |||
| } | |||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||
| }, | |||
| Event::Stop => println!("Received manual stop"), | |||
| other => eprintln!("Received unexpected input: {other:?}"), | |||
| }, | |||
| MergedEvent::External(request) => match request { | |||
| Ok((id, request)) => { | |||
| let response = AddTwoIntsResponse { | |||
| sum: request.a.wrapping_add(request.b), | |||
| }; | |||
| println!("replying to incoming request {id:?} {request:?} with response {response:?}"); | |||
| server | |||
| .send_response(id, response) | |||
| .context("failed to send response")?; | |||
| } | |||
| Err(err) => eprintln!("error while receiving incoming request: {err:?}"), | |||
| }, | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| fn init_ros_node() -> eyre::Result<ros2_client::Node> { | |||
| let ros_context = ros2_client::Context::new().unwrap(); | |||
| ros_context | |||
| .new_node( | |||
| ros2_client::NodeName::new("/ros2_demo", "service_server_example") | |||
| .map_err(|e| eyre!("failed to create ROS2 node name: {e}"))?, | |||
| NodeOptions::new().enable_rosout(true), | |||
| ) | |||
| .context("failed to create ros2 node") | |||
| } | |||
| @@ -4,7 +4,7 @@ use dora_node_api::{ | |||
| self, | |||
| dora_core::config::DataId, | |||
| merged::{MergeExternal, MergedEvent}, | |||
| DoraNode, Event, | |||
| DoraNode, Event, IntoArrow, | |||
| }; | |||
| use dora_ros2_bridge::{ | |||
| messages::{ | |||
| @@ -12,7 +12,7 @@ use dora_ros2_bridge::{ | |||
| geometry_msgs::msg::{Twist, Vector3}, | |||
| turtlesim::msg::Pose, | |||
| }, | |||
| ros2_client::{self, ros2, NodeOptions}, | |||
| ros2_client::{self, ros2, Client, NodeOptions}, | |||
| rustdds::{self, policy}, | |||
| }; | |||
| use eyre::{eyre, Context}; | |||
| @@ -33,43 +33,11 @@ fn main() -> eyre::Result<()> { | |||
| }) | |||
| .context("failed to spawn ros2 spinner")?; | |||
| // create an example service client | |||
| let service_qos = { | |||
| rustdds::QosPolicyBuilder::new() | |||
| .reliability(policy::Reliability::Reliable { | |||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||
| }) | |||
| .history(policy::History::KeepLast { depth: 1 }) | |||
| .build() | |||
| }; | |||
| let add_client = ros_node.create_client::<AddTwoInts>( | |||
| ros2_client::ServiceMapping::Enhanced, | |||
| &ros2_client::Name::new("/", "add_two_ints").unwrap(), | |||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||
| service_qos.clone(), | |||
| service_qos.clone(), | |||
| )?; | |||
| let add_client = create_add_client(&mut ros_node, "add_two_ints")?; | |||
| let add_client_custom = create_add_client(&mut ros_node, "add_two_ints_custom")?; | |||
| // wait until the service server is ready | |||
| println!("wait for add_two_ints service"); | |||
| let service_ready = async { | |||
| for _ in 0..10 { | |||
| let ready = add_client.wait_for_service(&ros_node); | |||
| futures::pin_mut!(ready); | |||
| let timeout = futures_timer::Delay::new(Duration::from_secs(2)); | |||
| match futures::future::select(ready, timeout).await { | |||
| futures::future::Either::Left(((), _)) => { | |||
| println!("add_two_ints service is ready"); | |||
| return Ok(()); | |||
| } | |||
| futures::future::Either::Right(_) => { | |||
| println!("timeout while waiting for add_two_ints service, retrying"); | |||
| } | |||
| } | |||
| } | |||
| eyre::bail!("add_two_ints service not available"); | |||
| }; | |||
| futures::executor::block_on(service_ready)?; | |||
| wait_for_service(&add_client, &mut ros_node, "add_two_ints")?; | |||
| wait_for_service(&add_client_custom, &mut ros_node, "add_two_ints_custom")?; | |||
| let output = DataId::from("pose".to_owned()); | |||
| @@ -114,6 +82,16 @@ fn main() -> eyre::Result<()> { | |||
| if sum != a.wrapping_add(b) { | |||
| eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) | |||
| } | |||
| let service_result_custom = add_two_ints_request(&add_client_custom, a, b); | |||
| let sum = futures::executor::block_on(service_result_custom) | |||
| .context("failed to send custom service request")?; | |||
| if sum != a.wrapping_add(b) { | |||
| eyre::bail!( | |||
| "unexpected addition result from custom service: expected {}, got {sum}", | |||
| a + b | |||
| ) | |||
| } | |||
| } | |||
| other => eprintln!("Ignoring unexpected input `{other}`"), | |||
| }, | |||
| @@ -135,9 +113,66 @@ fn main() -> eyre::Result<()> { | |||
| } | |||
| } | |||
| node.send_output( | |||
| DataId::from("finished".to_owned()), | |||
| Default::default(), | |||
| true.into_arrow(), | |||
| ) | |||
| .context("failed to send `finished` output")?; | |||
| Ok(()) | |||
| } | |||
| fn wait_for_service( | |||
| client: &Client<AddTwoInts>, | |||
| ros_node: &mut ros2_client::Node, | |||
| service_name: &str, | |||
| ) -> Result<(), eyre::Error> { | |||
| println!("wait for {service_name} service"); | |||
| let service_ready = async { | |||
| for _ in 0..10 { | |||
| let ready = client.wait_for_service(&ros_node); | |||
| futures::pin_mut!(ready); | |||
| let timeout = futures_timer::Delay::new(Duration::from_secs(2)); | |||
| match futures::future::select(ready, timeout).await { | |||
| futures::future::Either::Left(((), _)) => { | |||
| println!("add_two_ints service is ready"); | |||
| return Ok(()); | |||
| } | |||
| futures::future::Either::Right(_) => { | |||
| println!("timeout while waiting for {service_name} service, retrying"); | |||
| } | |||
| } | |||
| } | |||
| eyre::bail!("{service_name} service not available"); | |||
| }; | |||
| futures::executor::block_on(service_ready)?; | |||
| Ok(()) | |||
| } | |||
| fn create_add_client( | |||
| ros_node: &mut ros2_client::Node, | |||
| service_name: &str, | |||
| ) -> Result<ros2_client::Client<AddTwoInts>, eyre::Error> { | |||
| let service_qos = { | |||
| rustdds::QosPolicyBuilder::new() | |||
| .reliability(policy::Reliability::Reliable { | |||
| max_blocking_time: rustdds::Duration::from_millis(100), | |||
| }) | |||
| .history(policy::History::KeepLast { depth: 1 }) | |||
| .build() | |||
| }; | |||
| let add_client = ros_node.create_client::<AddTwoInts>( | |||
| ros2_client::ServiceMapping::Enhanced, | |||
| &ros2_client::Name::new("/", service_name).unwrap(), | |||
| &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), | |||
| service_qos.clone(), | |||
| service_qos.clone(), | |||
| )?; | |||
| Ok(add_client) | |||
| } | |||
| async fn add_two_ints_request( | |||
| add_client: &ros2_client::Client<AddTwoInts>, | |||
| a: i64, | |||
| @@ -140,6 +140,12 @@ where | |||
| } | |||
| } | |||
| impl Drop for Ros2Context { | |||
| fn drop(&mut self) { | |||
| tracing::debug!("dropping Ros2Context"); | |||
| } | |||
| } | |||
| struct Ros2Node { | |||
| node : ros2_client::Node, | |||
| executor: std::sync::Arc<futures::executor::ThreadPool>, | |||
| @@ -85,6 +85,11 @@ impl Service { | |||
| let create_client = format_ident!("new_Client__{package_name}__{}", self.name); | |||
| let cxx_create_client = format!("create_client_{package_name}_{}", self.name); | |||
| let server_name = format_ident!("Server__{package_name}__{}", self.name); | |||
| let cxx_server_name = format_ident!("Server_{}", self.name); | |||
| let create_server = format_ident!("new_Server__{package_name}__{}", self.name); | |||
| let cxx_create_server = format!("create_server_{package_name}_{}", self.name); | |||
| let package = format_ident!("{package_name}"); | |||
| let self_name = format_ident!("{}", self.name); | |||
| let self_name_str = &self.name; | |||
| @@ -94,6 +99,7 @@ impl Service { | |||
| let send_request = format_ident!("send_request__{package_name}__{}", self.name); | |||
| let cxx_send_request = format_ident!("send_request"); | |||
| let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); | |||
| let req_type_raw_str = req_type_raw.to_string(); | |||
| let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); | |||
| let res_type_raw_str = res_type_raw.to_string(); | |||
| @@ -106,10 +112,15 @@ impl Service { | |||
| #[namespace = #package_name] | |||
| #[cxx_name = #cxx_client_name] | |||
| type #client_name; | |||
| // TODO: add `merged_streams` argument (for sending replies) | |||
| #[cxx_name = #cxx_create_client] | |||
| fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#client_name>>; | |||
| #[namespace = #package_name] | |||
| #[cxx_name = #cxx_server_name] | |||
| type #server_name; | |||
| #[cxx_name = #cxx_create_server] | |||
| fn #create_server(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#server_name>>; | |||
| #[namespace = #package_name] | |||
| #[cxx_name = #cxx_wait_for_service] | |||
| fn #wait_for_service(self: &mut #client_name, node: &Box<Ros2Node>) -> Result<()>; | |||
| @@ -147,6 +158,29 @@ impl Service { | |||
| stream_id: id, | |||
| })) | |||
| } | |||
| #[allow(non_snake_case)] | |||
| pub fn #create_server(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result<Box<#server_name>> { | |||
| let server = self.node.create_server::< #package :: service :: #self_name >( | |||
| ros2_client::ServiceMapping::Enhanced, | |||
| &ros2_client::Name::new(name_space, base_name).unwrap(), | |||
| &ros2_client::ServiceTypeName::new(#package_name, #self_name_str), | |||
| qos.clone().into(), | |||
| qos.into(), | |||
| )?; | |||
| let server = std::sync::Arc::new(server); | |||
| let stream = futures::stream::unfold(server.clone(), |server| async move { | |||
| let result = server.async_receive_request().await; | |||
| Some((Box::new(result) as Box<dyn std::any::Any + 'static>, server)) | |||
| }); | |||
| let id = events.events.merge(Box::pin(stream)); | |||
| Ok(Box::new(#server_name { | |||
| server, | |||
| stream_id: id, | |||
| })) | |||
| } | |||
| } | |||
| #[allow(non_camel_case_types)] | |||
| @@ -221,6 +255,38 @@ impl Service { | |||
| } | |||
| } | |||
| } | |||
| #[allow(non_camel_case_types)] | |||
| pub struct #server_name { | |||
| server: std::sync::Arc<ros2_client::service::Server< #package :: service :: #self_name >>, | |||
| stream_id: u32, | |||
| } | |||
| impl #server_name { | |||
| #[allow(non_snake_case)] | |||
| fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool { | |||
| match &event.event.as_ref().0 { | |||
| Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true, | |||
| _ => false | |||
| } | |||
| } | |||
| #[allow(non_snake_case)] | |||
| fn #downcast(&self, event: crate::ffi::CombinedEvent) -> eyre::Result<ffi::#req_type_raw> { | |||
| use eyre::WrapErr; | |||
| match (*event.event).0 { | |||
| Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => { | |||
| let result = event.event.downcast::<eyre::Result<ffi::#req_type_raw>>() | |||
| .map_err(|_| eyre::eyre!("downcast to {} failed", #req_type_raw_str))?; | |||
| let data = result.with_context(|| format!("failed to receive {} request", #self_name_str))?; | |||
| Ok(data) | |||
| }, | |||
| _ => eyre::bail!("not a {} request event", #self_name_str), | |||
| } | |||
| } | |||
| } | |||
| }; | |||
| (def, imp) | |||
| } | |||