diff --git a/Cargo.lock b/Cargo.lock index 9b6e26a8..bb05ca38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1015,7 +1015,7 @@ dependencies = [ name = "communication-layer-pub-sub" version = "0.3.2" dependencies = [ - "flume", + "flume 0.10.14", "zenoh", ] @@ -1488,7 +1488,7 @@ dependencies = [ "dora-node-api", "dora-tracing", "eyre", - "flume", + "flume 0.10.14", "futures", "futures-concurrency", "serde_json", @@ -1562,7 +1562,7 @@ dependencies = [ "dora-core", "dora-tracing", "eyre", - "flume", + "flume 0.10.14", "futures", "futures-concurrency", "futures-timer", @@ -1610,7 +1610,7 @@ dependencies = [ "dora-ros2-bridge-python", "dora-runtime", "eyre", - "flume", + "flume 0.10.14", "futures", "pyo3", "pythonize", @@ -1660,7 +1660,7 @@ dependencies = [ "arrow-schema", "dora-node-api", "eyre", - "flume", + "flume 0.10.14", "pyo3", "serde_yaml 0.8.26", ] @@ -1692,7 +1692,9 @@ dependencies = [ "dora-daemon", "dora-ros2-bridge-msg-gen", "eyre", + "flume 0.11.0", "futures", + "futures-timer", "rand", "ros2-client", "rust-format", @@ -1747,7 +1749,7 @@ dependencies = [ "dora-operator-api-types", "dora-tracing", "eyre", - "flume", + "flume 0.10.14", "futures", "futures-concurrency", "libloading", @@ -1998,6 +2000,18 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -6530,7 +6544,7 @@ dependencies = [ "base64 0.13.1", "env_logger", "event-listener 2.5.3", - "flume", + "flume 0.10.14", "form_urlencoded", "futures", "git-version", @@ -6594,7 +6608,7 @@ checksum = "1e256d7aff2c9af765d77efbfae7fcb708d2d7f4e179aa201bff2f81ad7a3845" dependencies = [ "async-std", "async-trait", - "flume", + "flume 0.10.14", "log", "zenoh-core", "zenoh-sync", @@ -6606,7 +6620,7 @@ version = "0.7.0-rc" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bad1ff61abf28c57e8879ec4286fa29becf7e9bf12555df9a7faddff3bc9ea1b" dependencies = [ - "flume", + "flume 0.10.14", "json5", "num_cpus", "serde", @@ -6673,7 +6687,7 @@ checksum = "21aab9eeb2aba53e37aae57467ffca1268d209811c5e2f39761aab4c1343bce3" dependencies = [ "async-std", "async-trait", - "flume", + "flume 0.10.14", "serde", "zenoh-buffers", "zenoh-cfg-properties", @@ -6845,7 +6859,7 @@ checksum = "821070b62a55d4c8a22e1e06c939c1f2d94767e660df9fcbea377781f72f59bf" dependencies = [ "async-std", "event-listener 2.5.3", - "flume", + "flume 0.10.14", "futures", "tokio", "zenoh-core", @@ -6861,7 +6875,7 @@ dependencies = [ "async-global-executor", "async-std", "async-trait", - "flume", + "flume 0.10.14", "log", "paste", "rand", diff --git a/libraries/extensions/ros2-bridge/Cargo.toml b/libraries/extensions/ros2-bridge/Cargo.toml index 20b2d03f..967403f2 100644 --- a/libraries/extensions/ros2-bridge/Cargo.toml +++ b/libraries/extensions/ros2-bridge/Cargo.toml @@ -24,6 +24,9 @@ tokio = { version = "1.29.1", features = ["full"], optional = true } dora-daemon = { path = "../../../binaries/daemon", optional = true } tracing = "0.1.37" tracing-subscriber = "0.3.17" +flume = "0.11.0" +futures = { version = "0.3.21", features = ["thread-pool"] } +futures-timer = "3.0.3" [dev-dependencies] rand = "0.8.5" diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs b/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs index 49360375..e338410c 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/lib.rs @@ -107,22 +107,43 @@ where } }, quote! { - struct Ros2Context(ros2_client::Context); + struct Ros2Context{ + context: ros2_client::Context, + executor: std::sync::Arc, + } fn init_ros2_context() -> eyre::Result> { - Ok(Box::new(Ros2Context(ros2_client::Context::new()?))) + Ok(Box::new(Ros2Context{ + context: ros2_client::Context::new()?, + executor: std::sync::Arc::new(futures::executor::ThreadPool::new()?), + })) } impl Ros2Context { fn new_node(&self, name_space: &str, base_name: &str) -> eyre::Result> { + use futures::task::SpawnExt as _; + use eyre::WrapErr as _; + let name = ros2_client::NodeName::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; let options = ros2_client::NodeOptions::new().enable_rosout(true); - let node = self.0.new_node(name, options)?; - Ok(Box::new(Ros2Node(node))) + let mut node = self.context.new_node(name, options)?; + + let spinner = node.spinner(); + self.executor.spawn(async { + if let Err(err) = spinner.spin().await { + eprintln!("ros2 spinner failed: {err:?}"); + } + }) + .context("failed to spawn ros2 spinner")?; + + Ok(Box::new(Ros2Node{ node, executor: self.executor.clone(), })) } } - struct Ros2Node(ros2_client::Node); + struct Ros2Node { + node : ros2_client::Node, + executor: std::sync::Arc, + } fn qos_default() -> ffi::Ros2QosPolicies { ffi::Ros2QosPolicies::new(None, None, None, None, None, None, None) diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs index 157d88ce..cd4a92df 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs @@ -1,4 +1,3 @@ -use heck::SnakeCase; use quote::{format_ident, quote, ToTokens}; use syn::Ident; @@ -314,19 +313,19 @@ impl Message { pub fn #create_topic(&self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result> { let name = crate::ros2_client::Name::new(name_space, base_name).map_err(|e| eyre::eyre!(e))?; let type_name = crate::ros2_client::MessageTypeName::new(#package_name, #self_name); - let topic = self.0.create_topic(&name, type_name, &qos.into())?; + let topic = self.node.create_topic(&name, type_name, &qos.into())?; Ok(Box::new(#topic_name(topic))) } #[allow(non_snake_case)] pub fn #create_publisher(&mut self, topic: &Box<#topic_name>, qos: ffi::Ros2QosPolicies) -> eyre::Result> { - let publisher = self.0.create_publisher(&topic.0, Some(qos.into()))?; + let publisher = self.node.create_publisher(&topic.0, Some(qos.into()))?; Ok(Box::new(#publisher_name(publisher))) } #[allow(non_snake_case)] pub fn #create_subscription(&mut self, topic: &Box<#topic_name>, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result> { - let subscription = self.0.create_subscription::(&topic.0, Some(qos.into()))?; + let subscription = self.node.create_subscription::(&topic.0, Some(qos.into()))?; let stream = futures_lite::stream::unfold(subscription, |sub| async { let item = sub.async_take().await; let item_boxed: Box = Box::new(item); diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs index 70069df6..ca1617e2 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs @@ -89,28 +89,136 @@ impl Service { let self_name = format_ident!("{}", self.name); let self_name_str = &self.name; + let wait_for_service = format_ident!("wait_for_service__{package_name}__{}", self.name); + let cxx_wait_for_service = format_ident!("wait_for_service"); + let send_request = format_ident!("send_request__{package_name}__{}", self.name); + let cxx_send_request = format_ident!("send_request"); + let req_type_raw = format_ident!("{package_name}__{}_Request", self.name); + let res_type_raw = format_ident!("{package_name}__{}_Response", self.name); + let res_type_raw_str = res_type_raw.to_string(); + + let matches = format_ident!("matches__{package_name}__{}", self.name); + let cxx_matches = format_ident!("matches"); + let downcast = format_ident!("downcast__{package_name}__{}", self.name); + let cxx_downcast = format_ident!("downcast"); + let def = quote! { #[namespace = #package_name] #[cxx_name = #cxx_client_name] type #client_name; + // TODO: add `merged_streams` argument (for sending replies) #[cxx_name = #cxx_create_client] - fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies) -> Result>; + fn #create_client(self: &mut Ros2Node, name_space: &str, base_name: &str, qos: Ros2QosPolicies, events: &mut CombinedEvents) -> Result>; + + #[namespace = #package_name] + #[cxx_name = #cxx_wait_for_service] + fn #wait_for_service(self: &mut #client_name, node: &Box) -> Result<()>; + #[namespace = #package_name] + #[cxx_name = #cxx_send_request] + fn #send_request(self: &mut #client_name, request: #req_type_raw) -> Result<()>; + #[namespace = #package_name] + #[cxx_name = #cxx_matches] + fn #matches(self: &#client_name, event: &CombinedEvent) -> bool; + #[namespace = #package_name] + #[cxx_name = #cxx_downcast] + fn #downcast(self: &#client_name, event: CombinedEvent) -> Result<#res_type_raw>; }; let imp = quote! { - #[allow(non_camel_case_types)] - pub struct #client_name(ros2_client::service::Client< #package :: service :: #self_name >); - impl Ros2Node { #[allow(non_snake_case)] - pub fn #create_client(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies) -> eyre::Result> { - let client = self.0.create_client::< #package :: service :: #self_name >( + pub fn #create_client(&mut self, name_space: &str, base_name: &str, qos: ffi::Ros2QosPolicies, events: &mut crate::ffi::CombinedEvents) -> eyre::Result> { + use futures::StreamExt as _; + + let client = self.node.create_client::< #package :: service :: #self_name >( ros2_client::ServiceMapping::Enhanced, &ros2_client::Name::new(name_space, base_name).unwrap(), &ros2_client::ServiceTypeName::new(#package_name, #self_name_str), qos.clone().into(), qos.into(), )?; - Ok(Box::new(#client_name(client))) + let (response_tx, response_rx) = flume::bounded(1); + let stream = response_rx.into_stream().map(|v: eyre::Result<_>| Box::new(v) as Box); + let id = events.events.merge(Box::pin(stream)); + + Ok(Box::new(#client_name { + client: std::sync::Arc::new(client), + response_tx: std::sync::Arc::new(response_tx), + executor: self.executor.clone(), + stream_id: id, + })) + } + } + + #[allow(non_camel_case_types)] + pub struct #client_name { + client: std::sync::Arc>, + response_tx: std::sync::Arc>>, + executor: std::sync::Arc, + stream_id: u32, + } + + impl #client_name { + #[allow(non_snake_case)] + fn #wait_for_service(self: &mut #client_name, node: &Box) -> eyre::Result<()> { + let service_ready = async { + for _ in 0..10 { + let ready = self.client.wait_for_service(&node.node); + futures::pin_mut!(ready); + let timeout = futures_timer::Delay::new(std::time::Duration::from_secs(2)); + match futures::future::select(ready, timeout).await { + futures::future::Either::Left(((), _)) => { + return Ok(()); + } + futures::future::Either::Right(_) => { + eprintln!("timeout while waiting for service, retrying"); + } + } + } + eyre::bail!("service not available"); + }; + futures::executor::block_on(service_ready)?; + Ok(()) + } + + #[allow(non_snake_case)] + fn #send_request(&mut self, request: ffi::#req_type_raw) -> eyre::Result<()> { + use eyre::WrapErr; + use futures::task::SpawnExt as _; + + let request_id = futures::executor::block_on(self.client.async_send_request(request.clone())).context("failed to send request")?; + let client = self.client.clone(); + let response_tx = self.response_tx.clone(); + let send_result = async move { + let response = client.async_receive_response(request_id).await.with_context(|| format!("failed to receive response for request {request_id:?}")); + if response_tx.send_async(response).await.is_err() { + tracing::warn!("failed to send service response"); + } + }; + self.executor.spawn(send_result).context("failed to spawn response task")?; + Ok(()) + } + + #[allow(non_snake_case)] + fn #matches(&self, event: &crate::ffi::CombinedEvent) -> bool { + match &event.event.as_ref().0 { + Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => true, + _ => false + } + } + #[allow(non_snake_case)] + fn #downcast(&self, event: crate::ffi::CombinedEvent) -> eyre::Result { + use eyre::WrapErr; + + match (*event.event).0 { + Some(crate::MergedEvent::External(event)) if event.id == self.stream_id => { + let result = event.event.downcast::>() + .map_err(|_| eyre::eyre!("downcast to {} failed", #res_type_raw_str))?; + + let data = result.with_context(|| format!("failed to receive {} response", #self_name_str))?; + Ok(data) + }, + _ => eyre::bail!("not a {} response event", #self_name_str), + } } } }; diff --git a/libraries/extensions/ros2-bridge/src/lib.rs b/libraries/extensions/ros2-bridge/src/lib.rs index da41c521..7a3163b4 100644 --- a/libraries/extensions/ros2-bridge/src/lib.rs +++ b/libraries/extensions/ros2-bridge/src/lib.rs @@ -1,5 +1,9 @@ +pub use flume; +pub use futures; +pub use futures_timer; pub use ros2_client; pub use rustdds; +pub use tracing; #[cfg(feature = "generate-messages")] pub mod messages {