From a0a95b730cd12723075c7c6e6ca3ee8de678e60f Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 10 Jun 2024 20:41:04 +0200 Subject: [PATCH] Rename dynamic node listener -> local listener --- apis/rust/node/src/node/mod.rs | 4 ++-- binaries/cli/src/main.rs | 16 ++++++++-------- binaries/daemon/src/lib.rs | 12 ++++++------ .../src/{dynamic_node.rs => local_listener.rs} | 10 +++++----- examples/multiple-daemons/run.rs | 4 ++-- libraries/core/src/topics.rs | 2 +- 6 files changed, 24 insertions(+), 24 deletions(-) rename binaries/daemon/src/{dynamic_node.rs => local_listener.rs} (93%) diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 31fdd8a6..8995a8fc 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -12,7 +12,7 @@ use dora_core::{ daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, descriptor::Descriptor, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, - topics::{DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, LOCALHOST}, + topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST}, }; use eyre::{bail, WrapErr}; @@ -90,7 +90,7 @@ impl DoraNode { return Self::init(node_config); } - let daemon_address = (LOCALHOST, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT).into(); + let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(); let mut channel = DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?; diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index d54a43aa..e3f900b2 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -6,7 +6,7 @@ use dora_core::{ descriptor::Descriptor, topics::{ ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -176,10 +176,10 @@ enum Command { machine_id: Option, /// The inter daemon IP address and port this daemon will bind to. #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] - addr: SocketAddr, - /// The dynamic node port this daemon will bind to. - #[clap(long, default_value_t = DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT)] - dynamic_node_port: u16, + inter_daemon_addr: SocketAddr, + /// Local listen port for event such as dynamic node. + #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] + local_listen_port: u16, /// Address and port number of the dora coordinator #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] coordinator_addr: SocketAddr, @@ -413,8 +413,8 @@ fn run() -> eyre::Result<()> { } Command::Daemon { coordinator_addr, - addr, - dynamic_node_port, + inter_daemon_addr, + local_listen_port, machine_id, run_dataflow, } => { @@ -439,7 +439,7 @@ fn run() -> eyre::Result<()> { if coordinator_addr.ip() == LOCALHOST { tracing::info!("Starting in local mode"); } - Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr, dynamic_node_port).await + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await } } }) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 6c4d58bd..72c699e3 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -19,11 +19,11 @@ use dora_core::{ descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, }; -use dynamic_node::DynamicNodeEventWrapper; use eyre::{bail, eyre, Context, ContextCompat, Result}; use futures::{future, stream, FutureExt, TryFutureExt}; use futures_concurrency::stream::Merge; use inter_daemon::InterDaemonConnection; +use local_listener::DynamicNodeEventWrapper; use pending::PendingNodes; use shared_memory_server::ShmemConf; use std::sync::Arc; @@ -49,8 +49,8 @@ use tracing::{error, warn}; use uuid::{NoContext, Timestamp, Uuid}; mod coordinator; -mod dynamic_node; mod inter_daemon; +mod local_listener; mod log; mod node_communication; mod pending; @@ -88,7 +88,7 @@ impl Daemon { coordinator_addr: SocketAddr, machine_id: String, inter_daemon_addr: SocketAddr, - dynamic_node_port: u16, + local_listen_port: u16, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); @@ -119,10 +119,10 @@ impl Daemon { }, ); - // Spawn dynamic node listener loop + // Spawn local listener loop let (events_tx, events_rx) = flume::bounded(10); - let _listen_port = dynamic_node::spawn_listener_loop( - (LOCALHOST, dynamic_node_port).into(), + let _listen_port = local_listener::spawn_listener_loop( + (LOCALHOST, local_listen_port).into(), machine_id.clone(), events_tx, ) diff --git a/binaries/daemon/src/dynamic_node.rs b/binaries/daemon/src/local_listener.rs similarity index 93% rename from binaries/daemon/src/dynamic_node.rs rename to binaries/daemon/src/local_listener.rs index 7b1f9dc3..dbffe39e 100644 --- a/binaries/daemon/src/dynamic_node.rs +++ b/binaries/daemon/src/local_listener.rs @@ -31,7 +31,7 @@ pub async fn spawn_listener_loop( tokio::spawn(async move { listener_loop(socket, events_tx).await; - tracing::debug!("Dynamic node listener loop finished for machine `{machine_id}`"); + tracing::debug!("Local listener loop finished for machine `{machine_id}`"); }); Ok(listen_port) @@ -100,7 +100,7 @@ async fn handle_connection_loop( } }; if let Err(err) = tcp_send(&mut connection, &serialized).await { - tracing::warn!("failed to send reply to dynamic node: {err}"); + tracing::warn!("failed to send reply: {err}"); continue; }; } @@ -111,7 +111,7 @@ async fn handle_connection_loop( break; } _ => tracing::warn!( - "Unexpected Daemon Request that is not yet by Additional dynamic node controls" + "Unexpected Daemon Request that is not yet by Additional local listener controls" ), } } @@ -128,11 +128,11 @@ async fn receive_message( | ErrorKind::ConnectionReset => return Ok(None), _other => { return Err(err) - .context("unexpected I/O error while trying to receive DynamicNodeEvent") + .context("unexpected I/O error while trying to receive DaemonRequest") } }, }; bincode::deserialize(&raw) - .wrap_err("failed to deserialize DynamicNodeEvent") + .wrap_err("failed to deserialize DaemonRequest") .map(Some) } diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index dae921ee..59e75fb0 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -214,7 +214,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { async fn run_daemon( coordinator: String, machine_id: &str, - dynamic_node_port: u16, + local_listen_port: u16, ) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); @@ -227,7 +227,7 @@ async fn run_daemon( .arg("--coordinator-addr") .arg(coordinator) .arg("--dynamic-node-port") - .arg(dynamic_node_port.to_string()); + .arg(local_listen_port.to_string()); if !cmd.status().await?.success() { bail!("failed to run dataflow"); }; diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 4da3ab7b..8e90e48f 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -14,7 +14,7 @@ use crate::{ pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; -pub const DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT: u16 = 0xD02B; +pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B; pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop";