From db03eedc7b17956aa5d9c73b53e135938fbc0372 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 27 Mar 2024 18:03:46 +0100 Subject: [PATCH] Add example for ROS2 service server using `ros2-client` crate --- examples/rust-ros2-dataflow/dataflow.yml | 7 ++ examples/rust-ros2-dataflow/node/Cargo.toml | 5 + .../node/src/bin/service-server.rs | 97 +++++++++++++++ examples/rust-ros2-dataflow/node/src/main.rs | 111 ++++++++++++------ 4 files changed, 182 insertions(+), 38 deletions(-) create mode 100644 examples/rust-ros2-dataflow/node/src/bin/service-server.rs diff --git a/examples/rust-ros2-dataflow/dataflow.yml b/examples/rust-ros2-dataflow/dataflow.yml index 080f1f5d..81d3ad06 100644 --- a/examples/rust-ros2-dataflow/dataflow.yml +++ b/examples/rust-ros2-dataflow/dataflow.yml @@ -8,3 +8,10 @@ nodes: service_timer: dora/timer/secs/1 outputs: - pose + - finished + - id: service-server + custom: + build: cargo build -p rust-ros2-dataflow-example-node --features ros2 + source: ../../target/debug/rust-ros2-dataflow-service-server + inputs: + exit: rust-node/finished diff --git a/examples/rust-ros2-dataflow/node/Cargo.toml b/examples/rust-ros2-dataflow/node/Cargo.toml index c984455e..2d704000 100644 --- a/examples/rust-ros2-dataflow/node/Cargo.toml +++ b/examples/rust-ros2-dataflow/node/Cargo.toml @@ -13,6 +13,11 @@ ros2 = [] name = "rust-ros2-dataflow-example-node" required-features = ["ros2"] +[[bin]] +path = "src/bin/service-server.rs" +name = "rust-ros2-dataflow-service-server" +required-features = ["ros2"] + [dependencies] dora-node-api = { workspace = true, features = ["tracing"] } eyre = "0.6.8" diff --git a/examples/rust-ros2-dataflow/node/src/bin/service-server.rs b/examples/rust-ros2-dataflow/node/src/bin/service-server.rs new file mode 100644 index 00000000..ae109b07 --- /dev/null +++ b/examples/rust-ros2-dataflow/node/src/bin/service-server.rs @@ -0,0 +1,97 @@ +use std::time::Duration; + +use dora_node_api::{ + self, + dora_core::config::DataId, + merged::{MergeExternal, MergedEvent}, + DoraNode, Event, +}; +use dora_ros2_bridge::{ + messages::{ + example_interfaces::service::{AddTwoInts, AddTwoIntsRequest, AddTwoIntsResponse}, + geometry_msgs::msg::{Twist, Vector3}, + turtlesim::msg::Pose, + }, + ros2_client::{self, ros2, NodeOptions}, + rustdds::{self, policy}, +}; +use eyre::{eyre, Context}; +use futures::task::SpawnExt; + +fn main() -> eyre::Result<()> { + let mut ros_node = init_ros_node()?; + + let service_qos = { + rustdds::QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_millis(100), + }) + .history(policy::History::KeepLast { depth: 10 }) + .build() + }; + let server = ros_node + .create_server::( + ros2_client::ServiceMapping::Enhanced, + &ros2_client::Name::new("/", "add_two_ints_custom").unwrap(), + &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), + service_qos.clone(), + service_qos.clone(), + ) + .context("failed to create service server")?; + + let (node, dora_events) = DoraNode::init_from_env()?; + + let merged = dora_events.merge_external(Box::pin(server.receive_request_stream())); + let mut events = futures::executor::block_on_stream(merged); + + loop { + let event = match events.next() { + Some(input) => input, + None => break, + }; + + match event { + MergedEvent::Dora(event) => match event { + Event::Input { + id, + metadata: _, + data: _, + } => match id.as_str() { + "exit" => { + println!("received exit signal"); + break; + } + other => eprintln!("Ignoring unexpected input `{other}`"), + }, + Event::Stop => println!("Received manual stop"), + other => eprintln!("Received unexpected input: {other:?}"), + }, + MergedEvent::External(request) => match request { + Ok((id, request)) => { + let response = AddTwoIntsResponse { + sum: request.a.wrapping_add(request.b), + }; + println!("replying to incoming request {id:?} {request:?} with response {response:?}"); + server + .send_response(id, response) + .context("failed to send response")?; + } + Err(err) => eprintln!("error while receiving incoming request: {err:?}"), + }, + } + } + + Ok(()) +} + +fn init_ros_node() -> eyre::Result { + let ros_context = ros2_client::Context::new().unwrap(); + + ros_context + .new_node( + ros2_client::NodeName::new("/ros2_demo", "service_server_example") + .map_err(|e| eyre!("failed to create ROS2 node name: {e}"))?, + NodeOptions::new().enable_rosout(true), + ) + .context("failed to create ros2 node") +} diff --git a/examples/rust-ros2-dataflow/node/src/main.rs b/examples/rust-ros2-dataflow/node/src/main.rs index 32ac8cf4..8e1b8a46 100644 --- a/examples/rust-ros2-dataflow/node/src/main.rs +++ b/examples/rust-ros2-dataflow/node/src/main.rs @@ -4,7 +4,7 @@ use dora_node_api::{ self, dora_core::config::DataId, merged::{MergeExternal, MergedEvent}, - DoraNode, Event, + DoraNode, Event, IntoArrow, }; use dora_ros2_bridge::{ messages::{ @@ -12,7 +12,7 @@ use dora_ros2_bridge::{ geometry_msgs::msg::{Twist, Vector3}, turtlesim::msg::Pose, }, - ros2_client::{self, ros2, NodeOptions}, + ros2_client::{self, ros2, Client, NodeOptions}, rustdds::{self, policy}, }; use eyre::{eyre, Context}; @@ -33,43 +33,11 @@ fn main() -> eyre::Result<()> { }) .context("failed to spawn ros2 spinner")?; - // create an example service client - let service_qos = { - rustdds::QosPolicyBuilder::new() - .reliability(policy::Reliability::Reliable { - max_blocking_time: rustdds::Duration::from_millis(100), - }) - .history(policy::History::KeepLast { depth: 1 }) - .build() - }; - let add_client = ros_node.create_client::( - ros2_client::ServiceMapping::Enhanced, - &ros2_client::Name::new("/", "add_two_ints").unwrap(), - &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), - service_qos.clone(), - service_qos.clone(), - )?; + let add_client = create_add_client(&mut ros_node, "add_two_ints")?; + let add_client_custom = create_add_client(&mut ros_node, "add_two_ints_custom")?; - // wait until the service server is ready - println!("wait for add_two_ints service"); - let service_ready = async { - for _ in 0..10 { - let ready = add_client.wait_for_service(&ros_node); - futures::pin_mut!(ready); - let timeout = futures_timer::Delay::new(Duration::from_secs(2)); - match futures::future::select(ready, timeout).await { - futures::future::Either::Left(((), _)) => { - println!("add_two_ints service is ready"); - return Ok(()); - } - futures::future::Either::Right(_) => { - println!("timeout while waiting for add_two_ints service, retrying"); - } - } - } - eyre::bail!("add_two_ints service not available"); - }; - futures::executor::block_on(service_ready)?; + wait_for_service(&add_client, &mut ros_node, "add_two_ints")?; + wait_for_service(&add_client_custom, &mut ros_node, "add_two_ints_custom")?; let output = DataId::from("pose".to_owned()); @@ -114,6 +82,16 @@ fn main() -> eyre::Result<()> { if sum != a.wrapping_add(b) { eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) } + + let service_result_custom = add_two_ints_request(&add_client_custom, a, b); + let sum = futures::executor::block_on(service_result_custom) + .context("failed to send custom service request")?; + if sum != a.wrapping_add(b) { + eyre::bail!( + "unexpected addition result from custom service: expected {}, got {sum}", + a + b + ) + } } other => eprintln!("Ignoring unexpected input `{other}`"), }, @@ -135,9 +113,66 @@ fn main() -> eyre::Result<()> { } } + node.send_output( + DataId::from("finished".to_owned()), + Default::default(), + true.into_arrow(), + ) + .context("failed to send `finished` output")?; + + Ok(()) +} + +fn wait_for_service( + client: &Client, + ros_node: &mut ros2_client::Node, + service_name: &str, +) -> Result<(), eyre::Error> { + println!("wait for {service_name} service"); + let service_ready = async { + for _ in 0..10 { + let ready = client.wait_for_service(&ros_node); + futures::pin_mut!(ready); + let timeout = futures_timer::Delay::new(Duration::from_secs(2)); + match futures::future::select(ready, timeout).await { + futures::future::Either::Left(((), _)) => { + println!("add_two_ints service is ready"); + return Ok(()); + } + futures::future::Either::Right(_) => { + println!("timeout while waiting for {service_name} service, retrying"); + } + } + } + eyre::bail!("{service_name} service not available"); + }; + futures::executor::block_on(service_ready)?; Ok(()) } +fn create_add_client( + ros_node: &mut ros2_client::Node, + service_name: &str, +) -> Result, eyre::Error> { + let service_qos = { + rustdds::QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_millis(100), + }) + .history(policy::History::KeepLast { depth: 1 }) + .build() + }; + let add_client = ros_node.create_client::( + ros2_client::ServiceMapping::Enhanced, + &ros2_client::Name::new("/", service_name).unwrap(), + &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), + service_qos.clone(), + service_qos.clone(), + )?; + + Ok(add_client) +} + async fn add_two_ints_request( add_client: &ros2_client::Client, a: i64,