| Author | SHA1 | Message | Date |
|---|---|---|---|
|
|
ac23cb4f08
|
Fix `dora runtime` spawn command for dora installed through `pip`
Pip installations of dora use a small Python wrapper script, which then calls a functions of the `dora_cli` module. This means that the `current_exe()` will be a python binary instead of a standalone `dora` binary. This leads to errors when trying to start a `dora runtime` instance for shared library operators (see #900). This commit fixes this issue by introducing a new `DoraCommand` struct that is initialized in the respective main function. For normal Rust binaries, this is just initialized with `current_exe()` as before. When invoked from Python, we also include the first argument, which should be the path to the Python wrapper script invoking dora. We also use the 0th argument instead of `current_exe` for invoking the `python` binary because `current_exe` resolves symlinks on some platforms, which affects import statements (different base directory). Fixes #900 |
9 months ago |
| @@ -6,7 +6,7 @@ use std::sync::Arc; | |||||
| use std::time::Duration; | use std::time::Duration; | ||||
| use arrow::pyarrow::{FromPyArrow, ToPyArrow}; | use arrow::pyarrow::{FromPyArrow, ToPyArrow}; | ||||
| use dora_daemon::Daemon; | |||||
| use dora_daemon::{Daemon, DoraCommand}; | |||||
| use dora_download::download_file; | use dora_download::download_file; | ||||
| use dora_node_api::dora_core::config::NodeId; | use dora_node_api::dora_core::config::NodeId; | ||||
| use dora_node_api::dora_core::descriptor::source_is_url; | use dora_node_api::dora_core::descriptor::source_is_url; | ||||
| @@ -387,7 +387,12 @@ pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> { | |||||
| .enable_all() | .enable_all() | ||||
| .build() | .build() | ||||
| .context("tokio runtime failed")?; | .context("tokio runtime failed")?; | ||||
| let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?; | |||||
| let dora_command = DoraCommand::from_python_env()?; | |||||
| let result = rt.block_on(Daemon::run_dataflow( | |||||
| &dataflow_path, | |||||
| uv.unwrap_or_default(), | |||||
| dora_command, | |||||
| ))?; | |||||
| match result.is_ok() { | match result.is_ok() { | ||||
| true => Ok(()), | true => Ok(()), | ||||
| false => Err(eyre::eyre!( | false => Err(eyre::eyre!( | ||||
| @@ -9,7 +9,7 @@ use dora_core::{ | |||||
| DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, | DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, | ||||
| }, | }, | ||||
| }; | }; | ||||
| use dora_daemon::Daemon; | |||||
| use dora_daemon::{Daemon, DoraCommand}; | |||||
| use dora_download::download_file; | use dora_download::download_file; | ||||
| use dora_message::{ | use dora_message::{ | ||||
| cli_to_coordinator::ControlRequest, | cli_to_coordinator::ControlRequest, | ||||
| @@ -285,15 +285,15 @@ enum Lang { | |||||
| Cxx, | Cxx, | ||||
| } | } | ||||
| pub fn lib_main(args: Args) { | |||||
| if let Err(err) = run(args) { | |||||
| pub fn lib_main(args: Args, dora_command: DoraCommand) { | |||||
| if let Err(err) = run(args, dora_command) { | |||||
| eprintln!("\n\n{}", "[ERROR]".bold().red()); | eprintln!("\n\n{}", "[ERROR]".bold().red()); | ||||
| eprintln!("{err:?}"); | eprintln!("{err:?}"); | ||||
| std::process::exit(1); | std::process::exit(1); | ||||
| } | } | ||||
| } | } | ||||
| fn run(args: Args) -> eyre::Result<()> { | |||||
| fn run(args: Args, dora_command: DoraCommand) -> eyre::Result<()> { | |||||
| #[cfg(feature = "tracing")] | #[cfg(feature = "tracing")] | ||||
| match &args.command { | match &args.command { | ||||
| Command::Daemon { | Command::Daemon { | ||||
| @@ -379,7 +379,7 @@ fn run(args: Args) -> eyre::Result<()> { | |||||
| .enable_all() | .enable_all() | ||||
| .build() | .build() | ||||
| .context("tokio runtime failed")?; | .context("tokio runtime failed")?; | ||||
| let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?; | |||||
| let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv, dora_command))?; | |||||
| handle_dataflow_result(result, None)? | handle_dataflow_result(result, None)? | ||||
| } | } | ||||
| Command::Up { config } => { | Command::Up { config } => { | ||||
| @@ -541,11 +541,11 @@ fn run(args: Args) -> eyre::Result<()> { | |||||
| ); | ); | ||||
| } | } | ||||
| let result = Daemon::run_dataflow(&dataflow_path, false).await?; | |||||
| let result = Daemon::run_dataflow(&dataflow_path, false, dora_command).await?; | |||||
| handle_dataflow_result(result, None) | handle_dataflow_result(result, None) | ||||
| } | } | ||||
| None => { | None => { | ||||
| Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port).await | |||||
| Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port, dora_command).await | |||||
| } | } | ||||
| } | } | ||||
| }) | }) | ||||
| @@ -799,9 +799,9 @@ fn py_main(_py: Python) -> PyResult<()> { | |||||
| pyo3::prepare_freethreaded_python(); | pyo3::prepare_freethreaded_python(); | ||||
| // Skip first argument as it is a python call. | // Skip first argument as it is a python call. | ||||
| let args = std::env::args_os().skip(1).collect::<Vec<_>>(); | let args = std::env::args_os().skip(1).collect::<Vec<_>>(); | ||||
| let dora_command = DoraCommand::from_python_env()?; | |||||
| match Args::try_parse_from(args) { | match Args::try_parse_from(args) { | ||||
| Ok(args) => lib_main(args), | |||||
| Ok(args) => lib_main(args, dora_command), | |||||
| Err(err) => { | Err(err) => { | ||||
| eprintln!("{err}"); | eprintln!("{err}"); | ||||
| } | } | ||||
| @@ -1,7 +1,12 @@ | |||||
| use clap::Parser; | use clap::Parser; | ||||
| use dora_cli::Args; | use dora_cli::Args; | ||||
| use dora_daemon::DoraCommand; | |||||
| fn main() { | fn main() { | ||||
| let args = Args::parse(); | let args = Args::parse(); | ||||
| dora_cli::lib_main(args); | |||||
| let dora_command = DoraCommand { | |||||
| executable: std::env::current_exe().expect("failed to get current executable"), | |||||
| args: Vec::new(), | |||||
| }; | |||||
| dora_cli::lib_main(args, dora_command); | |||||
| } | } | ||||
| @@ -97,6 +97,8 @@ pub struct Daemon { | |||||
| remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, | remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, | ||||
| logger: DaemonLogger, | logger: DaemonLogger, | ||||
| dora_command: DoraCommand, | |||||
| } | } | ||||
| type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>; | type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>; | ||||
| @@ -106,6 +108,7 @@ impl Daemon { | |||||
| coordinator_addr: SocketAddr, | coordinator_addr: SocketAddr, | ||||
| machine_id: Option<String>, | machine_id: Option<String>, | ||||
| local_listen_port: u16, | local_listen_port: u16, | ||||
| dora_command: DoraCommand, | |||||
| ) -> eyre::Result<()> { | ) -> eyre::Result<()> { | ||||
| let clock = Arc::new(HLC::default()); | let clock = Arc::new(HLC::default()); | ||||
| @@ -137,12 +140,17 @@ impl Daemon { | |||||
| None, | None, | ||||
| clock, | clock, | ||||
| Some(remote_daemon_events_tx), | Some(remote_daemon_events_tx), | ||||
| dora_command, | |||||
| ) | ) | ||||
| .await | .await | ||||
| .map(|_| ()) | .map(|_| ()) | ||||
| } | } | ||||
| pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> { | |||||
| pub async fn run_dataflow( | |||||
| dataflow_path: &Path, | |||||
| uv: bool, | |||||
| dora_command: DoraCommand, | |||||
| ) -> eyre::Result<DataflowResult> { | |||||
| let working_dir = dataflow_path | let working_dir = dataflow_path | ||||
| .canonicalize() | .canonicalize() | ||||
| .context("failed to canonicalize dataflow path")? | .context("failed to canonicalize dataflow path")? | ||||
| @@ -192,6 +200,7 @@ impl Daemon { | |||||
| Some(exit_when_done), | Some(exit_when_done), | ||||
| clock.clone(), | clock.clone(), | ||||
| None, | None, | ||||
| dora_command, | |||||
| ); | ); | ||||
| let spawn_result = reply_rx | let spawn_result = reply_rx | ||||
| @@ -223,6 +232,7 @@ impl Daemon { | |||||
| exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, | exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, | ||||
| clock: Arc<HLC>, | clock: Arc<HLC>, | ||||
| remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, | remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, | ||||
| dora_command: DoraCommand, | |||||
| ) -> eyre::Result<DaemonRunResult> { | ) -> eyre::Result<DaemonRunResult> { | ||||
| let coordinator_connection = match coordinator_addr { | let coordinator_connection = match coordinator_addr { | ||||
| Some(addr) => { | Some(addr) => { | ||||
| @@ -286,6 +296,7 @@ impl Daemon { | |||||
| clock, | clock, | ||||
| zenoh_session, | zenoh_session, | ||||
| remote_daemon_events_tx, | remote_daemon_events_tx, | ||||
| dora_command, | |||||
| }; | }; | ||||
| let dora_events = ReceiverStream::new(dora_events_rx); | let dora_events = ReceiverStream::new(dora_events_rx); | ||||
| @@ -762,6 +773,7 @@ impl Daemon { | |||||
| node_stderr_most_recent, | node_stderr_most_recent, | ||||
| uv, | uv, | ||||
| &mut logger, | &mut logger, | ||||
| &self.dora_command, | |||||
| ) | ) | ||||
| .await | .await | ||||
| .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) | .wrap_err_with(|| format!("failed to spawn node `{node_id}`")) | ||||
| @@ -2180,3 +2192,43 @@ impl CoreNodeKindExt for CoreNodeKind { | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| /// Enables dora to spawn itself as a subprocess. | |||||
| #[derive(Clone, Debug)] | |||||
| pub struct DoraCommand { | |||||
| /// Path to the `dora` (or `python`) executable. | |||||
| /// | |||||
| /// When dora is installed as a standalone binary, this points to the | |||||
| /// `dora` executable directly. When installed through `pip`, this will | |||||
| /// point to the Python executable. | |||||
| pub executable: PathBuf, | |||||
| /// Additional arguments required to invoke dora. | |||||
| /// | |||||
| /// This will be empty when dora is installed as standalone binary. For | |||||
| /// pip installations, this will point to the dora python executable. | |||||
| pub args: Vec<String>, | |||||
| } | |||||
| impl DoraCommand { | |||||
| pub fn from_python_env() -> eyre::Result<Self> { | |||||
| let mut args = std::env::args(); | |||||
| // first arg is executable name | |||||
| // (we use first arg instead of std::env::current_exe() because the | |||||
| // latter follows symlinks on some platforms, which affects the python | |||||
| // import folders so that the dora_cli is not found) | |||||
| let executable = args.next().context("no executable name arg")?.into(); | |||||
| // second argument should be the python executable wrapping dora | |||||
| let python_file = args.next().context("no python file argument")?; | |||||
| Ok(DoraCommand { | |||||
| executable, | |||||
| args: [python_file].to_vec(), | |||||
| }) | |||||
| } | |||||
| fn to_tokio_command(&self) -> tokio::process::Command { | |||||
| let mut cmd = tokio::process::Command::new(&self.executable); | |||||
| cmd.args(&self.args); | |||||
| cmd | |||||
| } | |||||
| } | |||||
| @@ -1,7 +1,7 @@ | |||||
| use crate::{ | use crate::{ | ||||
| log::{self, NodeLogger}, | log::{self, NodeLogger}, | ||||
| node_communication::spawn_listener_loop, | node_communication::spawn_listener_loop, | ||||
| node_inputs, CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode, | |||||
| node_inputs, CoreNodeKindExt, DoraCommand, DoraEvent, Event, OutputId, RunningNode, | |||||
| }; | }; | ||||
| use aligned_vec::{AVec, ConstAlign}; | use aligned_vec::{AVec, ConstAlign}; | ||||
| use crossbeam::queue::ArrayQueue; | use crossbeam::queue::ArrayQueue; | ||||
| @@ -51,6 +51,7 @@ pub async fn spawn_node( | |||||
| node_stderr_most_recent: Arc<ArrayQueue<String>>, | node_stderr_most_recent: Arc<ArrayQueue<String>>, | ||||
| uv: bool, | uv: bool, | ||||
| logger: &mut NodeLogger<'_>, | logger: &mut NodeLogger<'_>, | ||||
| dora_command: &DoraCommand, | |||||
| ) -> eyre::Result<RunningNode> { | ) -> eyre::Result<RunningNode> { | ||||
| let node_id = node.id.clone(); | let node_id = node.id.clone(); | ||||
| logger | logger | ||||
| @@ -301,9 +302,7 @@ pub async fn spawn_node( | |||||
| cmd | cmd | ||||
| } | } | ||||
| } else if python_operators.is_empty() && other_operators { | } else if python_operators.is_empty() && other_operators { | ||||
| let mut cmd = tokio::process::Command::new( | |||||
| std::env::current_exe().wrap_err("failed to get current executable path")?, | |||||
| ); | |||||
| let mut cmd = dora_command.to_tokio_command(); | |||||
| cmd.arg("runtime"); | cmd.arg("runtime"); | ||||
| cmd | cmd | ||||
| } else { | } else { | ||||