diff --git a/rclrust/examples/service.rs b/rclrust/examples/service.rs new file mode 100644 index 00000000..e03b6e00 --- /dev/null +++ b/rclrust/examples/service.rs @@ -0,0 +1,18 @@ +use anyhow::Result; +use rclrust::qos::QoSProfile; +use rclrust_msg::example_interfaces::srv::{AddTwoInts, AddTwoInts_Response}; + +fn main() -> Result<()> { + let ctx = rclrust::init()?; + let node = ctx.create_node("examples_service")?; + + let _service = node.create_service::( + "add_ints", + move |req| AddTwoInts_Response { sum: req.a + req.b }, + &QoSProfile::default(), + )?; + + rclrust::spin(&node)?; + + Ok(()) +} diff --git a/rclrust/src/executor.rs b/rclrust/src/executor.rs index 7d3e6011..d7ddc24d 100644 --- a/rclrust/src/executor.rs +++ b/rclrust/src/executor.rs @@ -51,7 +51,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> { } pub fn spin_some(&self, max_duration: Duration) -> Result<()> { - let (n_subscriptions, _, n_timers, _, _, _) = + let (n_subscriptions, _, n_timers, _, n_services, _) = self.nodes.iter().filter_map(|n| n.upgrade()).fold( (0, 0, 0, 0, 0, 0), |(subs, guards, timers, clients, services, events), node| { @@ -60,7 +60,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> { guards, timers + node.timers.lock().unwrap().len(), clients, - services, + services + node.services.lock().unwrap().len(), events, ) }, @@ -72,7 +72,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> { 0, n_timers, 0, - 0, + n_services, 0, )?; diff --git a/rclrust/src/lib.rs b/rclrust/src/lib.rs index 4299f1d5..27247f35 100644 --- a/rclrust/src/lib.rs +++ b/rclrust/src/lib.rs @@ -17,6 +17,7 @@ pub mod node_options; pub mod prelude; pub mod publisher; pub mod qos; +pub mod service; pub mod subscription; pub mod time; pub mod timer; diff --git a/rclrust/src/node.rs b/rclrust/src/node.rs index a75d8b36..1a57e03b 100644 --- a/rclrust/src/node.rs +++ b/rclrust/src/node.rs @@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex, Weak}; use std::time::Duration; use anyhow::{ensure, Context as _, Result}; -use rclrust_msg::_core::MessageT; +use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT}; use crate::clock::ClockType; use crate::context::{Context, RclContext}; @@ -14,6 +14,7 @@ use crate::node_options::NodeOptions; use crate::publisher::Publisher; use crate::qos::QoSProfile; use crate::rclrust_error; +use crate::service::{Service, ServiceBase}; use crate::subscription::{Subscription, SubscriptionBase}; use crate::timer::Timer; use crate::wait_set::RclWaitSet; @@ -101,6 +102,7 @@ pub struct Node<'ctx> { context: &'ctx Context, pub(crate) subscriptions: Mutex>>, pub(crate) timers: Mutex>>, + pub(crate) services: Mutex>>, } impl<'ctx> Node<'ctx> { @@ -126,6 +128,7 @@ impl<'ctx> Node<'ctx> { context, subscriptions: Mutex::new(Vec::new()), timers: Mutex::new(Vec::new()), + services: Mutex::new(Vec::new()), })) } @@ -272,6 +275,43 @@ impl<'ctx> Node<'ctx> { self.create_timer(period, ClockType::SteadyTime, callback) } + pub fn create_service( + &self, + service_name: &str, + callback: F, + qos: &QoSProfile, + ) -> Result>> + where + Srv: ServiceT + 'static, + F: Fn(Srv::Request) -> Srv::Response + 'static, + { + let srv = Service::::new( + self, + service_name, + move |req_raw| (callback)(unsafe { req_raw.to_rust() }), + qos, + )?; + let weak_srv = Arc::downgrade(&srv) as Weak; + self.services.lock().unwrap().push(weak_srv); + Ok(srv) + } + + pub fn create_raw_service( + &self, + service_name: &str, + callback: F, + qos: &QoSProfile, + ) -> Result>> + where + Srv: ServiceT + 'static, + F: Fn(&::Raw) -> Srv::Response + 'static, + { + let srv = Service::new(self, service_name, callback, qos)?; + let weak_srv = Arc::downgrade(&srv) as Weak; + self.services.lock().unwrap().push(weak_srv); + Ok(srv) + } + pub(crate) fn add_to_wait_set(&self, wait_set: &mut RclWaitSet) -> Result<()> { for subscription in self.subscriptions.lock().unwrap().iter() { if let Some(subscription) = subscription.upgrade() { @@ -283,6 +323,11 @@ impl<'ctx> Node<'ctx> { wait_set.add_timer(&timer.handle().lock().unwrap())?; } } + for service in self.services.lock().unwrap().iter() { + if let Some(service) = service.upgrade() { + wait_set.add_service(service.handle())?; + } + } Ok(()) } @@ -297,6 +342,11 @@ impl<'ctx> Node<'ctx> { timer.call_callback()?; } } + for service in self.services.lock().unwrap().iter() { + if let Some(service) = service.upgrade() { + service.call_callback()?; + } + } Ok(()) } diff --git a/rclrust/src/service.rs b/rclrust/src/service.rs new file mode 100644 index 00000000..283c37c1 --- /dev/null +++ b/rclrust/src/service.rs @@ -0,0 +1,186 @@ +use std::ffi::{c_void, CString}; +use std::mem::MaybeUninit; +use std::sync::{Arc, Mutex}; + +use anyhow::{Context, Result}; +use rclrust_msg::_core::{MessageT, ServiceT}; + +use crate::error::{RclRustError, ToRclRustResult}; +use crate::internal::ffi::*; +use crate::log::Logger; +use crate::node::{Node, RclNode}; +use crate::qos::QoSProfile; +use crate::rclrust_error; + +pub struct RclService(rcl_sys::rcl_service_t); + +unsafe impl Send for RclService {} + +impl RclService { + fn new(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result + where + Srv: ServiceT, + { + let mut service = unsafe { rcl_sys::rcl_get_zero_initialized_service() }; + let service_c_str = CString::new(service_name)?; + let mut options = unsafe { rcl_sys::rcl_service_get_default_options() }; + options.qos = qos.into(); + + unsafe { + rcl_sys::rcl_service_init( + &mut service, + node.raw(), + Srv::type_support() as *const _, + service_c_str.as_ptr(), + &options, + ) + .to_result()?; + } + + Ok(Self(service)) + } + + pub const fn raw(&self) -> &rcl_sys::rcl_service_t { + &self.0 + } + + unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> { + rcl_sys::rcl_service_fini(&mut self.0, node.raw_mut()) + .to_result() + .with_context(|| "rcl_sys::rcl_service_fini in RclService::fini") + } + + fn take_request( + &self, + ) -> Result<(rcl_sys::rmw_request_id_t, ::Raw)> + where + Srv: ServiceT, + { + let mut request_header = MaybeUninit::uninit(); + let mut request = ::Raw::default(); + unsafe { + rcl_sys::rcl_take_request( + &self.0, + request_header.as_mut_ptr(), + &mut request as *mut _ as *mut c_void, + ) + .to_result() + .with_context(|| "rcl_sys::rcl_take_request in RclService::take_request")?; + } + + Ok((unsafe { request_header.assume_init() }, request)) + } + + fn send_response( + &self, + response_header: &mut rcl_sys::rmw_request_id_t, + response: Srv::Response, + ) -> Result<()> + where + Srv: ServiceT, + { + unsafe { + rcl_sys::rcl_send_response( + &self.0, + response_header, + &response.to_raw_ref() as *const _ as *mut c_void, + ) + .to_result() + .with_context(|| "rcl_sys::rcl_send_response in RclService::send_response") + } + } + + fn service_name(&self) -> String { + unsafe { + let name = rcl_sys::rcl_service_get_service_name(&self.0); + String::from_c_char(name).unwrap_or_default() + } + } + + fn is_valid(&self) -> bool { + unsafe { rcl_sys::rcl_service_is_valid(&self.0) } + } +} + +pub(crate) trait ServiceBase { + fn handle(&self) -> &RclService; + fn call_callback(&self) -> Result<()>; +} + +pub struct Service +where + Srv: ServiceT, +{ + handle: RclService, + callback: Box::Raw) -> Srv::Response>, + node_handle: Arc>, +} + +impl Service +where + Srv: ServiceT, +{ + pub(crate) fn new<'ctx, F>( + node: &Node<'ctx>, + service_name: &str, + callback: F, + qos: &QoSProfile, + ) -> Result> + where + F: Fn(&::Raw) -> Srv::Response + 'static, + { + let node_handle = node.clone_handle(); + let handle = RclService::new::(&node_handle.lock().unwrap(), service_name, qos)?; + + Ok(Arc::new(Self { + handle, + callback: Box::new(callback), + node_handle, + })) + } + + pub fn service_name(&self) -> String { + self.handle.service_name() + } + + pub fn is_valid(&self) -> bool { + self.handle.is_valid() + } +} + +impl ServiceBase for Service +where + Srv: ServiceT, +{ + fn handle(&self) -> &RclService { + &self.handle + } + + fn call_callback(&self) -> Result<()> { + match self.handle.take_request::() { + Ok((mut req_header, req)) => { + let res = (self.callback)(&req); + self.handle.send_response::(&mut req_header, res) + } + Err(e) => match e.downcast_ref::() { + Some(RclRustError::RclSubscriptionTakeFailed(_)) => Ok(()), + _ => Err(e), + }, + } + } +} + +impl Drop for Service +where + Srv: ServiceT, +{ + fn drop(&mut self) { + if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } { + rclrust_error!( + Logger::new("rclrust"), + "Failed to clean up rcl service handle: {}", + e + ) + } + } +} diff --git a/rclrust/src/wait_set.rs b/rclrust/src/wait_set.rs index 470ddf7d..8960a543 100644 --- a/rclrust/src/wait_set.rs +++ b/rclrust/src/wait_set.rs @@ -4,6 +4,7 @@ use crate::context::RclContext; use crate::error::ToRclRustResult; use crate::log::Logger; use crate::rclrust_error; +use crate::service::RclService; use crate::subscription::RclSubscription; use crate::timer::RclTimer; @@ -83,6 +84,14 @@ impl RclWaitSet { .with_context(|| "rcl_sys::rcl_wait_set_add_timer in RclWaitSet::add_timer") } } + + pub fn add_service(&mut self, service: &RclService) -> Result<()> { + unsafe { + rcl_sys::rcl_wait_set_add_service(&mut self.0, service.raw(), std::ptr::null_mut()) + .to_result() + .with_context(|| "rcl_sys::rcl_wait_set_add_service in RclWaitSet::add_service") + } + } } impl Drop for RclWaitSet {