Browse Source

Implement ros2 service client

tags/v0.2.5-alpha.2
Yuma Hiramatsu 4 years ago
parent
commit
5aabba27cc
8 changed files with 299 additions and 3 deletions
  1. +1
    -0
      rcl-sys/src/wrapper.h
  2. +1
    -0
      rclrust/Cargo.toml
  3. +36
    -0
      rclrust/examples/client.rs
  4. +217
    -0
      rclrust/src/client.rs
  5. +3
    -3
      rclrust/src/executor.rs
  6. +1
    -0
      rclrust/src/lib.rs
  7. +31
    -0
      rclrust/src/node.rs
  8. +9
    -0
      rclrust/src/wait_set.rs

+ 1
- 0
rcl-sys/src/wrapper.h View File

@@ -1,3 +1,4 @@
#include <rcl/graph.h>
#include <rcl/logging.h>
#include <rcl/logging_rosout.h>
#include <rcl/rcl.h>


+ 1
- 0
rclrust/Cargo.toml View File

@@ -14,6 +14,7 @@ categories = ["science::robotics"]

[dependencies]
anyhow = "1.0"
futures = "0.3"
once_cell = "1.8"
parking_lot = "0.11"
rcl-sys = { path = "../rcl-sys", version = "0.0.1" }


+ 36
- 0
rclrust/examples/client.rs View File

@@ -0,0 +1,36 @@
use std::thread;

use anyhow::Result;
use rclrust::prelude::*;
use rclrust::qos::QoSProfile;
use rclrust_msg::example_interfaces::srv::{AddTwoInts, AddTwoInts_Request};

fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_client")?;
let logger = node.logger();

let client = node.create_client::<AddTwoInts>("add_ints", &QoSProfile::default())?;

while !client.service_is_available()? {
std::thread::sleep(std::time::Duration::from_secs_f32(0.1));
}

let req = AddTwoInts_Request { a: 17, b: 25 };
let mut recv = client.send_request(&req)?;

thread::spawn(move || loop {
match recv.try_recv() {
Ok(Some(res)) => {
rclrust_info!(logger, "{} + {} = {}", req.a, req.b, res.sum);
return;
}
Ok(None) => continue,
Err(_) => rclrust_error!(logger, "Request is cancelled"),
}
});

rclrust::spin(&node)?;

Ok(())
}

+ 217
- 0
rclrust/src/client.rs View File

@@ -0,0 +1,217 @@
use std::collections::HashMap;
use std::ffi::CString;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::os::raw::c_void;
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Context, Result};
use futures::channel::oneshot;
use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT};

use crate::error::{RclRustError, ToRclRustResult};
use crate::internal::ffi::*;
use crate::log::Logger;
use crate::node::{Node, RclNode};
use crate::qos::QoSProfile;
use crate::rclrust_error;

pub(crate) struct RclClient(rcl_sys::rcl_client_t);

impl RclClient {
pub fn new<Srv>(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result<Self>
where
Srv: ServiceT,
{
let mut client = unsafe { rcl_sys::rcl_get_zero_initialized_client() };
let service_c_str = CString::new(service_name)?;
let mut options = unsafe { rcl_sys::rcl_client_get_default_options() };
options.qos = qos.into();

unsafe {
rcl_sys::rcl_client_init(
&mut client,
node.raw(),
Srv::type_support() as *const _,
service_c_str.as_ptr(),
&options,
)
.to_result()
.with_context(|| "rcl_sys::rcl_client_init in RclClient::new")?;
}

Ok(Self(client))
}

pub const fn raw(&self) -> &rcl_sys::rcl_client_t {
&self.0
}

unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> {
rcl_sys::rcl_client_fini(&mut self.0, node.raw_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_client_fini in RclClient::fini")
}

fn send_request<Srv>(&self, request: &Srv::Request) -> Result<i64>
where
Srv: ServiceT,
{
let mut sequence_number = 0;
unsafe {
rcl_sys::rcl_send_request(
&self.0,
&request.to_raw_ref() as *const _ as *const c_void,
&mut sequence_number,
)
.to_result()
.with_context(|| "rcl_sys::rcl_send_request in RclClient::send_request")?;
}
Ok(sequence_number)
}

fn take_response<Srv>(
&self,
) -> Result<(rcl_sys::rmw_request_id_t, <Srv::Response as MessageT>::Raw)>
where
Srv: ServiceT,
{
let mut request_header = MaybeUninit::uninit();
let mut response = <Srv::Response as MessageT>::Raw::default();
unsafe {
rcl_sys::rcl_take_response(
&self.0,
request_header.as_mut_ptr(),
&mut response as *mut _ as *mut c_void,
)
.to_result()
.with_context(|| "rcl_sys::rcl_take_response in RclClient::take_response")?;
}

Ok((unsafe { request_header.assume_init() }, response))
}

fn service_name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_client_get_service_name(&self.0);
String::from_c_char(name).unwrap_or_default()
}
}

fn service_is_available(&self, node: &RclNode) -> Result<bool> {
let mut is_available = false;
unsafe {
rcl_sys::rcl_service_server_is_available(node.raw(), &self.0, &mut is_available)
.to_result()
.with_context(|| {
"rcl_sys::rcl_service_server_is_available in RclClient::service_is_available"
})?;
}
Ok(is_available)
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_client_is_valid(&self.0) }
}
}

pub(crate) trait ClientBase {
fn handle(&self) -> &RclClient;
fn process_requests(&self) -> Result<()>;
}

pub struct Client<Srv>
where
Srv: ServiceT,
{
handle: RclClient,
node_handle: Arc<Mutex<RclNode>>,
pendings: Mutex<HashMap<i64, oneshot::Sender<Srv::Response>>>,
_phantom: PhantomData<Srv>,
}

impl<Srv> Client<Srv>
where
Srv: ServiceT,
{
pub(crate) fn new<'ctx>(
node: &Node<'ctx>,
service_name: &str,
qos: &QoSProfile,
) -> Result<Arc<Self>> {
let node_handle = node.clone_handle();
let handle = RclClient::new::<Srv>(&node_handle.lock().unwrap(), service_name, qos)?;

Ok(Arc::new(Self {
handle,
node_handle,
pendings: Mutex::new(HashMap::new()),
_phantom: Default::default(),
}))
}

pub fn send_request(&self, request: &Srv::Request) -> Result<oneshot::Receiver<Srv::Response>> {
let id = self.handle.send_request::<Srv>(request)?;

let (sender, receiver) = oneshot::channel::<Srv::Response>();
self.pendings.lock().unwrap().insert(id, sender);

Ok(receiver)
}

pub fn service_name(&self) -> String {
self.handle.service_name()
}

pub fn service_is_available(&self) -> Result<bool> {
self.handle
.service_is_available(&self.node_handle.lock().unwrap())
}

pub fn is_valid(&self) -> bool {
self.handle.is_valid()
}
}

impl<Srv> ClientBase for Client<Srv>
where
Srv: ServiceT,
{
fn handle(&self) -> &RclClient {
&self.handle
}

fn process_requests(&self) -> Result<()> {
match self.handle.take_response::<Srv>() {
Ok((req_header, res)) => {
let mut pendings = self.pendings.lock().unwrap();
let sender = pendings
.remove(&req_header.sequence_number)
.ok_or_else(|| anyhow!("fail to find key in Client::process_requests"))?;
sender.send(unsafe { res.to_rust() }).map_err(|_| {
anyhow!("fail to send response via channel in Client::process_requests")
})?;
Ok(())
}
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclClientTakeFailed(_)) => Ok(()),
_ => Err(e),
},
}
}
}

impl<Srv> Drop for Client<Srv>
where
Srv: ServiceT,
{
fn drop(&mut self) {
if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl client handle: {}",
e
)
}
}
}

+ 3
- 3
rclrust/src/executor.rs View File

@@ -51,7 +51,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
}

pub fn spin_some(&self, max_duration: Duration) -> Result<()> {
let (n_subscriptions, _, n_timers, _, n_services, _) =
let (n_subscriptions, _, n_timers, n_clients, n_services, _) =
self.nodes.iter().filter_map(|n| n.upgrade()).fold(
(0, 0, 0, 0, 0, 0),
|(subs, guards, timers, clients, services, events), node| {
@@ -59,7 +59,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
subs + node.subscriptions.lock().unwrap().len(),
guards,
timers + node.timers.lock().unwrap().len(),
clients,
clients + node.clients.lock().unwrap().len(),
services + node.services.lock().unwrap().len(),
events,
)
@@ -71,7 +71,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
n_subscriptions,
0,
n_timers,
0,
n_clients,
n_services,
0,
)?;


+ 1
- 0
rclrust/src/lib.rs View File

@@ -5,6 +5,7 @@
clippy::nursery
)]

pub mod client;
pub mod clock;
pub mod context;
pub mod error;


+ 31
- 0
rclrust/src/node.rs View File

@@ -5,6 +5,7 @@ use std::time::Duration;
use anyhow::{ensure, Context as _, Result};
use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT};

use crate::client::{Client, ClientBase};
use crate::clock::ClockType;
use crate::context::{Context, RclContext};
use crate::error::ToRclRustResult;
@@ -102,6 +103,7 @@ pub struct Node<'ctx> {
context: &'ctx Context,
pub(crate) subscriptions: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
pub(crate) timers: Mutex<Vec<Weak<Timer>>>,
pub(crate) clients: Mutex<Vec<Weak<dyn ClientBase>>>,
pub(crate) services: Mutex<Vec<Weak<dyn ServiceBase>>>,
}

@@ -128,6 +130,7 @@ impl<'ctx> Node<'ctx> {
context,
subscriptions: Mutex::new(Vec::new()),
timers: Mutex::new(Vec::new()),
clients: Mutex::new(Vec::new()),
services: Mutex::new(Vec::new()),
}))
}
@@ -275,6 +278,20 @@ impl<'ctx> Node<'ctx> {
self.create_timer(period, ClockType::SteadyTime, callback)
}

pub fn create_client<Srv>(
&self,
service_name: &str,
qos: &QoSProfile,
) -> Result<Arc<Client<Srv>>>
where
Srv: ServiceT + 'static,
{
let client = Client::<Srv>::new(self, service_name, qos)?;
let weak = Arc::downgrade(&client) as Weak<dyn ClientBase>;
self.clients.lock().unwrap().push(weak);
Ok(client)
}

pub fn create_service<Srv, F>(
&self,
service_name: &str,
@@ -327,6 +344,13 @@ impl<'ctx> Node<'ctx> {
.filter_map(|weak| weak.upgrade())
.try_for_each(|timer| wait_set.add_timer(&timer.handle().lock().unwrap()))?;

self.clients
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|client| wait_set.add_client(client.handle()))?;

self.services
.lock()
.unwrap()
@@ -352,6 +376,13 @@ impl<'ctx> Node<'ctx> {
.filter_map(|weak| weak.upgrade())
.try_for_each(|timer| timer.call_callback())?;

self.clients
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|client| client.process_requests())?;

self.services
.lock()
.unwrap()


+ 9
- 0
rclrust/src/wait_set.rs View File

@@ -1,5 +1,6 @@
use anyhow::{Context, Result};

use crate::client::RclClient;
use crate::context::RclContext;
use crate::error::ToRclRustResult;
use crate::log::Logger;
@@ -85,6 +86,14 @@ impl RclWaitSet {
}
}

pub fn add_client(&mut self, client: &RclClient) -> Result<()> {
unsafe {
rcl_sys::rcl_wait_set_add_client(&mut self.0, client.raw(), std::ptr::null_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_wait_set_add_client in RclWaitSet::add_client")
}
}

pub fn add_service(&mut self, service: &RclService) -> Result<()> {
unsafe {
rcl_sys::rcl_wait_set_add_service(&mut self.0, service.raw(), std::ptr::null_mut())


Loading…
Cancel
Save