Browse Source

Update client API

tags/v0.2.5-alpha.2
Yuma Hiramatsu 4 years ago
parent
commit
feea852574
3 changed files with 73 additions and 22 deletions
  1. +8
    -16
      rclrust/examples/client.rs
  2. +63
    -6
      rclrust/src/client.rs
  3. +2
    -0
      rclrust/src/error.rs

+ 8
- 16
rclrust/examples/client.rs View File

@@ -1,5 +1,3 @@
use std::thread;

use anyhow::Result;
use rclrust::prelude::*;
use rclrust::qos::QoSProfile;
@@ -17,20 +15,14 @@ fn main() -> Result<()> {
}

let req = AddTwoInts_Request { a: 17, b: 25 };
let mut recv = client.send_request(&req)?;

thread::spawn(move || loop {
match recv.try_recv() {
Ok(Some(res)) => {
rclrust_info!(logger, "{} + {} = {}", req.a, req.b, res.sum);
return;
}
Ok(None) => continue,
Err(_) => rclrust_error!(logger, "Request is cancelled"),
}
});

rclrust::spin(&node)?;
let task = client.send_request(&req)?;
rclrust_info!(
logger,
"{} + {} = {}",
req.a,
req.b,
task.wait_response(&ctx)?.unwrap().sum
);

Ok(())
}

+ 63
- 6
rclrust/src/client.rs View File

@@ -3,21 +3,27 @@ use std::ffi::CString;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::os::raw::c_void;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, Weak};
use std::time::Duration;

use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, Context as _, Result};
use futures::channel::oneshot;
use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT};

use crate::context::Context;
use crate::error::{RclRustError, ToRclRustResult};
use crate::internal::ffi::*;
use crate::log::Logger;
use crate::node::{Node, RclNode};
use crate::qos::QoSProfile;
use crate::rclrust_error;
use crate::wait_set::RclWaitSet;

#[derive(Debug)]
pub(crate) struct RclClient(rcl_sys::rcl_client_t);

unsafe impl Send for RclClient {}

impl RclClient {
pub fn new<Srv>(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result<Self>
where
@@ -122,7 +128,7 @@ pub(crate) trait ClientBase {

pub struct Client<Srv>
where
Srv: ServiceT,
Srv: ServiceT + 'static,
{
handle: RclClient,
node_handle: Arc<Mutex<RclNode>>,
@@ -150,13 +156,18 @@ where
}))
}

pub fn send_request(&self, request: &Srv::Request) -> Result<oneshot::Receiver<Srv::Response>> {
pub fn send_request(
self: &Arc<Self>,
request: &Srv::Request,
) -> Result<ServiceResponseTask<Srv>> {
let id = self.handle.send_request::<Srv>(request)?;

let (sender, receiver) = oneshot::channel::<Srv::Response>();
self.pendings.lock().unwrap().insert(id, sender);

Ok(receiver)
Ok(ServiceResponseTask {
receiver,
client: Arc::downgrade(self) as Weak<dyn ClientBase>,
})
}

pub fn service_name(&self) -> String {
@@ -215,3 +226,49 @@ where
}
}
}

pub struct ServiceResponseTask<Srv>
where
Srv: ServiceT,
{
receiver: oneshot::Receiver<Srv::Response>,
client: Weak<dyn ClientBase>,
}

impl<Srv> ServiceResponseTask<Srv>
where
Srv: ServiceT,
{
pub fn wait_response(mut self, context: &Context) -> Result<Option<Srv::Response>> {
while context.is_valid() {
match self.spin_some(context, Duration::from_nanos(500)) {
Ok(_) => {}
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclTimeout(_)) => {}
_ => return Err(e),
},
}
match self.receiver.try_recv() {
Ok(Some(v)) => return Ok(Some(v)),
Ok(None) => {}
Err(_) => return Err(RclRustError::ServiceIsCanceled.into()),
}
}

Ok(None)
}

fn spin_some(&self, context: &Context, max_duration: Duration) -> Result<()> {
if let Some(client) = self.client.upgrade() {
let mut wait_set =
RclWaitSet::new(&mut context.handle().lock().unwrap(), 0, 0, 0, 1, 0, 0)?;

wait_set.clear()?;
wait_set.add_client(client.handle())?;
wait_set.wait(max_duration.as_nanos() as i64)?;
client.process_requests()?;
}

Ok(())
}
}

+ 2
- 0
rclrust/src/error.rs View File

@@ -135,6 +135,8 @@ pub enum RclRustError {

#[error("Runtime Error: {0}")]
RuntimeError(&'static str),
#[error("Service is canceled.")]
ServiceIsCanceled,
}

pub(crate) fn result_from_rcl_ret(ret: rcl_sys::rcl_ret_t) -> Result<()> {


Loading…
Cancel
Save