Browse Source

Report results of spawned nodes as events

tags/v0.2.0-candidate
Philipp Oppermann 3 years ago
parent
commit
f0e31e2cc6
Failed to extract signature
2 changed files with 57 additions and 9 deletions
  1. +40
    -5
      binaries/daemon/src/main.rs
  2. +17
    -4
      binaries/daemon/src/spawn.rs

+ 40
- 5
binaries/daemon/src/main.rs View File

@@ -160,10 +160,9 @@ impl Daemon {
}
}

let task = spawn::spawn_node(dataflow_id, params, self.port)
spawn::spawn_node(dataflow_id, params, self.port, self.dora_events_tx.clone())
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))?;
dataflow.node_tasks.insert(node_id, task);
}

// spawn timer tasks
@@ -311,6 +310,12 @@ impl Daemon {
}

// TODO: notify remote nodes

dataflow.subscribe_channels.remove(&node_id);
if dataflow.subscribe_channels.is_empty() {
tracing::info!("Dataflow `{dataflow_id}` finished");
self.running.remove(&dataflow_id);
}
}
}
Ok(())
@@ -353,17 +358,42 @@ impl Daemon {
for id in closed {
dataflow.subscribe_channels.remove(id);
}

Ok(())
}
DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id,
result,
} => {
if self
.running
.get(&dataflow_id)
.and_then(|d| d.subscribe_channels.get(&node_id))
.is_some()
{
tracing::warn!(
"node `{dataflow_id}/{node_id}` finished without sending `Stopped` message"
);
}
match result {
Ok(()) => {
tracing::info!("node {dataflow_id}/{node_id} finished");
}
Err(err) => {
tracing::error!(
"{:?}",
err.wrap_err(format!("error in node `{dataflow_id}/{node_id}`"))
);
}
}
}
}
Ok(())
}
}

#[derive(Default)]
pub struct RunningDataflow {
subscribe_channels: HashMap<NodeId, flume::Sender<daemon_messages::NodeEvent>>,
node_tasks: HashMap<NodeId, tokio::task::JoinHandle<eyre::Result<()>>>,
mappings: HashMap<OutputId, BTreeSet<InputId>>,
timers: BTreeMap<Duration, BTreeSet<InputId>>,
}
@@ -406,6 +436,11 @@ pub enum DoraEvent {
interval: Duration,
metadata: dora_message::Metadata<'static>,
},
SpawnedNodeResult {
dataflow_id: DataflowId,
node_id: NodeId,
result: eyre::Result<()>,
},
}

type MessageId = String;


+ 17
- 4
binaries/daemon/src/spawn.rs View File

@@ -1,3 +1,4 @@
use crate::DoraEvent;
use dora_core::{
daemon_messages::{DataflowId, NodeConfig, SpawnNodeParams},
descriptor::{resolve_path, source_is_url},
@@ -5,13 +6,15 @@ use dora_core::{
use dora_download::download_file;
use eyre::{eyre, WrapErr};
use std::{env::consts::EXE_EXTENSION, path::Path};
use tokio::sync::mpsc;

#[tracing::instrument]
pub async fn spawn_node(
dataflow_id: DataflowId,
params: SpawnNodeParams,
daemon_port: u16,
) -> eyre::Result<tokio::task::JoinHandle<eyre::Result<()>>> {
result_tx: mpsc::Sender<DoraEvent>,
) -> eyre::Result<()> {
let SpawnNodeParams {
node_id,
node,
@@ -63,16 +66,26 @@ pub async fn spawn_node(
node.args.as_deref().unwrap_or_default()
)
})?;
let result = tokio::spawn(async move {
let node_id_cloned = node_id.clone();
let wait_task = async move {
let status = child.wait().await.context("child process failed")?;
if status.success() {
tracing::info!("node {node_id} finished");
Ok(())
} else if let Some(code) = status.code() {
Err(eyre!("node {node_id} failed with exit code: {code}"))
} else {
Err(eyre!("node {node_id} failed (unknown exit code)"))
}
};
tokio::spawn(async move {
let result = wait_task.await;
let _ = result_tx
.send(DoraEvent::SpawnedNodeResult {
dataflow_id,
node_id: node_id_cloned,
result,
})
.await;
});
Ok(result)
Ok(())
}

Loading…
Cancel
Save