Compare commits

...

5 Commits

Author SHA1 Message Date
  Philipp Oppermann e9f102caa7
Fix unused import warning 1 year ago
  Philipp Oppermann d4ed69fe97
Add some debug output 1 year ago
  Philipp Oppermann 64bde8cd00
Enable debug logging on CI 1 year ago
  Philipp Oppermann a072a7e3e4
Create C++ functions for constructing service servers 1 year ago
  Philipp Oppermann db03eedc7b
Add example for ROS2 service server using `ros2-client` crate 1 year ago
8 changed files with 257 additions and 40 deletions
Split View
  1. +1
    -1
      .github/workflows/ci.yml
  2. +1
    -0
      binaries/daemon/src/spawn.rs
  3. +7
    -0
      examples/rust-ros2-dataflow/dataflow.yml
  4. +5
    -0
      examples/rust-ros2-dataflow/node/Cargo.toml
  5. +97
    -0
      examples/rust-ros2-dataflow/node/src/bin/service-server.rs
  6. +73
    -38
      examples/rust-ros2-dataflow/node/src/main.rs
  7. +6
    -0
      libraries/extensions/ros2-bridge/msg-gen/src/lib.rs
  8. +67
    -1
      libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs

+ 1
- 1
.github/workflows/ci.yml View File

@@ -8,7 +8,7 @@ on:
workflow_dispatch:

env:
RUST_LOG: INFO
RUST_LOG: DEBUG

jobs:
test:


+ 1
- 0
binaries/daemon/src/spawn.rs View File

@@ -316,6 +316,7 @@ pub async fn spawn_node(
let (log_finish_tx, log_finish_rx) = oneshot::channel();
tokio::spawn(async move {
let exit_status = NodeExitStatus::from(child.wait().await);
tracing::debug!("node exited with status: {exit_status:?}");
let _ = log_finish_rx.await;
let event = DoraEvent::SpawnedNodeResult {
dataflow_id,


+ 7
- 0
examples/rust-ros2-dataflow/dataflow.yml View File

@@ -8,3 +8,10 @@ nodes:
service_timer: dora/timer/secs/1
outputs:
- pose
- finished
- id: service-server
custom:
build: cargo build -p rust-ros2-dataflow-example-node --features ros2
source: ../../target/debug/rust-ros2-dataflow-service-server
inputs:
exit: rust-node/finished

+ 5
- 0
examples/rust-ros2-dataflow/node/Cargo.toml View File

@@ -13,6 +13,11 @@ ros2 = []
name = "rust-ros2-dataflow-example-node"
required-features = ["ros2"]

[[bin]]
path = "src/bin/service-server.rs"
name = "rust-ros2-dataflow-service-server"
required-features = ["ros2"]

[dependencies]
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"


+ 97
- 0
examples/rust-ros2-dataflow/node/src/bin/service-server.rs View File

@@ -0,0 +1,97 @@
use std::time::Duration;

use dora_node_api::{
self,
dora_core::config::DataId,
merged::{MergeExternal, MergedEvent},
DoraNode, Event,
};
use dora_ros2_bridge::{
messages::{
example_interfaces::service::{AddTwoInts, AddTwoIntsRequest, AddTwoIntsResponse},
geometry_msgs::msg::{Twist, Vector3},
turtlesim::msg::Pose,
},
ros2_client::{self, ros2, NodeOptions},
rustdds::{self, policy},
};
use eyre::{eyre, Context};
use futures::task::SpawnExt;

fn main() -> eyre::Result<()> {
let mut ros_node = init_ros_node()?;

let service_qos = {
rustdds::QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 10 })
.build()
};
let server = ros_node
.create_server::<AddTwoInts>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", "add_two_ints_custom").unwrap(),
&ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"),
service_qos.clone(),
service_qos.clone(),
)
.context("failed to create service server")?;

let (node, dora_events) = DoraNode::init_from_env()?;

let merged = dora_events.merge_external(Box::pin(server.receive_request_stream()));
let mut events = futures::executor::block_on_stream(merged);

loop {
let event = match events.next() {
Some(input) => input,
None => break,
};

match event {
MergedEvent::Dora(event) => match event {
Event::Input {
id,
metadata: _,
data: _,
} => match id.as_str() {
"exit" => {
println!("received exit signal");
break;
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
Event::Stop => println!("Received manual stop"),
other => eprintln!("Received unexpected input: {other:?}"),
},
MergedEvent::External(request) => match request {
Ok((id, request)) => {
let response = AddTwoIntsResponse {
sum: request.a.wrapping_add(request.b),
};
println!("replying to incoming request {id:?} {request:?} with response {response:?}");
server
.send_response(id, response)
.context("failed to send response")?;
}
Err(err) => eprintln!("error while receiving incoming request: {err:?}"),
},
}
}

Ok(())
}

fn init_ros_node() -> eyre::Result<ros2_client::Node> {
let ros_context = ros2_client::Context::new().unwrap();

ros_context
.new_node(
ros2_client::NodeName::new("/ros2_demo", "service_server_example")
.map_err(|e| eyre!("failed to create ROS2 node name: {e}"))?,
NodeOptions::new().enable_rosout(true),
)
.context("failed to create ros2 node")
}

+ 73
- 38
examples/rust-ros2-dataflow/node/src/main.rs View File

@@ -4,7 +4,7 @@ use dora_node_api::{
self,
dora_core::config::DataId,
merged::{MergeExternal, MergedEvent},
DoraNode, Event,
DoraNode, Event, IntoArrow,
};
use dora_ros2_bridge::{
messages::{
@@ -12,7 +12,7 @@ use dora_ros2_bridge::{
geometry_msgs::msg::{Twist, Vector3},
turtlesim::msg::Pose,
},
ros2_client::{self, ros2, NodeOptions},
ros2_client::{self, ros2, Client, NodeOptions},
rustdds::{self, policy},
};
use eyre::{eyre, Context};
@@ -33,43 +33,11 @@ fn main() -> eyre::Result<()> {
})
.context("failed to spawn ros2 spinner")?;

// create an example service client
let service_qos = {
rustdds::QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 1 })
.build()
};
let add_client = ros_node.create_client::<AddTwoInts>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", "add_two_ints").unwrap(),
&ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"),
service_qos.clone(),
service_qos.clone(),
)?;
let add_client = create_add_client(&mut ros_node, "add_two_ints")?;
let add_client_custom = create_add_client(&mut ros_node, "add_two_ints_custom")?;

// wait until the service server is ready
println!("wait for add_two_ints service");
let service_ready = async {
for _ in 0..10 {
let ready = add_client.wait_for_service(&ros_node);
futures::pin_mut!(ready);
let timeout = futures_timer::Delay::new(Duration::from_secs(2));
match futures::future::select(ready, timeout).await {
futures::future::Either::Left(((), _)) => {
println!("add_two_ints service is ready");
return Ok(());
}
futures::future::Either::Right(_) => {
println!("timeout while waiting for add_two_ints service, retrying");
}
}
}
eyre::bail!("add_two_ints service not available");
};
futures::executor::block_on(service_ready)?;
wait_for_service(&add_client, &mut ros_node, "add_two_ints")?;
wait_for_service(&add_client_custom, &mut ros_node, "add_two_ints_custom")?;

let output = DataId::from("pose".to_owned());

@@ -114,6 +82,16 @@ fn main() -> eyre::Result<()> {
if sum != a.wrapping_add(b) {
eyre::bail!("unexpected addition result: expected {}, got {sum}", a + b)
}

let service_result_custom = add_two_ints_request(&add_client_custom, a, b);
let sum = futures::executor::block_on(service_result_custom)
.context("failed to send custom service request")?;
if sum != a.wrapping_add(b) {
eyre::bail!(
"unexpected addition result from custom service: expected {}, got {sum}",
a + b
)
}
}
other => eprintln!("Ignoring unexpected input `{other}`"),
},
@@ -135,9 +113,66 @@ fn main() -> eyre::Result<()> {
}
}

node.send_output(
DataId::from("finished".to_owned()),
Default::default(),
true.into_arrow(),
)
.context("failed to send `finished` output")?;

Ok(())
}

fn wait_for_service(
client: &Client<AddTwoInts>,
ros_node: &mut ros2_client::Node,
service_name: &str,
) -> Result<(), eyre::Error> {
println!("wait for {service_name} service");
let service_ready = async {
for _ in 0..10 {
let ready = client.wait_for_service(&ros_node);
futures::pin_mut!(ready);
let timeout = futures_timer::Delay::new(Duration::from_secs(2));
match futures::future::select(ready, timeout).await {
futures::future::Either::Left(((), _)) => {
println!("add_two_ints service is ready");
return Ok(());
}
futures::future::Either::Right(_) => {
println!("timeout while waiting for {service_name} service, retrying");
}
}
}
eyre::bail!("{service_name} service not available");
};
futures::executor::block_on(service_ready)?;
Ok(())
}

fn create_add_client(
ros_node: &mut ros2_client::Node,
service_name: &str,
) -> Result<ros2_client::Client<AddTwoInts>, eyre::Error> {
let service_qos = {
rustdds::QosPolicyBuilder::new()
.reliability(policy::Reliability::Reliable {
max_blocking_time: rustdds::Duration::from_millis(100),
})
.history(policy::History::KeepLast { depth: 1 })
.build()
};
let add_client = ros_node.create_client::<AddTwoInts>(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new("/", service_name).unwrap(),
&ros2_client::ServiceTypeName::new("example_interfaces", "AddTwoInts"),
service_qos.clone(),
service_qos.clone(),
)?;

Ok(add_client)
}

async fn add_two_ints_request(
add_client: &ros2_client::Client<AddTwoInts>,
a: i64,


+ 6
- 0
libraries/extensions/ros2-bridge/msg-gen/src/lib.rs View File

@@ -140,6 +140,12 @@ where
}
}

impl Drop for Ros2Context {
fn drop(&mut self) {
tracing::debug!("dropping Ros2Context");
}
}

struct Ros2Node {
node : ros2_client::Node,
executor: std::sync::Arc<futures::executor::ThreadPool>,


+ 67
- 1
libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs View File

@@ -85,6 +85,11 @@ impl Service {
let create_client = format_ident!("new_Client__{package_name}__{}", self.name);
let cxx_create_client = format!("create_client_{package_name}_{}", self.name);

let server_name = format_ident!("Server__{package_name}__{}", self.name);
let cxx_server_name = format_ident!("Server_{}", self.name);
let create_server = format_ident!("new_Server__{package_name}__{}", self.name);
let cxx_create_server = format!("create_server_{package_name}_{}", self.name);

let package = format_ident!("{package_name}");
let self_name = format_ident!("{}", self.name);
let self_name_str = &self.name;
@@ -94,6 +99,7 @@ impl Service {
let send_request = format_ident!("send_request__{package_name}__{}", self.name);
let cxx_send_request = format_ident!("send_request");
let req_type_raw = format_ident!("{package_name}__{}_Request", self.name);
let req_type_raw_str = req_type_raw.to_string();
let res_type_raw = format_ident!("{package_name}__{}_Response", self.name);
let res_type_raw_str = res_type_raw.to_string();

@@ -106,10 +112,15 @@ impl Service {
#[namespace = #package_name]
#[cxx_name = #cxx_client_name]
type #client_name;
// TODO: add `merged_streams` argument (for sending replies)
#[cxx_name = #cxx_create_client]
fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#client_name>>;

#[namespace = #package_name]
#[cxx_name = #cxx_server_name]
type #server_name;
#[cxx_name = #cxx_create_server]
fn #create_server(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result<Box<#server_name>>;

#[namespace = #package_name]
#[cxx_name = #cxx_wait_for_service]
fn #wait_for_service(self: &mut #client_name, node: &Box<Ros2Node>) -> Result<()>;
@@ -147,6 +158,29 @@ impl Service {
stream_id: id,
}))
}

#[allow(non_snake_case)]
pub fn #create_server(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result<Box<#server_name>> {
let server = self.node.create_server::< #package :: service :: #self_name >(
ros2_client::ServiceMapping::Enhanced,
&ros2_client::Name::new(name_space, base_name).unwrap(),
&ros2_client::ServiceTypeName::new(#package_name, #self_name_str),
qos.clone().into(),
qos.into(),
)?;
let server = std::sync::Arc::new(server);

let stream = futures::stream::unfold(server.clone(), |server| async move {
let result = server.async_receive_request().await;
Some((Box::new(result) as Box<dyn std::any::Any + 'static>, server))
});
let id = events.events.merge(Box::pin(stream));

Ok(Box::new(#server_name {
server,
stream_id: id,
}))
}
}

#[allow(non_camel_case_types)]
@@ -221,6 +255,38 @@ impl Service {
}
}
}

#[allow(non_camel_case_types)]
pub struct #server_name {
server: std::sync::Arc<ros2_client::service::Server< #package :: service :: #self_name >>,
stream_id: u32,
}

impl #server_name {
#[allow(non_snake_case)]
fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool {
match &event.event.as_ref().0 {
Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true,
_ => false
}
}
#[allow(non_snake_case)]
fn #downcast(&self, event: crate::ffi::CombinedEvent) -> eyre::Result<ffi::#req_type_raw> {
use eyre::WrapErr;

match (*event.event).0 {
Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => {
let result = event.event.downcast::<eyre::Result<ffi::#req_type_raw>>()
.map_err(|_| eyre::eyre!("downcast to {} failed", #req_type_raw_str))?;

let data = result.with_context(|| format!("failed to receive {} request", #self_name_str))?;
Ok(data)
},
_ => eyre::bail!("not a {} request event", #self_name_str),
}
}
}

};
(def, imp)
}


Loading…
Cancel
Save