| @@ -256,7 +256,7 @@ fn run() -> eyre::Result<()> { | |||
| .build() | |||
| .context("tokio runtime failed")?; | |||
| rt.block_on(async { | |||
| let (_, task) = | |||
| let (_port, task) = | |||
| dora_coordinator::start(port, futures::stream::empty::<Event>()).await?; | |||
| task.await | |||
| }) | |||
| @@ -276,19 +276,21 @@ fn run() -> eyre::Result<()> { | |||
| Some(dataflow_path) => { | |||
| tracing::info!("Starting dataflow `{}`", dataflow_path.display()); | |||
| Daemon::run_dataflow(&dataflow_path).await | |||
| } | |||
| None => { | |||
| Daemon::run( | |||
| coordinator_addr.unwrap_or_else(|| { | |||
| tracing::info!("Starting in local mode"); | |||
| let localhost = Ipv4Addr::new(127, 0, 0, 1); | |||
| (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() | |||
| }), | |||
| Daemon::run_dataflow( | |||
| &dataflow_path, | |||
| coordinator_addr, | |||
| machine_id.unwrap_or_default(), | |||
| ) | |||
| .await | |||
| } | |||
| None => { | |||
| let addr = coordinator_addr.unwrap_or_else(|| { | |||
| tracing::info!("Starting in local mode"); | |||
| let localhost = Ipv4Addr::new(127, 0, 0, 1); | |||
| (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() | |||
| }); | |||
| Daemon::run(addr, machine_id.unwrap_or_default()).await | |||
| } | |||
| } | |||
| }) | |||
| .context("failed to run dora-daemon")? | |||
| @@ -116,7 +116,11 @@ impl Daemon { | |||
| .map(|_| ()) | |||
| } | |||
| pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<()> { | |||
| pub async fn run_dataflow( | |||
| dataflow_path: &Path, | |||
| coordinator_addr: Option<SocketAddr>, | |||
| machine_id: String, | |||
| ) -> eyre::Result<()> { | |||
| let working_dir = dataflow_path | |||
| .canonicalize() | |||
| .context("failed to canoncialize dataflow path")? | |||
| @@ -156,8 +160,8 @@ impl Daemon { | |||
| }); | |||
| let run_result = Self::run_general( | |||
| Box::pin(coordinator_events), | |||
| None, | |||
| "".into(), | |||
| coordinator_addr, | |||
| machine_id, | |||
| Some(exit_when_done), | |||
| clock, | |||
| ); | |||
| @@ -35,10 +35,10 @@ async fn main() -> eyre::Result<()> { | |||
| let (coordinator_events_tx, coordinator_events_rx) = mpsc::channel(1); | |||
| let (coordinator_port, coordinator) = | |||
| dora_coordinator::start(Some(0), ReceiverStream::new(coordinator_events_rx)).await?; | |||
| dora_coordinator::start(None, ReceiverStream::new(coordinator_events_rx)).await?; | |||
| let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); | |||
| let daemon_a = run_dataflow(coordinator_addr.to_string(), "A".into(), dataflow); | |||
| let daemon_b = run_dataflow(coordinator_addr.to_string(), "B".into(), dataflow); | |||
| let daemon_a = run_daemon(coordinator_addr.to_string(), "A".into()); | |||
| let daemon_b = run_daemon(coordinator_addr.to_string(), "B".into()); | |||
| tracing::info!("Spawning coordinator and daemons"); | |||
| let mut tasks = JoinSet::new(); | |||
| @@ -46,6 +46,7 @@ async fn main() -> eyre::Result<()> { | |||
| tasks.spawn(daemon_a); | |||
| tasks.spawn(daemon_b); | |||
| std::thread::sleep(Duration::from_secs(20)); | |||
| // wait until both daemons are connected | |||
| tracing::info!("waiting until daemons are connected to coordinator"); | |||
| let mut retries = 0; | |||
| @@ -197,15 +198,13 @@ async fn build_dataflow(dataflow: &Path) -> eyre::Result<()> { | |||
| Ok(()) | |||
| } | |||
| async fn run_dataflow(coordinator: String, machine_id: &str, dataflow: &Path) -> eyre::Result<()> { | |||
| async fn run_daemon(coordinator: String, machine_id: &str) -> eyre::Result<()> { | |||
| let cargo = std::env::var("CARGO").unwrap(); | |||
| let mut cmd = tokio::process::Command::new(&cargo); | |||
| cmd.arg("run"); | |||
| cmd.arg("--package").arg("dora-cli"); | |||
| cmd.arg("--") | |||
| .arg("daemon") | |||
| .arg("--run-dataflow") | |||
| .arg(dataflow) | |||
| .arg("--machine-id") | |||
| .arg(machine_id) | |||
| .arg("--coordinator-addr") | |||
| @@ -1,6 +1,6 @@ | |||
| use dora_core::{get_pip_path, get_python_path, run}; | |||
| use dora_tracing::set_up_tracing; | |||
| use eyre::{ContextCompat, WrapErr}; | |||
| use eyre::{bail, ContextCompat, WrapErr}; | |||
| use std::path::Path; | |||
| #[tokio::main] | |||