Browse Source

Add example for ROS2 service server using `ros2-client` crate

ros2-service-server
Philipp Oppermann 1 year ago
parent
commit
db03eedc7b
Failed to extract signature
4 changed files with 182 additions and 38 deletions
  1. +7
    -0
      examples/rust-ros2-dataflow/dataflow.yml
  2. +5
    -0
      examples/rust-ros2-dataflow/node/Cargo.toml
  3. +97
    -0
      examples/rust-ros2-dataflow/node/src/bin/service-server.rs
  4. +73
    -38
      examples/rust-ros2-dataflow/node/src/main.rs

+ 7
- 0
examples/rust-ros2-dataflow/dataflow.yml View File

@@ -8,3 +8,10 @@ nodes:
service_timer: dora/timer/secs/1
outputs:
- pose
- finished
- id: service-server
custom:
build: cargo build -p rust-ros2-dataflow-example-node --features ros2
source: ../../target/debug/rust-ros2-dataflow-service-server
inputs:
exit: rust-node/finished

+ 5
- 0
examples/rust-ros2-dataflow/node/Cargo.toml View File

@@ -13,6 +13,11 @@ ros2 = []
name = "rust-ros2-dataflow-example-node"
required-features = ["ros2"]

[[bin]]
path = "src/bin/service-server.rs"
name = "rust-ros2-dataflow-service-server"
required-features = ["ros2"]

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"


+ 97
- 0
examples/rust-ros2-dataflow/node/src/bin/service-server.rs View File

@@ -0,0 +1,97 @@
use std::time::Duration;

use dora_node_api::{
self,
dora_core::config::DataId,
merged::{MergeExternal, MergedEvent},
DoraNode, Event,
};
use dora_ros2_bridge::{
messages::{
example_interfaces::service::{AddTwoInts, AddTwoIntsRequest, AddTwoIntsResponse},
geometry_msgs::msg::{Twist, Vector3},
turtlesim::msg::Pose,
},
ros2_client::{self, ros2, NodeOptions},
rustdds::{self, policy},
};
use eyre::{eyre, Context};
use futures::task::SpawnExt;

fn main() -> eyre::Result<()> {
let mut ros_node = init_ros_node()?;

let service_qos = {
rustdds::QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 10 })
.build()
};
let server = ros_node
.create_server::<AddTwoInts>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", "add_two_ints_custom").unwrap(),
&ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"),
service_qos.clone(),
service_qos.clone(),
)
.context("failed to create service server")?;

let (node, dora_events) = DoraNode::init_from_env()?;

let merged = dora_events.merge_external(Box::pin(server.receive_request_stream()));
let mut events = futures::executor::block_on_stream(merged);

loop {
let event = match events.next() {
Some(input) => input,
None => break,
};

match event {
MergedEvent::Dora(event) => match event {
Event::Input {
id,
metadata: _,
data: _,
} => match id.as_str() {
"exit" => {
println!("received exit signal");
break;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => println!("Received manual stop"),
other => eprintln!("Received unexpected input: {other:?}"),
},
MergedEvent::External(request) => match request {
Ok((id, request)) => {
let response = AddTwoIntsResponse {
sum: request.a.wrapping_add(request.b),
};
println!("replying to incoming request {id:?} {request:?} with response {response:?}");
server
.send_response(id, response)
.context("failed to send response")?;
}
Err(err) => eprintln!("error while receiving incoming request: {err:?}"),
},
}
}

Ok(())
}

fn init_ros_node() -> eyre::Result<ros2_client::Node> {
let ros_context = ros2_client::Context::new().unwrap();

ros_context
.new_node(
ros2_client::NodeName::new("/ros2_demo", "service_server_example")
.map_err(|e| eyre!("failed to create ROS2 node name: {e}"))?,
NodeOptions::new().enable_rosout(true),
)
.context("failed to create ros2 node")
}

+ 73
- 38
examples/rust-ros2-dataflow/node/src/main.rs View File

@@ -4,7 +4,7 @@ use dora_node_api::{
self,
dora_core::config::DataId,
merged::{MergeExternal, MergedEvent},
DoraNode, Event,
DoraNode, Event, IntoArrow,
};
use dora_ros2_bridge::{
messages::{
@@ -12,7 +12,7 @@ use dora_ros2_bridge::{
geometry_msgs::msg::{Twist, Vector3},
turtlesim::msg::Pose,
},
ros2_client::{self, ros2, NodeOptions},
ros2_client::{self, ros2, Client, NodeOptions},
rustdds::{self, policy},
};
use eyre::{eyre, Context};
@@ -33,43 +33,11 @@ fn main() -> eyre::Result<()> {
})
.context("failed to spawn ros2 spinner")?;

// create an example service client
let service_qos = {
rustdds::QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 1 })
.build()
};
let add_client = ros_node.create_client::<AddTwoInts>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", "add_two_ints").unwrap(),
&ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"),
service_qos.clone(),
service_qos.clone(),
)?;
let add_client = create_add_client(&mut ros_node, "add_two_ints")?;
let add_client_custom = create_add_client(&mut ros_node, "add_two_ints_custom")?;

// wait until the service server is ready
println!("wait for add_two_ints service");
let service_ready = async {
for _ in 0..10 {
let ready = add_client.wait_for_service(&ros_node);
futures::pin_mut!(ready);
let timeout = futures_timer::Delay::new(Duration::from_secs(2));
match futures::future::select(ready, timeout).await {
futures::future::Either::Left(((), _)) => {
println!("add_two_ints service is ready");
return Ok(());
}
futures::future::Either::Right(_) => {
println!("timeout while waiting for add_two_ints service, retrying");
}
}
}
eyre::bail!("add_two_ints service not available");
};
futures::executor::block_on(service_ready)?;
wait_for_service(&add_client, &mut ros_node, "add_two_ints")?;
wait_for_service(&add_client_custom, &mut ros_node, "add_two_ints_custom")?;

let output = DataId::from("pose".to_owned());

@@ -114,6 +82,16 @@ fn main() -> eyre::Result<()> {
if sum != a.wrapping_add(b) {
eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b)
}

let service_result_custom = add_two_ints_request(&add_client_custom, a, b);
let sum = futures::executor::block_on(service_result_custom)
.context("failed to send custom service request")?;
if sum != a.wrapping_add(b) {
eyre::bail!(
"unexpected addition result from custom service: expected {}, got {sum}",
a + b
)
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
@@ -135,9 +113,66 @@ fn main() -> eyre::Result<()> {
}
}

node.send_output(
DataId::from("finished".to_owned()),
Default::default(),
true.into_arrow(),
)
.context("failed to send `finished` output")?;

Ok(())
}

fn wait_for_service(
client: &Client<AddTwoInts>,
ros_node: &mut ros2_client::Node,
service_name: &str,
) -> Result<(), eyre::Error> {
println!("wait for {service_name} service");
let service_ready = async {
for _ in 0..10 {
let ready = client.wait_for_service(&ros_node);
futures::pin_mut!(ready);
let timeout = futures_timer::Delay::new(Duration::from_secs(2));
match futures::future::select(ready, timeout).await {
futures::future::Either::Left(((), _)) => {
println!("add_two_ints service is ready");
return Ok(());
}
futures::future::Either::Right(_) => {
println!("timeout while waiting for {service_name} service, retrying");
}
}
}
eyre::bail!("{service_name} service not available");
};
futures::executor::block_on(service_ready)?;
Ok(())
}

fn create_add_client(
ros_node: &mut ros2_client::Node,
service_name: &str,
) -> Result<ros2_client::Client<AddTwoInts>, eyre::Error> {
let service_qos = {
rustdds::QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 1 })
.build()
};
let add_client = ros_node.create_client::<AddTwoInts>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", service_name).unwrap(),
&ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"),
service_qos.clone(),
service_qos.clone(),
)?;

Ok(add_client)
}

async fn add_two_ints_request(
add_client: &ros2_client::Client<AddTwoInts>,
a: i64,


Loading…
Cancel
Save