|
|
|
@@ -45,6 +45,7 @@ enum Command { |
|
|
|
Check { |
|
|
|
#[clap(long)] |
|
|
|
dataflow: Option<PathBuf>, |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
}, |
|
|
|
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. |
|
|
|
Graph { |
|
|
|
@@ -67,17 +68,21 @@ enum Command { |
|
|
|
Up { |
|
|
|
#[clap(long)] |
|
|
|
config: Option<PathBuf>, |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
}, |
|
|
|
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. |
|
|
|
Destroy { |
|
|
|
#[clap(long)] |
|
|
|
config: Option<PathBuf>, |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
}, |
|
|
|
/// Start the given dataflow path. Attach a name to the running dataflow by using --name. |
|
|
|
Start { |
|
|
|
dataflow: PathBuf, |
|
|
|
#[clap(long)] |
|
|
|
name: Option<String>, |
|
|
|
#[clap(long)] |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
#[clap(long, action)] |
|
|
|
attach: bool, |
|
|
|
#[clap(long, action)] |
|
|
|
@@ -91,9 +96,12 @@ enum Command { |
|
|
|
#[clap(long)] |
|
|
|
#[arg(value_parser = parse)] |
|
|
|
grace_duration: Option<Duration>, |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
}, |
|
|
|
/// List running dataflows. |
|
|
|
List, |
|
|
|
List { |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
}, |
|
|
|
// Planned for future releases: |
|
|
|
// Dashboard, |
|
|
|
/// Show logs of a given dataflow and node. |
|
|
|
@@ -101,6 +109,7 @@ enum Command { |
|
|
|
Logs { |
|
|
|
dataflow: Option<String>, |
|
|
|
node: String, |
|
|
|
coordinator_addr: Option<SocketAddr>, |
|
|
|
}, |
|
|
|
// Metrics, |
|
|
|
// Stats, |
|
|
|
@@ -184,7 +193,10 @@ fn run() -> eyre::Result<()> { |
|
|
|
}; |
|
|
|
|
|
|
|
match args.command { |
|
|
|
Command::Check { dataflow } => match dataflow { |
|
|
|
Command::Check { |
|
|
|
dataflow, |
|
|
|
coordinator_addr, |
|
|
|
} => match dataflow { |
|
|
|
Some(dataflow) => { |
|
|
|
let working_dir = dataflow |
|
|
|
.canonicalize() |
|
|
|
@@ -193,9 +205,9 @@ fn run() -> eyre::Result<()> { |
|
|
|
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? |
|
|
|
.to_owned(); |
|
|
|
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; |
|
|
|
check::check_environment()? |
|
|
|
check::check_environment(coordinator_addr)? |
|
|
|
} |
|
|
|
None => check::check_environment()?, |
|
|
|
None => check::check_environment(coordinator_addr)?, |
|
|
|
}, |
|
|
|
Command::Graph { |
|
|
|
dataflow, |
|
|
|
@@ -211,11 +223,20 @@ fn run() -> eyre::Result<()> { |
|
|
|
args, |
|
|
|
internal_create_with_path_dependencies, |
|
|
|
} => template::create(args, internal_create_with_path_dependencies)?, |
|
|
|
Command::Up { config } => up::up(config.as_deref())?, |
|
|
|
|
|
|
|
Command::Logs { dataflow, node } => { |
|
|
|
let mut session = |
|
|
|
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; |
|
|
|
Command::Up { |
|
|
|
config, |
|
|
|
coordinator_addr, |
|
|
|
} => { |
|
|
|
up::up(config.as_deref(), coordinator_addr)?; |
|
|
|
} |
|
|
|
Command::Logs { |
|
|
|
dataflow, |
|
|
|
node, |
|
|
|
coordinator_addr, |
|
|
|
} => { |
|
|
|
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); |
|
|
|
let mut session = connect_to_coordinator(coordination_addr) |
|
|
|
.wrap_err("failed to connect to dora coordinator")?; |
|
|
|
let uuids = query_running_dataflows(&mut *session) |
|
|
|
.wrap_err("failed to query running dataflows")?; |
|
|
|
if let Some(dataflow) = dataflow { |
|
|
|
@@ -234,6 +255,7 @@ fn run() -> eyre::Result<()> { |
|
|
|
Command::Start { |
|
|
|
dataflow, |
|
|
|
name, |
|
|
|
coordinator_addr, |
|
|
|
attach, |
|
|
|
hot_reload, |
|
|
|
} => { |
|
|
|
@@ -248,8 +270,10 @@ fn run() -> eyre::Result<()> { |
|
|
|
dataflow_descriptor |
|
|
|
.check(&working_dir) |
|
|
|
.wrap_err("Could not validate yaml")?; |
|
|
|
let mut session = |
|
|
|
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; |
|
|
|
|
|
|
|
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); |
|
|
|
let mut session = connect_to_coordinator(coordination_addr) |
|
|
|
.wrap_err("failed to connect to dora coordinator")?; |
|
|
|
let dataflow_id = start_dataflow( |
|
|
|
dataflow_descriptor.clone(), |
|
|
|
name, |
|
|
|
@@ -267,26 +291,34 @@ fn run() -> eyre::Result<()> { |
|
|
|
)? |
|
|
|
} |
|
|
|
} |
|
|
|
Command::List => match connect_to_coordinator() { |
|
|
|
Ok(mut session) => list(&mut *session)?, |
|
|
|
Err(_) => { |
|
|
|
bail!("No dora coordinator seems to be running."); |
|
|
|
Command::List { coordinator_addr } => { |
|
|
|
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); |
|
|
|
match connect_to_coordinator(coordination_addr) { |
|
|
|
Ok(mut session) => list(&mut *session)?, |
|
|
|
Err(_) => { |
|
|
|
bail!("No dora coordinator seems to be running."); |
|
|
|
} |
|
|
|
} |
|
|
|
}, |
|
|
|
} |
|
|
|
Command::Stop { |
|
|
|
uuid, |
|
|
|
name, |
|
|
|
grace_duration, |
|
|
|
coordinator_addr, |
|
|
|
} => { |
|
|
|
let mut session = |
|
|
|
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; |
|
|
|
let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); |
|
|
|
let mut session = connect_to_coordinator(coordination_addr) |
|
|
|
.wrap_err("could not connect to dora coordinator")?; |
|
|
|
match (uuid, name) { |
|
|
|
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, |
|
|
|
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, |
|
|
|
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, |
|
|
|
} |
|
|
|
} |
|
|
|
Command::Destroy { config } => up::destroy(config.as_deref())?, |
|
|
|
Command::Destroy { |
|
|
|
config, |
|
|
|
coordinator_addr, |
|
|
|
} => up::destroy(config.as_deref(), coordinator_addr)?, |
|
|
|
Command::Coordinator { addr } => { |
|
|
|
let rt = Builder::new_multi_thread() |
|
|
|
.enable_all() |
|
|
|
@@ -327,7 +359,7 @@ fn run() -> eyre::Result<()> { |
|
|
|
tracing::info!("Starting in local mode"); |
|
|
|
let localhost = Ipv4Addr::new(127, 0, 0, 1); |
|
|
|
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into() |
|
|
|
}); |
|
|
|
}); |
|
|
|
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await |
|
|
|
} |
|
|
|
} |
|
|
|
@@ -466,6 +498,8 @@ fn query_running_dataflows( |
|
|
|
Ok(ids) |
|
|
|
} |
|
|
|
|
|
|
|
fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> { |
|
|
|
TcpLayer::new().connect(control_socket_addr()) |
|
|
|
fn connect_to_coordinator( |
|
|
|
coordinator_addr: SocketAddr, |
|
|
|
) -> std::io::Result<Box<TcpRequestReplyConnection>> { |
|
|
|
TcpLayer::new().connect(coordinator_addr) |
|
|
|
} |