| @@ -12,7 +12,7 @@ use dora_core::{ | |||||
| daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, | daemon_messages::{DaemonRequest, DataMessage, DataflowId, DropToken, NodeConfig, Timestamped}, | ||||
| descriptor::Descriptor, | descriptor::Descriptor, | ||||
| message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, | message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, | ||||
| topics::{DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, LOCALHOST}, | |||||
| topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST}, | |||||
| }; | }; | ||||
| use eyre::{bail, WrapErr}; | use eyre::{bail, WrapErr}; | ||||
| @@ -90,7 +90,7 @@ impl DoraNode { | |||||
| return Self::init(node_config); | return Self::init(node_config); | ||||
| } | } | ||||
| let daemon_address = (LOCALHOST, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT).into(); | |||||
| let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into(); | |||||
| let mut channel = | let mut channel = | ||||
| DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?; | DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?; | ||||
| @@ -6,7 +6,7 @@ use dora_core::{ | |||||
| descriptor::Descriptor, | descriptor::Descriptor, | ||||
| topics::{ | topics::{ | ||||
| ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, | ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, | ||||
| DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT, | |||||
| DORA_COORDINATOR_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, | |||||
| }, | }, | ||||
| }; | }; | ||||
| use dora_daemon::Daemon; | use dora_daemon::Daemon; | ||||
| @@ -176,10 +176,10 @@ enum Command { | |||||
| machine_id: Option<String>, | machine_id: Option<String>, | ||||
| /// The inter daemon IP address and port this daemon will bind to. | /// The inter daemon IP address and port this daemon will bind to. | ||||
| #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] | #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] | ||||
| addr: SocketAddr, | |||||
| /// The dynamic node port this daemon will bind to. | |||||
| #[clap(long, default_value_t = DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT)] | |||||
| dynamic_node_port: u16, | |||||
| inter_daemon_addr: SocketAddr, | |||||
| /// Local listen port for event such as dynamic node. | |||||
| #[clap(long, default_value_t = DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT)] | |||||
| local_listen_port: u16, | |||||
| /// Address and port number of the dora coordinator | /// Address and port number of the dora coordinator | ||||
| #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] | #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] | ||||
| coordinator_addr: SocketAddr, | coordinator_addr: SocketAddr, | ||||
| @@ -413,8 +413,8 @@ fn run() -> eyre::Result<()> { | |||||
| } | } | ||||
| Command::Daemon { | Command::Daemon { | ||||
| coordinator_addr, | coordinator_addr, | ||||
| addr, | |||||
| dynamic_node_port, | |||||
| inter_daemon_addr, | |||||
| local_listen_port, | |||||
| machine_id, | machine_id, | ||||
| run_dataflow, | run_dataflow, | ||||
| } => { | } => { | ||||
| @@ -439,7 +439,7 @@ fn run() -> eyre::Result<()> { | |||||
| if coordinator_addr.ip() == LOCALHOST { | if coordinator_addr.ip() == LOCALHOST { | ||||
| tracing::info!("Starting in local mode"); | tracing::info!("Starting in local mode"); | ||||
| } | } | ||||
| Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr, dynamic_node_port).await | |||||
| Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await | |||||
| } | } | ||||
| } | } | ||||
| }) | }) | ||||
| @@ -19,11 +19,11 @@ use dora_core::{ | |||||
| descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, | descriptor::{CoreNodeKind, Descriptor, ResolvedNode}, | ||||
| }; | }; | ||||
| use dynamic_node::DynamicNodeEventWrapper; | |||||
| use eyre::{bail, eyre, Context, ContextCompat, Result}; | use eyre::{bail, eyre, Context, ContextCompat, Result}; | ||||
| use futures::{future, stream, FutureExt, TryFutureExt}; | use futures::{future, stream, FutureExt, TryFutureExt}; | ||||
| use futures_concurrency::stream::Merge; | use futures_concurrency::stream::Merge; | ||||
| use inter_daemon::InterDaemonConnection; | use inter_daemon::InterDaemonConnection; | ||||
| use local_listener::DynamicNodeEventWrapper; | |||||
| use pending::PendingNodes; | use pending::PendingNodes; | ||||
| use shared_memory_server::ShmemConf; | use shared_memory_server::ShmemConf; | ||||
| use std::sync::Arc; | use std::sync::Arc; | ||||
| @@ -49,8 +49,8 @@ use tracing::{error, warn}; | |||||
| use uuid::{NoContext, Timestamp, Uuid}; | use uuid::{NoContext, Timestamp, Uuid}; | ||||
| mod coordinator; | mod coordinator; | ||||
| mod dynamic_node; | |||||
| mod inter_daemon; | mod inter_daemon; | ||||
| mod local_listener; | |||||
| mod log; | mod log; | ||||
| mod node_communication; | mod node_communication; | ||||
| mod pending; | mod pending; | ||||
| @@ -88,7 +88,7 @@ impl Daemon { | |||||
| coordinator_addr: SocketAddr, | coordinator_addr: SocketAddr, | ||||
| machine_id: String, | machine_id: String, | ||||
| inter_daemon_addr: SocketAddr, | inter_daemon_addr: SocketAddr, | ||||
| dynamic_node_port: u16, | |||||
| local_listen_port: u16, | |||||
| ) -> eyre::Result<()> { | ) -> eyre::Result<()> { | ||||
| let clock = Arc::new(HLC::default()); | let clock = Arc::new(HLC::default()); | ||||
| @@ -119,10 +119,10 @@ impl Daemon { | |||||
| }, | }, | ||||
| ); | ); | ||||
| // Spawn dynamic node listener loop | |||||
| // Spawn local listener loop | |||||
| let (events_tx, events_rx) = flume::bounded(10); | let (events_tx, events_rx) = flume::bounded(10); | ||||
| let _listen_port = dynamic_node::spawn_listener_loop( | |||||
| (LOCALHOST, dynamic_node_port).into(), | |||||
| let _listen_port = local_listener::spawn_listener_loop( | |||||
| (LOCALHOST, local_listen_port).into(), | |||||
| machine_id.clone(), | machine_id.clone(), | ||||
| events_tx, | events_tx, | ||||
| ) | ) | ||||
| @@ -31,7 +31,7 @@ pub async fn spawn_listener_loop( | |||||
| tokio::spawn(async move { | tokio::spawn(async move { | ||||
| listener_loop(socket, events_tx).await; | listener_loop(socket, events_tx).await; | ||||
| tracing::debug!("Dynamic node listener loop finished for machine `{machine_id}`"); | |||||
| tracing::debug!("Local listener loop finished for machine `{machine_id}`"); | |||||
| }); | }); | ||||
| Ok(listen_port) | Ok(listen_port) | ||||
| @@ -100,7 +100,7 @@ async fn handle_connection_loop( | |||||
| } | } | ||||
| }; | }; | ||||
| if let Err(err) = tcp_send(&mut connection, &serialized).await { | if let Err(err) = tcp_send(&mut connection, &serialized).await { | ||||
| tracing::warn!("failed to send reply to dynamic node: {err}"); | |||||
| tracing::warn!("failed to send reply: {err}"); | |||||
| continue; | continue; | ||||
| }; | }; | ||||
| } | } | ||||
| @@ -111,7 +111,7 @@ async fn handle_connection_loop( | |||||
| break; | break; | ||||
| } | } | ||||
| _ => tracing::warn!( | _ => tracing::warn!( | ||||
| "Unexpected Daemon Request that is not yet by Additional dynamic node controls" | |||||
| "Unexpected Daemon Request that is not yet by Additional local listener controls" | |||||
| ), | ), | ||||
| } | } | ||||
| } | } | ||||
| @@ -128,11 +128,11 @@ async fn receive_message( | |||||
| | ErrorKind::ConnectionReset => return Ok(None), | | ErrorKind::ConnectionReset => return Ok(None), | ||||
| _other => { | _other => { | ||||
| return Err(err) | return Err(err) | ||||
| .context("unexpected I/O error while trying to receive DynamicNodeEvent") | |||||
| .context("unexpected I/O error while trying to receive DaemonRequest") | |||||
| } | } | ||||
| }, | }, | ||||
| }; | }; | ||||
| bincode::deserialize(&raw) | bincode::deserialize(&raw) | ||||
| .wrap_err("failed to deserialize DynamicNodeEvent") | |||||
| .wrap_err("failed to deserialize DaemonRequest") | |||||
| .map(Some) | .map(Some) | ||||
| } | } | ||||
| @@ -214,7 +214,7 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||||
| async fn run_daemon( | async fn run_daemon( | ||||
| coordinator: String, | coordinator: String, | ||||
| machine_id: &str, | machine_id: &str, | ||||
| dynamic_node_port: u16, | |||||
| local_listen_port: u16, | |||||
| ) -> eyre::Result<()> { | ) -> eyre::Result<()> { | ||||
| let cargo = std::env::var("CARGO").unwrap(); | let cargo = std::env::var("CARGO").unwrap(); | ||||
| let mut cmd = tokio::process::Command::new(&cargo); | let mut cmd = tokio::process::Command::new(&cargo); | ||||
| @@ -227,7 +227,7 @@ async fn run_daemon( | |||||
| .arg("--coordinator-addr") | .arg("--coordinator-addr") | ||||
| .arg(coordinator) | .arg(coordinator) | ||||
| .arg("--dynamic-node-port") | .arg("--dynamic-node-port") | ||||
| .arg(dynamic_node_port.to_string()); | |||||
| .arg(local_listen_port.to_string()); | |||||
| if !cmd.status().await?.success() { | if !cmd.status().await?.success() { | ||||
| bail!("failed to run dataflow"); | bail!("failed to run dataflow"); | ||||
| }; | }; | ||||
| @@ -14,7 +14,7 @@ use crate::{ | |||||
| pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); | pub const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); | ||||
| pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; | pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; | ||||
| pub const DORA_DAEMON_DYNAMIC_NODE_PORT_DEFAULT: u16 = 0xD02B; | |||||
| pub const DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT: u16 = 0xD02B; | |||||
| pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; | pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; | ||||
| pub const MANUAL_STOP: &str = "dora/stop"; | pub const MANUAL_STOP: &str = "dora/stop"; | ||||