|
|
|
@@ -8,7 +8,7 @@ use dora_core::{ |
|
|
|
use dora_node_api::DoraNode; |
|
|
|
use eyre::{bail, Context, Result}; |
|
|
|
use futures::{Stream, StreamExt}; |
|
|
|
use futures_concurrency::Merge; |
|
|
|
use futures_concurrency::stream::Merge; |
|
|
|
use operator::{run_operator, OperatorEvent, StopReason}; |
|
|
|
|
|
|
|
use std::{collections::HashMap, mem}; |
|
|
|
@@ -47,7 +47,7 @@ pub fn main() -> eyre::Result<()> { |
|
|
|
id: operator_id.clone(), |
|
|
|
event, |
|
|
|
}); |
|
|
|
let daemon_events = futures::stream::unfold(daemon_events, |mut stream| async { |
|
|
|
let daemon_events = Box::pin(futures::stream::unfold(daemon_events, |mut stream| async { |
|
|
|
let event = stream.recv_async().await.map(|event| match event { |
|
|
|
dora_node_api::daemon::Event::Stop => Event::Stop, |
|
|
|
dora_node_api::daemon::Event::Input { id, metadata, data } => Event::Input { |
|
|
|
@@ -60,7 +60,7 @@ pub fn main() -> eyre::Result<()> { |
|
|
|
_ => todo!(), |
|
|
|
}); |
|
|
|
event.map(|event| (event, stream)) |
|
|
|
}); |
|
|
|
})); |
|
|
|
let events = (operator_events, daemon_events).merge(); |
|
|
|
let tokio_runtime = Builder::new_current_thread() |
|
|
|
.enable_all() |
|
|
|
|