| @@ -12,7 +12,7 @@ use dora_daemon::Daemon; | |||
| use duration_str::parse; | |||
| use eyre::{bail, Context}; | |||
| use formatting::FormatDataflowError; | |||
| use std::{io::Write, net::SocketAddr}; | |||
| use std::{io::Write, net::SocketAddr, path::Path}; | |||
| use std::{ | |||
| net::{IpAddr, Ipv4Addr}, | |||
| path::PathBuf, | |||
| @@ -243,7 +243,7 @@ enum Lang { | |||
| Cxx, | |||
| } | |||
| pub fn run(command: Command) -> eyre::Result<()> { | |||
| pub fn run(command: Command, dora_cli_path: PathBuf) -> eyre::Result<()> { | |||
| let log_level = env_logger::Builder::new() | |||
| .filter_level(log::LevelFilter::Info) | |||
| .parse_default_env() | |||
| @@ -283,7 +283,7 @@ pub fn run(command: Command) -> eyre::Result<()> { | |||
| internal_create_with_path_dependencies, | |||
| } => template::create(args, internal_create_with_path_dependencies)?, | |||
| Command::Up { config } => { | |||
| up::up(config.as_deref())?; | |||
| up::up(config.as_deref(), &dora_cli_path)?; | |||
| } | |||
| Command::Logs { | |||
| dataflow, | |||
| @@ -445,14 +445,14 @@ pub fn run(command: Command) -> eyre::Result<()> { | |||
| ); | |||
| } | |||
| let result = Daemon::run_dataflow(&dataflow_path).await?; | |||
| let result = Daemon::run_dataflow(&dataflow_path, dora_cli_path.to_owned()).await?; | |||
| handle_dataflow_result(result, None) | |||
| } | |||
| None => { | |||
| if coordinator_addr.ip() == LOCALHOST { | |||
| tracing::info!("Starting in local mode"); | |||
| } | |||
| Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port).await | |||
| Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), inter_daemon_addr, local_listen_port, dora_cli_path.to_owned()).await | |||
| } | |||
| } | |||
| }) | |||
| @@ -50,5 +50,7 @@ fn main_inner() -> eyre::Result<()> { | |||
| } | |||
| }; | |||
| run(args.command) | |||
| let dora_cli_path = | |||
| std::env::current_exe().wrap_err("failed to get current executable path")?; | |||
| run(args.command, dora_cli_path) | |||
| } | |||
| @@ -5,13 +5,13 @@ use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; | |||
| #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] | |||
| struct UpConfig {} | |||
| pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { | |||
| pub(crate) fn up(config_path: Option<&Path>, dora_cli_path: &Path) -> eyre::Result<()> { | |||
| let UpConfig {} = parse_dora_config(config_path)?; | |||
| let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into(); | |||
| let mut session = match connect_to_coordinator(coordinator_addr) { | |||
| Ok(session) => session, | |||
| Err(_) => { | |||
| start_coordinator().wrap_err("failed to start dora-coordinator")?; | |||
| start_coordinator(dora_cli_path).wrap_err("failed to start dora-coordinator")?; | |||
| loop { | |||
| match connect_to_coordinator(coordinator_addr) { | |||
| @@ -26,7 +26,7 @@ pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { | |||
| }; | |||
| if !daemon_running(&mut *session)? { | |||
| start_daemon().wrap_err("failed to start dora-daemon")?; | |||
| start_daemon(dora_cli_path).wrap_err("failed to start dora-daemon")?; | |||
| // wait a bit until daemon is connected | |||
| let mut i = 0; | |||
| @@ -81,9 +81,8 @@ fn parse_dora_config(config_path: Option<&Path>) -> Result<UpConfig, eyre::ErrRe | |||
| Ok(config) | |||
| } | |||
| fn start_coordinator() -> eyre::Result<()> { | |||
| let mut cmd = | |||
| Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); | |||
| fn start_coordinator(dora_cli_path: &Path) -> eyre::Result<()> { | |||
| let mut cmd = Command::new(dora_cli_path); | |||
| cmd.arg("coordinator"); | |||
| cmd.arg("--quiet"); | |||
| cmd.spawn().wrap_err("failed to run `dora coordinator`")?; | |||
| @@ -93,9 +92,8 @@ fn start_coordinator() -> eyre::Result<()> { | |||
| Ok(()) | |||
| } | |||
| fn start_daemon() -> eyre::Result<()> { | |||
| let mut cmd = | |||
| Command::new(std::env::current_exe().wrap_err("failed to get current executable path")?); | |||
| fn start_daemon(dora_cli_path: &Path) -> eyre::Result<()> { | |||
| let mut cmd = Command::new(dora_cli_path); | |||
| cmd.arg("daemon"); | |||
| cmd.arg("--quiet"); | |||
| cmd.spawn().wrap_err("failed to run `dora daemon`")?; | |||
| @@ -86,6 +86,7 @@ pub struct Daemon { | |||
| dataflow_node_results: BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>, | |||
| clock: Arc<uhlc::HLC>, | |||
| dora_cli_path: PathBuf, | |||
| } | |||
| type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>; | |||
| @@ -96,6 +97,7 @@ impl Daemon { | |||
| machine_id: String, | |||
| inter_daemon_addr: SocketAddr, | |||
| local_listen_port: u16, | |||
| dora_cli_path: PathBuf, | |||
| ) -> eyre::Result<()> { | |||
| let clock = Arc::new(HLC::default()); | |||
| @@ -150,12 +152,16 @@ impl Daemon { | |||
| machine_id, | |||
| None, | |||
| clock, | |||
| dora_cli_path, | |||
| ) | |||
| .await | |||
| .map(|_| ()) | |||
| } | |||
| pub async fn run_dataflow(dataflow_path: &Path) -> eyre::Result<DataflowResult> { | |||
| pub async fn run_dataflow( | |||
| dataflow_path: &Path, | |||
| dora_cli_path: PathBuf, | |||
| ) -> eyre::Result<DataflowResult> { | |||
| let working_dir = dataflow_path | |||
| .canonicalize() | |||
| .context("failed to canoncialize dataflow path")? | |||
| @@ -200,6 +206,7 @@ impl Daemon { | |||
| "".to_string(), | |||
| Some(exit_when_done), | |||
| clock.clone(), | |||
| dora_cli_path, | |||
| ); | |||
| let spawn_result = reply_rx | |||
| @@ -230,6 +237,7 @@ impl Daemon { | |||
| machine_id: String, | |||
| exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, | |||
| clock: Arc<HLC>, | |||
| dora_cli_path: PathBuf, | |||
| ) -> eyre::Result<DaemonRunResult> { | |||
| let coordinator_connection = match coordinator_addr { | |||
| Some(addr) => { | |||
| @@ -256,6 +264,7 @@ impl Daemon { | |||
| exit_when_done, | |||
| dataflow_node_results: BTreeMap::new(), | |||
| clock, | |||
| dora_cli_path, | |||
| }; | |||
| let dora_events = ReceiverStream::new(dora_events_rx); | |||
| @@ -667,6 +676,7 @@ impl Daemon { | |||
| dataflow_descriptor.clone(), | |||
| self.clock.clone(), | |||
| node_stderr_most_recent, | |||
| &self.dora_cli_path, | |||
| ) | |||
| .await | |||
| .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) | |||
| @@ -44,6 +44,7 @@ pub async fn spawn_node( | |||
| dataflow_descriptor: Descriptor, | |||
| clock: Arc<HLC>, | |||
| node_stderr_most_recent: Arc<ArrayQueue<String>>, | |||
| dora_cli_path: &Path, | |||
| ) -> eyre::Result<RunningNode> { | |||
| let node_id = node.id.clone(); | |||
| tracing::debug!("Spawning node `{dataflow_id}/{node_id}`"); | |||
| @@ -223,9 +224,7 @@ pub async fn spawn_node( | |||
| command | |||
| } | |||
| } else if python_operators.is_empty() && other_operators { | |||
| let mut cmd = tokio::process::Command::new( | |||
| std::env::current_exe().wrap_err("failed to get current executable path")?, | |||
| ); | |||
| let mut cmd = tokio::process::Command::new(dora_cli_path); | |||
| cmd.arg("runtime"); | |||
| cmd | |||
| } else { | |||