| @@ -0,0 +1,20 @@ | |||
| use std::cell::Cell; | |||
| use std::time::Duration; | |||
| use anyhow::Result; | |||
| use rclrust::prelude::*; | |||
| fn main() -> Result<()> { | |||
| let ctx = rclrust::init()?; | |||
| let node = ctx.create_node("examples_timer")?; | |||
| let logger = node.logger(); | |||
| let count = Cell::new(0); | |||
| let _timer = node.create_wall_timer(Duration::from_millis(100), move || { | |||
| rclrust_info!(logger, "count: {}", count.get()); | |||
| count.set(count.get() + 1); | |||
| })?; | |||
| rclrust::spin(&node)?; | |||
| Ok(()) | |||
| } | |||
| @@ -58,6 +58,10 @@ impl Clock { | |||
| } | |||
| } | |||
| pub(crate) fn raw_mut(&mut self) -> &mut rcl_sys::rcl_clock_t { | |||
| &mut self.0 | |||
| } | |||
| /// Construct a new `Clock` with ros time | |||
| /// | |||
| /// # Examples | |||
| @@ -51,25 +51,26 @@ impl<'ctx> SingleThreadExecutor<'ctx> { | |||
| } | |||
| pub fn spin_some(&self, max_duration: Duration) -> Result<()> { | |||
| let (n_subscriptions, _, _, _, _, _) = self.nodes.iter().filter_map(|n| n.upgrade()).fold( | |||
| (0, 0, 0, 0, 0, 0), | |||
| |(subs, guards, timers, clients, services, events), node| { | |||
| ( | |||
| subs + node.subscriptions.lock().unwrap().len(), | |||
| guards, | |||
| timers, | |||
| clients, | |||
| services, | |||
| events, | |||
| ) | |||
| }, | |||
| ); | |||
| let (n_subscriptions, _, n_timers, _, _, _) = | |||
| self.nodes.iter().filter_map(|n| n.upgrade()).fold( | |||
| (0, 0, 0, 0, 0, 0), | |||
| |(subs, guards, timers, clients, services, events), node| { | |||
| ( | |||
| subs + node.subscriptions.lock().unwrap().len(), | |||
| guards, | |||
| timers + node.timers.lock().unwrap().len(), | |||
| clients, | |||
| services, | |||
| events, | |||
| ) | |||
| }, | |||
| ); | |||
| let mut wait_set = RclWaitSet::new( | |||
| &mut self.context.handle().lock().unwrap(), | |||
| n_subscriptions, | |||
| 0, | |||
| 0, | |||
| n_timers, | |||
| 0, | |||
| 0, | |||
| 0, | |||
| @@ -78,21 +79,13 @@ impl<'ctx> SingleThreadExecutor<'ctx> { | |||
| wait_set.clear()?; | |||
| for node in self.nodes.iter().filter_map(|n| n.upgrade()) { | |||
| for subscription in node.subscriptions.lock().unwrap().iter() { | |||
| if let Some(subscription) = subscription.upgrade() { | |||
| wait_set.add_subscription(&subscription.handle())?; | |||
| } | |||
| } | |||
| node.add_to_wait_set(&mut wait_set)?; | |||
| } | |||
| wait_set.wait(max_duration.as_nanos() as i64)?; | |||
| for node in self.nodes.iter().filter_map(|n| n.upgrade()) { | |||
| for subscription in node.subscriptions.lock().unwrap().iter() { | |||
| if let Some(subscription) = subscription.upgrade() { | |||
| subscription.call_callback().unwrap(); | |||
| } | |||
| } | |||
| node.call_callbacks()?; | |||
| } | |||
| Ok(()) | |||
| @@ -19,6 +19,7 @@ pub mod publisher; | |||
| pub mod qos; | |||
| pub mod subscription; | |||
| pub mod time; | |||
| pub mod timer; | |||
| pub mod utility; | |||
| pub mod wait_set; | |||
| @@ -1,8 +1,10 @@ | |||
| use std::ffi::CString; | |||
| use std::sync::{Arc, Mutex, Weak}; | |||
| use std::time::Duration; | |||
| use anyhow::{ensure, Context as _, Result}; | |||
| use crate::clock::ClockType; | |||
| use crate::context::{Context, RclContext}; | |||
| use crate::error::ToRclRustResult; | |||
| use crate::internal::ffi::*; | |||
| @@ -12,6 +14,8 @@ use crate::publisher::Publisher; | |||
| use crate::qos::QoSProfile; | |||
| use crate::rclrust_error; | |||
| use crate::subscription::{Subscription, SubscriptionBase}; | |||
| use crate::timer::Timer; | |||
| use crate::wait_set::RclWaitSet; | |||
| #[derive(Debug)] | |||
| pub(crate) struct RclNode(rcl_sys::rcl_node_t); | |||
| @@ -95,6 +99,7 @@ pub struct Node<'ctx> { | |||
| handle: Arc<Mutex<RclNode>>, | |||
| context: &'ctx Context, | |||
| pub(crate) subscriptions: Mutex<Vec<Weak<dyn SubscriptionBase>>>, | |||
| pub(crate) timers: Mutex<Vec<Weak<Timer>>>, | |||
| } | |||
| impl<'ctx> Node<'ctx> { | |||
| @@ -118,7 +123,8 @@ impl<'ctx> Node<'ctx> { | |||
| Ok(Arc::new(Self { | |||
| handle: Arc::new(Mutex::new(handle)), | |||
| context, | |||
| subscriptions: Mutex::new(vec![]), | |||
| subscriptions: Mutex::new(Vec::new()), | |||
| timers: Mutex::new(Vec::new()), | |||
| })) | |||
| } | |||
| @@ -242,6 +248,57 @@ impl<'ctx> Node<'ctx> { | |||
| self.subscriptions.lock().unwrap().push(weak_sub); | |||
| Ok(sub) | |||
| } | |||
| pub fn create_timer<F>( | |||
| &self, | |||
| period: Duration, | |||
| clock_type: ClockType, | |||
| callback: F, | |||
| ) -> Result<Arc<Timer>> | |||
| where | |||
| F: Fn() + 'static, | |||
| { | |||
| let timer = Timer::new(self, period, clock_type, callback)?; | |||
| let weak_timer = Arc::downgrade(&timer); | |||
| self.timers.lock().unwrap().push(weak_timer); | |||
| Ok(timer) | |||
| } | |||
| pub fn create_wall_timer<F>(&self, period: Duration, callback: F) -> Result<Arc<Timer>> | |||
| where | |||
| F: Fn() + 'static, | |||
| { | |||
| self.create_timer(period, ClockType::SteadyTime, callback) | |||
| } | |||
| pub(crate) fn add_to_wait_set(&self, wait_set: &mut RclWaitSet) -> Result<()> { | |||
| for subscription in self.subscriptions.lock().unwrap().iter() { | |||
| if let Some(subscription) = subscription.upgrade() { | |||
| wait_set.add_subscription(&subscription.handle())?; | |||
| } | |||
| } | |||
| for timer in self.timers.lock().unwrap().iter() { | |||
| if let Some(timer) = timer.upgrade() { | |||
| wait_set.add_timer(&timer.handle().lock().unwrap())?; | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| pub(crate) fn call_callbacks(&self) -> Result<()> { | |||
| for subscription in self.subscriptions.lock().unwrap().iter() { | |||
| if let Some(subscription) = subscription.upgrade() { | |||
| subscription.call_callback()?; | |||
| } | |||
| } | |||
| for timer in self.timers.lock().unwrap().iter() { | |||
| if let Some(timer) = timer.upgrade() { | |||
| timer.call_callback()?; | |||
| } | |||
| } | |||
| Ok(()) | |||
| } | |||
| } | |||
| impl Drop for Node<'_> { | |||
| @@ -0,0 +1,117 @@ | |||
| use std::convert::TryInto; | |||
| use std::sync::{Arc, Mutex}; | |||
| use std::time::Duration; | |||
| use anyhow::{Context, Result}; | |||
| use crate::clock::{Clock, ClockType}; | |||
| use crate::context::RclContext; | |||
| use crate::error::ToRclRustResult; | |||
| use crate::log::Logger; | |||
| use crate::node::Node; | |||
| use crate::rclrust_error; | |||
| #[derive(Debug)] | |||
| pub struct RclTimer(rcl_sys::rcl_timer_t); | |||
| unsafe impl Send for RclTimer {} | |||
| impl RclTimer { | |||
| fn new(clock: &mut Clock, context: &mut RclContext, period: Duration) -> Result<Self> { | |||
| let mut timer = unsafe { rcl_sys::rcl_get_zero_initialized_timer() }; | |||
| unsafe { | |||
| rcl_sys::rcl_timer_init( | |||
| &mut timer, | |||
| clock.raw_mut(), | |||
| context.raw_mut(), | |||
| period.as_nanos().try_into().unwrap(), | |||
| None, | |||
| rcl_sys::rcutils_get_default_allocator(), | |||
| ) | |||
| .to_result() | |||
| .with_context(|| "rcl_sys::rcl_timer_init in RclTimer::new")?; | |||
| } | |||
| Ok(Self(timer)) | |||
| } | |||
| pub const fn raw(&self) -> &rcl_sys::rcl_timer_t { | |||
| &self.0 | |||
| } | |||
| fn is_ready(&self) -> Result<bool> { | |||
| let mut ready = false; | |||
| unsafe { | |||
| rcl_sys::rcl_timer_is_ready(&self.0, &mut ready) | |||
| .to_result() | |||
| .with_context(|| "rcl_sys::rcl_timer_is_ready in RclTimer::is_ready")?; | |||
| } | |||
| Ok(ready) | |||
| } | |||
| fn call(&mut self) -> Result<()> { | |||
| unsafe { | |||
| rcl_sys::rcl_timer_call(&mut self.0) | |||
| .to_result() | |||
| .with_context(|| "rcl_sys::rcl_timer_call in RclTimer::call") | |||
| } | |||
| } | |||
| } | |||
| impl Drop for RclTimer { | |||
| fn drop(&mut self) { | |||
| if let Err(e) = unsafe { rcl_sys::rcl_timer_fini(&mut self.0).to_result() } { | |||
| rclrust_error!( | |||
| Logger::new("rclrust"), | |||
| "Failed to clean up rcl timer handle: {}", | |||
| e | |||
| ) | |||
| } | |||
| } | |||
| } | |||
| pub struct Timer { | |||
| handle: Mutex<RclTimer>, | |||
| _clock: Box<Clock>, | |||
| callback: Box<dyn Fn()>, | |||
| } | |||
| impl<'ctx> Timer { | |||
| pub(crate) fn new<F>( | |||
| node: &Node<'ctx>, | |||
| period: Duration, | |||
| clock_type: ClockType, | |||
| callback: F, | |||
| ) -> Result<Arc<Self>> | |||
| where | |||
| F: Fn() + 'static, | |||
| { | |||
| let mut clock = Box::new(Clock::new(clock_type)?); | |||
| let handle = RclTimer::new( | |||
| &mut clock, | |||
| &mut node.context_ref().handle().lock().unwrap(), | |||
| period, | |||
| )?; | |||
| Ok(Arc::new(Self { | |||
| handle: Mutex::new(handle), | |||
| _clock: clock, | |||
| callback: Box::new(callback), | |||
| })) | |||
| } | |||
| pub(crate) fn handle(&self) -> &Mutex<RclTimer> { | |||
| &self.handle | |||
| } | |||
| pub(crate) fn call_callback(&self) -> Result<()> { | |||
| let mut handle = self.handle.lock().unwrap(); | |||
| if handle.is_ready()? { | |||
| handle.call()?; | |||
| drop(handle); | |||
| (self.callback)() | |||
| } | |||
| Ok(()) | |||
| } | |||
| } | |||
| @@ -5,6 +5,7 @@ use crate::error::ToRclRustResult; | |||
| use crate::log::Logger; | |||
| use crate::rclrust_error; | |||
| use crate::subscription::RclSubscription; | |||
| use crate::timer::RclTimer; | |||
| #[derive(Debug)] | |||
| pub(crate) struct RclWaitSet(rcl_sys::rcl_wait_set_t); | |||
| @@ -74,6 +75,14 @@ impl RclWaitSet { | |||
| }) | |||
| } | |||
| } | |||
| pub fn add_timer(&mut self, timer: &RclTimer) -> Result<()> { | |||
| unsafe { | |||
| rcl_sys::rcl_wait_set_add_timer(&mut self.0, timer.raw(), std::ptr::null_mut()) | |||
| .to_result() | |||
| .with_context(|| "rcl_sys::rcl_wait_set_add_timer in RclWaitSet::add_timer") | |||
| } | |||
| } | |||
| } | |||
| impl Drop for RclWaitSet { | |||