|
|
|
@@ -37,7 +37,7 @@ mod thread; |
|
|
|
|
|
|
|
pub struct EventStream { |
|
|
|
node_id: NodeId, |
|
|
|
receiver: flume::r#async::RecvStream<'static, EventItem>, |
|
|
|
receiver: flume::Receiver<EventItem>, |
|
|
|
_thread_handle: EventStreamThreadHandle, |
|
|
|
close_channel: DaemonChannel, |
|
|
|
clock: Arc<uhlc::HLC>, |
|
|
|
@@ -149,7 +149,7 @@ impl EventStream { |
|
|
|
|
|
|
|
Ok(EventStream { |
|
|
|
node_id: node_id.clone(), |
|
|
|
receiver: rx.into_stream(), |
|
|
|
receiver: rx, |
|
|
|
_thread_handle: thread_handle, |
|
|
|
close_channel, |
|
|
|
clock, |
|
|
|
@@ -170,16 +170,15 @@ impl EventStream { |
|
|
|
pub async fn recv_async(&mut self) -> Option<Event> { |
|
|
|
loop { |
|
|
|
if self.scheduler.is_empty() { |
|
|
|
if let Some(event) = self.receiver.next().await { |
|
|
|
if let Ok(event) = self.receiver.recv_async().await { |
|
|
|
self.scheduler.add_event(event); |
|
|
|
} else { |
|
|
|
break; |
|
|
|
} |
|
|
|
} else { |
|
|
|
match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await { |
|
|
|
Either::Left((_elapsed, _)) => break, |
|
|
|
Either::Right((Some(event), _)) => self.scheduler.add_event(event), |
|
|
|
Either::Right((None, _)) => break, |
|
|
|
match self.receiver.try_recv() { |
|
|
|
Ok(event) => self.scheduler.add_event(event), |
|
|
|
Err(_) => break, // no other ready events |
|
|
|
}; |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -258,10 +257,11 @@ impl Stream for EventStream { |
|
|
|
type Item = Event; |
|
|
|
|
|
|
|
fn poll_next( |
|
|
|
mut self: std::pin::Pin<&mut Self>, |
|
|
|
self: std::pin::Pin<&mut Self>, |
|
|
|
cx: &mut std::task::Context<'_>, |
|
|
|
) -> std::task::Poll<Option<Self::Item>> { |
|
|
|
self.receiver |
|
|
|
.stream() |
|
|
|
.poll_next_unpin(cx) |
|
|
|
.map(|item| item.map(Self::convert_event_item)) |
|
|
|
} |
|
|
|
|