Browse Source

Use duration instead of f32 secs

tags/v0.3.1-rc
haixuanTao 2 years ago
parent
commit
0cd5281325
3 changed files with 15 additions and 15 deletions
  1. +4
    -2
      apis/python/node/src/lib.rs
  2. +10
    -12
      apis/rust/node/src/event_stream/mod.rs
  3. +1
    -1
      apis/rust/node/src/event_stream/thread.rs

+ 4
- 2
apis/python/node/src/lib.rs View File

@@ -1,5 +1,7 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
@@ -58,7 +60,7 @@ impl Node {
/// ```
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
let event = py.allow_threads(|| self.events.recv(timeout));
let event = py.allow_threads(|| self.events.recv(timeout.map(Duration::from_secs_f32)));
Ok(event)
}

@@ -157,7 +159,7 @@ enum Events {
}

impl Events {
fn recv(&mut self, timeout: Option<f32>) -> Option<PyEvent> {
fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
match self {
Events::Dora(events) => match timeout {
Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from),


+ 10
- 12
apis/rust/node/src/event_stream/mod.rs View File

@@ -112,22 +112,20 @@ impl EventStream {
}

/// wait for the next event on the events stream until timeout
pub fn recv_timeout(&mut self, secs: f32) -> Option<Event> {
futures::executor::block_on(self.recv_async(Some(secs)))
pub fn recv_timeout(&mut self, dur: Duration) -> Option<Event> {
futures::executor::block_on(self.recv_async(Some(dur)))
}

pub async fn recv_async(&mut self, secs: Option<f32>) -> Option<Event> {
pub async fn recv_async(&mut self, dur: Option<Duration>) -> Option<Event> {
let receive_event = self.receiver.next();
match secs {
match dur {
None => receive_event.await,
Some(secs) => {
match select(Delay::new(Duration::from_secs_f32(secs)), receive_event).await {
Either::Left((_elapsed, _)) => {
Some(EventItem::TimedoutError(eyre!("Receiver timed out")))
}
Either::Right((event, _)) => event,
Some(dur) => match select(Delay::new(dur), receive_event).await {
Either::Left((_elapsed, _)) => {
Some(EventItem::TimeoutError(eyre!("Receiver timed out")))
}
}
Either::Right((event, _)) => event,
},
}
.map(Self::convert_event_item)
}
@@ -182,7 +180,7 @@ impl EventStream {
EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}
EventItem::TimedoutError(err) => {
EventItem::TimeoutError(err) => {
Event::Error(format!("Timeout event stream error: {err:?}"))
}
}


+ 1
- 1
apis/rust/node/src/event_stream/thread.rs View File

@@ -30,7 +30,7 @@ pub enum EventItem {
ack_channel: flume::Sender<()>,
},
FatalError(eyre::Report),
TimedoutError(eyre::Report),
TimeoutError(eyre::Report),
}

pub struct EventStreamThreadHandle {


Loading…
Cancel
Save