Browse Source

Wait for dataflow finish on `dora stop`

tags/v0.2.4-rc
Philipp Oppermann 2 years ago
parent
commit
868bee5897
Failed to extract signature
2 changed files with 118 additions and 70 deletions
  1. +116
    -69
      binaries/coordinator/src/lib.rs
  2. +2
    -1
      libraries/core/src/topics.rs

+ 116
- 69
binaries/coordinator/src/lib.rs View File

@@ -18,7 +18,7 @@ use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use run::SpawnedDataflow;
use std::{
collections::{BTreeSet, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
net::SocketAddr,
path::PathBuf,
time::{Duration, Instant},
@@ -156,7 +156,7 @@ async fn start_inner(
let mut events = (abortable_events, daemon_events).merge();

let mut running_dataflows: HashMap<Uuid, RunningDataflow> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, Result<(), String>> = HashMap::new();
let mut dataflow_results: HashMap<Uuid, BTreeMap<String, Result<(), String>>> = HashMap::new();
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections: HashMap<_, DaemonConnection> = HashMap::new();

@@ -271,9 +271,6 @@ async fn start_inner(
.insert(uuid, ArchivedDataflow::from(entry.get()));
}
entry.get_mut().machines.remove(&machine_id);
let result = result.wrap_err(format!(
"error occured in dataflow `{uuid}` on machine `{machine_id}`"
));
match &result {
Ok(()) => {
tracing::info!("dataflow `{uuid}` finished successfully on machine `{machine_id}`");
@@ -282,10 +279,19 @@ async fn start_inner(
tracing::error!("{err:?}");
}
}
dataflow_results
.entry(uuid)
.or_default()
.insert(machine_id, result.map_err(|err| format!("{err:?}")));
if entry.get_mut().machines.is_empty() {
entry.remove();
dataflow_results
.insert(uuid, result.map_err(|err| format!("{err:?}")));
let finished_dataflow = entry.remove();
let reply = ControlRequestReply::DataflowStopped {
uuid,
result: dataflow_result(&dataflow_results, uuid),
};
for sender in finished_dataflow.reply_senders {
let _ = sender.send(Ok(reply.clone()));
}
}
}
std::collections::hash_map::Entry::Vacant(_) => {
@@ -300,7 +306,7 @@ async fn start_inner(
request,
reply_sender,
} => {
let reply = match request {
match request {
ControlRequest::Start {
dataflow,
name,
@@ -327,11 +333,12 @@ async fn start_inner(
.await?;
Ok(dataflow)
};
inner.await.map(|dataflow| {
let reply = inner.await.map(|dataflow| {
let uuid = dataflow.uuid;
running_dataflows.insert(uuid, dataflow);
ControlRequestReply::DataflowStarted { uuid }
})
});
let _ = reply_sender.send(reply);
}
ControlRequest::Check { dataflow_uuid } => {
let status = match &running_dataflows.get(&dataflow_uuid) {
@@ -340,13 +347,10 @@ async fn start_inner(
},
None => ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_results
.get(&dataflow_uuid)
.cloned()
.unwrap_or(Ok(())),
result: dataflow_result(&dataflow_results, dataflow_uuid),
},
};
Ok(status)
let _ = reply_sender.send(Ok(status));
}
ControlRequest::Reload {
dataflow_id,
@@ -364,45 +368,38 @@ async fn start_inner(
.await?;
Result::<_, eyre::Report>::Ok(())
};
reload
.await
.map(|()| ControlRequestReply::DataflowReloaded {
uuid: dataflow_id,
})
let reply =
reload
.await
.map(|()| ControlRequestReply::DataflowReloaded {
uuid: dataflow_id,
});
let _ = reply_sender.send(reply);
}
ControlRequest::Stop { dataflow_uuid } => {
let stop = async {
stop_dataflow(
&running_dataflows,
dataflow_uuid,
&mut daemon_connections,
)
.await?;
Result::<_, eyre::Report>::Ok(())
};
stop.await.map(|()| ControlRequestReply::DataflowStopped {
uuid: dataflow_uuid,
result: dataflow_results
.get(&dataflow_uuid)
.cloned()
.unwrap_or(Ok(())),
})
stop_dataflow_by_uuid(
&mut running_dataflows,
dataflow_uuid,
&mut daemon_connections,
reply_sender,
)
.await?;
}
ControlRequest::StopByName { name } => {
let stop = async {
let dataflow_uuid = resolve_name(name, &running_dataflows, None)?;
stop_dataflow(
&running_dataflows,
dataflow_uuid,
&mut daemon_connections,
)
.await?;
Result::<_, eyre::Report>::Ok(dataflow_uuid)
};
stop.await.map(|uuid| ControlRequestReply::DataflowStopped {
uuid,
result: dataflow_results.get(&uuid).cloned().unwrap_or(Ok(())),
})
match resolve_name(name, &running_dataflows, None) {
Ok(uuid) => {
stop_dataflow_by_uuid(
&mut running_dataflows,
uuid,
&mut daemon_connections,
reply_sender,
)
.await?
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::Logs { uuid, name, node } => {
let dataflow_uuid = if let Some(uuid) = uuid {
@@ -413,7 +410,7 @@ async fn start_inner(
bail!("No uuid")
};

retrieve_logs(
let reply = retrieve_logs(
&running_dataflows,
&archived_dataflows,
dataflow_uuid,
@@ -421,25 +418,27 @@ async fn start_inner(
&mut daemon_connections,
)
.await
.map(|logs| ControlRequestReply::Logs(logs))
.map(|logs| ControlRequestReply::Logs(logs));
let _ = reply_sender.send(reply);
}
ControlRequest::Destroy => {
tracing::info!("Received destroy command");

handle_destroy(
let reply = handle_destroy(
&running_dataflows,
&mut daemon_connections,
&abort_handle,
&mut daemon_events_tx,
)
.await
.map(|()| ControlRequestReply::DestroyOk)
.map(|()| ControlRequestReply::DestroyOk);
let _ = reply_sender.send(reply);
}
ControlRequest::List => {
let mut dataflows: Vec<_> = running_dataflows.values().collect();
dataflows.sort_by_key(|d| (&d.name, d.uuid));

Ok(ControlRequestReply::DataflowList {
let reply = Ok(ControlRequestReply::DataflowList {
dataflows: dataflows
.into_iter()
.map(|d| DataflowId {
@@ -447,19 +446,21 @@ async fn start_inner(
name: d.name.clone(),
})
.collect(),
})
});
let _ = reply_sender.send(reply);
}
ControlRequest::DaemonConnected => {
let running = !daemon_connections.is_empty();
Ok(ControlRequestReply::DaemonConnected(running))
let _ = reply_sender
.send(Ok(ControlRequestReply::DaemonConnected(running)));
}
ControlRequest::ConnectedMachines => {
Ok(ControlRequestReply::ConnectedMachines(
let reply = Ok(ControlRequestReply::ConnectedMachines(
daemon_connections.keys().cloned().collect(),
))
));
let _ = reply_sender.send(reply);
}
};
let _ = reply_sender.send(reply);
}
}
ControlEvent::Error(err) => tracing::error!("{err:?}"),
},
@@ -512,6 +513,53 @@ async fn start_inner(
Ok(())
}

async fn stop_dataflow_by_uuid(
running_dataflows: &mut HashMap<Uuid, RunningDataflow>,
dataflow_uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
reply_sender: tokio::sync::oneshot::Sender<Result<ControlRequestReply, eyre::ErrReport>>,
) -> Result<(), eyre::ErrReport> {
let Some(dataflow) = running_dataflows.get_mut(&dataflow_uuid) else {
bail!("No running dataflow found with UUID `{dataflow_uuid}`")
};
let stop = async {
stop_dataflow(dataflow, dataflow_uuid, daemon_connections).await?;
Result::<_, eyre::Report>::Ok(())
};
match stop.await {
Ok(()) => {
dataflow.reply_senders.push(reply_sender);
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
};
Ok(())
}

fn dataflow_result(
dataflow_results: &HashMap<Uuid, BTreeMap<String, Result<(), String>>>,
dataflow_uuid: Uuid,
) -> Result<(), String> {
let Some(results) = dataflow_results.get(&dataflow_uuid) else { return Ok(()) };

let mut errors = Vec::new();
for (machine, result) in results {
if let Err(err) = result {
let err: String = err.lines().map(|line| format!(" {line}\n")).collect();
errors.push(format!("- machine `{machine}`:\n{err}\n"));
}
}

if errors.is_empty() {
Ok(())
} else {
let mut formatted = format!("errors occurred in dataflow {dataflow_uuid}:\n");
formatted.push_str(&errors.join("\n"));
Err(formatted)
}
}

struct DaemonConnection {
stream: TcpStream,
listen_socket: SocketAddr,
@@ -547,8 +595,8 @@ async fn handle_destroy(
daemon_events_tx: &mut Option<mpsc::Sender<Event>>,
) -> Result<(), eyre::ErrReport> {
abortable_events.abort();
for &uuid in running_dataflows.keys() {
stop_dataflow(running_dataflows, uuid, daemon_connections).await?;
for (&uuid, dataflow) in running_dataflows {
stop_dataflow(dataflow, uuid, daemon_connections).await?;
}
destroy_daemons(daemon_connections).await?;
*daemon_events_tx = None;
@@ -563,7 +611,6 @@ async fn send_watchdog_message(connection: &mut TcpStream) -> eyre::Result<()> {
.wrap_err("failed to send watchdog message to daemon")
}

#[allow(dead_code)] // Keeping the communication layer for later use.
struct RunningDataflow {
name: Option<String>,
uuid: Uuid,
@@ -573,6 +620,8 @@ struct RunningDataflow {
pending_machines: BTreeSet<String>,
init_success: bool,
nodes: Vec<ResolvedNode>,

reply_senders: Vec<tokio::sync::oneshot::Sender<eyre::Result<ControlRequestReply>>>,
}

struct ArchivedDataflow {
@@ -598,13 +647,10 @@ impl PartialEq for RunningDataflow {
impl Eq for RunningDataflow {}

async fn stop_dataflow(
running_dataflows: &HashMap<Uuid, RunningDataflow>,
dataflow: &RunningDataflow,
uuid: Uuid,
daemon_connections: &mut HashMap<String, DaemonConnection>,
) -> eyre::Result<()> {
let Some(dataflow) = running_dataflows.get(&uuid) else {
bail!("No running dataflow found with UUID `{uuid}`")
};
let message = serde_json::to_vec(&DaemonCoordinatorEvent::StopDataflow { dataflow_id: uuid })?;

for machine_id in &dataflow.machines {
@@ -757,6 +803,7 @@ async fn start_dataflow(
init_success: true,
machines,
nodes,
reply_senders: Vec::new(),
})
}



+ 2
- 1
libraries/core/src/topics.rs View File

@@ -53,7 +53,7 @@ pub enum ControlRequest {
ConnectedMachines,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
@@ -67,6 +67,7 @@ pub enum ControlRequestReply {
uuid: Uuid,
result: Result<(), String>,
},

DataflowList {
dataflows: Vec<DataflowId>,
},


Loading…
Cancel
Save