Browse Source

List failed and finished dataflows in `dora list`

tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
8056c23db7
Failed to extract signature
4 changed files with 57 additions and 22 deletions
  1. +27
    -12
      binaries/cli/src/main.rs
  2. +19
    -4
      binaries/coordinator/src/lib.rs
  3. +3
    -3
      examples/multiple-daemons/run.rs
  4. +8
    -3
      libraries/core/src/topics.rs

+ 27
- 12
binaries/cli/src/main.rs View File

@@ -5,7 +5,7 @@ use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
ControlRequest, ControlRequestReply, DataflowList, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
@@ -321,17 +321,18 @@ fn run() -> eyre::Result<()> {
} => {
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
let list = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
logs::logs(&mut *session, uuid, name, node)?
} else {
let uuid = match &uuids[..] {
let uuid = match &list.active[..] {
[] => bail!("No dataflows are running"),
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", uuids).prompt()?,
_ => inquire::Select::new("Choose dataflow to show logs:", list.active)
.prompt()?,
};
logs::logs(&mut *session, Some(uuid.uuid), None, node)?
}
@@ -509,11 +510,11 @@ fn stop_dataflow_interactive(
grace_duration: Option<Duration>,
session: &mut TcpRequestReplyConnection,
) -> eyre::Result<()> {
let uuids = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if uuids.is_empty() {
let list = query_running_dataflows(session).wrap_err("failed to query running dataflows")?;
if list.active.is_empty() {
eprintln!("No dataflows are running");
} else {
let selection = inquire::Select::new("Choose dataflow to stop:", uuids).prompt()?;
let selection = inquire::Select::new("Choose dataflow to stop:", list.active).prompt()?;
stop_dataflow(selection.uuid, grace_duration, session)?;
}

@@ -571,13 +572,27 @@ fn stop_dataflow_by_name(
}

fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport> {
let ids = query_running_dataflows(session)?;
let list = query_running_dataflows(session)?;

if ids.is_empty() {
if list.active.is_empty() {
eprintln!("No dataflows are running");
} else {
println!("Running dataflows:");
for id in ids {
for id in list.active {
println!("- {id}");
}
}

if !list.failed.is_empty() {
println!("Failed dataflows:");
for id in list.failed {
println!("- {id}");
}
}

if !list.finished.is_empty() {
println!("Finished dataflows:");
for id in list.finished {
println!("- {id}");
}
}
@@ -587,14 +602,14 @@ fn list(session: &mut TcpRequestReplyConnection) -> Result<(), eyre::ErrReport>

fn query_running_dataflows(
session: &mut TcpRequestReplyConnection,
) -> Result<Vec<DataflowId>, eyre::ErrReport> {
) -> Result<DataflowList, eyre::ErrReport> {
let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::List).unwrap())
.wrap_err("failed to send list message")?;
let reply: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
let ids = match reply {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::DataflowList(list) => list,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected list dataflow reply: {other:?}"),
};


+ 19
- 4
binaries/coordinator/src/lib.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{ControlRequest, ControlRequestReply, DataflowId},
topics::{ControlRequest, ControlRequestReply, DataflowId, DataflowList},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -458,15 +458,30 @@ async fn start_inner(
let mut dataflows: Vec<_> = running_dataflows.values().collect();
dataflows.sort_by_key(|d| (&d.name, d.uuid));

let reply = Ok(ControlRequestReply::DataflowList {
dataflows: dataflows
let mut finished = Vec::new();
let mut failed = Vec::new();
for (&uuid, results) in &dataflow_results {
let name =
archived_dataflows.get(&uuid).and_then(|d| d.name.clone());
let id = DataflowId { uuid, name };
if results.values().all(|r| r.is_ok()) {
finished.push(id);
} else {
failed.push(id);
}
}

let reply = Ok(ControlRequestReply::DataflowList(DataflowList {
active: dataflows
.into_iter()
.map(|d| DataflowId {
uuid: d.uuid,
name: d.name.clone(),
})
.collect(),
});
finished,
failed,
}));
let _ = reply_sender.send(reply);
}
ControlRequest::DaemonConnected => {


+ 3
- 3
examples/multiple-daemons/run.rs View File

@@ -2,8 +2,8 @@ use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT,
ControlRequest, ControlRequestReply, DataflowId, DataflowList,
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_tracing::set_up_tracing;
@@ -176,7 +176,7 @@ async fn running_dataflows(coordinator_events_tx: &Sender<Event>) -> eyre::Resul
.await?;
let result = reply.await??;
let dataflows = match result {
ControlRequestReply::DataflowList { dataflows } => dataflows,
ControlRequestReply::DataflowList(DataflowList { active, .. }) => active,
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
};


+ 8
- 3
libraries/core/src/topics.rs View File

@@ -55,6 +55,13 @@ pub enum ControlRequest {
ConnectedMachines,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct DataflowList {
pub active: Vec<DataflowId>,
pub finished: Vec<DataflowId>,
pub failed: Vec<DataflowId>,
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequestReply {
Error(String),
@@ -70,9 +77,7 @@ pub enum ControlRequestReply {
result: Result<(), String>,
},

DataflowList {
dataflows: Vec<DataflowId>,
},
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),
ConnectedMachines(BTreeSet<String>),


Loading…
Cancel
Save