Browse Source

Add support for custom coordinator control port numbers in CLI

tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
142d218610
Failed to extract signature
4 changed files with 88 additions and 38 deletions
  1. +2
    -2
      binaries/cli/src/check.rs
  2. +69
    -29
      binaries/cli/src/main.rs
  3. +3
    -3
      binaries/cli/src/up.rs
  4. +14
    -4
      examples/multiple-daemons/run.rs

+ 2
- 2
binaries/cli/src/check.rs View File

@@ -4,11 +4,11 @@ use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use std::{
io::{IsTerminal, Write},
net::IpAddr,
net::SocketAddr,
};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

pub fn check_environment(coordinator_addr: IpAddr) -> eyre::Result<()> {
pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
let mut error_occured = false;

let color_choice = if std::io::stdout().is_terminal() {


+ 69
- 29
binaries/cli/src/main.rs View File

@@ -45,8 +45,12 @@ enum Command {
Check {
#[clap(long)]
dataflow: Option<PathBuf>,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
@@ -69,23 +73,35 @@ enum Command {
Up {
#[clap(long)]
config: Option<PathBuf>,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
#[clap(long)]
config: Option<PathBuf>,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
dataflow: PathBuf,
#[clap(long)]
name: Option<String>,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
#[clap(long, action)]
attach: bool,
#[clap(long, action)]
@@ -99,13 +115,21 @@ enum Command {
#[clap(long)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// List running dataflows.
List {
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Planned for future releases:
// Dashboard,
@@ -114,8 +138,12 @@ enum Command {
Logs {
dataflow: Option<String>,
node: String,
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Metrics,
// Stats,
@@ -130,9 +158,9 @@ enum Command {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
addr: SocketAddr,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,
/// Address and port number of the dora coordinator
#[clap(long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DORA_COORDINATOR_PORT_DEFAULT))]
coordinator_addr: SocketAddr,
#[clap(long)]
run_dataflow: Option<PathBuf>,
},
@@ -210,6 +238,7 @@ fn run() -> eyre::Result<()> {
Command::Check {
dataflow,
coordinator_addr,
coordinator_port,
} => match dataflow {
Some(dataflow) => {
let working_dir = dataflow
@@ -219,9 +248,9 @@ fn run() -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment(coordinator_addr)?
check::check_environment((coordinator_addr, coordinator_port).into())?
}
None => check::check_environment(coordinator_addr)?,
None => check::check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
@@ -240,15 +269,20 @@ fn run() -> eyre::Result<()> {
Command::Up {
config,
coordinator_addr,
coordinator_port,
} => {
up::up(config.as_deref(), coordinator_addr)?;
up::up(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?;
}
Command::Logs {
dataflow,
node,
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator(coordinator_addr)
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
@@ -269,6 +303,7 @@ fn run() -> eyre::Result<()> {
dataflow,
name,
coordinator_addr,
coordinator_port,
attach,
hot_reload,
} => {
@@ -284,7 +319,7 @@ fn run() -> eyre::Result<()> {
.check(&working_dir)
.wrap_err("Could not validate yaml")?;

let mut session = connect_to_coordinator(coordinator_addr)
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
@@ -303,7 +338,10 @@ fn run() -> eyre::Result<()> {
)?
}
}
Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) {
Command::List {
coordinator_addr,
coordinator_port,
} => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
@@ -314,8 +352,9 @@ fn run() -> eyre::Result<()> {
name,
grace_duration,
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator(coordinator_addr)
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
@@ -326,7 +365,11 @@ fn run() -> eyre::Result<()> {
Command::Destroy {
config,
coordinator_addr,
} => up::destroy(config.as_deref(), coordinator_addr)?,
coordinator_port,
} => up::destroy(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?,
Command::Coordinator {
interface,
port,
@@ -343,6 +386,7 @@ fn run() -> eyre::Result<()> {
let (port, task) =
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
.await?;
println!("Listening for incoming daemon connection on {port}");
task.await
})
.context("failed to run dora-coordinator")?
@@ -357,11 +401,12 @@ fn run() -> eyre::Result<()> {
.enable_all()
.build()
.context("tokio runtime failed")?;
let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
rt.block_on(async {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if let Some(coordinator_addr) = coordinator_addr {
if coordinator_addr != SocketAddr::new(localhost, DORA_COORDINATOR_PORT_DEFAULT){
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
coordinator_addr
@@ -371,12 +416,10 @@ fn run() -> eyre::Result<()> {
Daemon::run_dataflow(&dataflow_path).await
}
None => {
let coordination_addr = coordinator_addr.unwrap_or_else(|| {
if coordinator_addr.ip() == localhost {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await
}
}
})
@@ -515,10 +558,7 @@ fn query_running_dataflows(
}

fn connect_to_coordinator(
coordinator_addr: IpAddr,
coordinator_addr: SocketAddr,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(SocketAddr::new(
coordinator_addr,
DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
))
TcpLayer::new().connect(coordinator_addr)
}

+ 3
- 3
binaries/cli/src/up.rs View File

@@ -1,11 +1,11 @@
use crate::{check::daemon_running, connect_to_coordinator};
use dora_core::topics::ControlRequest;
use eyre::Context;
use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration};
use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre::Result<()> {
pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: SocketAddr) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;
let mut session = match connect_to_coordinator(coordinator_addr) {
Ok(session) => session,
@@ -47,7 +47,7 @@ pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre::

pub(crate) fn destroy(
config_path: Option<&Path>,
coordinator_addr: IpAddr,
coordinator_addr: SocketAddr,
) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;
match connect_to_coordinator(coordinator_addr) {


+ 14
- 4
examples/multiple-daemons/run.rs View File

@@ -1,7 +1,10 @@
use dora_coordinator::{ControlEvent, Event};
use dora_core::{
descriptor::Descriptor,
topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT},
topics::{
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
@@ -38,9 +41,16 @@ async fn main() -> eyre::Result<()> {
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_DEFAULT,
);
let (coordinator_port, coordinator) =
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx))
.await?;
let coordinator_control_bind = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
);
let (coordinator_port, coordinator) = dora_coordinator::start(
coordinator_bind,
coordinator_control_bind,
ReceiverStream::new(coordinator_events_rx),
)
.await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = run_daemon(coordinator_addr.to_string(), "A");
let daemon_b = run_daemon(coordinator_addr.to_string(), "B");


Loading…
Cancel
Save