Browse Source

Allow setting custom control port in coordinator

tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
2ccba2b953
Failed to extract signature
3 changed files with 56 additions and 54 deletions
  1. +30
    -11
      binaries/cli/src/main.rs
  2. +24
    -28
      binaries/coordinator/src/lib.rs
  3. +2
    -15
      libraries/core/src/topics.rs

+ 30
- 11
binaries/cli/src/main.rs View File

@@ -5,8 +5,8 @@ use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT,
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
@@ -140,10 +140,18 @@ enum Command {
Runtime,
/// Run coordinator
Coordinator {
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT)
)]
addr: SocketAddr,
/// Network interface to bind to for daemon communication
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))]
interface: IpAddr,
/// Port number to bind to for daemon communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)]
port: u16,
/// Network interface to bind to for control communication
#[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))]
control_interface: IpAddr,
/// Port number to bind to for control communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
control_port: u16,
},
}

@@ -319,14 +327,22 @@ fn run() -> eyre::Result<()> {
config,
coordinator_addr,
} => up::destroy(config.as_deref(), coordinator_addr)?,
Command::Coordinator { addr } => {
Command::Coordinator {
interface,
port,
control_interface,
control_port,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_port, task) =
dora_coordinator::start(addr, futures::stream::empty::<Event>()).await?;
let bind = SocketAddr::new(interface, port);
let bind_control = SocketAddr::new(control_interface, control_port);
let (port, task) =
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
.await?;
task.await
})
.context("failed to run dora-coordinator")?
@@ -504,9 +520,12 @@ fn connect_to_coordinator(
if let Some(coordinator_addr) = coordinator_addr {
TcpLayer::new().connect(SocketAddr::new(
coordinator_addr,
DORA_COORDINATOR_PORT_CONTROL,
DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
))
} else {
TcpLayer::new().connect(control_socket_addr())
TcpLayer::new().connect(SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
))
}
}

+ 24
- 28
binaries/coordinator/src/lib.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped},
descriptor::{Descriptor, ResolvedNode},
message::uhlc::{self, HLC},
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
topics::{ControlRequest, ControlRequestReply, DataflowId},
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use futures::{stream::FuturesUnordered, Future, Stream, StreamExt};
@@ -22,11 +22,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tokio::{
net::{TcpListener, TcpStream},
sync::mpsc,
task::JoinHandle,
};
use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
use uuid::Uuid;

@@ -37,6 +33,7 @@ mod tcp_utils;

pub async fn start(
bind: SocketAddr,
bind_control: SocketAddr,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let listener = listener::create_listener(bind).await?;
@@ -44,13 +41,30 @@ pub async fn start(
.local_addr()
.wrap_err("failed to get local addr of listener")?
.port();
let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
.unwrap_or_else(Event::DaemonConnectError)
});

let mut tasks = FuturesUnordered::new();
let control_events = control::control_events(bind_control, &tasks)
.await
.wrap_err("failed to create control events")?;

// Setup ctrl-c handler
let ctrlc_events = set_up_ctrlc_handler()?;

let events = (
external_events,
new_daemon_connections,
control_events,
ctrlc_events,
)
.merge();

let future = async move {
start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?;
start_inner(events, &tasks).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {
@@ -100,40 +114,22 @@ fn resolve_name(
}

async fn start_inner(
listener: TcpListener,
events: impl Stream<Item = Event> + Unpin,
tasks: &FuturesUnordered<JoinHandle<()>>,
external_events: impl Stream<Item = Event> + Unpin,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

let new_daemon_connections = TcpListenerStream::new(listener).map(|c| {
c.map(Event::NewDaemonConnection)
.wrap_err("failed to open connection")
.unwrap_or_else(Event::DaemonConnectError)
});

let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2);
let mut daemon_events_tx = Some(daemon_events_tx);
let daemon_events = ReceiverStream::new(daemon_events);

let control_events = control::control_events(control_socket_addr(), tasks)
.await
.wrap_err("failed to create control events")?;

let daemon_heartbeat_interval =
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3)))
.map(|_| Event::DaemonHeartbeatInterval);

// events that should be aborted on `dora destroy`
let (abortable_events, abort_handle) = futures::stream::abortable(
(
control_events,
new_daemon_connections,
external_events,
daemon_heartbeat_interval,
)
.merge(),
);
let (abortable_events, abort_handle) =
futures::stream::abortable((events, daemon_heartbeat_interval).merge());

let mut events = (abortable_events, daemon_events).merge();



+ 2
- 15
libraries/core/src/topics.rs View File

@@ -1,10 +1,4 @@
use std::{
collections::BTreeSet,
fmt::Display,
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
time::Duration,
};
use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration};
use uuid::Uuid;

use crate::{
@@ -13,17 +7,10 @@ use crate::{
};

pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A;
pub const DORA_COORDINATOR_PORT_CONTROL: u16 = 0x177C;
pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C;

pub const MANUAL_STOP: &str = "dora/stop";

pub fn control_socket_addr() -> SocketAddr {
SocketAddr::new(
Ipv4Addr::new(127, 0, 0, 1).into(),
DORA_COORDINATOR_PORT_CONTROL,
)
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Start {


Loading…
Cancel
Save