Browse Source

Merge pull request #513 from Gege-Wang/multiple-daemon

Make dora cli connect to remote coordinator
tags/v0.3.5-fix4
Haixuan Xavier Tao GitHub 1 year ago
parent
commit
f50478b1a7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
5 changed files with 81 additions and 32 deletions
  1. +1
    -0
      .github/workflows/ci.yml
  2. +6
    -3
      binaries/cli/src/check.rs
  3. +60
    -19
      binaries/cli/src/main.rs
  4. +9
    -9
      binaries/cli/src/up.rs
  5. +5
    -1
      libraries/core/src/topics.rs

+ 1
- 0
.github/workflows/ci.yml View File

@@ -290,6 +290,7 @@ jobs:
sleep 10
dora stop --name ci-rust-test --grace-duration 5s
dora destroy
- name: "Test CLI (Python)"
timeout-minutes: 30
# fail-fast by using bash shell explictly


+ 6
- 3
binaries/cli/src/check.rs View File

@@ -2,10 +2,13 @@ use crate::connect_to_coordinator;
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use std::io::{IsTerminal, Write};
use std::{
io::{IsTerminal, Write},
net::IpAddr,
};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

pub fn check_environment() -> eyre::Result<()> {
pub fn check_environment(coordinator_addr: Option<IpAddr>) -> eyre::Result<()> {
let mut error_occured = false;

let color_choice = if std::io::stdout().is_terminal() {
@@ -17,7 +20,7 @@ pub fn check_environment() -> eyre::Result<()> {

// check whether coordinator is running
write!(stdout, "Dora Coordinator: ")?;
let mut session = match connect_to_coordinator() {
let mut session = match connect_to_coordinator(coordinator_addr) {
Ok(session) => {
let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green)));
writeln!(stdout, "ok")?;


+ 60
- 19
binaries/cli/src/main.rs View File

@@ -6,7 +6,7 @@ use dora_core::{
descriptor::Descriptor,
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_DEFAULT,
DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
@@ -45,6 +45,8 @@ enum Command {
Check {
#[clap(long)]
dataflow: Option<PathBuf>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
@@ -67,17 +69,23 @@ enum Command {
Up {
#[clap(long)]
config: Option<PathBuf>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
#[clap(long)]
config: Option<PathBuf>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
dataflow: PathBuf,
#[clap(long)]
name: Option<String>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
#[clap(long, action)]
attach: bool,
#[clap(long, action)]
@@ -91,9 +99,14 @@ enum Command {
#[clap(long)]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
/// List running dataflows.
List,
List {
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
// Planned for future releases:
// Dashboard,
/// Show logs of a given dataflow and node.
@@ -101,6 +114,8 @@ enum Command {
Logs {
dataflow: Option<String>,
node: String,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
// Metrics,
// Stats,
@@ -184,7 +199,10 @@ fn run() -> eyre::Result<()> {
};

match args.command {
Command::Check { dataflow } => match dataflow {
Command::Check {
dataflow,
coordinator_addr,
} => match dataflow {
Some(dataflow) => {
let working_dir = dataflow
.canonicalize()
@@ -193,9 +211,9 @@ fn run() -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment()?
check::check_environment(coordinator_addr)?
}
None => check::check_environment()?,
None => check::check_environment(coordinator_addr)?,
},
Command::Graph {
dataflow,
@@ -211,11 +229,19 @@ fn run() -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up { config } => up::up(config.as_deref())?,

Command::Logs { dataflow, node } => {
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;
Command::Up {
config,
coordinator_addr,
} => {
up::up(config.as_deref(), coordinator_addr)?;
}
Command::Logs {
dataflow,
node,
coordinator_addr,
} => {
let mut session = connect_to_coordinator(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
if let Some(dataflow) = dataflow {
@@ -234,6 +260,7 @@ fn run() -> eyre::Result<()> {
Command::Start {
dataflow,
name,
coordinator_addr,
attach,
hot_reload,
} => {
@@ -248,8 +275,9 @@ fn run() -> eyre::Result<()> {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;

let mut session = connect_to_coordinator(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
@@ -267,7 +295,7 @@ fn run() -> eyre::Result<()> {
)?
}
}
Command::List => match connect_to_coordinator() {
Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
@@ -277,16 +305,20 @@ fn run() -> eyre::Result<()> {
uuid,
name,
grace_duration,
coordinator_addr,
} => {
let mut session =
connect_to_coordinator().wrap_err("could not connect to dora coordinator")?;
let mut session = connect_to_coordinator(coordinator_addr)
.wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
(None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?,
(None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?,
}
}
Command::Destroy { config } => up::destroy(config.as_deref())?,
Command::Destroy {
config,
coordinator_addr,
} => up::destroy(config.as_deref(), coordinator_addr)?,
Command::Coordinator { addr } => {
let rt = Builder::new_multi_thread()
.enable_all()
@@ -327,7 +359,7 @@ fn run() -> eyre::Result<()> {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
});
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await
}
}
@@ -466,6 +498,15 @@ fn query_running_dataflows(
Ok(ids)
}

fn connect_to_coordinator() -> std::io::Result<Box<TcpRequestReplyConnection>> {
TcpLayer::new().connect(control_socket_addr())
fn connect_to_coordinator(
coordinator_addr: Option<IpAddr>,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
if let Some(coordinator_addr) = coordinator_addr {
TcpLayer::new().connect(SocketAddr::new(
coordinator_addr,
DORA_COORDINATOR_PORT_CONTROL,
))
} else {
TcpLayer::new().connect(control_socket_addr())
}
}

+ 9
- 9
binaries/cli/src/up.rs View File

@@ -1,21 +1,19 @@
use crate::{check::daemon_running, connect_to_coordinator};
use dora_core::topics::ControlRequest;
use eyre::Context;
use std::{fs, path::Path, process::Command, time::Duration};

use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option<IpAddr>) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;

let mut session = match connect_to_coordinator() {
let mut session = match connect_to_coordinator(coordinator_addr) {
Ok(session) => session,
Err(_) => {
start_coordinator().wrap_err("failed to start dora-coordinator")?;

loop {
match connect_to_coordinator() {
match connect_to_coordinator(coordinator_addr) {
Ok(session) => break session,
Err(_) => {
// sleep a bit until the coordinator accepts connections
@@ -47,10 +45,12 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
Ok(())
}

pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> {
pub(crate) fn destroy(
config_path: Option<&Path>,
coordinator_addr: Option<IpAddr>,
) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;

match connect_to_coordinator() {
match connect_to_coordinator(coordinator_addr) {
Ok(mut session) => {
// send destroy command to dora-coordinator
session


+ 5
- 1
libraries/core/src/topics.rs View File

@@ -13,11 +13,15 @@ use crate::{
};

pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A;
pub const DORA_COORDINATOR_PORT_CONTROL: u16 = 0x177C;

pub const MANUAL_STOP: &str = "dora/stop";

pub fn control_socket_addr() -> SocketAddr {
SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 6012)
SocketAddr::new(
Ipv4Addr::new(127, 0, 0, 1).into(),
DORA_COORDINATOR_PORT_CONTROL,
)
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]


Loading…
Cancel
Save