diff --git a/Cargo.lock b/Cargo.lock index 19f5f4d7..9b6e26a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2097,6 +2097,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -2158,9 +2159,9 @@ checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" [[package]] name = "futures-timer" -version = "3.0.2" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" @@ -4662,6 +4663,7 @@ dependencies = [ "dora-ros2-bridge", "eyre", "futures", + "futures-timer", "rand", "serde_json", "tokio", diff --git a/examples/rust-ros2-dataflow/README.md b/examples/rust-ros2-dataflow/README.md index 6c8445e9..c30e52e6 100644 --- a/examples/rust-ros2-dataflow/README.md +++ b/examples/rust-ros2-dataflow/README.md @@ -11,11 +11,14 @@ This examples requires a sourced ROS2 installation. - Follow tasks 1 and 2 of the [ROS2 turtlesim tutorial](https://docs.ros.org/en/iron/Tutorials/Beginner-CLI-Tools/Introducing-Turtlesim/Introducing-Turtlesim.html#id3) - Install the turtlesim package - Start the turtlesim node through `ros2 run turtlesim turtlesim_node` +- In a separate terminal, start the `/add_two_ints` service: `ros2 run examples_rclcpp_minimal_service service_main` ## Running -After sourcing the ROS2 installation and starting the `turtlesim` node, you can run this example to move the turtle in random directions: +After sourcing the ROS2 installation and starting both the `turtlesim` node and the `/add_two_ints` service, you can run this example to move the turtle in random directions: ``` cargo run --example rust-ros2-dataflow --features ros2-examples ``` + +You should see a few random requests in the terminal where you started the `examples_rclcpp_minimal_service`. diff --git a/examples/rust-ros2-dataflow/dataflow.yml b/examples/rust-ros2-dataflow/dataflow.yml index 83583f75..080f1f5d 100644 --- a/examples/rust-ros2-dataflow/dataflow.yml +++ b/examples/rust-ros2-dataflow/dataflow.yml @@ -5,5 +5,6 @@ nodes: source: ../../target/debug/rust-ros2-dataflow-example-node inputs: tick: dora/timer/millis/500 + service_timer: dora/timer/secs/1 outputs: - pose diff --git a/examples/rust-ros2-dataflow/node/Cargo.toml b/examples/rust-ros2-dataflow/node/Cargo.toml index 90d3efa6..c984455e 100644 --- a/examples/rust-ros2-dataflow/node/Cargo.toml +++ b/examples/rust-ros2-dataflow/node/Cargo.toml @@ -16,7 +16,8 @@ required-features = ["ros2"] [dependencies] dora-node-api = { workspace = true, features = ["tracing"] } eyre = "0.6.8" -futures = "0.3.21" +futures = { version = "0.3.21", features = ["thread-pool"] } +futures-timer = "3.0.3" rand = "0.8.5" tokio = { version = "1.24.2", features = ["rt", "macros"] } dora-ros2-bridge = { workspace = true } diff --git a/examples/rust-ros2-dataflow/node/src/main.rs b/examples/rust-ros2-dataflow/node/src/main.rs index 6d20472f..454b51d9 100644 --- a/examples/rust-ros2-dataflow/node/src/main.rs +++ b/examples/rust-ros2-dataflow/node/src/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use dora_node_api::{ self, dora_core::config::DataId, @@ -6,19 +8,70 @@ use dora_node_api::{ }; use dora_ros2_bridge::{ messages::{ + example_interfaces::service::{AddTwoInts, AddTwoIntsRequest}, geometry_msgs::msg::{Twist, Vector3}, - turtlesim::msg::Pose, + turtlesim::{ + msg::Pose, + service::{Spawn, SpawnRequest}, + }, }, ros2_client::{self, ros2, NodeOptions}, rustdds::{self, policy}, }; use eyre::{eyre, Context}; +use futures::task::SpawnExt; fn main() -> eyre::Result<()> { let mut ros_node = init_ros_node()?; let turtle_vel_publisher = create_vel_publisher(&mut ros_node)?; let turtle_pose_reader = create_pose_reader(&mut ros_node)?; + let service_qos = { + rustdds::QosPolicyBuilder::new() + .reliability(policy::Reliability::Reliable { + max_blocking_time: rustdds::Duration::from_millis(100), + }) + .history(policy::History::KeepLast { depth: 1 }) + .build() + }; + let add_client = ros_node.create_client::( + ros2_client::ServiceMapping::Enhanced, + &ros2_client::Name::new("/", "add_two_ints").unwrap(), + &ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"), + service_qos.clone(), + service_qos.clone(), + )?; + + let pool = futures::executor::ThreadPool::new()?; + + let spinner = ros_node.spinner(); + pool.spawn(async { + if let Err(err) = spinner.spin().await { + eprintln!("ros2 spinner failed: {err:?}"); + } + }) + .context("failed to spawn ros2 spinner")?; + + println!("wait for add_two_ints service"); + let service_ready = async { + for _ in 0..10 { + let ready = add_client.wait_for_service(&ros_node); + futures::pin_mut!(ready); + let timeout = futures_timer::Delay::new(Duration::from_secs(2)); + match futures::future::select(ready, timeout).await { + futures::future::Either::Left(((), _)) => { + println!("add_two_ints service is ready"); + return Ok(()); + } + futures::future::Either::Right(_) => { + println!("timeout while waiting for add_two_ints service, retrying"); + } + } + } + eyre::bail!("add_to_ints service not available"); + }; + futures::executor::block_on(service_ready)?; + let output = DataId::from("pose".to_owned()); let (mut node, dora_events) = DoraNode::init_from_env()?; @@ -53,6 +106,16 @@ fn main() -> eyre::Result<()> { println!("tick {i}, sending {direction:?}"); turtle_vel_publisher.publish(direction).unwrap(); } + "service_timer" => { + let a = rand::random(); + let b = rand::random(); + let service_result = add_two_ints_request(&add_client, a, b); + let sum = futures::executor::block_on(service_result) + .context("failed to send service request")?; + if sum != a.wrapping_add(b) { + eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b) + } + } other => eprintln!("Ignoring unexpected input `{other}`"), }, Event::Stop => println!("Received manual stop"), @@ -76,6 +139,31 @@ fn main() -> eyre::Result<()> { Ok(()) } +async fn add_two_ints_request( + add_client: &ros2_client::Client, + a: i64, + b: i64, +) -> eyre::Result { + let request = AddTwoIntsRequest { a, b }; + println!("sending add request {request:?}"); + let request_id = add_client.async_send_request(request.clone()).await?; + println!("{request_id:?}"); + + let response = add_client.async_receive_response(request_id); + futures::pin_mut!(response); + let timeout = futures_timer::Delay::new(Duration::from_secs(5)); + match futures::future::select(response, timeout).await { + futures::future::Either::Left((Ok(response), _)) => { + println!("received response: {response:?}"); + Ok(response.sum) + } + futures::future::Either::Left((Err(err), _)) => eyre::bail!(err), + futures::future::Either::Right(_) => { + eyre::bail!("timeout while waiting for response"); + } + } +} + fn init_ros_node() -> eyre::Result { let ros_context = ros2_client::Context::new().unwrap();