Browse Source

Add convenience functions for merging external event streams with `EventStream`

Our upcoming ros2<->dora bridge will allow nodes to subscribe to ROS2 topics, which results in a second stream of events. By providing a merge function, this ROS2 event stream can be merged together with the dora event stream.
tags/v0.2.4
Philipp Oppermann 2 years ago
parent
commit
c4bd9d3edf
Failed to extract signature
5 changed files with 64 additions and 3 deletions
  1. +3
    -2
      Cargo.lock
  2. +1
    -0
      apis/rust/node/Cargo.toml
  3. +58
    -0
      apis/rust/node/src/event_stream/merged.rs
  4. +1
    -0
      apis/rust/node/src/event_stream/mod.rs
  5. +1
    -1
      apis/rust/node/src/lib.rs

+ 3
- 2
Cargo.lock View File

@@ -1421,6 +1421,7 @@ dependencies = [
"eyre",
"flume",
"futures",
"futures-concurrency",
"once_cell",
"serde",
"serde_json",
@@ -1855,9 +1856,9 @@ dependencies = [

[[package]]
name = "futures-concurrency"
version = "7.2.0"
version = "7.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30ce9739c5655304eced9aaea4220e4393b8f60a3a5f1b84d09a206d6a5078a9"
checksum = "b726119e6cd29cf120724495b2085e1ed3d17821ea17b86de54576d1aa565f5e"
dependencies = [
"bitvec",
"futures-core",


+ 1
- 0
apis/rust/node/Cargo.toml View File

@@ -28,6 +28,7 @@ shared_memory = "0.12.0"
dora-tracing = { workspace = true, optional = true }
arrow = "35.0.0"
futures = "0.3.28"
futures-concurrency = "7.3.0"

[dev-dependencies]
tokio = { version = "1.24.2", features = ["rt"] }

+ 58
- 0
apis/rust/node/src/event_stream/merged.rs View File

@@ -0,0 +1,58 @@
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge;

pub enum MergedEvent<E> {
Dora(super::Event),
External(E),
}

pub enum Either<A, B> {
First(A),
Second(B),
}

pub trait MergeExternal<'a, E> {
type Item;

fn merge_external(
self,
external_events: impl Stream<Item = E> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a>;
}

impl<'a, E> MergeExternal<'a, E> for super::EventStream
where
E: 'static,
{
type Item = MergedEvent<E>;

fn merge_external(
self,
external_events: impl Stream<Item = E> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
let dora = self.map(MergedEvent::Dora);
let external = external_events.map(MergedEvent::External);
Box::new((dora, external).merge())
}
}

impl<'a, E, F, S> MergeExternal<'a, F> for S
where
S: Stream<Item = MergedEvent<E>> + Unpin + 'a,
E: 'a,
F: 'a,
{
type Item = MergedEvent<Either<E, F>>;

fn merge_external(
self,
external_events: impl Stream<Item = F> + Unpin + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + 'a> {
let first = self.map(|e| match e {
MergedEvent::Dora(d) => MergedEvent::Dora(d),
MergedEvent::External(e) => MergedEvent::External(Either::First(e)),
});
let second = external_events.map(|e| MergedEvent::External(Either::Second(e)));
Box::new((first, second).merge())
}
}

+ 1
- 0
apis/rust/node/src/event_stream/mod.rs View File

@@ -15,6 +15,7 @@ use dora_core::{
use eyre::{eyre, Context};

mod event;
pub mod merged;
mod thread;

pub struct EventStream {


+ 1
- 1
apis/rust/node/src/lib.rs View File

@@ -15,7 +15,7 @@
//!
pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters};
pub use event_stream::{Data, Event, EventStream, MappedInputData};
pub use event_stream::{merged, Data, Event, EventStream, MappedInputData};
pub use flume::Receiver;
pub use node::{DataSample, DoraNode};



Loading…
Cancel
Save