Browse Source

Implement ros2 service

tags/v0.2.5-alpha.2
Yuma Hiramatsu 4 years ago
parent
commit
e997fa18ab
6 changed files with 268 additions and 4 deletions
  1. +18
    -0
      rclrust/examples/service.rs
  2. +3
    -3
      rclrust/src/executor.rs
  3. +1
    -0
      rclrust/src/lib.rs
  4. +51
    -1
      rclrust/src/node.rs
  5. +186
    -0
      rclrust/src/service.rs
  6. +9
    -0
      rclrust/src/wait_set.rs

+ 18
- 0
rclrust/examples/service.rs View File

@@ -0,0 +1,18 @@
use anyhow::Result;
use rclrust::qos::QoSProfile;
use rclrust_msg::example_interfaces::srv::{AddTwoInts, AddTwoInts_Response};

fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_service")?;

let _service = node.create_service::<AddTwoInts, _>(
"add_ints",
move |req| AddTwoInts_Response { sum: req.a + req.b },
&QoSProfile::default(),
)?;

rclrust::spin(&node)?;

Ok(())
}

+ 3
- 3
rclrust/src/executor.rs View File

@@ -51,7 +51,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
} }


pub fn spin_some(&self, max_duration: Duration) -> Result<()> { pub fn spin_some(&self, max_duration: Duration) -> Result<()> {
let (n_subscriptions, _, n_timers, _, _, _) =
let (n_subscriptions, _, n_timers, _, n_services, _) =
self.nodes.iter().filter_map(|n| n.upgrade()).fold( self.nodes.iter().filter_map(|n| n.upgrade()).fold(
(0, 0, 0, 0, 0, 0), (0, 0, 0, 0, 0, 0),
|(subs, guards, timers, clients, services, events), node| { |(subs, guards, timers, clients, services, events), node| {
@@ -60,7 +60,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
guards, guards,
timers + node.timers.lock().unwrap().len(), timers + node.timers.lock().unwrap().len(),
clients, clients,
services,
services + node.services.lock().unwrap().len(),
events, events,
) )
}, },
@@ -72,7 +72,7 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
0, 0,
n_timers, n_timers,
0, 0,
0,
n_services,
0, 0,
)?; )?;




+ 1
- 0
rclrust/src/lib.rs View File

@@ -17,6 +17,7 @@ pub mod node_options;
pub mod prelude; pub mod prelude;
pub mod publisher; pub mod publisher;
pub mod qos; pub mod qos;
pub mod service;
pub mod subscription; pub mod subscription;
pub mod time; pub mod time;
pub mod timer; pub mod timer;


+ 51
- 1
rclrust/src/node.rs View File

@@ -3,7 +3,7 @@ use std::sync::{Arc, Mutex, Weak};
use std::time::Duration; use std::time::Duration;


use anyhow::{ensure, Context as _, Result}; use anyhow::{ensure, Context as _, Result};
use rclrust_msg::_core::MessageT;
use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT};


use crate::clock::ClockType; use crate::clock::ClockType;
use crate::context::{Context, RclContext}; use crate::context::{Context, RclContext};
@@ -14,6 +14,7 @@ use crate::node_options::NodeOptions;
use crate::publisher::Publisher; use crate::publisher::Publisher;
use crate::qos::QoSProfile; use crate::qos::QoSProfile;
use crate::rclrust_error; use crate::rclrust_error;
use crate::service::{Service, ServiceBase};
use crate::subscription::{Subscription, SubscriptionBase}; use crate::subscription::{Subscription, SubscriptionBase};
use crate::timer::Timer; use crate::timer::Timer;
use crate::wait_set::RclWaitSet; use crate::wait_set::RclWaitSet;
@@ -101,6 +102,7 @@ pub struct Node<'ctx> {
context: &'ctx Context, context: &'ctx Context,
pub(crate) subscriptions: Mutex<Vec<Weak<dyn SubscriptionBase>>>, pub(crate) subscriptions: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
pub(crate) timers: Mutex<Vec<Weak<Timer>>>, pub(crate) timers: Mutex<Vec<Weak<Timer>>>,
pub(crate) services: Mutex<Vec<Weak<dyn ServiceBase>>>,
} }


impl<'ctx> Node<'ctx> { impl<'ctx> Node<'ctx> {
@@ -126,6 +128,7 @@ impl<'ctx> Node<'ctx> {
context, context,
subscriptions: Mutex::new(Vec::new()), subscriptions: Mutex::new(Vec::new()),
timers: Mutex::new(Vec::new()), timers: Mutex::new(Vec::new()),
services: Mutex::new(Vec::new()),
})) }))
} }


@@ -272,6 +275,43 @@ impl<'ctx> Node<'ctx> {
self.create_timer(period, ClockType::SteadyTime, callback) self.create_timer(period, ClockType::SteadyTime, callback)
} }


pub fn create_service<Srv, F>(
&self,
service_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Service<Srv>>>
where
Srv: ServiceT + 'static,
F: Fn(Srv::Request) -> Srv::Response + 'static,
{
let srv = Service::<Srv>::new(
self,
service_name,
move |req_raw| (callback)(unsafe { req_raw.to_rust() }),
qos,
)?;
let weak_srv = Arc::downgrade(&srv) as Weak<dyn ServiceBase>;
self.services.lock().unwrap().push(weak_srv);
Ok(srv)
}

pub fn create_raw_service<Srv, F>(
&self,
service_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Service<Srv>>>
where
Srv: ServiceT + 'static,
F: Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response + 'static,
{
let srv = Service::new(self, service_name, callback, qos)?;
let weak_srv = Arc::downgrade(&srv) as Weak<dyn ServiceBase>;
self.services.lock().unwrap().push(weak_srv);
Ok(srv)
}

pub(crate) fn add_to_wait_set(&self, wait_set: &mut RclWaitSet) -> Result<()> { pub(crate) fn add_to_wait_set(&self, wait_set: &mut RclWaitSet) -> Result<()> {
for subscription in self.subscriptions.lock().unwrap().iter() { for subscription in self.subscriptions.lock().unwrap().iter() {
if let Some(subscription) = subscription.upgrade() { if let Some(subscription) = subscription.upgrade() {
@@ -283,6 +323,11 @@ impl<'ctx> Node<'ctx> {
wait_set.add_timer(&timer.handle().lock().unwrap())?; wait_set.add_timer(&timer.handle().lock().unwrap())?;
} }
} }
for service in self.services.lock().unwrap().iter() {
if let Some(service) = service.upgrade() {
wait_set.add_service(service.handle())?;
}
}
Ok(()) Ok(())
} }


@@ -297,6 +342,11 @@ impl<'ctx> Node<'ctx> {
timer.call_callback()?; timer.call_callback()?;
} }
} }
for service in self.services.lock().unwrap().iter() {
if let Some(service) = service.upgrade() {
service.call_callback()?;
}
}


Ok(()) Ok(())
} }


+ 186
- 0
rclrust/src/service.rs View File

@@ -0,0 +1,186 @@
use std::ffi::{c_void, CString};
use std::mem::MaybeUninit;
use std::sync::{Arc, Mutex};

use anyhow::{Context, Result};
use rclrust_msg::_core::{MessageT, ServiceT};

use crate::error::{RclRustError, ToRclRustResult};
use crate::internal::ffi::*;
use crate::log::Logger;
use crate::node::{Node, RclNode};
use crate::qos::QoSProfile;
use crate::rclrust_error;

pub struct RclService(rcl_sys::rcl_service_t);

unsafe impl Send for RclService {}

impl RclService {
fn new<Srv>(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result<Self>
where
Srv: ServiceT,
{
let mut service = unsafe { rcl_sys::rcl_get_zero_initialized_service() };
let service_c_str = CString::new(service_name)?;
let mut options = unsafe { rcl_sys::rcl_service_get_default_options() };
options.qos = qos.into();

unsafe {
rcl_sys::rcl_service_init(
&mut service,
node.raw(),
Srv::type_support() as *const _,
service_c_str.as_ptr(),
&options,
)
.to_result()?;
}

Ok(Self(service))
}

pub const fn raw(&self) -> &rcl_sys::rcl_service_t {
&self.0
}

unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> {
rcl_sys::rcl_service_fini(&mut self.0, node.raw_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_service_fini in RclService::fini")
}

fn take_request<Srv>(
&self,
) -> Result<(rcl_sys::rmw_request_id_t, <Srv::Request as MessageT>::Raw)>
where
Srv: ServiceT,
{
let mut request_header = MaybeUninit::uninit();
let mut request = <Srv::Request as MessageT>::Raw::default();
unsafe {
rcl_sys::rcl_take_request(
&self.0,
request_header.as_mut_ptr(),
&mut request as *mut _ as *mut c_void,
)
.to_result()
.with_context(|| "rcl_sys::rcl_take_request in RclService::take_request")?;
}

Ok((unsafe { request_header.assume_init() }, request))
}

fn send_response<Srv>(
&self,
response_header: &mut rcl_sys::rmw_request_id_t,
response: Srv::Response,
) -> Result<()>
where
Srv: ServiceT,
{
unsafe {
rcl_sys::rcl_send_response(
&self.0,
response_header,
&response.to_raw_ref() as *const _ as *mut c_void,
)
.to_result()
.with_context(|| "rcl_sys::rcl_send_response in RclService::send_response")
}
}

fn service_name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_service_get_service_name(&self.0);
String::from_c_char(name).unwrap_or_default()
}
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_service_is_valid(&self.0) }
}
}

pub(crate) trait ServiceBase {
fn handle(&self) -> &RclService;
fn call_callback(&self) -> Result<()>;
}

pub struct Service<Srv>
where
Srv: ServiceT,
{
handle: RclService,
callback: Box<dyn Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response>,
node_handle: Arc<Mutex<RclNode>>,
}

impl<Srv> Service<Srv>
where
Srv: ServiceT,
{
pub(crate) fn new<'ctx, F>(
node: &Node<'ctx>,
service_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Self>>
where
F: Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response + 'static,
{
let node_handle = node.clone_handle();
let handle = RclService::new::<Srv>(&node_handle.lock().unwrap(), service_name, qos)?;

Ok(Arc::new(Self {
handle,
callback: Box::new(callback),
node_handle,
}))
}

pub fn service_name(&self) -> String {
self.handle.service_name()
}

pub fn is_valid(&self) -> bool {
self.handle.is_valid()
}
}

impl<Srv> ServiceBase for Service<Srv>
where
Srv: ServiceT,
{
fn handle(&self) -> &RclService {
&self.handle
}

fn call_callback(&self) -> Result<()> {
match self.handle.take_request::<Srv>() {
Ok((mut req_header, req)) => {
let res = (self.callback)(&req);
self.handle.send_response::<Srv>(&mut req_header, res)
}
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclSubscriptionTakeFailed(_)) => Ok(()),
_ => Err(e),
},
}
}
}

impl<Srv> Drop for Service<Srv>
where
Srv: ServiceT,
{
fn drop(&mut self) {
if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl service handle: {}",
e
)
}
}
}

+ 9
- 0
rclrust/src/wait_set.rs View File

@@ -4,6 +4,7 @@ use crate::context::RclContext;
use crate::error::ToRclRustResult; use crate::error::ToRclRustResult;
use crate::log::Logger; use crate::log::Logger;
use crate::rclrust_error; use crate::rclrust_error;
use crate::service::RclService;
use crate::subscription::RclSubscription; use crate::subscription::RclSubscription;
use crate::timer::RclTimer; use crate::timer::RclTimer;


@@ -83,6 +84,14 @@ impl RclWaitSet {
.with_context(|| "rcl_sys::rcl_wait_set_add_timer in RclWaitSet::add_timer") .with_context(|| "rcl_sys::rcl_wait_set_add_timer in RclWaitSet::add_timer")
} }
} }

pub fn add_service(&mut self, service: &RclService) -> Result<()> {
unsafe {
rcl_sys::rcl_wait_set_add_service(&mut self.0, service.raw(), std::ptr::null_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_wait_set_add_service in RclWaitSet::add_service")
}
}
} }


impl Drop for RclWaitSet { impl Drop for RclWaitSet {


Loading…
Cancel
Save