From c75928239e08531e2f4ac02cffa1bf1022bc4c86 Mon Sep 17 00:00:00 2001 From: Miyamo Date: Mon, 27 May 2024 14:00:40 +0800 Subject: [PATCH 1/6] Make dora cli connect to remote coordinator Signed-off-by: Gege-Wang <2891067867@qq.com> fix: localhost to control_socket_addr() --- binaries/cli/src/check.rs | 11 ++++-- binaries/cli/src/main.rs | 78 ++++++++++++++++++++++++++++----------- binaries/cli/src/up.rs | 25 ++++++++----- 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 9a6607b3..6e3b5fd1 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,11 +1,15 @@ use crate::connect_to_coordinator; use communication_layer_request_reply::TcpRequestReplyConnection; +use dora_core::topics::control_socket_addr; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; -use std::io::{IsTerminal, Write}; +use std::{ + io::{IsTerminal, Write}, + net::SocketAddr, +}; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment() -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: Option) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { @@ -17,7 +21,8 @@ pub fn check_environment() -> eyre::Result<()> { // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - let mut session = match connect_to_coordinator() { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = match connect_to_coordinator(coordination_addr) { Ok(session) => { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 8ac75485..c10bb8c2 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -45,6 +45,7 @@ enum Command { Check { #[clap(long)] dataflow: Option, + coordinator_addr: Option, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -67,17 +68,21 @@ enum Command { Up { #[clap(long)] config: Option, + coordinator_addr: Option, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, + coordinator_addr: Option, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { dataflow: PathBuf, #[clap(long)] name: Option, + #[clap(long)] + coordinator_addr: Option, #[clap(long, action)] attach: bool, #[clap(long, action)] @@ -91,9 +96,12 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, + coordinator_addr: Option, }, /// List running dataflows. - List, + List { + coordinator_addr: Option, + }, // Planned for future releases: // Dashboard, /// Show logs of a given dataflow and node. @@ -101,6 +109,7 @@ enum Command { Logs { dataflow: Option, node: String, + coordinator_addr: Option, }, // Metrics, // Stats, @@ -184,7 +193,10 @@ fn run() -> eyre::Result<()> { }; match args.command { - Command::Check { dataflow } => match dataflow { + Command::Check { + dataflow, + coordinator_addr, + } => match dataflow { Some(dataflow) => { let working_dir = dataflow .canonicalize() @@ -193,9 +205,9 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment()? + check::check_environment(coordinator_addr)? } - None => check::check_environment()?, + None => check::check_environment(coordinator_addr)?, }, Command::Graph { dataflow, @@ -211,11 +223,20 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { config } => up::up(config.as_deref())?, - - Command::Logs { dataflow, node } => { - let mut session = - connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + Command::Up { + config, + coordinator_addr, + } => { + up::up(config.as_deref(), coordinator_addr)?; + } + Command::Logs { + dataflow, + node, + coordinator_addr, + } => { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = connect_to_coordinator(coordination_addr) + .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; if let Some(dataflow) = dataflow { @@ -234,6 +255,7 @@ fn run() -> eyre::Result<()> { Command::Start { dataflow, name, + coordinator_addr, attach, hot_reload, } => { @@ -248,8 +270,10 @@ fn run() -> eyre::Result<()> { dataflow_descriptor .check(&working_dir) .wrap_err("Could not validate yaml")?; - let mut session = - connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?; + + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = connect_to_coordinator(coordination_addr) + .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), name, @@ -267,26 +291,34 @@ fn run() -> eyre::Result<()> { )? } } - Command::List => match connect_to_coordinator() { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); + Command::List { coordinator_addr } => { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + match connect_to_coordinator(coordination_addr) { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); + } } - }, + } Command::Stop { uuid, name, grace_duration, + coordinator_addr, } => { - let mut session = - connect_to_coordinator().wrap_err("could not connect to dora coordinator")?; + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = connect_to_coordinator(coordination_addr) + .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, (None, Some(name)) => stop_dataflow_by_name(name, grace_duration, &mut *session)?, (None, None) => stop_dataflow_interactive(grace_duration, &mut *session)?, } } - Command::Destroy { config } => up::destroy(config.as_deref())?, + Command::Destroy { + config, + coordinator_addr, + } => up::destroy(config.as_deref(), coordinator_addr)?, Command::Coordinator { addr } => { let rt = Builder::new_multi_thread() .enable_all() @@ -327,7 +359,7 @@ fn run() -> eyre::Result<()> { tracing::info!("Starting in local mode"); let localhost = Ipv4Addr::new(127, 0, 0, 1); (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() - }); + }); Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await } } @@ -466,6 +498,8 @@ fn query_running_dataflows( Ok(ids) } -fn connect_to_coordinator() -> std::io::Result> { - TcpLayer::new().connect(control_socket_addr()) +fn connect_to_coordinator( + coordinator_addr: SocketAddr, +) -> std::io::Result> { + TcpLayer::new().connect(coordinator_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index bdb7a0b3..623869ba 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,21 +1,23 @@ use crate::{check::daemon_running, connect_to_coordinator}; -use dora_core::topics::ControlRequest; +use dora_core::topics::{control_socket_addr, ControlRequest}; use eyre::Context; -use std::{fs, path::Path, process::Command, time::Duration}; - +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { +pub(crate) fn up( + config_path: Option<&Path>, + coordinator_addr: Option, +) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - - let mut session = match connect_to_coordinator() { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + let mut session = match connect_to_coordinator(coordination_addr) { Ok(session) => session, Err(_) => { start_coordinator().wrap_err("failed to start dora-coordinator")?; loop { - match connect_to_coordinator() { + match connect_to_coordinator(coordination_addr) { Ok(session) => break session, Err(_) => { // sleep a bit until the coordinator accepts connections @@ -47,10 +49,13 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { Ok(()) } -pub(crate) fn destroy(config_path: Option<&Path>) -> Result<(), eyre::ErrReport> { +pub(crate) fn destroy( + config_path: Option<&Path>, + coordinator_addr: Option, +) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; - - match connect_to_coordinator() { + let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); + match connect_to_coordinator(coordination_addr) { Ok(mut session) => { // send destroy command to dora-coordinator session From d9ddeff87cd9cfbbf52428dd85bd287647b5b3d9 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Tue, 28 May 2024 21:22:17 +0800 Subject: [PATCH 2/6] test: add coordinator-addr CI test --- .github/workflows/ci.yml | 15 +++++++++++++++ binaries/cli/src/check.rs | 4 +--- binaries/cli/src/main.rs | 32 ++++++++++++++++---------------- binaries/cli/src/up.rs | 10 ++++------ 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4f0a35c2..8c172af5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -272,6 +272,14 @@ jobs: sleep 10 dora stop --name ci-rust-test --grace-duration 5s dora destroy + export IP="$(curl ipinfo.io/ip)" + dora up --coordinator-addr $IP:6012 + dora list --coordinator-addr $IP:6012 + dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 + sleep 10 + dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 + dora destroy --coordinator-addr $IP:6012 + - name: "Test CLI (Python)" timeout-minutes: 30 # fail-fast by using bash shell explictly @@ -294,6 +302,13 @@ jobs: sleep 10 dora stop --name ci-python-test --grace-duration 5s dora destroy + export IP="$(curl ipinfo.io/ip)" + dora up --coordinator-addr $IP:6012 + dora list --coordinator-addr $IP:6012 + dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 + sleep 10 + dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 + dora destroy --coordinator-addr $IP:6012 clippy: name: "Clippy" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 6e3b5fd1..19dea806 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -1,6 +1,5 @@ use crate::connect_to_coordinator; use communication_layer_request_reply::TcpRequestReplyConnection; -use dora_core::topics::control_socket_addr; use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::{ @@ -21,8 +20,7 @@ pub fn check_environment(coordinator_addr: Option) -> eyre::Result<( // check whether coordinator is running write!(stdout, "Dora Coordinator: ")?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = match connect_to_coordinator(coordination_addr) { + let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => { let _ = stdout.set_color(ColorSpec::new().set_fg(Some(Color::Green))); writeln!(stdout, "ok")?; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index c10bb8c2..a746c080 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -68,12 +68,14 @@ enum Command { Up { #[clap(long)] config: Option, + #[clap(long)] coordinator_addr: Option, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, + #[clap(long)] coordinator_addr: Option, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. @@ -96,10 +98,12 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, + #[clap(long)] coordinator_addr: Option, }, /// List running dataflows. List { + #[clap(long)] coordinator_addr: Option, }, // Planned for future releases: @@ -109,6 +113,7 @@ enum Command { Logs { dataflow: Option, node: String, + #[clap(long)] coordinator_addr: Option, }, // Metrics, @@ -234,8 +239,7 @@ fn run() -> eyre::Result<()> { node, coordinator_addr, } => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = connect_to_coordinator(coordination_addr) + let mut session = connect_to_coordinator(coordinator_addr) .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; @@ -271,8 +275,7 @@ fn run() -> eyre::Result<()> { .check(&working_dir) .wrap_err("Could not validate yaml")?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = connect_to_coordinator(coordination_addr) + let mut session = connect_to_coordinator(coordinator_addr) .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), @@ -291,23 +294,19 @@ fn run() -> eyre::Result<()> { )? } } - Command::List { coordinator_addr } => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - match connect_to_coordinator(coordination_addr) { - Ok(mut session) => list(&mut *session)?, - Err(_) => { - bail!("No dora coordinator seems to be running."); - } + Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) { + Ok(mut session) => list(&mut *session)?, + Err(_) => { + bail!("No dora coordinator seems to be running."); } - } + }, Command::Stop { uuid, name, grace_duration, coordinator_addr, } => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = connect_to_coordinator(coordination_addr) + let mut session = connect_to_coordinator(coordinator_addr) .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, @@ -499,7 +498,8 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: SocketAddr, + coordinator_addr: Option, ) -> std::io::Result> { - TcpLayer::new().connect(coordinator_addr) + let coordination_addr = coordinator_addr.unwrap_or_else(control_socket_addr); + TcpLayer::new().connect(coordination_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 623869ba..f4d7e96e 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,5 +1,5 @@ use crate::{check::daemon_running, connect_to_coordinator}; -use dora_core::topics::{control_socket_addr, ControlRequest}; +use dora_core::topics::ControlRequest; use eyre::Context; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] @@ -10,14 +10,13 @@ pub(crate) fn up( coordinator_addr: Option, ) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - let mut session = match connect_to_coordinator(coordination_addr) { + let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, Err(_) => { start_coordinator().wrap_err("failed to start dora-coordinator")?; loop { - match connect_to_coordinator(coordination_addr) { + match connect_to_coordinator(coordinator_addr) { Ok(session) => break session, Err(_) => { // sleep a bit until the coordinator accepts connections @@ -54,8 +53,7 @@ pub(crate) fn destroy( coordinator_addr: Option, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; - let coordination_addr = coordinator_addr.unwrap_or_else(|| control_socket_addr()); - match connect_to_coordinator(coordination_addr) { + match connect_to_coordinator(coordinator_addr) { Ok(mut session) => { // send destroy command to dora-coordinator session From 717c5d063934c33971585ef02df1ee38d4676c62 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Wed, 29 May 2024 14:36:27 +0800 Subject: [PATCH 3/6] test: change ip to localhost --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8c172af5..03335bd8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -272,12 +272,12 @@ jobs: sleep 10 dora stop --name ci-rust-test --grace-duration 5s dora destroy - export IP="$(curl ipinfo.io/ip)" + export IP=127.0.0.1 dora up --coordinator-addr $IP:6012 dora list --coordinator-addr $IP:6012 dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 sleep 10 - dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 + dora stop --name ci-rust-test --grace-duration 5s --coordinator-addr $IP:6012 dora destroy --coordinator-addr $IP:6012 - name: "Test CLI (Python)" @@ -302,12 +302,12 @@ jobs: sleep 10 dora stop --name ci-python-test --grace-duration 5s dora destroy - export IP="$(curl ipinfo.io/ip)" + export IP=127.0.0.1 dora up --coordinator-addr $IP:6012 dora list --coordinator-addr $IP:6012 dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 sleep 10 - dora stop --name ci-rust-test --grace-duration 5 --coordinator-addr $IP:6012 + dora stop --name ci-rust-test --grace-duration 5s --coordinator-addr $IP:6012 dora destroy --coordinator-addr $IP:6012 clippy: From ce2b02fb054ed0f892135aa3003654f2078c1053 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Thu, 30 May 2024 11:55:55 +0800 Subject: [PATCH 4/6] fix: change the SocketAddr to IpAddr --- .github/workflows/ci.yml | 14 -------------- binaries/cli/src/check.rs | 4 ++-- binaries/cli/src/main.rs | 26 ++++++++++++++------------ binaries/cli/src/up.rs | 6 +++--- 4 files changed, 19 insertions(+), 31 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 03335bd8..d8a686c0 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -272,13 +272,6 @@ jobs: sleep 10 dora stop --name ci-rust-test --grace-duration 5s dora destroy - export IP=127.0.0.1 - dora up --coordinator-addr $IP:6012 - dora list --coordinator-addr $IP:6012 - dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 - sleep 10 - dora stop --name ci-rust-test --grace-duration 5s --coordinator-addr $IP:6012 - dora destroy --coordinator-addr $IP:6012 - name: "Test CLI (Python)" timeout-minutes: 30 @@ -302,13 +295,6 @@ jobs: sleep 10 dora stop --name ci-python-test --grace-duration 5s dora destroy - export IP=127.0.0.1 - dora up --coordinator-addr $IP:6012 - dora list --coordinator-addr $IP:6012 - dora start dataflow.yml --name ci-rust-test --coordinator-addr $IP:6012 - sleep 10 - dora stop --name ci-rust-test --grace-duration 5s --coordinator-addr $IP:6012 - dora destroy --coordinator-addr $IP:6012 clippy: name: "Clippy" diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 19dea806..277dd002 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -4,11 +4,11 @@ use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::{ io::{IsTerminal, Write}, - net::SocketAddr, + net::IpAddr, }; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment(coordinator_addr: Option) -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: Option) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index a746c080..58ac8c89 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,8 +5,7 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_DEFAULT, + control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT }, }; use dora_daemon::Daemon; @@ -45,7 +44,7 @@ enum Command { Check { #[clap(long)] dataflow: Option, - coordinator_addr: Option, + coordinator_addr: Option, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -69,14 +68,14 @@ enum Command { #[clap(long)] config: Option, #[clap(long)] - coordinator_addr: Option, + coordinator_addr: Option, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, #[clap(long)] - coordinator_addr: Option, + coordinator_addr: Option, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { @@ -84,7 +83,7 @@ enum Command { #[clap(long)] name: Option, #[clap(long)] - coordinator_addr: Option, + coordinator_addr: Option, #[clap(long, action)] attach: bool, #[clap(long, action)] @@ -99,12 +98,12 @@ enum Command { #[arg(value_parser = parse)] grace_duration: Option, #[clap(long)] - coordinator_addr: Option, + coordinator_addr: Option, }, /// List running dataflows. List { #[clap(long)] - coordinator_addr: Option, + coordinator_addr: Option, }, // Planned for future releases: // Dashboard, @@ -114,7 +113,7 @@ enum Command { dataflow: Option, node: String, #[clap(long)] - coordinator_addr: Option, + coordinator_addr: Option, }, // Metrics, // Stats, @@ -498,8 +497,11 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: Option, + coordinator_addr: Option, ) -> std::io::Result> { - let coordination_addr = coordinator_addr.unwrap_or_else(control_socket_addr); - TcpLayer::new().connect(coordination_addr) + if let Some(coordinator_addr) = coordinator_addr { + TcpLayer::new().connect(SocketAddr::new(coordinator_addr, DORA_COORDINATOR_PORT_CONTROL)) + } else { + TcpLayer::new().connect(control_socket_addr()) + } } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index f4d7e96e..b2b5ccc2 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,13 +1,13 @@ use crate::{check::daemon_running, connect_to_coordinator}; use dora_core::topics::ControlRequest; use eyre::Context; -use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; +use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} pub(crate) fn up( config_path: Option<&Path>, - coordinator_addr: Option, + coordinator_addr: Option, ) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let mut session = match connect_to_coordinator(coordinator_addr) { @@ -50,7 +50,7 @@ pub(crate) fn up( pub(crate) fn destroy( config_path: Option<&Path>, - coordinator_addr: Option, + coordinator_addr: Option, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; match connect_to_coordinator(coordinator_addr) { From 49e6994cdacbd0f61f2b5a8cbe16a290a7537cae Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Thu, 30 May 2024 12:49:00 +0800 Subject: [PATCH 5/6] fix: add dora coordinator control port --- binaries/cli/src/main.rs | 8 ++++++-- binaries/cli/src/up.rs | 5 +---- libraries/core/src/topics.rs | 6 +++++- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 58ac8c89..4a63b58a 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,7 +5,8 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT + control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, + DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -500,7 +501,10 @@ fn connect_to_coordinator( coordinator_addr: Option, ) -> std::io::Result> { if let Some(coordinator_addr) = coordinator_addr { - TcpLayer::new().connect(SocketAddr::new(coordinator_addr, DORA_COORDINATOR_PORT_CONTROL)) + TcpLayer::new().connect(SocketAddr::new( + coordinator_addr, + DORA_COORDINATOR_PORT_CONTROL, + )) } else { TcpLayer::new().connect(control_socket_addr()) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index b2b5ccc2..b404eaa7 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -5,10 +5,7 @@ use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up( - config_path: Option<&Path>, - coordinator_addr: Option, -) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 48048864..69ad75a5 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -13,11 +13,15 @@ use crate::{ }; pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; +pub const DORA_COORDINATOR_PORT_CONTROL: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; pub fn control_socket_addr() -> SocketAddr { - SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 6012) + SocketAddr::new( + Ipv4Addr::new(127, 0, 0, 1).into(), + DORA_COORDINATOR_PORT_CONTROL, + ) } #[derive(Debug, serde::Deserialize, serde::Serialize)] From 21f29091ccd7cd465fa06c50f59909bcf00fd2c4 Mon Sep 17 00:00:00 2001 From: Gege-Wang <2891067867@qq.com> Date: Thu, 30 May 2024 13:04:16 +0800 Subject: [PATCH 6/6] fix: add check command --- binaries/cli/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 4a63b58a..2f36fc34 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -45,6 +45,7 @@ enum Command { Check { #[clap(long)] dataflow: Option, + #[clap(long)] coordinator_addr: Option, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.