From 28be4a6663142af63e2ab530be26b7c1b931132e Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Mon, 15 Jan 2024 17:03:30 +0100 Subject: [PATCH] Fix example by using the right daemon command --- binaries/cli/src/main.rs | 22 ++++++++++++---------- binaries/daemon/src/lib.rs | 10 +++++++--- examples/multiple-daemons/run.rs | 11 +++++------ examples/python-ros2-dataflow/run.rs | 2 +- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 404d2794..cc71e198 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -256,7 +256,7 @@ fn run() -> eyre::Result<()> { .build() .context("tokio runtime failed")?; rt.block_on(async { - let (_, task) = + let (_port, task) = dora_coordinator::start(port, futures::stream::empty::()).await?; task.await }) @@ -276,19 +276,21 @@ fn run() -> eyre::Result<()> { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - Daemon::run_dataflow(&dataflow_path).await - } - None => { - Daemon::run( - coordinator_addr.unwrap_or_else(|| { - tracing::info!("Starting in local mode"); - let localhost = Ipv4Addr::new(127, 0, 0, 1); - (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() - }), + Daemon::run_dataflow( + &dataflow_path, + coordinator_addr, machine_id.unwrap_or_default(), ) .await } + None => { + let addr = coordinator_addr.unwrap_or_else(|| { + tracing::info!("Starting in local mode"); + let localhost = Ipv4Addr::new(127, 0, 0, 1); + (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() + }); + Daemon::run(addr, machine_id.unwrap_or_default()).await + } } }) .context("failed to run dora-daemon")? diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 9bf6d189..f5a2b8a5 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -116,7 +116,11 @@ impl Daemon { .map(|_| ()) } - pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { + pub async fn run_dataflow( + dataflow_path: &Path, + coordinator_addr: Option, + machine_id: String, + ) -> eyre::Result<()> { let working_dir = dataflow_path .canonicalize() .context("failed to canoncialize dataflow path")? @@ -156,8 +160,8 @@ impl Daemon { }); let run_result = Self::run_general( Box::pin(coordinator_events), - None, - "".into(), + coordinator_addr, + machine_id, Some(exit_when_done), clock, ); diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 78c50d3d..d8fb13f6 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -35,10 +35,10 @@ async fn main() -> eyre::Result<()> { let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); let (coordinator_port, coordinator) = - dora_coordinator::start(Some(0), ReceiverStream::new(coordinator_events_rx)).await?; + dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); - let daemon_a = run_dataflow(coordinator_addr.to_string(), "A".into(), dataflow); - let daemon_b = run_dataflow(coordinator_addr.to_string(), "B".into(), dataflow); + let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); + let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); tracing::info!("Spawning coordinator and daemons"); let mut tasks = JoinSet::new(); @@ -46,6 +46,7 @@ async fn main() -> eyre::Result<()> { tasks.spawn(daemon_a); tasks.spawn(daemon_b); + std::thread::sleep(Duration::from_secs(20)); // wait until both daemons are connected tracing::info!("waiting until daemons are connected to coordinator"); let mut retries = 0; @@ -197,15 +198,13 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { Ok(()) } -async fn run_dataflow(coordinator: String, machine_id: &str, dataflow: &Path) -> eyre::Result<()> { +async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { let cargo = std::env::var("CARGO").unwrap(); let mut cmd = tokio::process::Command::new(&cargo); cmd.arg("run"); cmd.arg("--package").arg("dora-cli"); cmd.arg("--") .arg("daemon") - .arg("--run-dataflow") - .arg(dataflow) .arg("--machine-id") .arg(machine_id) .arg("--coordinator-addr") diff --git a/examples/python-ros2-dataflow/run.rs b/examples/python-ros2-dataflow/run.rs index 6a9435d9..8eecdf64 100644 --- a/examples/python-ros2-dataflow/run.rs +++ b/examples/python-ros2-dataflow/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_pip_path, get_python_path, run}; use dora_tracing::set_up_tracing; -use eyre::{ContextCompat, WrapErr}; +use eyre::{bail, ContextCompat, WrapErr}; use std::path::Path; #[tokio::main]