Browse Source

Fix multiple daemon example

tags/v0.3.2-rc0
haixuanTao 2 years ago
parent
commit
054ae5afcc
3 changed files with 8 additions and 5 deletions
  1. +3
    -1
      binaries/cli/src/main.rs
  2. +2
    -1
      binaries/coordinator/src/lib.rs
  3. +3
    -3
      examples/multiple-daemons/run.rs

+ 3
- 1
binaries/cli/src/main.rs View File

@@ -3,6 +3,7 @@ use std::{net::Ipv4Addr, path::PathBuf};
use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
@@ -255,7 +256,8 @@ fn run() -> eyre::Result<()> {
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_, task) = dora_coordinator::start(port).await?;
let (_, task) =
dora_coordinator::start(port, futures::stream::empty::<Event>()).await?;
task.await
})
.context("failed to run dora-coordinator")?


+ 2
- 1
binaries/coordinator/src/lib.rs View File

@@ -40,6 +40,7 @@ mod tcp_utils;

pub async fn start(
port: Option<u16>,
external_events: impl Stream<Item = Event> + Unpin,
) -> Result<(u16, impl Future<Output = eyre::Result<()>>), eyre::ErrReport> {
let port = port.unwrap_or(DORA_COORDINATOR_PORT_DEFAULT);
let listener = listener::create_listener(port).await?;
@@ -53,7 +54,7 @@ pub async fn start(
let ctrlc_events = set_up_ctrlc_handler()?;

let future = async move {
start_inner(listener, &tasks, ctrlc_events).await?;
start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?;

tracing::debug!("coordinator main loop finished, waiting on spawned tasks");
while let Some(join_result) = tasks.next().await {


+ 3
- 3
examples/multiple-daemons/run.rs View File

@@ -5,7 +5,7 @@ use dora_core::{
};
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use futures::stream;
use std::{
collections::BTreeSet,
net::{Ipv4Addr, SocketAddr},
@@ -37,8 +37,8 @@ async fn main() -> eyre::Result<()> {
let (coordinator_port, coordinator) =
dora_coordinator::start(Some(0), ReceiverStream::new(coordinator_events_rx)).await?;
let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port);
let daemon_a = dora_daemon::Daemon::run(coordinator_addr, "A".into(), stream::empty());
let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into(), stream::empty());
let daemon_a = dora_daemon::Daemon::run(coordinator_addr, "A".into());
let daemon_b = dora_daemon::Daemon::run(coordinator_addr, "B".into());

tracing::info!("Spawning coordinator and daemons");
let mut tasks = JoinSet::new();


Loading…
Cancel
Save