|
|
|
@@ -5,7 +5,7 @@ use dora_core::{ |
|
|
|
daemon_messages::{ |
|
|
|
DaemonCoordinatorEvent, DaemonCoordinatorReply, SpawnDataflowNodes, SpawnNodeParams, |
|
|
|
}, |
|
|
|
descriptor::{collect_dora_timers, CoreNodeKind, Descriptor}, |
|
|
|
descriptor::{CoreNodeKind, Descriptor}, |
|
|
|
}; |
|
|
|
use eyre::{bail, eyre, ContextCompat, WrapErr}; |
|
|
|
use futures::{stream::FuturesUnordered, StreamExt}; |
|
|
|
@@ -38,7 +38,6 @@ pub async fn spawn_dataflow( |
|
|
|
.ok_or_else(|| eyre!("canonicalized dataflow path has no parent"))? |
|
|
|
.to_owned(); |
|
|
|
let nodes = descriptor.resolve_aliases(); |
|
|
|
let dora_timers = collect_dora_timers(&nodes); |
|
|
|
let uuid = Uuid::new_v4(); |
|
|
|
let communication_config = { |
|
|
|
let mut config = descriptor.communication; |
|
|
|
@@ -125,17 +124,6 @@ pub struct SpawnedDataflow { |
|
|
|
pub machines: BTreeSet<String>, |
|
|
|
} |
|
|
|
|
|
|
|
pub async fn await_tasks( |
|
|
|
mut tasks: FuturesUnordered<tokio::task::JoinHandle<Result<(), eyre::ErrReport>>>, |
|
|
|
) -> eyre::Result<()> { |
|
|
|
while let Some(task_result) = tasks.next().await { |
|
|
|
task_result |
|
|
|
.wrap_err("failed to join async task")? |
|
|
|
.wrap_err("custom node failed")?; |
|
|
|
} |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
|
|
|
|
async fn read_descriptor(file: &Path) -> Result<Descriptor, eyre::Error> { |
|
|
|
let descriptor_file = tokio::fs::read(file) |
|
|
|
.await |
|
|
|
|