|
|
|
@@ -1,14 +1,14 @@ |
|
|
|
use dora_coordinator::{ControlEvent, Event}; |
|
|
|
use dora_core::{ |
|
|
|
descriptor::Descriptor, |
|
|
|
topics::{ControlRequest, ControlRequestReply, DataflowId}, |
|
|
|
topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, |
|
|
|
}; |
|
|
|
use dora_tracing::set_up_tracing; |
|
|
|
use eyre::{bail, Context}; |
|
|
|
|
|
|
|
use std::{ |
|
|
|
collections::BTreeSet, |
|
|
|
net::{Ipv4Addr, SocketAddr}, |
|
|
|
net::{IpAddr, Ipv4Addr, SocketAddr}, |
|
|
|
path::Path, |
|
|
|
time::Duration, |
|
|
|
}; |
|
|
|
@@ -34,8 +34,13 @@ async fn main() -> eyre::Result<()> { |
|
|
|
build_dataflow(dataflow).await?; |
|
|
|
|
|
|
|
let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); |
|
|
|
let coordinator_bind = SocketAddr::new( |
|
|
|
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), |
|
|
|
DORA_COORDINATOR_PORT_DEFAULT, |
|
|
|
); |
|
|
|
let (coordinator_port, coordinator) = |
|
|
|
dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; |
|
|
|
dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) |
|
|
|
.await?; |
|
|
|
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); |
|
|
|
let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); |
|
|
|
let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); |
|
|
|
|