| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
ac23cb4f08
|
Fix `dora runtime` spawn command for dora installed through `pip`
Pip installations of dora use a small Python wrapper script, which then calls a functions of the `dora_cli` module. This means that the `current_exe()` will be a python binary instead of a standalone `dora` binary. This leads to errors when trying to start a `dora runtime` instance for shared library operators (see #900). This commit fixes this issue by introducing a new `DoraCommand` struct that is initialized in the respective main function. For normal Rust binaries, this is just initialized with `current_exe()` as before. When invoked from Python, we also include the first argument, which should be the path to the Python wrapper script invoking dora. We also use the 0th argument instead of `current_exe` for invoking the `python` binary because `current_exe` resolves symlinks on some platforms, which affects import statements (different base directory). Fixes #900 |
9 months ago |
| @@ -6,7 +6,7 @@ use std::sync::Arc; | |||
| use std::time::Duration; | |||
| use arrow::pyarrow::{FromPyArrow, ToPyArrow}; | |||
| use dora_daemon::Daemon; | |||
| use dora_daemon::{Daemon, DoraCommand}; | |||
| use dora_download::download_file; | |||
| use dora_node_api::dora_core::config::NodeId; | |||
| use dora_node_api::dora_core::descriptor::source_is_url; | |||
| @@ -387,7 +387,12 @@ pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> { | |||
| .enable_all() | |||
| .build() | |||
| .context("tokio runtime failed")?; | |||
| let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?; | |||
| let dora_command = DoraCommand::from_python_env()?; | |||
| let result = rt.block_on(Daemon::run_dataflow( | |||
| &dataflow_path, | |||
| uv.unwrap_or_default(), | |||
| dora_command, | |||
| ))?; | |||
| match result.is_ok() { | |||
| true => Ok(()), | |||
| false => Err(eyre::eyre!( | |||
| @@ -9,7 +9,7 @@ use dora_core::{ | |||
| DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, | |||
| }, | |||
| }; | |||
| use dora_daemon::Daemon; | |||
| use dora_daemon::{Daemon, DoraCommand}; | |||
| use dora_download::download_file; | |||
| use dora_message::{ | |||
| cli_to_coordinator::ControlRequest, | |||
| @@ -285,15 +285,15 @@ enum Lang { | |||
| Cxx, | |||
| } | |||
| pub fn lib_main(args: Args) { | |||
| if let Err(err) = run(args) { | |||
| pub fn lib_main(args: Args, dora_command: DoraCommand) { | |||
| if let Err(err) = run(args, dora_command) { | |||
| eprintln!("\n\n{}", "[ERROR]".bold().red()); | |||
| eprintln!("{err:?}"); | |||
| std::process::exit(1); | |||
| } | |||
| } | |||
| fn run(args: Args) -> eyre::Result<()> { | |||
| fn run(args: Args, dora_command: DoraCommand) -> eyre::Result<()> { | |||
| #[cfg(feature = "tracing")] | |||
| match &args.command { | |||
| Command::Daemon { | |||
| @@ -379,7 +379,7 @@ fn run(args: Args) -> eyre::Result<()> { | |||
| .enable_all() | |||
| .build() | |||
| .context("tokio runtime failed")?; | |||
| let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?; | |||
| let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv, dora_command))?; | |||
| handle_dataflow_result(result, None)? | |||
| } | |||
| Command::Up { config } => { | |||
| @@ -541,11 +541,11 @@ fn run(args: Args) -> eyre::Result<()> { | |||
| ); | |||
| } | |||
| let result = Daemon::run_dataflow(&dataflow_path, false).await?; | |||
| let result = Daemon::run_dataflow(&dataflow_path, false, dora_command).await?; | |||
| handle_dataflow_result(result, None) | |||
| } | |||
| None => { | |||
| Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port).await | |||
| Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port, dora_command).await | |||
| } | |||
| } | |||
| }) | |||
| @@ -799,9 +799,9 @@ fn py_main(_py: Python) -> PyResult<()> { | |||
| pyo3::prepare_freethreaded_python(); | |||
| // Skip first argument as it is a python call. | |||
| let args = std::env::args_os().skip(1).collect::<Vec<_>>(); | |||
| let dora_command = DoraCommand::from_python_env()?; | |||
| match Args::try_parse_from(args) { | |||
| Ok(args) => lib_main(args), | |||
| Ok(args) => lib_main(args, dora_command), | |||
| Err(err) => { | |||
| eprintln!("{err}"); | |||
| } | |||
| @@ -1,7 +1,12 @@ | |||
| use clap::Parser; | |||
| use dora_cli::Args; | |||
| use dora_daemon::DoraCommand; | |||
| fn main() { | |||
| let args = Args::parse(); | |||
| dora_cli::lib_main(args); | |||
| let dora_command = DoraCommand { | |||
| executable: std::env::current_exe().expect("failed to get current executable"), | |||
| args: Vec::new(), | |||
| }; | |||
| dora_cli::lib_main(args, dora_command); | |||
| } | |||
| @@ -97,6 +97,8 @@ pub struct Daemon { | |||
| remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, | |||
| logger: DaemonLogger, | |||
| dora_command: DoraCommand, | |||
| } | |||
| type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>; | |||
| @@ -106,6 +108,7 @@ impl Daemon { | |||
| coordinator_addr: SocketAddr, | |||
| machine_id: Option<String>, | |||
| local_listen_port: u16, | |||
| dora_command: DoraCommand, | |||
| ) -> eyre::Result<()> { | |||
| let clock = Arc::new(HLC::default()); | |||
| @@ -137,12 +140,17 @@ impl Daemon { | |||
| None, | |||
| clock, | |||
| Some(remote_daemon_events_tx), | |||
| dora_command, | |||
| ) | |||
| .await | |||
| .map(|_| ()) | |||
| } | |||
| pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> { | |||
| pub async fn run_dataflow( | |||
| dataflow_path: &Path, | |||
| uv: bool, | |||
| dora_command: DoraCommand, | |||
| ) -> eyre::Result<DataflowResult> { | |||
| let working_dir = dataflow_path | |||
| .canonicalize() | |||
| .context("failed to canonicalize dataflow path")? | |||
| @@ -192,6 +200,7 @@ impl Daemon { | |||
| Some(exit_when_done), | |||
| clock.clone(), | |||
| None, | |||
| dora_command, | |||
| ); | |||
| let spawn_result = reply_rx | |||
| @@ -223,6 +232,7 @@ impl Daemon { | |||
| exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, | |||
| clock: Arc<HLC>, | |||
| remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, | |||
| dora_command: DoraCommand, | |||
| ) -> eyre::Result<DaemonRunResult> { | |||
| let coordinator_connection = match coordinator_addr { | |||
| Some(addr) => { | |||
| @@ -286,6 +296,7 @@ impl Daemon { | |||
| clock, | |||
| zenoh_session, | |||
| remote_daemon_events_tx, | |||
| dora_command, | |||
| }; | |||
| let dora_events = ReceiverStream::new(dora_events_rx); | |||
| @@ -762,6 +773,7 @@ impl Daemon { | |||
| node_stderr_most_recent, | |||
| uv, | |||
| &mut logger, | |||
| &self.dora_command, | |||
| ) | |||
| .await | |||
| .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) | |||
| @@ -2180,3 +2192,43 @@ impl CoreNodeKindExt for CoreNodeKind { | |||
| } | |||
| } | |||
| } | |||
| /// Enables dora to spawn itself as a subprocess. | |||
| #[derive(Clone, Debug)] | |||
| pub struct DoraCommand { | |||
| /// Path to the `dora` (or `python`) executable. | |||
| /// | |||
| /// When dora is installed as a standalone binary, this points to the | |||
| /// `dora` executable directly. When installed through `pip`, this will | |||
| /// point to the Python executable. | |||
| pub executable: PathBuf, | |||
| /// Additional arguments required to invoke dora. | |||
| /// | |||
| /// This will be empty when dora is installed as standalone binary. For | |||
| /// pip installations, this will point to the dora python executable. | |||
| pub args: Vec<String>, | |||
| } | |||
| impl DoraCommand { | |||
| pub fn from_python_env() -> eyre::Result<Self> { | |||
| let mut args = std::env::args(); | |||
| // first arg is executable name | |||
| // (we use first arg instead of std::env::current_exe() because the | |||
| // latter follows symlinks on some platforms, which affects the python | |||
| // import folders so that the dora_cli is not found) | |||
| let executable = args.next().context("no executable name arg")?.into(); | |||
| // second argument should be the python executable wrapping dora | |||
| let python_file = args.next().context("no python file argument")?; | |||
| Ok(DoraCommand { | |||
| executable, | |||
| args: [python_file].to_vec(), | |||
| }) | |||
| } | |||
| fn to_tokio_command(&self) -> tokio::process::Command { | |||
| let mut cmd = tokio::process::Command::new(&self.executable); | |||
| cmd.args(&self.args); | |||
| cmd | |||
| } | |||
| } | |||
| @@ -1,7 +1,7 @@ | |||
| use crate::{ | |||
| log::{self, NodeLogger}, | |||
| node_communication::spawn_listener_loop, | |||
| node_inputs, CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode, | |||
| node_inputs, CoreNodeKindExt, DoraCommand, DoraEvent, Event, OutputId, RunningNode, | |||
| }; | |||
| use aligned_vec::{AVec, ConstAlign}; | |||
| use crossbeam::queue::ArrayQueue; | |||
| @@ -51,6 +51,7 @@ pub async fn spawn_node( | |||
| node_stderr_most_recent: Arc<ArrayQueue<String>>, | |||
| uv: bool, | |||
| logger: &mut NodeLogger<'_>, | |||
| dora_command: &DoraCommand, | |||
| ) -> eyre::Result<RunningNode> { | |||
| let node_id = node.id.clone(); | |||
| logger | |||
| @@ -301,9 +302,7 @@ pub async fn spawn_node( | |||
| cmd | |||
| } | |||
| } else if python_operators.is_empty() && other_operators { | |||
| let mut cmd = tokio::process::Command::new( | |||
| std::env::current_exe().wrap_err("failed to get current executable path")?, | |||
| ); | |||
| let mut cmd = dora_command.to_tokio_command(); | |||
| cmd.arg("runtime"); | |||
| cmd | |||
| } else { | |||