Browse Source

Desynchronize

tags/v0.2.5-alpha.2
Yuma Hiramatsu 4 years ago
parent
commit
fb5a76d38c
28 changed files with 870 additions and 588 deletions
  1. +3
    -0
      rclrust-msg-gen/src/types/message.rs
  2. +9
    -8
      rclrust-msg/src/_core/string.rs
  3. +4
    -4
      rclrust-msg/src/_core/traits.rs
  4. +5
    -0
      rclrust/Cargo.toml
  5. +7
    -15
      rclrust/examples/client.rs
  6. +0
    -2
      rclrust/examples/parameters.rs
  7. +1
    -1
      rclrust/examples/publisher.rs
  8. +5
    -5
      rclrust/examples/raw_subscription.rs
  9. +4
    -4
      rclrust/examples/service.rs
  10. +7
    -5
      rclrust/examples/subscription.rs
  11. +11
    -8
      rclrust/examples/timer.rs
  12. +149
    -123
      rclrust/src/client.rs
  13. +12
    -16
      rclrust/src/context.rs
  14. +2
    -0
      rclrust/src/error.rs
  15. +91
    -60
      rclrust/src/executor.rs
  16. +2
    -0
      rclrust/src/internal/async_api/mod.rs
  17. +1
    -0
      rclrust/src/internal/async_api/tokio.rs
  18. +2
    -0
      rclrust/src/internal/mod.rs
  19. +56
    -0
      rclrust/src/internal/worker.rs
  20. +3
    -4
      rclrust/src/lib.rs
  21. +121
    -156
      rclrust/src/node.rs
  22. +11
    -3
      rclrust/src/parameter/parameters.rs
  23. +35
    -37
      rclrust/src/publisher.rs
  24. +124
    -55
      rclrust/src/service.rs
  25. +114
    -57
      rclrust/src/subscription.rs
  26. +61
    -17
      rclrust/src/timer.rs
  27. +5
    -7
      rclrust/src/utility.rs
  28. +25
    -1
      rclrust/src/wait_set.rs

+ 3
- 0
rclrust-msg-gen/src/types/message.rs View File

@@ -267,6 +267,9 @@ impl Message {
}
}

unsafe impl std::marker::Send for #raw_type {}
unsafe impl std::marker::Sync for #raw_type {}

impl std::ops::Drop for #raw_type {
fn drop(&mut self) {
unsafe {


+ 9
- 8
rclrust-msg/src/_core/string.rs View File

@@ -26,20 +26,21 @@ impl FFIString {
pub const fn is_empty(&self) -> bool {
self.len() == 0
}

pub unsafe fn to_str(&self) -> Result<&str, std::str::Utf8Error> {
if self.is_empty() {
Ok("")
} else {
CStr::from_ptr(self.data).to_str()
}
}
}

impl FFIToRust for FFIString {
type Target = String;

unsafe fn to_rust(&self) -> Self::Target {
if self.is_empty() {
"".to_string()
} else {
CStr::from_ptr(self.data)
.to_str()
.expect("CStr::to_str failed")
.to_string()
}
self.to_str().expect("CStr::to_str failed").to_string()
}
}



+ 4
- 4
rclrust-msg/src/_core/traits.rs View File

@@ -3,8 +3,8 @@ use std::{convert::TryInto, os::raw::c_void};
use array_init::array_init;
use widestring::U16String;

pub trait MessageT: Default {
type Raw: FFIToRust<Target = Self> + Default + Drop;
pub trait MessageT: Default + Send + Sync {
type Raw: FFIToRust<Target = Self> + Default + Drop + Send + Sync;
type RawRef: FFIFromRust<From = Self>;

fn type_support() -> *const c_void;
@@ -18,14 +18,14 @@ pub trait MessageT: Default {
}
}

pub trait ServiceT {
pub trait ServiceT: Send {
type Request: MessageT;
type Response: MessageT;

fn type_support() -> *const c_void;
}

pub trait ActionT {
pub trait ActionT: Send {
type Goal: MessageT;
type Result: MessageT;
type Feedback: MessageT;


+ 5
- 0
rclrust/Cargo.toml View File

@@ -16,9 +16,14 @@ workspace = ".."

[dependencies]
anyhow = "1.0"
derive-new = "0.5"
futures = "0.3"
once_cell = "1.8"
parking_lot = "0.11"
rcl-sys = { path = "../rcl-sys", version = "0.1.0" }
rclrust-msg = { path = "../rclrust-msg", version = "0.1.0" }
thiserror = "1.0"
tokio = { version = "1", features = ["rt"] }

[dev-dependencies]
tokio = { version = "1", features = ["full"] }

+ 7
- 15
rclrust/examples/client.rs View File

@@ -2,26 +2,18 @@ use anyhow::Result;
use rclrust::{qos::QoSProfile, rclrust_info};
use rclrust_msg::example_interfaces::srv::{AddTwoInts, AddTwoInts_Request};

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_client")?;
let mut node = ctx.create_node("examples_client")?;
let logger = node.logger();

let client = node.create_client::<AddTwoInts>("add_ints", &QoSProfile::default())?;

while !client.service_is_available()? {
std::thread::sleep(std::time::Duration::from_secs_f32(0.1));
}
let mut client = node.create_client::<AddTwoInts>("add_ints", &QoSProfile::default())?;
client.wait_service()?;

let req = AddTwoInts_Request { a: 17, b: 25 };
let task = client.send_request(&req)?;
rclrust_info!(
logger,
"{} + {} = {}",
req.a,
req.b,
task.wait_response(&ctx)?.unwrap().sum
);
let task = client.send_request(&req).await?;
rclrust_info!(logger, "{} + {} = {}", req.a, req.b, task.sum);

Ok(())
}

+ 0
- 2
rclrust/examples/parameters.rs View File

@@ -14,7 +14,5 @@ fn main() -> Result<()> {

rclrust_info!(logger, "param0 = {}", node.get_parameter("param0").unwrap());

std::thread::sleep(std::time::Duration::from_secs(20));

Ok(())
}

+ 1
- 1
rclrust/examples/publisher.rs View File

@@ -15,7 +15,7 @@ fn main() -> Result<()> {
data: format!("hello {}", count),
})?;
rclrust_info!(logger, "hello {}", count);
sleep(Duration::from_secs_f32(0.1));
sleep(Duration::from_millis(100));
}

Ok(())


+ 5
- 5
rclrust/examples/raw_subscription.rs View File

@@ -2,20 +2,20 @@ use anyhow::Result;
use rclrust::{qos::QoSProfile, rclrust_info};
use rclrust_msg::{_core::FFIToRust, std_msgs::msg::String as String_};

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_subscriber")?;
let mut node = ctx.create_node("examples_subscriber")?;
let logger = node.logger();

let _subscription = node.create_raw_subscription::<String_, _>(
"message",
move |msg| {
rclrust_info!(logger, "{}", unsafe { msg.to_rust() }.data);
rclrust_info!(logger, "{}", unsafe { msg.data.to_rust() });
},
&QoSProfile::default(),
)?;

rclrust::spin(&node)?;

node.wait();
Ok(())
}

+ 4
- 4
rclrust/examples/service.rs View File

@@ -2,9 +2,10 @@ use anyhow::Result;
use rclrust::qos::QoSProfile;
use rclrust_msg::example_interfaces::srv::{AddTwoInts, AddTwoInts_Response};

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_service")?;
let mut node = ctx.create_node("examples_service")?;

let _service = node.create_service::<AddTwoInts, _>(
"add_ints",
@@ -12,7 +13,6 @@ fn main() -> Result<()> {
&QoSProfile::default(),
)?;

rclrust::spin(&node)?;

node.wait();
Ok(())
}

+ 7
- 5
rclrust/examples/subscription.rs View File

@@ -1,21 +1,23 @@
use std::sync::Arc;

use anyhow::Result;
use rclrust::{qos::QoSProfile, rclrust_info};
use rclrust_msg::std_msgs::msg::String as String_;

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_subscriber")?;
let mut node = ctx.create_node("examples_subscriber")?;
let logger = node.logger();

let _subscription = node.create_subscription(
"message",
move |msg: String_| {
move |msg: Arc<String_>| {
rclrust_info!(logger, "{}", msg.data);
},
&QoSProfile::default(),
)?;

rclrust::spin(&node)?;

node.wait();
Ok(())
}

+ 11
- 8
rclrust/examples/timer.rs View File

@@ -1,19 +1,22 @@
use std::{cell::Cell, time::Duration};
use std::{
sync::atomic::{AtomicU32, Ordering},
time::Duration,
};

use anyhow::Result;
use rclrust::rclrust_info;

fn main() -> Result<()> {
#[tokio::main]
async fn main() -> Result<()> {
let ctx = rclrust::init()?;
let node = ctx.create_node("examples_timer")?;
let mut node = ctx.create_node("examples_timer")?;
let logger = node.logger();
let count = Cell::new(0);
let count = AtomicU32::default();
let _timer = node.create_wall_timer(Duration::from_millis(100), move || {
rclrust_info!(logger, "count: {}", count.get());
count.set(count.get() + 1);
rclrust_info!(logger, "count: {}", count.load(Ordering::Relaxed));
count.fetch_add(1, Ordering::Relaxed);
})?;

rclrust::spin(&node)?;

node.wait();
Ok(())
}

+ 149
- 123
rclrust/src/client.rs View File

@@ -1,35 +1,41 @@
use std::{
collections::HashMap,
ffi::CString,
marker::PhantomData,
fmt,
mem::MaybeUninit,
os::raw::c_void,
sync::{Arc, Mutex, Weak},
sync::{Arc, Mutex},
thread,
time::Duration,
};

use anyhow::{anyhow, Context as _, Result};
use futures::channel::oneshot;
use anyhow::{Context as _, Result};
use futures::channel::{mpsc, oneshot};
use rclrust_msg::_core::{FFIToRust, MessageT, ServiceT};

use crate::{
context::Context,
error::{RclRustError, ToRclRustResult},
internal::ffi::*,
internal::{
ffi::*,
worker::{ReceiveWorker, WorkerMessage},
},
log::Logger,
node::{Node, RclNode},
qos::QoSProfile,
rclrust_error,
wait_set::RclWaitSet,
};

#[derive(Debug)]
pub(crate) struct RclClient(Box<rcl_sys::rcl_client_t>);
pub(crate) struct RclClient {
r#impl: Box<rcl_sys::rcl_client_t>,
node: Arc<Mutex<RclNode>>,
}

unsafe impl Send for RclClient {}
unsafe impl Sync for RclClient {}

impl RclClient {
pub fn new<Srv>(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result<Self>
pub fn new<Srv>(node: Arc<Mutex<RclNode>>, service_name: &str, qos: &QoSProfile) -> Result<Self>
where
Srv: ServiceT,
{
@@ -41,7 +47,7 @@ impl RclClient {
unsafe {
rcl_sys::rcl_client_init(
&mut *client,
node.raw(),
node.lock().unwrap().raw(),
Srv::type_support() as *const _,
service_c_str.as_ptr(),
&options,
@@ -50,17 +56,14 @@ impl RclClient {
.with_context(|| "rcl_sys::rcl_client_init in RclClient::new")?;
}

Ok(Self(client))
Ok(Self {
r#impl: client,
node,
})
}

pub const fn raw(&self) -> &rcl_sys::rcl_client_t {
&self.0
}

unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> {
rcl_sys::rcl_client_fini(&mut *self.0, node.raw_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_client_fini in RclClient::fini")
&self.r#impl
}

fn send_request<Srv>(&self, request: &Srv::Request) -> Result<i64>
@@ -70,7 +73,7 @@ impl RclClient {
let mut sequence_number = 0;
unsafe {
rcl_sys::rcl_send_request(
&*self.0,
&*self.r#impl,
&request.to_raw_ref() as *const _ as *const c_void,
&mut sequence_number,
)
@@ -90,7 +93,7 @@ impl RclClient {
let mut response = <Srv::Response as MessageT>::Raw::default();
unsafe {
rcl_sys::rcl_take_response(
&*self.0,
&*self.r#impl,
request_header.as_mut_ptr(),
&mut response as *mut _ as *mut c_void,
)
@@ -103,84 +106,138 @@ impl RclClient {

fn service_name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_client_get_service_name(&*self.0);
String::from_c_char(name).unwrap_or_default()
let name = rcl_sys::rcl_client_get_service_name(&*self.r#impl);
String::from_c_char(name).unwrap()
}
}

fn service_is_available(&self, node: &RclNode) -> Result<bool> {
fn service_is_available(&self) -> Result<bool> {
let mut is_available = false;
unsafe {
rcl_sys::rcl_service_server_is_available(node.raw(), &*self.0, &mut is_available)
.to_result()
.with_context(|| {
"rcl_sys::rcl_service_server_is_available in RclClient::service_is_available"
})?;
rcl_sys::rcl_service_server_is_available(
self.node.lock().unwrap().raw(),
&*self.r#impl,
&mut is_available,
)
.to_result()
.with_context(|| {
"rcl_sys::rcl_service_server_is_available in RclClient::service_is_available"
})?;
}
Ok(is_available)
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_client_is_valid(&*self.0) }
unsafe { rcl_sys::rcl_client_is_valid(&*self.r#impl) }
}
}

pub(crate) trait ClientBase {
fn handle(&self) -> &RclClient;
fn process_requests(&self) -> Result<()>;
impl Drop for RclClient {
fn drop(&mut self) {
if let Err(e) = unsafe {
rcl_sys::rcl_client_fini(&mut *self.r#impl, self.node.lock().unwrap().raw_mut())
.to_result()
} {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl client handle: {}",
e
)
}
}
}

type ChannelMessage<Srv> = (
rcl_sys::rmw_request_id_t,
<<Srv as ServiceT>::Response as MessageT>::Raw,
);

pub struct Client<Srv>
where
Srv: ServiceT + 'static,
{
handle: RclClient,
node_handle: Arc<Mutex<RclNode>>,
pendings: Mutex<HashMap<i64, oneshot::Sender<Srv::Response>>>,
_phantom: PhantomData<Srv>,
handle: Arc<RclClient>,
worker: ReceiveWorker<ChannelMessage<Srv>>,
pendings: Arc<Mutex<HashMap<i64, oneshot::Sender<Srv::Response>>>>,
}

impl<Srv> Client<Srv>
where
Srv: ServiceT,
{
pub(crate) fn new<'ctx>(
node: &Node<'ctx>,
service_name: &str,
qos: &QoSProfile,
) -> Result<Arc<Self>> {
let node_handle = node.clone_handle();
let handle = RclClient::new::<Srv>(&node_handle.lock().unwrap(), service_name, qos)?;

Ok(Arc::new(Self {
pub(crate) fn new(node: &Node, service_name: &str, qos: &QoSProfile) -> Result<Self> {
let handle = Arc::new(RclClient::new::<Srv>(
node.clone_handle(),
service_name,
qos,
)?);

let pendings = Arc::new(Mutex::new(
HashMap::<i64, oneshot::Sender<Srv::Response>>::new(),
));

let callback = {
let pendings = Arc::clone(&pendings);

move |(req_header, res): (
rcl_sys::rmw_request_id_t,
<Srv::Response as MessageT>::Raw,
)| {
pendings
.lock()
.unwrap()
.remove(&req_header.sequence_number)
.unwrap_or_else(|| panic!("fail to find key in Client::process_requests"))
.send(unsafe { res.to_rust() })
.unwrap_or_else(|_| {
panic!("fail to send response via channel in Client::process_requests")
});
}
};

Ok(Self {
handle,
node_handle,
pendings: Default::default(),
_phantom: Default::default(),
}))
worker: ReceiveWorker::new(callback),
pendings,
})
}

pub fn send_request(
self: &Arc<Self>,
request: &Srv::Request,
) -> Result<ServiceResponseTask<Srv>> {
pub(crate) fn create_invoker(&self) -> ClientInvoker<Srv> {
ClientInvoker {
handle: self.clone_handle(),
tx: Some(self.clone_tx()),
}
}

pub(crate) fn clone_handle(&self) -> Arc<RclClient> {
Arc::clone(&self.handle)
}

pub(crate) fn clone_tx(&self) -> mpsc::Sender<WorkerMessage<ChannelMessage<Srv>>> {
self.worker.clone_tx()
}

pub async fn send_request(&mut self, request: &Srv::Request) -> Result<Srv::Response> {
let id = self.handle.send_request::<Srv>(request)?;
let (sender, receiver) = oneshot::channel::<Srv::Response>();
self.pendings.lock().unwrap().insert(id, sender);
let (tx, rx) = oneshot::channel::<Srv::Response>();
self.pendings.lock().unwrap().insert(id, tx);

Ok(ServiceResponseTask {
receiver,
client: Arc::downgrade(self) as Weak<dyn ClientBase>,
})
Ok(rx.await?)
}

pub fn service_name(&self) -> String {
self.handle.service_name()
}

pub fn wait_service(&self) -> Result<()> {
while !self.service_is_available()? {
thread::sleep(Duration::from_millis(1));
}
Ok(())
}

pub fn service_is_available(&self) -> Result<bool> {
self.handle
.service_is_available(&self.node_handle.lock().unwrap())
self.handle.service_is_available()
}

pub fn is_valid(&self) -> bool {
@@ -188,89 +245,58 @@ where
}
}

impl<Srv> ClientBase for Client<Srv>
pub(crate) trait ClientInvokerBase: fmt::Debug {
fn handle(&self) -> &RclClient;
fn invoke(&mut self) -> Result<()>;
}

pub(crate) struct ClientInvoker<Srv>
where
Srv: ServiceT,
{
fn handle(&self) -> &RclClient {
&self.handle
}

fn process_requests(&self) -> Result<()> {
match self.handle.take_response::<Srv>() {
Ok((req_header, res)) => {
let mut pendings = self.pendings.lock().unwrap();
let sender = pendings
.remove(&req_header.sequence_number)
.ok_or_else(|| anyhow!("fail to find key in Client::process_requests"))?;
sender.send(unsafe { res.to_rust() }).map_err(|_| {
anyhow!("fail to send response via channel in Client::process_requests")
})?;
Ok(())
}
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclClientTakeFailed(_)) => Ok(()),
_ => Err(e),
},
}
}
handle: Arc<RclClient>,
tx: Option<mpsc::Sender<WorkerMessage<ChannelMessage<Srv>>>>,
}

impl<Srv> Drop for Client<Srv>
impl<Srv> ClientInvoker<Srv>
where
Srv: ServiceT,
{
fn drop(&mut self) {
if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl client handle: {}",
e
)
}
fn stop(&mut self) {
self.tx.take();
}
}

pub struct ServiceResponseTask<Srv>
impl<Srv> fmt::Debug for ClientInvoker<Srv>
where
Srv: ServiceT,
{
receiver: oneshot::Receiver<Srv::Response>,
client: Weak<dyn ClientBase>,
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ClientInvoker {{{:?}}}", self.handle)
}
}

impl<Srv> ServiceResponseTask<Srv>
impl<Srv> ClientInvokerBase for ClientInvoker<Srv>
where
Srv: ServiceT,
{
pub fn wait_response(mut self, context: &Context) -> Result<Option<Srv::Response>> {
while context.is_valid() {
match self.spin_some(context, Duration::from_nanos(500)) {
Ok(_) => {}
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclTimeout(_)) => {}
_ => return Err(e),
},
}
match self.receiver.try_recv() {
Ok(Some(v)) => return Ok(Some(v)),
Ok(None) => {}
Err(_) => return Err(RclRustError::ServiceIsCanceled.into()),
}
}

Ok(None)
fn handle(&self) -> &RclClient {
&self.handle
}

fn spin_some(&self, context: &Context, max_duration: Duration) -> Result<()> {
if let Some(client) = self.client.upgrade() {
let mut wait_set =
RclWaitSet::new(&mut context.handle.lock().unwrap(), 0, 0, 0, 1, 0, 0)?;

wait_set.clear()?;
wait_set.add_client(client.handle())?;
wait_set.wait(max_duration.as_nanos() as i64)?;
client.process_requests()?;
fn invoke(&mut self) -> Result<()> {
if let Some(ref mut tx) = self.tx {
match tx.try_send(WorkerMessage::Message(self.handle.take_response::<Srv>()?)) {
Ok(_) => (),
Err(e) if e.is_disconnected() => self.stop(),
Err(_) => {
return Err(RclRustError::MessageQueueIsFull {
type_: "Client",
name: self.handle.service_name(),
}
.into())
}
}
}

Ok(())


+ 12
- 16
rclrust/src/context.rs View File

@@ -64,7 +64,7 @@ impl RclContext {
self.0.as_mut()
}

fn is_valid(&mut self) -> bool {
pub(crate) fn is_valid(&mut self) -> bool {
unsafe { rcl_sys::rcl_context_is_valid(self.0.as_mut()) }
}

@@ -105,18 +105,18 @@ impl Drop for RclContext {

#[derive(Debug)]
pub struct Context {
pub(crate) handle: Mutex<RclContext>,
pub(crate) handle: Arc<Mutex<RclContext>>,
shutdown_reason: Mutex<Option<String>>,
}

impl Context {
pub(crate) fn new(args: Vec<String>, init_options: InitOptions) -> Result<Arc<Self>> {
pub(crate) fn new(args: Vec<String>, init_options: InitOptions) -> Result<Self> {
let handle = RclContext::new(args, &init_options)?;

Ok(Arc::new(Self {
handle: Mutex::new(handle),
Ok(Self {
handle: Arc::new(Mutex::new(handle)),
shutdown_reason: Default::default(),
}))
})
}

pub fn is_valid(&self) -> bool {
@@ -138,7 +138,7 @@ impl Context {
/// let node = ctx.create_node("test_node").unwrap();
/// assert_eq!(&node.fully_qualified_name(), "/test_node");
/// ```
pub fn create_node<'a>(&'a self, name: &str) -> Result<Arc<Node<'a>>> {
pub fn create_node(&self, name: &str) -> Result<Node> {
Node::new(self, name, None, &NodeOptions::new())
}

@@ -153,11 +153,7 @@ impl Context {
/// let node = ctx.create_node_with_options("test_node", &options).unwrap();
/// assert_eq!(&node.fully_qualified_name(), "/test_node");
/// ```
pub fn create_node_with_options<'a>(
&'a self,
name: &str,
options: &NodeOptions,
) -> Result<Arc<Node<'a>>> {
pub fn create_node_with_options(&self, name: &str, options: &NodeOptions) -> Result<Node> {
Node::new(self, name, None, options)
}

@@ -169,7 +165,7 @@ impl Context {
/// let node = ctx.create_node_with_ns("test_node", "ns").unwrap();
/// assert_eq!(&node.fully_qualified_name(), "/ns/test_node");
/// ```
pub fn create_node_with_ns<'a>(&'a self, name: &str, namespace: &str) -> Result<Arc<Node<'a>>> {
pub fn create_node_with_ns(&self, name: &str, namespace: &str) -> Result<Node> {
Node::new(self, name, Some(namespace), &NodeOptions::new())
}

@@ -186,12 +182,12 @@ impl Context {
/// .unwrap();
/// assert_eq!(&node.fully_qualified_name(), "/ns/test_node");
/// ```
pub fn create_node_with_ns_and_options<'a>(
&'a self,
pub fn create_node_with_ns_and_options(
&self,
name: &str,
namespace: &str,
options: &NodeOptions,
) -> Result<Arc<Node<'a>>> {
) -> Result<Node> {
Node::new(self, name, Some(namespace), options)
}
}


+ 2
- 0
rclrust/src/error.rs View File

@@ -137,6 +137,8 @@ pub enum RclRustError {
RuntimeError(&'static str),
#[error("Service is canceled.")]
ServiceIsCanceled,
#[error("Internal message queue if full: {type_} {name}")]
MessageQueueIsFull { type_: &'static str, name: String },

// Parameter
#[error(r#"Parameter "{name}" cannot be set because it was not declared."#)]


+ 91
- 60
rclrust/src/executor.rs View File

@@ -1,46 +1,64 @@
use std::{
sync::{Arc, Weak},
sync::{Arc, Mutex},
thread,
time::Duration,
};

use anyhow::Result;
use derive_new::new;
use futures::channel::mpsc;

use crate::{context::Context, error::RclRustError, node::Node, wait_set::RclWaitSet};

pub fn spin(node: &Arc<Node<'_>>) -> Result<()> {
let mut exec = SingleThreadExecutor::new(node.context)?;
exec.add_node(node);
exec.spin()?;

Ok(())
}

pub fn spin_some(node: &Arc<Node<'_>>) -> Result<()> {
let mut exec = SingleThreadExecutor::new(node.context)?;
exec.add_node(node);
exec.spin_some(Duration::ZERO)?;
use crate::{
client::ClientInvokerBase, context::RclContext, error::RclRustError,
service::ServiceInvokerBase, subscription::SubscriptionInvokerBase, timer::TimerInvoker,
wait_set::RclWaitSet,
};

Ok(())
#[derive(Debug)]
pub(super) enum ExecutorMessage {
Subscription(Box<dyn SubscriptionInvokerBase + Send>),
Timer(TimerInvoker),
Client(Box<dyn ClientInvokerBase + Send>),
Service(Box<dyn ServiceInvokerBase + Send>),
}

pub struct SingleThreadExecutor<'ctx> {
context: &'ctx Context,
nodes: Vec<Weak<Node<'ctx>>>,
#[derive(new)]
pub(super) struct Executor {
context: Arc<Mutex<RclContext>>,
rx: mpsc::Receiver<ExecutorMessage>,
#[new(default)]
subscriptions: Vec<Box<dyn SubscriptionInvokerBase + Send>>,
#[new(default)]
timers: Vec<TimerInvoker>,
#[new(default)]
clients: Vec<Box<dyn ClientInvokerBase + Send>>,
#[new(default)]
services: Vec<Box<dyn ServiceInvokerBase + Send>>,
}

impl<'ctx> SingleThreadExecutor<'ctx> {
pub const fn new(context: &'ctx Context) -> Result<Self> {
Ok(Self {
context,
nodes: Vec::new(),
})
}
impl Executor {
pub fn spin(&mut self) -> Result<()> {
let max_duration = Duration::from_millis(50);

while self.context.lock().unwrap().is_valid() {
loop {
match self.rx.try_next() {
Ok(Some(ExecutorMessage::Subscription(v))) => self.subscriptions.push(v),
Ok(Some(ExecutorMessage::Timer(v))) => self.timers.push(v),
Ok(Some(ExecutorMessage::Client(v))) => self.clients.push(v),
Ok(Some(ExecutorMessage::Service(v))) => self.services.push(v),
Ok(None) => return Ok(()),
Err(_) => break,
}
}

pub fn spin(&self) -> Result<()> {
while self.context.is_valid() {
if let Err(e) = self.spin_some(Duration::from_nanos(500)) {
if let Err(e) = self.spin_some(max_duration) {
match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclTimeout(_)) => continue,
Some(RclRustError::RclWaitSetEmpty(_)) => {
thread::sleep(max_duration);
continue;
}
_ => return Err(e),
}
}
@@ -49,48 +67,61 @@ impl<'ctx> SingleThreadExecutor<'ctx> {
Ok(())
}

pub fn spin_some(&self, max_duration: Duration) -> Result<()> {
let (n_subscriptions, _, n_timers, n_clients, n_services, _) =
self.nodes.iter().filter_map(|n| n.upgrade()).fold(
(0, 0, 0, 0, 0, 0),
|(subs, guards, timers, clients, services, events), node| {
(
subs + node.subscriptions.lock().unwrap().len(),
guards,
timers + node.timers.lock().unwrap().len(),
clients + node.clients.lock().unwrap().len(),
services + node.services.lock().unwrap().len(),
events,
)
},
);

pub fn spin_some(&mut self, max_duration: Duration) -> Result<()> {
let mut wait_set = RclWaitSet::new(
&mut self.context.handle.lock().unwrap(),
n_subscriptions,
&mut self.context.lock().unwrap(),
self.subscriptions.len(),
0,
n_timers,
n_clients,
n_services,
self.timers.len(),
self.clients.len(),
self.services.len(),
0,
)?;

wait_set.clear()?;

for node in self.nodes.iter().filter_map(|n| n.upgrade()) {
node.add_to_wait_set(&mut wait_set)?;
}
self.subscriptions
.iter()
.try_for_each(|subscription| wait_set.add_subscription(subscription.handle()))?;

self.timers
.iter()
.try_for_each(|timer| wait_set.add_timer(&timer.handle.lock().unwrap()))?;

self.clients
.iter()
.try_for_each(|client| wait_set.add_client(client.handle()))?;

self.services
.iter()
.try_for_each(|service| wait_set.add_service(service.handle()))?;

wait_set.wait(max_duration.as_nanos() as i64)?;

for node in self.nodes.iter().filter_map(|n| n.upgrade()) {
node.call_callbacks()?;
}
self.subscriptions
.iter_mut()
.zip(wait_set.subscriptions_ready())
.filter(|(_, ready)| *ready)
.try_for_each(|(subscription, _)| subscription.invoke())?;

self.timers
.iter_mut()
.zip(wait_set.timers_ready())
.filter(|(_, ready)| *ready)
.try_for_each(|(timer, _)| timer.invoke())?;

self.clients
.iter_mut()
.zip(wait_set.clients_ready())
.filter(|(_, ready)| *ready)
.try_for_each(|(client, _)| client.invoke())?;

self.services
.iter_mut()
.zip(wait_set.services_ready())
.filter(|(_, ready)| *ready)
.try_for_each(|(service, _)| service.invoke())?;

Ok(())
}

fn add_node(&mut self, node: &Arc<Node<'ctx>>) {
self.nodes.push(Arc::downgrade(node));
}
}

+ 2
- 0
rclrust/src/internal/async_api/mod.rs View File

@@ -0,0 +1,2 @@
mod tokio;
pub use self::tokio::*;

+ 1
- 0
rclrust/src/internal/async_api/tokio.rs View File

@@ -0,0 +1 @@
pub use tokio::{spawn, task::JoinHandle};

+ 2
- 0
rclrust/src/internal/mod.rs View File

@@ -1,2 +1,4 @@
pub mod async_api;
pub mod ffi;
mod macros;
pub mod worker;

+ 56
- 0
rclrust/src/internal/worker.rs View File

@@ -0,0 +1,56 @@
use futures::{channel::mpsc, StreamExt};

use super::async_api::{self, JoinHandle};
use crate::{log::Logger, rclrust_debug};

#[derive(Debug)]
pub enum WorkerMessage<T> {
Message(T),
Terminate,
}

#[derive(Debug)]
pub struct ReceiveWorker<T> {
tx: mpsc::Sender<WorkerMessage<T>>,
thread: Option<JoinHandle<()>>,
}

impl<T> ReceiveWorker<T> {
pub fn new<F>(callback: F) -> Self
where
T: Send + 'static,
F: Fn(T) + Send + 'static,
{
let (tx, mut rx) = mpsc::channel(10);

let thread = async_api::spawn(async move {
while let Some(message) = rx.next().await {
match message {
WorkerMessage::Message(msg) => callback(msg),
WorkerMessage::Terminate => break,
}
}

rclrust_debug!(Logger::new("rclrust"), "Channel is closed.");
});

Self {
tx,
thread: Some(thread),
}
}

pub fn clone_tx(&self) -> mpsc::Sender<WorkerMessage<T>> {
self.tx.clone()
}

pub fn terminate(&mut self) {
let _ = self.tx.try_send(WorkerMessage::Terminate);
}
}

impl<T> Drop for ReceiveWorker<T> {
fn drop(&mut self) {
self.terminate();
}
}

+ 3
- 4
rclrust/src/lib.rs View File

@@ -10,9 +10,9 @@ pub mod client;
pub mod clock;
pub mod context;
pub mod error;
pub mod executor;
mod executor;
pub mod init_options;
pub(crate) mod internal;
mod internal;
pub mod log;
pub mod node;
pub mod node_options;
@@ -24,11 +24,10 @@ pub mod subscription;
pub mod time;
pub mod timer;
pub mod utility;
pub mod wait_set;
mod wait_set;

pub use clock::{Clock, ClockType};
pub use context::Context;
pub use executor::{spin, spin_some};
pub use init_options::InitOptions;
pub use log::Logger;
pub use node_options::NodeOptions;


+ 121
- 156
rclrust/src/node.rs View File

@@ -1,20 +1,23 @@
use std::{
ffi::CString,
sync::{Arc, Mutex, Weak},
sync::{Arc, Mutex},
thread::{self, JoinHandle},
time::Duration,
};

use anyhow::{ensure, Context as _, Result};
use futures::channel::mpsc;
use rclrust_msg::{
_core::{FFIToRust, MessageT, ServiceT},
rcl_interfaces::msg::ParameterDescriptor,
};

use crate::{
client::{Client, ClientBase},
client::Client,
clock::ClockType,
context::{Context, RclContext},
error::ToRclRustResult,
executor::{Executor, ExecutorMessage},
internal::ffi::*,
log::Logger,
node_options::NodeOptions,
@@ -22,20 +25,22 @@ use crate::{
publisher::Publisher,
qos::QoSProfile,
rclrust_error,
service::{Service, ServiceBase},
subscription::{Subscription, SubscriptionBase},
service::Service,
subscription::Subscription,
timer::Timer,
wait_set::RclWaitSet,
};

#[derive(Debug)]
pub(crate) struct RclNode(Box<rcl_sys::rcl_node_t>);
pub(crate) struct RclNode {
r#impl: Box<rcl_sys::rcl_node_t>,
context: Arc<Mutex<RclContext>>,
}

unsafe impl Send for RclNode {}

impl RclNode {
fn new(
context: &mut RclContext,
context: Arc<Mutex<RclContext>>,
name: &str,
namespace: Option<&str>,
options: &NodeOptions,
@@ -49,103 +54,126 @@ impl RclNode {
&mut *node,
name_c_str.as_ptr(),
namespace_c_str.as_ptr(),
context.raw_mut(),
context.lock().unwrap().raw_mut(),
options.raw(),
)
.to_result()
.with_context(|| "rcl_sys::rcl_node_init in RclNode::new")?;
}

Ok(Self(node))
Ok(Self {
r#impl: node,
context,
})
}

pub(crate) const fn raw(&self) -> &rcl_sys::rcl_node_t {
&self.0
&self.r#impl
}

pub(crate) unsafe fn raw_mut(&mut self) -> &mut rcl_sys::rcl_node_t {
&mut self.0
&mut self.r#impl
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_node_is_valid(&*self.0) }
unsafe { rcl_sys::rcl_node_is_valid(&*self.r#impl) }
}

fn name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_node_get_name(&*self.0);
let name = rcl_sys::rcl_node_get_name(&*self.r#impl);
String::from_c_char(name).unwrap()
}
}

fn namespace(&self) -> String {
unsafe {
let namespace = rcl_sys::rcl_node_get_namespace(&*self.0);
let namespace = rcl_sys::rcl_node_get_namespace(&*self.r#impl);
String::from_c_char(namespace).unwrap()
}
}

pub(crate) fn fully_qualified_name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_node_get_fully_qualified_name(&*self.0);
let name = rcl_sys::rcl_node_get_fully_qualified_name(&*self.r#impl);
String::from_c_char(name).unwrap()
}
}

fn logger_name(&self) -> String {
unsafe {
let logger_name = rcl_sys::rcl_node_get_logger_name(&*self.0);
let logger_name = rcl_sys::rcl_node_get_logger_name(&*self.r#impl);
String::from_c_char(logger_name).unwrap()
}
}

pub fn get_options(&self) -> Option<&rcl_sys::rcl_node_options_t> {
unsafe { rcl_sys::rcl_node_get_options(&*self.0).as_ref() }
unsafe { rcl_sys::rcl_node_get_options(&*self.r#impl).as_ref() }
}

pub fn use_global_arguments(&self) -> Option<bool> {
self.get_options().map(|opt| opt.use_global_arguments)
}
}

unsafe fn fini(&mut self, _ctx: &RclContext) -> Result<()> {
rcl_sys::rcl_node_fini(&mut *self.0)
.to_result()
.with_context(|| "rcl_sys::rcl_node_fini in RclNode::fini")
impl Drop for RclNode {
fn drop(&mut self) {
let result = unsafe {
let _guard = self.context.lock().unwrap();
rcl_sys::rcl_node_fini(&mut *self.r#impl).to_result()
};
if let Err(e) = result {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl node handle: {}",
e
)
}
}
}

pub struct Node<'ctx> {
#[derive(Debug)]
pub struct Node {
pub(crate) handle: Arc<Mutex<RclNode>>,
pub(crate) context: &'ctx Context,
pub(crate) subscriptions: Mutex<Vec<Weak<dyn SubscriptionBase>>>,
pub(crate) timers: Mutex<Vec<Weak<Timer>>>,
pub(crate) clients: Mutex<Vec<Weak<dyn ClientBase>>>,
pub(crate) services: Mutex<Vec<Weak<dyn ServiceBase>>>,
pub(crate) context: Arc<Mutex<RclContext>>,
parameters: Parameters,
wait_thread: Option<JoinHandle<Result<()>>>,
tx: mpsc::Sender<ExecutorMessage>,
}

impl<'ctx> Node<'ctx> {
impl Node {
pub(crate) fn new(
context: &'ctx Context,
context: &Context,
name: &str,
namespace: Option<&str>,
options: &NodeOptions,
) -> Result<Arc<Self>> {
) -> Result<Self> {
ensure!(context.is_valid(), "given Context is not valid");
let mut context_handle = context.handle.lock().unwrap();

let handle = { RclNode::new(&mut context_handle, name, namespace, options)? };
let parameters = Parameters::new(&context_handle, &handle)?;
let context = Arc::clone(&context.handle);
let handle = RclNode::new(Arc::clone(&context), name, namespace, options)?;
let parameters = Parameters::new(Arc::clone(&context), &handle)?;

let (tx, rx) = mpsc::channel(10);

Ok(Arc::new(Self {
let wait_thread = {
let context = Arc::clone(&context);

thread::spawn(move || {
let mut executor = Executor::new(context, rx);
loop {
executor.spin()?
}
})
};

Ok(Self {
handle: Arc::new(Mutex::new(handle)),
context,
subscriptions: Default::default(),
timers: Default::default(),
clients: Default::default(),
services: Default::default(),
parameters,
}))
wait_thread: Some(wait_thread),
tx,
})
}

pub(crate) fn clone_handle(&self) -> Arc<Mutex<RclNode>> {
@@ -262,193 +290,130 @@ impl<'ctx> Node<'ctx> {
}

pub fn create_subscription<T, F>(
&self,
&mut self,
topic_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Subscription<T>>>
) -> Result<Subscription<T>>
where
T: MessageT + 'static,
F: Fn(T) + 'static,
F: Fn(Arc<T>) + Send + 'static,
{
let sub = Subscription::new(
let subscription = Subscription::new(
self,
topic_name,
move |msg| callback(unsafe { T::from_raw(msg) }),
move |msg| callback(Arc::new(unsafe { T::from_raw(&msg) })),
qos,
)?;
let weak_sub = Arc::downgrade(&sub) as Weak<dyn SubscriptionBase>;
self.subscriptions.lock().unwrap().push(weak_sub);
Ok(sub)
self.tx
.try_send(ExecutorMessage::Subscription(Box::new(
subscription.create_invoker(),
)))
.expect("try_send should succeed");
Ok(subscription)
}

pub fn create_raw_subscription<T, F>(
&self,
&mut self,
topic_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Subscription<T>>>
) -> Result<Subscription<T>>
where
T: MessageT + 'static,
F: Fn(&T::Raw) + 'static,
F: Fn(Arc<T::Raw>) + Send + 'static,
{
let sub = Subscription::new(self, topic_name, callback, qos)?;
let weak_sub = Arc::downgrade(&sub) as Weak<dyn SubscriptionBase>;
self.subscriptions.lock().unwrap().push(weak_sub);
Ok(sub)
let subscription = Subscription::new(self, topic_name, callback, qos)?;
self.tx
.try_send(ExecutorMessage::Subscription(Box::new(
subscription.create_invoker(),
)))
.expect("try_send should succeed");
Ok(subscription)
}

pub fn create_timer<F>(
&self,
&mut self,
period: Duration,
clock_type: ClockType,
callback: F,
) -> Result<Arc<Timer>>
where
F: Fn() + 'static,
F: Fn() + Send + 'static,
{
let timer = Timer::new(self, period, clock_type, callback)?;
let weak_timer = Arc::downgrade(&timer);
self.timers.lock().unwrap().push(weak_timer);
self.tx
.try_send(ExecutorMessage::Timer(timer.create_invoker()))
.expect("try_send should succeed");
Ok(timer)
}

pub fn create_wall_timer<F>(&self, period: Duration, callback: F) -> Result<Arc<Timer>>
pub fn create_wall_timer<F>(&mut self, period: Duration, callback: F) -> Result<Arc<Timer>>
where
F: Fn() + 'static,
F: Fn() + Send + 'static,
{
self.create_timer(period, ClockType::SteadyTime, callback)
}

pub fn create_client<Srv>(
&self,
&mut self,
service_name: &str,
qos: &QoSProfile,
) -> Result<Arc<Client<Srv>>>
) -> Result<Client<Srv>>
where
Srv: ServiceT + 'static,
{
let client = Client::<Srv>::new(self, service_name, qos)?;
let weak = Arc::downgrade(&client) as Weak<dyn ClientBase>;
self.clients.lock().unwrap().push(weak);
self.tx
.try_send(ExecutorMessage::Client(Box::new(client.create_invoker())))
.expect("try_send should succeed");
Ok(client)
}

pub fn create_service<Srv, F>(
&self,
&mut self,
service_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Service<Srv>>>
) -> Result<Service<Srv>>
where
Srv: ServiceT + 'static,
F: Fn(Srv::Request) -> Srv::Response + 'static,
F: Fn(Srv::Request) -> Srv::Response + Send + 'static,
{
let srv = Service::<Srv>::new(
let service = Service::<Srv>::new(
self,
service_name,
move |req_raw| (callback)(unsafe { req_raw.to_rust() }),
qos,
)?;
let weak_srv = Arc::downgrade(&srv) as Weak<dyn ServiceBase>;
self.services.lock().unwrap().push(weak_srv);
Ok(srv)
self.tx
.try_send(ExecutorMessage::Service(Box::new(service.create_invoker())))
.expect("try_send should succeed");
Ok(service)
}

pub fn create_raw_service<Srv, F>(
&self,
&mut self,
service_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Service<Srv>>>
) -> Result<Service<Srv>>
where
Srv: ServiceT + 'static,
F: Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response + 'static,
F: Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response + Send + 'static,
{
let srv = Service::new(self, service_name, callback, qos)?;
let weak_srv = Arc::downgrade(&srv) as Weak<dyn ServiceBase>;
self.services.lock().unwrap().push(weak_srv);
Ok(srv)
}

pub(crate) fn add_to_wait_set(&self, wait_set: &mut RclWaitSet) -> Result<()> {
self.subscriptions
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|subscription| wait_set.add_subscription(subscription.handle()))?;

self.timers
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|timer| wait_set.add_timer(&timer.handle().lock().unwrap()))?;

self.clients
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|client| wait_set.add_client(client.handle()))?;

self.services
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|service| wait_set.add_service(service.handle()))?;

Ok(())
}

pub(crate) fn call_callbacks(&self) -> Result<()> {
self.subscriptions
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|subscription| subscription.call_callback())?;

self.timers
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|timer| timer.call_callback())?;

self.clients
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|client| client.process_requests())?;

self.services
.lock()
.unwrap()
.iter()
.filter_map(|weak| weak.upgrade())
.try_for_each(|service| service.call_callback())?;

Ok(())
let service = Service::new(self, service_name, callback, qos)?;
self.tx
.try_send(ExecutorMessage::Service(Box::new(service.create_invoker())))
.expect("try_send should succeed");
Ok(service)
}
}

impl Drop for Node<'_> {
fn drop(&mut self) {
if let Err(e) = unsafe {
self.handle
.lock()
.unwrap()
.fini(&self.context.handle.lock().unwrap())
} {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl node handle: {}",
e
)
pub fn wait(&mut self) {
if let Some(handle) = self.wait_thread.take() {
if let Err(e) = handle.join() {
rclrust_error!(Logger::new("rclrust"), "{:?}", e);
}
}
}
}


+ 11
- 3
rclrust/src/parameter/parameters.rs View File

@@ -1,4 +1,7 @@
use std::{collections::HashMap, sync::Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use anyhow::Result;

@@ -27,11 +30,16 @@ pub struct Parameters {
}

impl Parameters {
pub(crate) fn new(context_handle: &RclContext, node_handle: &RclNode) -> Result<Self> {
pub(crate) fn new(
context_handle: Arc<Mutex<RclContext>>,
node_handle: &RclNode,
) -> Result<Self> {
let mut parameter_overrides = HashMap::new();

if node_handle.use_global_arguments().unwrap() {
if let Some(rcl_params) = RclParams::new(context_handle.global_arguments())? {
if let Some(rcl_params) =
RclParams::new(context_handle.lock().unwrap().global_arguments())?
{
parameter_overrides =
rcl_params.to_parameters(&node_handle.fully_qualified_name())?;
}


+ 35
- 37
rclrust/src/publisher.rs View File

@@ -18,12 +18,16 @@ use crate::{
};

#[derive(Debug)]
pub(crate) struct RclPublisher(Box<rcl_sys::rcl_publisher_t>);
pub(crate) struct RclPublisher {
r#impl: Box<rcl_sys::rcl_publisher_t>,
node: Arc<Mutex<RclNode>>,
}

unsafe impl Send for RclPublisher {}
unsafe impl Sync for RclPublisher {}

impl RclPublisher {
fn new<T>(node: &RclNode, topic_name: &str, qos: &QoSProfile) -> Result<Self>
fn new<T>(node: Arc<Mutex<RclNode>>, topic_name: &str, qos: &QoSProfile) -> Result<Self>
where
T: MessageT,
{
@@ -35,7 +39,7 @@ impl RclPublisher {
unsafe {
rcl_sys::rcl_publisher_init(
&mut *publisher,
node.raw(),
node.lock().unwrap().raw(),
T::type_support() as *const _,
topic_c_str.as_ptr(),
&options,
@@ -44,13 +48,10 @@ impl RclPublisher {
.with_context(|| "rcl_sys::rcl_publisher_init in RclPublisher::new")?;
}

Ok(Self(publisher))
}

unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> {
rcl_sys::rcl_publisher_fini(&mut *self.0, node.raw_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_publisher_fini in RclPublisher::fini")
Ok(Self {
r#impl: publisher,
node,
})
}

fn publish<T>(&self, message: &T) -> Result<()>
@@ -59,7 +60,7 @@ impl RclPublisher {
{
unsafe {
rcl_sys::rcl_publish(
&*self.0,
&*self.r#impl,
&message.to_raw_ref() as *const _ as *const c_void,
std::ptr::null_mut(),
)
@@ -70,21 +71,21 @@ impl RclPublisher {
Ok(())
}

fn topic_name(&self) -> Option<String> {
fn topic_name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_publisher_get_topic_name(&*self.0);
String::from_c_char(name)
let name = rcl_sys::rcl_publisher_get_topic_name(&*self.r#impl);
String::from_c_char(name).unwrap()
}
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_publisher_is_valid(&*self.0) }
unsafe { rcl_sys::rcl_publisher_is_valid(&*self.r#impl) }
}

fn subscription_count(&self) -> Result<usize> {
let mut size = 0;
unsafe {
rcl_sys::rcl_publisher_get_subscription_count(&*self.0, &mut size)
rcl_sys::rcl_publisher_get_subscription_count(&*self.r#impl, &mut size)
.to_result()
.with_context(|| {
"rcl_sys::rcl_publisher_get_subscription_count in RclPublisher::subscription_count"
@@ -94,12 +95,26 @@ impl RclPublisher {
}
}

impl Drop for RclPublisher {
fn drop(&mut self) {
if let Err(e) = unsafe {
rcl_sys::rcl_publisher_fini(&mut *self.r#impl, self.node.lock().unwrap().raw_mut())
.to_result()
} {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl publisher handle: {}",
e
)
}
}
}

pub struct Publisher<T>
where
T: MessageT,
{
handle: RclPublisher,
node_handle: Arc<Mutex<RclNode>>,
_phantom: PhantomData<T>,
}

@@ -107,13 +122,11 @@ impl<T> Publisher<T>
where
T: MessageT,
{
pub(crate) fn new<'a>(node: &'a Node<'a>, topic_name: &str, qos: &QoSProfile) -> Result<Self> {
let node_handle = node.clone_handle();
let handle = RclPublisher::new::<T>(&node_handle.lock().unwrap(), topic_name, qos)?;
pub(crate) fn new(node: &Node, topic_name: &str, qos: &QoSProfile) -> Result<Self> {
let handle = RclPublisher::new::<T>(node.clone_handle(), topic_name, qos)?;

Ok(Self {
handle,
node_handle,
_phantom: Default::default(),
})
}
@@ -122,7 +135,7 @@ where
self.handle.publish(message)
}

pub fn topic_name(&self) -> Option<String> {
pub fn topic_name(&self) -> String {
self.handle.topic_name()
}

@@ -135,21 +148,6 @@ where
}
}

impl<T> Drop for Publisher<T>
where
T: MessageT,
{
fn drop(&mut self) {
if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl publisher handle: {}",
e
)
}
}
}

#[cfg(test)]
mod test {
use super::*;


+ 124
- 55
rclrust/src/service.rs View File

@@ -1,27 +1,37 @@
use std::{
ffi::{c_void, CString},
fmt,
mem::MaybeUninit,
sync::{Arc, Mutex},
};

use anyhow::{Context, Result};
use futures::channel::mpsc;
use rclrust_msg::_core::{MessageT, ServiceT};

use crate::{
error::{RclRustError, ToRclRustResult},
internal::ffi::*,
internal::{
ffi::*,
worker::{ReceiveWorker, WorkerMessage},
},
log::Logger,
node::{Node, RclNode},
qos::QoSProfile,
rclrust_error,
};

pub struct RclService(Box<rcl_sys::rcl_service_t>);
#[derive(Debug)]
pub struct RclService {
r#impl: Box<rcl_sys::rcl_service_t>,
node: Arc<Mutex<RclNode>>,
}

unsafe impl Send for RclService {}
unsafe impl Sync for RclService {}

impl RclService {
fn new<Srv>(node: &RclNode, service_name: &str, qos: &QoSProfile) -> Result<Self>
fn new<Srv>(node: Arc<Mutex<RclNode>>, service_name: &str, qos: &QoSProfile) -> Result<Self>
where
Srv: ServiceT,
{
@@ -33,7 +43,7 @@ impl RclService {
unsafe {
rcl_sys::rcl_service_init(
&mut *service,
node.raw(),
node.lock().unwrap().raw(),
Srv::type_support() as *const _,
service_c_str.as_ptr(),
&options,
@@ -41,17 +51,14 @@ impl RclService {
.to_result()?;
}

Ok(Self(service))
Ok(Self {
r#impl: service,
node,
})
}

pub const fn raw(&self) -> &rcl_sys::rcl_service_t {
&self.0
}

unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> {
rcl_sys::rcl_service_fini(&mut *self.0, node.raw_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_service_fini in RclService::fini")
&self.r#impl
}

fn take_request<Srv>(
@@ -64,7 +71,7 @@ impl RclService {
let mut request = <Srv::Request as MessageT>::Raw::default();
unsafe {
rcl_sys::rcl_take_request(
&*self.0,
&*self.r#impl,
request_header.as_mut_ptr(),
&mut request as *mut _ as *mut c_void,
)
@@ -85,7 +92,7 @@ impl RclService {
{
unsafe {
rcl_sys::rcl_send_response(
&*self.0,
&*self.r#impl,
response_header,
&response.to_raw_ref() as *const _ as *mut c_void,
)
@@ -96,51 +103,92 @@ impl RclService {

fn service_name(&self) -> String {
unsafe {
let name = rcl_sys::rcl_service_get_service_name(&*self.0);
String::from_c_char(name).unwrap_or_default()
let name = rcl_sys::rcl_service_get_service_name(&*self.r#impl);
String::from_c_char(name).unwrap()
}
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_service_is_valid(&*self.0) }
unsafe { rcl_sys::rcl_service_is_valid(&*self.r#impl) }
}
}

pub(crate) trait ServiceBase {
fn handle(&self) -> &RclService;
fn call_callback(&self) -> Result<()>;
impl Drop for RclService {
fn drop(&mut self) {
if let Err(e) = unsafe {
rcl_sys::rcl_service_fini(&mut *self.r#impl, self.node.lock().unwrap().raw_mut())
.to_result()
} {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl service handle: {}",
e
)
}
}
}

type ChannelMessage<Srv> = (
rcl_sys::rmw_request_id_t,
<<Srv as ServiceT>::Request as MessageT>::Raw,
);

pub struct Service<Srv>
where
Srv: ServiceT,
{
handle: RclService,
callback: Box<dyn Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response>,
node_handle: Arc<Mutex<RclNode>>,
handle: Arc<RclService>,
worker: ReceiveWorker<ChannelMessage<Srv>>,
}

impl<Srv> Service<Srv>
where
Srv: ServiceT,
{
pub(crate) fn new<'ctx, F>(
node: &Node<'ctx>,
pub(crate) fn new<F>(
node: &Node,
service_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Self>>
) -> Result<Self>
where
F: Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response + 'static,
<Srv::Request as MessageT>::Raw: 'static,
F: Fn(&<Srv::Request as MessageT>::Raw) -> Srv::Response + Send + 'static,
{
let node_handle = node.clone_handle();
let handle = RclService::new::<Srv>(&node_handle.lock().unwrap(), service_name, qos)?;
let handle = Arc::new(RclService::new::<Srv>(
node.clone_handle(),
service_name,
qos,
)?);

let callback = {
let handle = Arc::clone(&handle);

move |(mut req_header, req)| {
let res = (callback)(&req);
handle.send_response::<Srv>(&mut req_header, res).unwrap();
}
};

Ok(Arc::new(Self {
Ok(Self {
handle,
callback: Box::new(callback),
node_handle,
}))
worker: ReceiveWorker::new(callback),
})
}

pub(crate) fn create_invoker(&self) -> ServiceInvoker<Srv> {
ServiceInvoker {
handle: self.clone_handle(),
tx: Some(self.clone_tx()),
}
}

pub(crate) fn clone_handle(&self) -> Arc<RclService> {
Arc::clone(&self.handle)
}

pub(crate) fn clone_tx(&self) -> mpsc::Sender<WorkerMessage<ChannelMessage<Srv>>> {
self.worker.clone_tx()
}

pub fn service_name(&self) -> String {
@@ -152,39 +200,60 @@ where
}
}

impl<Srv> ServiceBase for Service<Srv>
pub(crate) trait ServiceInvokerBase: fmt::Debug {
fn handle(&self) -> &RclService;
fn invoke(&mut self) -> Result<()>;
}

pub(crate) struct ServiceInvoker<Srv>
where
Srv: ServiceT,
{
fn handle(&self) -> &RclService {
&self.handle
handle: Arc<RclService>,
tx: Option<mpsc::Sender<WorkerMessage<ChannelMessage<Srv>>>>,
}

impl<Srv> ServiceInvoker<Srv>
where
Srv: ServiceT,
{
fn stop(&mut self) {
self.tx.take();
}
}

fn call_callback(&self) -> Result<()> {
match self.handle.take_request::<Srv>() {
Ok((mut req_header, req)) => {
let res = (self.callback)(&req);
self.handle.send_response::<Srv>(&mut req_header, res)
}
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclSubscriptionTakeFailed(_)) => Ok(()),
_ => Err(e),
},
}
impl<Srv> fmt::Debug for ServiceInvoker<Srv>
where
Srv: ServiceT,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "ServiceInvoker {{{:?}}}", self.handle)
}
}

impl<Srv> Drop for Service<Srv>
impl<Srv> ServiceInvokerBase for ServiceInvoker<Srv>
where
Srv: ServiceT,
{
fn drop(&mut self) {
if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl service handle: {}",
e
)
fn handle(&self) -> &RclService {
&self.handle
}

fn invoke(&mut self) -> Result<()> {
if let Some(ref mut tx) = self.tx {
match tx.try_send(WorkerMessage::Message(self.handle.take_request::<Srv>()?)) {
Ok(_) => (),
Err(e) if e.is_disconnected() => self.stop(),
Err(_) => {
return Err(RclRustError::MessageQueueIsFull {
type_: "Service",
name: self.handle.service_name(),
}
.into())
}
}
}

Ok(())
}
}

+ 114
- 57
rclrust/src/subscription.rs View File

@@ -1,15 +1,20 @@
use std::{
ffi::CString,
fmt,
os::raw::c_void,
sync::{Arc, Mutex},
};

use anyhow::{Context, Result};
use futures::channel::mpsc;
use rclrust_msg::_core::MessageT;

use crate::{
error::{RclRustError, ToRclRustResult},
internal::ffi::*,
internal::{
ffi::*,
worker::{ReceiveWorker, WorkerMessage},
},
log::Logger,
node::{Node, RclNode},
qos::QoSProfile,
@@ -17,12 +22,16 @@ use crate::{
};

#[derive(Debug)]
pub(crate) struct RclSubscription(Box<rcl_sys::rcl_subscription_t>);
pub(crate) struct RclSubscription {
r#impl: Box<rcl_sys::rcl_subscription_t>,
node: Arc<Mutex<RclNode>>,
}

unsafe impl Send for RclSubscription {}
unsafe impl Sync for RclSubscription {}

impl RclSubscription {
fn new<T>(node: &RclNode, topic_name: &str, qos: &QoSProfile) -> Result<Self>
fn new<T>(node: Arc<Mutex<RclNode>>, topic_name: &str, qos: &QoSProfile) -> Result<Self>
where
T: MessageT,
{
@@ -35,7 +44,7 @@ impl RclSubscription {
unsafe {
rcl_sys::rcl_subscription_init(
&mut *subscription,
node.raw(),
node.lock().unwrap().raw(),
T::type_support() as *const _,
topic_c_str.as_ptr(),
&options,
@@ -44,27 +53,24 @@ impl RclSubscription {
.with_context(|| "rcl_sys::rcl_subscription_init in RclSubscription::new")?;
}

Ok(Self(subscription))
Ok(Self {
r#impl: subscription,
node,
})
}

pub const fn raw(&self) -> &rcl_sys::rcl_subscription_t {
&self.0
}

unsafe fn fini(&mut self, node: &mut RclNode) -> Result<()> {
rcl_sys::rcl_subscription_fini(&mut *self.0, node.raw_mut())
.to_result()
.with_context(|| "rcl_sys::rcl_subscription_init in RclSubscription::fini")
&self.r#impl
}

fn take<T>(&self) -> Result<T::Raw>
fn take<T>(&self) -> Result<Arc<T::Raw>>
where
T: MessageT,
{
let mut message = T::Raw::default();
unsafe {
rcl_sys::rcl_take(
&*self.0,
&*self.r#impl,
&mut message as *mut _ as *mut c_void,
std::ptr::null_mut(),
std::ptr::null_mut(),
@@ -73,24 +79,24 @@ impl RclSubscription {
.with_context(|| "rcl_sys::rcl_take in RclSubscription::take")?;
}

Ok(message)
Ok(Arc::new(message))
}

fn topic_name(&self) -> Option<String> {
fn topic_name(&self) -> String {
unsafe {
let topic_name = rcl_sys::rcl_subscription_get_topic_name(&*self.0);
String::from_c_char(topic_name)
let topic_name = rcl_sys::rcl_subscription_get_topic_name(&*self.r#impl);
String::from_c_char(topic_name).unwrap()
}
}

fn is_valid(&self) -> bool {
unsafe { rcl_sys::rcl_subscription_is_valid(&*self.0) }
unsafe { rcl_sys::rcl_subscription_is_valid(&*self.r#impl) }
}

fn publisher_count(&self) -> Result<usize> {
let mut size = 0;
unsafe {
rcl_sys::rcl_subscription_get_publisher_count(&*self.0, &mut size)
rcl_sys::rcl_subscription_get_publisher_count(&*self.r#impl, &mut size)
.to_result()
.with_context(|| {
"rcl_sys::rcl_subscription_get_publisher_count in RclSubscription::publisher_count"
@@ -100,44 +106,71 @@ impl RclSubscription {
}
}

pub(crate) trait SubscriptionBase {
fn handle(&self) -> &RclSubscription;
fn call_callback(&self) -> Result<()>;
impl Drop for RclSubscription {
fn drop(&mut self) {
if let Err(e) = unsafe {
rcl_sys::rcl_subscription_fini(&mut *self.r#impl, self.node.lock().unwrap().raw_mut())
.to_result()
} {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl node handle: {}",
e
)
}
}
}

pub struct Subscription<T>
where
T: MessageT,
{
handle: RclSubscription,
callback: Box<dyn Fn(&T::Raw)>,
node_handle: Arc<Mutex<RclNode>>,
handle: Arc<RclSubscription>,
worker: ReceiveWorker<Arc<T::Raw>>,
}

impl<T> Subscription<T>
where
T: MessageT,
{
pub(crate) fn new<'ctx, F>(
node: &Node<'ctx>,
pub(crate) fn new<F>(
node: &Node,
topic_name: &str,
callback: F,
qos: &QoSProfile,
) -> Result<Arc<Self>>
) -> Result<Self>
where
F: Fn(&T::Raw) + 'static,
T::Raw: 'static,
F: Fn(Arc<T::Raw>) + Send + 'static,
{
let node_handle = node.clone_handle();
let handle = RclSubscription::new::<T>(&node_handle.lock().unwrap(), topic_name, qos)?;
let handle = Arc::new(RclSubscription::new::<T>(
node.clone_handle(),
topic_name,
qos,
)?);

Ok(Arc::new(Self {
Ok(Self {
handle,
callback: Box::new(callback),
node_handle,
}))
worker: ReceiveWorker::new(callback),
})
}

pub(crate) fn create_invoker(&self) -> SubscriptionInvoker<T> {
SubscriptionInvoker {
handle: self.clone_handle(),
tx: Some(self.clone_tx()),
}
}

pub(crate) fn clone_handle(&self) -> Arc<RclSubscription> {
Arc::clone(&self.handle)
}

pub fn topic_name(&self) -> Option<String> {
pub(crate) fn clone_tx(&self) -> mpsc::Sender<WorkerMessage<Arc<T::Raw>>> {
self.worker.clone_tx()
}

pub fn topic_name(&self) -> String {
self.handle.topic_name()
}

@@ -150,37 +183,61 @@ where
}
}

impl<T> SubscriptionBase for Subscription<T>
pub(crate) trait SubscriptionInvokerBase: fmt::Debug {
fn handle(&self) -> &RclSubscription;
fn invoke(&mut self) -> Result<()>;
}

pub(crate) struct SubscriptionInvoker<T>
where
T: MessageT,
{
fn handle(&self) -> &RclSubscription {
&self.handle
handle: Arc<RclSubscription>,
tx: Option<mpsc::Sender<WorkerMessage<Arc<T::Raw>>>>,
}

impl<T> SubscriptionInvoker<T>
where
T: MessageT,
{
fn stop(&mut self) {
self.tx.take();
}
}

fn call_callback(&self) -> Result<()> {
match self.handle.take::<T>() {
Ok(message) => (self.callback)(&message),
Err(e) => match e.downcast_ref::<RclRustError>() {
Some(RclRustError::RclSubscriptionTakeFailed(_)) => {}
_ => return Err(e),
},
}
Ok(())
impl<T> fmt::Debug for SubscriptionInvoker<T>
where
T: MessageT,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "SubscriptionInvoker {{{:?}}}", self.handle)
}
}

impl<T> Drop for Subscription<T>
impl<T> SubscriptionInvokerBase for SubscriptionInvoker<T>
where
T: MessageT,
T::Raw: 'static,
{
fn drop(&mut self) {
if let Err(e) = unsafe { self.handle.fini(&mut self.node_handle.lock().unwrap()) } {
rclrust_error!(
Logger::new("rclrust"),
"Failed to clean up rcl node handle: {}",
e
)
fn handle(&self) -> &RclSubscription {
&self.handle
}

fn invoke(&mut self) -> Result<()> {
if let Some(ref mut tx) = self.tx {
match tx.try_send(WorkerMessage::Message(self.handle.take::<T>()?)) {
Ok(_) => (),
Err(e) if e.is_disconnected() => self.stop(),
Err(_) => {
return Err(RclRustError::MessageQueueIsFull {
type_: "Subscription",
name: self.handle.topic_name(),
}
.into())
}
}
}

Ok(())
}
}

+ 61
- 17
rclrust/src/timer.rs View File

@@ -1,15 +1,18 @@
use std::{
convert::TryInto,
fmt,
sync::{Arc, Mutex},
time::Duration,
};

use anyhow::{Context, Result};
use futures::channel::mpsc;

use crate::{
clock::{Clock, ClockType},
context::RclContext,
error::ToRclRustResult,
error::{RclRustError, ToRclRustResult},
internal::worker::{ReceiveWorker, WorkerMessage},
log::Logger,
node::Node,
rclrust_error,
@@ -44,6 +47,7 @@ impl RclTimer {
&self.0
}

#[allow(dead_code)]
fn is_ready(&self) -> Result<bool> {
let mut ready = false;
unsafe {
@@ -76,42 +80,82 @@ impl Drop for RclTimer {
}

pub struct Timer {
handle: Mutex<RclTimer>,
handle: Arc<Mutex<RclTimer>>,
_clock: Box<Clock>,
callback: Box<dyn Fn()>,
worker: ReceiveWorker<()>,
}

impl<'ctx> Timer {
impl Timer {
pub(crate) fn new<F>(
node: &Node<'ctx>,
node: &Node,
period: Duration,
clock_type: ClockType,
callback: F,
) -> Result<Arc<Self>>
where
F: Fn() + 'static,
F: Fn() + Send + 'static,
{
let mut clock = Box::new(Clock::new(clock_type)?);
let handle = RclTimer::new(&mut clock, &mut node.context.handle.lock().unwrap(), period)?;
let handle = Arc::new(Mutex::new(RclTimer::new(
&mut clock,
&mut node.context.lock().unwrap(),
period,
)?));
Ok(Arc::new(Self {
handle: Mutex::new(handle),
handle,
_clock: clock,
callback: Box::new(callback),
worker: ReceiveWorker::new(move |_| callback()),
}))
}

pub(crate) const fn handle(&self) -> &Mutex<RclTimer> {
&self.handle
pub(crate) fn create_invoker(&self) -> TimerInvoker {
TimerInvoker {
handle: self.clone_handle(),
tx: Some(self.clone_tx()),
}
}

pub(crate) fn clone_handle(&self) -> Arc<Mutex<RclTimer>> {
Arc::clone(&self.handle)
}

pub(crate) fn call_callback(&self) -> Result<()> {
let mut handle = self.handle.lock().unwrap();
pub(crate) fn clone_tx(&self) -> mpsc::Sender<WorkerMessage<()>> {
self.worker.clone_tx()
}
}

pub(crate) struct TimerInvoker {
pub handle: Arc<Mutex<RclTimer>>,
tx: Option<mpsc::Sender<WorkerMessage<()>>>,
}

if handle.is_ready()? {
handle.call()?;
drop(handle);
(self.callback)()
impl fmt::Debug for TimerInvoker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TimerInvoker {{{:?}}}", self.handle)
}
}

impl TimerInvoker {
fn stop(&mut self) {
self.tx.take();
}

pub fn invoke(&mut self) -> Result<()> {
self.handle.lock().unwrap().call()?;
if let Some(ref mut tx) = self.tx {
match tx.try_send(WorkerMessage::Message(())) {
Ok(_) => (),
Err(e) if e.is_disconnected() => self.stop(),
Err(_) => {
return Err(RclRustError::MessageQueueIsFull {
type_: "Timer",
name: "<none>".into(),
}
.into())
}
}
}

Ok(())
}
}

+ 5
- 7
rclrust/src/utility.rs View File

@@ -1,5 +1,3 @@
use std::sync::Arc;

use anyhow::Result;

use crate::{context::Context, init_options::InitOptions};
@@ -12,7 +10,7 @@ use crate::{context::Context, init_options::InitOptions};
/// let ctx = rclrust::init().unwrap();
/// assert!(ctx.is_valid());
/// ```
pub fn init() -> Result<Arc<Context>> {
pub fn init() -> Result<Context> {
init_with_options(InitOptions::new()?)
}

@@ -27,17 +25,17 @@ pub fn init() -> Result<Arc<Context>> {
/// let ctx = rclrust::init_with_options(init_options).unwrap();
/// assert!(ctx.is_valid());
/// ```
pub fn init_with_options(init_options: InitOptions) -> Result<Arc<Context>> {
pub fn init_with_options(init_options: InitOptions) -> Result<Context> {
Context::new(std::env::args().collect::<Vec<_>>(), init_options)
}

/// Check rclrust's status.
pub fn ok(ctx: Arc<Context>) -> bool {
pub fn ok(ctx: &Context) -> bool {
ctx.is_valid()
}

/// Shutdown rclrust context
pub fn shutdown(ctx: Arc<Context>, reason: &str) -> Result<()> {
pub fn shutdown(ctx: &Context, reason: &str) -> Result<()> {
ctx.shutdown(reason)
}

@@ -48,7 +46,7 @@ mod test {
#[test]
fn rclrust_init() -> Result<()> {
let ctx = init()?;
assert!(ok(ctx));
assert!(ok(&ctx));

Ok(())
}


+ 25
- 1
rclrust/src/wait_set.rs View File

@@ -6,7 +6,7 @@ use crate::{
};

#[derive(Debug)]
pub(crate) struct RclWaitSet(rcl_sys::rcl_wait_set_t);
pub(super) struct RclWaitSet(rcl_sys::rcl_wait_set_t);

impl RclWaitSet {
pub fn new(
@@ -74,6 +74,12 @@ impl RclWaitSet {
}
}

pub fn subscriptions_ready(&self) -> impl Iterator<Item = bool> {
unsafe { std::slice::from_raw_parts(self.0.subscriptions, self.0.size_of_subscriptions) }
.iter()
.map(|p| !p.is_null())
}

pub fn add_timer(&mut self, timer: &RclTimer) -> Result<()> {
unsafe {
rcl_sys::rcl_wait_set_add_timer(&mut self.0, timer.raw(), std::ptr::null_mut())
@@ -82,6 +88,12 @@ impl RclWaitSet {
}
}

pub fn timers_ready(&self) -> impl Iterator<Item = bool> {
unsafe { std::slice::from_raw_parts(self.0.timers, self.0.size_of_timers) }
.iter()
.map(|p| !p.is_null())
}

pub fn add_client(&mut self, client: &RclClient) -> Result<()> {
unsafe {
rcl_sys::rcl_wait_set_add_client(&mut self.0, client.raw(), std::ptr::null_mut())
@@ -90,6 +102,12 @@ impl RclWaitSet {
}
}

pub fn clients_ready(&self) -> impl Iterator<Item = bool> {
unsafe { std::slice::from_raw_parts(self.0.clients, self.0.size_of_clients) }
.iter()
.map(|p| !p.is_null())
}

pub fn add_service(&mut self, service: &RclService) -> Result<()> {
unsafe {
rcl_sys::rcl_wait_set_add_service(&mut self.0, service.raw(), std::ptr::null_mut())
@@ -97,6 +115,12 @@ impl RclWaitSet {
.with_context(|| "rcl_sys::rcl_wait_set_add_service in RclWaitSet::add_service")
}
}

pub fn services_ready(&self) -> impl Iterator<Item = bool> {
unsafe { std::slice::from_raw_parts(self.0.services, self.0.size_of_services) }
.iter()
.map(|p| !p.is_null())
}
}

impl Drop for RclWaitSet {


Loading…
Cancel
Save