diff --git a/rcl-sys/src/wrapper.h b/rcl-sys/src/wrapper.h index 1cc88bd1..db7129ab 100644 --- a/rcl-sys/src/wrapper.h +++ b/rcl-sys/src/wrapper.h @@ -1,3 +1,4 @@ +#include #include #include #include diff --git a/rclrust/Cargo.toml b/rclrust/Cargo.toml index db4e0a52..ad8aab6a 100644 --- a/rclrust/Cargo.toml +++ b/rclrust/Cargo.toml @@ -14,6 +14,7 @@ categories = ["science::robotics"] [dependencies] anyhow = "1.0" +futures = "0.3" once_cell = "1.8" parking_lot = "0.11" rcl-sys = { path = "../rcl-sys", version = "0.0.1" } diff --git a/rclrust/examples/client.rs b/rclrust/examples/client.rs new file mode 100644 index 00000000..ec2bedda --- /dev/null +++ b/rclrust/examples/client.rs @@ -0,0 +1,36 @@ +use std::thread; + +use anyhow::Result; +use rclrust::prelude::*; +use rclrust::qos::QoSProfile; +use rclrust_msg::example_interfaces::srv::{AddTwoInts, AddTwoInts_Request}; + +fn main() -> Result<()> { + let ctx = rclrust::init()?; + let node = ctx.create_node("examples_client")?; + let logger = node.logger(); + + let client = node.create_client::("add_ints", &QoSProfile::default())?; + + while !client.service_is_available()? { + std::thread::sleep(std::time::Duration::from_secs_f32(0.1)); + } + + let req = AddTwoInts_Request { a: 17, b: 25 }; + let mut recv = client.send_request(&req)?; + + thread::spawn(move || loop { + match recv.try_recv() { + Ok(Some(res)) => { + rclrust_info!(logger, "{} + {} = {}", req.a, req.b, res.sum); + return; + } + Ok(None) => continue, + Err(_) => rclrust_error!(logger, "Request is cancelled"), + } + }); + + rclrust::spin(&node)?; + + Ok(()) +} diff --git a/rclrust/src/client.rs b/rclrust/src/client.rs new file mode 100644 index 00000000..e30bbd1b --- /dev/null +++ b/rclrust/src/client.rs @@ -0,0 +1,217 @@ +use std::collections::HashMap; +use std::ffi::CString; +use std::marker::PhantomData; +use std::mem::MaybeUninit; +use std::os::raw::c_void; +use std::sync::{Arc, Mutex}; + +use anyhow::{anyhow, Context, Result}; +use futures::channel::oneshot; +use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT}; + +use crate::error::{RclRustError, ToRclRustResult}; +use crate::internal::ffi::*; +use crate::log::Logger; +use crate::node::{Node, RclNode}; +use crate::qos::QoSProfile; +use crate::rclrust_error; + +pub(crate) struct RclClient(rcl_sys::rcl_client_t); + +impl RclClient { + pub fn new(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result + where + Srv: ServiceT, + { + let mut client = unsafe { rcl_sys::rcl_get_zero_initialized_client() }; + let service_c_str = CString::new(service_name)?; + let mut options = unsafe { rcl_sys::rcl_client_get_default_options() }; + options.qos = qos.into(); + + unsafe { + rcl_sys::rcl_client_init( + &mut client, + node.raw(), + Srv::type_support() as *const _, + service_c_str.as_ptr(), + &options, + ) + .to_result() + .with_context(|| "rcl_sys::rcl_client_init in RclClient::new")?; + } + + Ok(Self(client)) + } + + pub const fn raw(&self) -> &rcl_sys::rcl_client_t { + &self.0 + } + + unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> { + rcl_sys::rcl_client_fini(&mut self.0, node.raw_mut()) + .to_result() + .with_context(|| "rcl_sys::rcl_client_fini in RclClient::fini") + } + + fn send_request(&self, request: &Srv::Request) -> Result + where + Srv: ServiceT, + { + let mut sequence_number = 0; + unsafe { + rcl_sys::rcl_send_request( + &self.0, + &request.to_raw_ref() as *const _ as *const c_void, + &mut sequence_number, + ) + .to_result() + .with_context(|| "rcl_sys::rcl_send_request in RclClient::send_request")?; + } + Ok(sequence_number) + } + + fn take_response( + &self, + ) -> Result<(rcl_sys::rmw_request_id_t, ::Raw)> + where + Srv: ServiceT, + { + let mut request_header = MaybeUninit::uninit(); + let mut response = ::Raw::default(); + unsafe { + rcl_sys::rcl_take_response( + &self.0, + request_header.as_mut_ptr(), + &mut response as *mut _ as *mut c_void, + ) + .to_result() + .with_context(|| "rcl_sys::rcl_take_response in RclClient::take_response")?; + } + + Ok((unsafe { request_header.assume_init() }, response)) + } + + fn service_name(&self) -> String { + unsafe { + let name = rcl_sys::rcl_client_get_service_name(&self.0); + String::from_c_char(name).unwrap_or_default() + } + } + + fn service_is_available(&self, node: &RclNode) -> Result { + let mut is_available = false; + unsafe { + rcl_sys::rcl_service_server_is_available(node.raw(), &self.0, &mut is_available) + .to_result() + .with_context(|| { + "rcl_sys::rcl_service_server_is_available in RclClient::service_is_available" + })?; + } + Ok(is_available) + } + + fn is_valid(&self) -> bool { + unsafe { rcl_sys::rcl_client_is_valid(&self.0) } + } +} + +pub(crate) trait ClientBase { + fn handle(&self) -> &RclClient; + fn process_requests(&self) -> Result<()>; +} + +pub struct Client +where + Srv: ServiceT, +{ + handle: RclClient, + node_handle: Arc>, + pendings: Mutex>>, + _phantom: PhantomData, +} + +impl Client +where + Srv: ServiceT, +{ + pub(crate) fn new<'ctx>( + node: &Node<'ctx>, + service_name: &str, + qos: &QoSProfile, + ) -> Result> { + let node_handle = node.clone_handle(); + let handle = RclClient::new::(&node_handle.lock().unwrap(), service_name, qos)?; + + Ok(Arc::new(Self { + handle, + node_handle, + pendings: Mutex::new(HashMap::new()), + _phantom: Default::default(), + })) + } + + pub fn send_request(&self, request: &Srv::Request) -> Result> { + let id = self.handle.send_request::(request)?; + + let (sender, receiver) = oneshot::channel::(); + self.pendings.lock().unwrap().insert(id, sender); + + Ok(receiver) + } + + pub fn service_name(&self) -> String { + self.handle.service_name() + } + + pub fn service_is_available(&self) -> Result { + self.handle + .service_is_available(&self.node_handle.lock().unwrap()) + } + + pub fn is_valid(&self) -> bool { + self.handle.is_valid() + } +} + +impl ClientBase for Client +where + Srv: ServiceT, +{ + fn handle(&self) -> &RclClient { + &self.handle + } + + fn process_requests(&self) -> Result<()> { + match self.handle.take_response::() { + Ok((req_header, res)) => { + let mut pendings = self.pendings.lock().unwrap(); + let sender = pendings + .remove(&req_header.sequence_number) + .ok_or_else(|| anyhow!("fail to find key in Client::process_requests"))?; + sender.send(unsafe { res.to_rust() }).map_err(|_| { + anyhow!("fail to send response via channel in Client::process_requests") + })?; + Ok(()) + } + Err(e) => match e.downcast_ref::() { + Some(RclRustError::RclClientTakeFailed(_)) => Ok(()), + _ => Err(e), + }, + } + } +} + +impl Drop for Client +where + Srv: ServiceT, +{ + fn drop(&mut self) { + if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } { + rclrust_error!( + Logger::new("rclrust"), + "Failed to clean up rcl client handle: {}", + e + ) + } + } +} diff --git a/rclrust/src/executor.rs b/rclrust/src/executor.rs index d7ddc24d..d05d5b34 100644 --- a/rclrust/src/executor.rs +++ b/rclrust/src/executor.rs @@ -51,7 +51,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> { } pub fn spin_some(&self, max_duration: Duration) -> Result<()> { - let (n_subscriptions, _, n_timers, _, n_services, _) = + let (n_subscriptions, _, n_timers, n_clients, n_services, _) = self.nodes.iter().filter_map(|n| n.upgrade()).fold( (0, 0, 0, 0, 0, 0), |(subs, guards, timers, clients, services, events), node| { @@ -59,7 +59,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> { subs + node.subscriptions.lock().unwrap().len(), guards, timers + node.timers.lock().unwrap().len(), - clients, + clients + node.clients.lock().unwrap().len(), services + node.services.lock().unwrap().len(), events, ) @@ -71,7 +71,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> { n_subscriptions, 0, n_timers, - 0, + n_clients, n_services, 0, )?; diff --git a/rclrust/src/lib.rs b/rclrust/src/lib.rs index 27247f35..1dc000ab 100644 --- a/rclrust/src/lib.rs +++ b/rclrust/src/lib.rs @@ -5,6 +5,7 @@ clippy::nursery )] +pub mod client; pub mod clock; pub mod context; pub mod error; diff --git a/rclrust/src/node.rs b/rclrust/src/node.rs index 6aeddb2a..4047f9e0 100644 --- a/rclrust/src/node.rs +++ b/rclrust/src/node.rs @@ -5,6 +5,7 @@ use std::time::Duration; use anyhow::{ensure, Context as _, Result}; use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT}; +use crate::client::{Client, ClientBase}; use crate::clock::ClockType; use crate::context::{Context, RclContext}; use crate::error::ToRclRustResult; @@ -102,6 +103,7 @@ pub struct Node<'ctx> { context: &'ctx Context, pub(crate) subscriptions: Mutex>>, pub(crate) timers: Mutex>>, + pub(crate) clients: Mutex>>, pub(crate) services: Mutex>>, } @@ -128,6 +130,7 @@ impl<'ctx> Node<'ctx> { context, subscriptions: Mutex::new(Vec::new()), timers: Mutex::new(Vec::new()), + clients: Mutex::new(Vec::new()), services: Mutex::new(Vec::new()), })) } @@ -275,6 +278,20 @@ impl<'ctx> Node<'ctx> { self.create_timer(period, ClockType::SteadyTime, callback) } + pub fn create_client( + &self, + service_name: &str, + qos: &QoSProfile, + ) -> Result>> + where + Srv: ServiceT + 'static, + { + let client = Client::::new(self, service_name, qos)?; + let weak = Arc::downgrade(&client) as Weak; + self.clients.lock().unwrap().push(weak); + Ok(client) + } + pub fn create_service( &self, service_name: &str, @@ -327,6 +344,13 @@ impl<'ctx> Node<'ctx> { .filter_map(|weak| weak.upgrade()) .try_for_each(|timer| wait_set.add_timer(&timer.handle().lock().unwrap()))?; + self.clients + .lock() + .unwrap() + .iter() + .filter_map(|weak| weak.upgrade()) + .try_for_each(|client| wait_set.add_client(client.handle()))?; + self.services .lock() .unwrap() @@ -352,6 +376,13 @@ impl<'ctx> Node<'ctx> { .filter_map(|weak| weak.upgrade()) .try_for_each(|timer| timer.call_callback())?; + self.clients + .lock() + .unwrap() + .iter() + .filter_map(|weak| weak.upgrade()) + .try_for_each(|client| client.process_requests())?; + self.services .lock() .unwrap() diff --git a/rclrust/src/wait_set.rs b/rclrust/src/wait_set.rs index 8960a543..3968e865 100644 --- a/rclrust/src/wait_set.rs +++ b/rclrust/src/wait_set.rs @@ -1,5 +1,6 @@ use anyhow::{Context, Result}; +use crate::client::RclClient; use crate::context::RclContext; use crate::error::ToRclRustResult; use crate::log::Logger; @@ -85,6 +86,14 @@ impl RclWaitSet { } } + pub fn add_client(&mut self, client: &RclClient) -> Result<()> { + unsafe { + rcl_sys::rcl_wait_set_add_client(&mut self.0, client.raw(), std::ptr::null_mut()) + .to_result() + .with_context(|| "rcl_sys::rcl_wait_set_add_client in RclWaitSet::add_client") + } + } + pub fn add_service(&mut self, service: &RclService) -> Result<()> { unsafe { rcl_sys::rcl_wait_set_add_service(&mut self.0, service.raw(), std::ptr::null_mut())