Compare commits

...

1 Commits

Author SHA1 Message Date
  Philipp Oppermann a643f8d3da Use `Receiver::try_recv` instead of `Stream::next` with timeout 1 year ago
1 changed files with 8 additions and 8 deletions
Split View
  1. +8
    -8
      apis/rust/node/src/event_stream/mod.rs

+ 8
- 8
apis/rust/node/src/event_stream/mod.rs View File

@@ -37,7 +37,7 @@ mod thread;

pub struct EventStream {
node_id: NodeId,
receiver: flume::r#async::RecvStream<'static, EventItem>,
receiver: flume::Receiver<EventItem>,
_thread_handle: EventStreamThreadHandle,
close_channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
@@ -149,7 +149,7 @@ impl EventStream {

Ok(EventStream {
node_id: node_id.clone(),
receiver: rx.into_stream(),
receiver: rx,
_thread_handle: thread_handle,
close_channel,
clock,
@@ -170,16 +170,15 @@ impl EventStream {
pub async fn recv_async(&mut self) -> Option<Event> {
loop {
if self.scheduler.is_empty() {
if let Some(event) = self.receiver.next().await {
if let Ok(event) = self.receiver.recv_async().await {
self.scheduler.add_event(event);
} else {
break;
}
} else {
match select(Delay::new(Duration::from_micros(300)), self.receiver.next()).await {
Either::Left((_elapsed, _)) => break,
Either::Right((Some(event), _)) => self.scheduler.add_event(event),
Either::Right((None, _)) => break,
match self.receiver.try_recv() {
Ok(event) => self.scheduler.add_event(event),
Err(_) => break, // no other ready events
};
}
}
@@ -258,10 +257,11 @@ impl Stream for EventStream {
type Item = Event;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.receiver
.stream()
.poll_next_unpin(cx)
.map(|item| item.map(Self::convert_event_item))
}


Loading…
Cancel
Save