diff --git a/Cargo.lock b/Cargo.lock index 6dca9cb0..04ca09c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1421,6 +1421,7 @@ dependencies = [ "eyre", "flume", "futures", + "futures-concurrency", "once_cell", "serde", "serde_json", @@ -1855,9 +1856,9 @@ dependencies = [ [[package]] name = "futures-concurrency" -version = "7.2.0" +version = "7.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ce9739c5655304eced9aaea4220e4393b8f60a3a5f1b84d09a206d6a5078a9" +checksum = "b726119e6cd29cf120724495b2085e1ed3d17821ea17b86de54576d1aa565f5e" dependencies = [ "bitvec", "futures-core", diff --git a/apis/rust/node/Cargo.toml b/apis/rust/node/Cargo.toml index c242d44c..023904d6 100644 --- a/apis/rust/node/Cargo.toml +++ b/apis/rust/node/Cargo.toml @@ -28,6 +28,7 @@ shared_memory = "0.12.0" dora-tracing = { workspace = true, optional = true } arrow = "35.0.0" futures = "0.3.28" +futures-concurrency = "7.3.0" [dev-dependencies] tokio = { version = "1.24.2", features = ["rt"] } diff --git a/apis/rust/node/src/event_stream/merged.rs b/apis/rust/node/src/event_stream/merged.rs new file mode 100644 index 00000000..b9a10aa2 --- /dev/null +++ b/apis/rust/node/src/event_stream/merged.rs @@ -0,0 +1,58 @@ +use futures::{Stream, StreamExt}; +use futures_concurrency::stream::Merge; + +pub enum MergedEvent { + Dora(super::Event), + External(E), +} + +pub enum Either { + First(A), + Second(B), +} + +pub trait MergeExternal<'a, E> { + type Item; + + fn merge_external( + self, + external_events: impl Stream + Unpin + 'a, + ) -> Box + Unpin + 'a>; +} + +impl<'a, E> MergeExternal<'a, E> for super::EventStream +where + E: 'static, +{ + type Item = MergedEvent; + + fn merge_external( + self, + external_events: impl Stream + Unpin + 'a, + ) -> Box + Unpin + 'a> { + let dora = self.map(MergedEvent::Dora); + let external = external_events.map(MergedEvent::External); + Box::new((dora, external).merge()) + } +} + +impl<'a, E, F, S> MergeExternal<'a, F> for S +where + S: Stream> + Unpin + 'a, + E: 'a, + F: 'a, +{ + type Item = MergedEvent>; + + fn merge_external( + self, + external_events: impl Stream + Unpin + 'a, + ) -> Box + Unpin + 'a> { + let first = self.map(|e| match e { + MergedEvent::Dora(d) => MergedEvent::Dora(d), + MergedEvent::External(e) => MergedEvent::External(Either::First(e)), + }); + let second = external_events.map(|e| MergedEvent::External(Either::Second(e))); + Box::new((first, second).merge()) + } +} diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 104722f8..f052c123 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -15,6 +15,7 @@ use dora_core::{ use eyre::{eyre, Context}; mod event; +pub mod merged; mod thread; pub struct EventStream { diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index a07be734..d23bae9f 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -15,7 +15,7 @@ //! pub use dora_core; pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; -pub use event_stream::{Data, Event, EventStream, MappedInputData}; +pub use event_stream::{merged, Data, Event, EventStream, MappedInputData}; pub use flume::Receiver; pub use node::{DataSample, DoraNode};