diff --git a/rclrust/examples/client.rs b/rclrust/examples/client.rs index ec2bedda..d448d11b 100644 --- a/rclrust/examples/client.rs +++ b/rclrust/examples/client.rs @@ -1,5 +1,3 @@ -use std::thread; - use anyhow::Result; use rclrust::prelude::*; use rclrust::qos::QoSProfile; @@ -17,20 +15,14 @@ fn main() -> Result<()> { } let req = AddTwoInts_Request { a: 17, b: 25 }; - let mut recv = client.send_request(&req)?; - - thread::spawn(move || loop { - match recv.try_recv() { - Ok(Some(res)) => { - rclrust_info!(logger, "{} + {} = {}", req.a, req.b, res.sum); - return; - } - Ok(None) => continue, - Err(_) => rclrust_error!(logger, "Request is cancelled"), - } - }); - - rclrust::spin(&node)?; + let task = client.send_request(&req)?; + rclrust_info!( + logger, + "{} + {} = {}", + req.a, + req.b, + task.wait_response(&ctx)?.unwrap().sum + ); Ok(()) } diff --git a/rclrust/src/client.rs b/rclrust/src/client.rs index e30bbd1b..ac99ae98 100644 --- a/rclrust/src/client.rs +++ b/rclrust/src/client.rs @@ -3,21 +3,27 @@ use std::ffi::CString; use std::marker::PhantomData; use std::mem::MaybeUninit; use std::os::raw::c_void; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Weak}; +use std::time::Duration; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Context as _, Result}; use futures::channel::oneshot; use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT}; +use crate::context::Context; use crate::error::{RclRustError, ToRclRustResult}; use crate::internal::ffi::*; use crate::log::Logger; use crate::node::{Node, RclNode}; use crate::qos::QoSProfile; use crate::rclrust_error; +use crate::wait_set::RclWaitSet; +#[derive(Debug)] pub(crate) struct RclClient(rcl_sys::rcl_client_t); +unsafe impl Send for RclClient {} + impl RclClient { pub fn new(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result where @@ -122,7 +128,7 @@ pub(crate) trait ClientBase { pub struct Client where - Srv: ServiceT, + Srv: ServiceT + 'static, { handle: RclClient, node_handle: Arc>, @@ -150,13 +156,18 @@ where })) } - pub fn send_request(&self, request: &Srv::Request) -> Result> { + pub fn send_request( + self: &Arc, + request: &Srv::Request, + ) -> Result> { let id = self.handle.send_request::(request)?; - let (sender, receiver) = oneshot::channel::(); self.pendings.lock().unwrap().insert(id, sender); - Ok(receiver) + Ok(ServiceResponseTask { + receiver, + client: Arc::downgrade(self) as Weak, + }) } pub fn service_name(&self) -> String { @@ -215,3 +226,49 @@ where } } } + +pub struct ServiceResponseTask +where + Srv: ServiceT, +{ + receiver: oneshot::Receiver, + client: Weak, +} + +impl ServiceResponseTask +where + Srv: ServiceT, +{ + pub fn wait_response(mut self, context: &Context) -> Result> { + while context.is_valid() { + match self.spin_some(context, Duration::from_nanos(500)) { + Ok(_) => {} + Err(e) => match e.downcast_ref::() { + Some(RclRustError::RclTimeout(_)) => {} + _ => return Err(e), + }, + } + match self.receiver.try_recv() { + Ok(Some(v)) => return Ok(Some(v)), + Ok(None) => {} + Err(_) => return Err(RclRustError::ServiceIsCanceled.into()), + } + } + + Ok(None) + } + + fn spin_some(&self, context: &Context, max_duration: Duration) -> Result<()> { + if let Some(client) = self.client.upgrade() { + let mut wait_set = + RclWaitSet::new(&mut context.handle().lock().unwrap(), 0, 0, 0, 1, 0, 0)?; + + wait_set.clear()?; + wait_set.add_client(client.handle())?; + wait_set.wait(max_duration.as_nanos() as i64)?; + client.process_requests()?; + } + + Ok(()) + } +} diff --git a/rclrust/src/error.rs b/rclrust/src/error.rs index ac1a19ce..a17c8539 100644 --- a/rclrust/src/error.rs +++ b/rclrust/src/error.rs @@ -135,6 +135,8 @@ pub enum RclRustError { #[error("Runtime Error: {0}")] RuntimeError(&'static str), + #[error("Service is canceled.")] + ServiceIsCanceled, } pub(crate) fn result_from_rcl_ret(ret: rcl_sys::rcl_ret_t) -> Result<()> {