Compare commits

...

Author SHA1 Message Date
  Philipp Oppermann ac23cb4f08
Fix `dora runtime` spawn command for dora installed through `pip` 9 months ago
5 changed files with 78 additions and 17 deletions
Unified View
  1. +7
    -2
      apis/python/node/src/lib.rs
  2. +9
    -9
      binaries/cli/src/lib.rs
  3. +6
    -1
      binaries/cli/src/main.rs
  4. +53
    -1
      binaries/daemon/src/lib.rs
  5. +3
    -4
      binaries/daemon/src/spawn.rs

+ 7
- 2
apis/python/node/src/lib.rs View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;


use arrow::pyarrow::{FromPyArrow, ToPyArrow}; use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_daemon::Daemon;
use dora_daemon::{Daemon, DoraCommand};
use dora_download::download_file; use dora_download::download_file;
use dora_node_api::dora_core::config::NodeId; use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::descriptor::source_is_url; use dora_node_api::dora_core::descriptor::source_is_url;
@@ -387,7 +387,12 @@ pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> {
.enable_all() .enable_all()
.build() .build()
.context("tokio runtime failed")?; .context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?;
let dora_command = DoraCommand::from_python_env()?;
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
uv.unwrap_or_default(),
dora_command,
))?;
match result.is_ok() { match result.is_ok() {
true => Ok(()), true => Ok(()),
false => Err(eyre::eyre!( false => Err(eyre::eyre!(


+ 9
- 9
binaries/cli/src/lib.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
}, },
}; };
use dora_daemon::Daemon;
use dora_daemon::{Daemon, DoraCommand};
use dora_download::download_file; use dora_download::download_file;
use dora_message::{ use dora_message::{
cli_to_coordinator::ControlRequest, cli_to_coordinator::ControlRequest,
@@ -285,15 +285,15 @@ enum Lang {
Cxx, Cxx,
} }


pub fn lib_main(args: Args) {
if let Err(err) = run(args) {
pub fn lib_main(args: Args, dora_command: DoraCommand) {
if let Err(err) = run(args, dora_command) {
eprintln!("\n\n{}", "[ERROR]".bold().red()); eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:?}"); eprintln!("{err:?}");
std::process::exit(1); std::process::exit(1);
} }
} }


fn run(args: Args) -> eyre::Result<()> {
fn run(args: Args, dora_command: DoraCommand) -> eyre::Result<()> {
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
match &args.command { match &args.command {
Command::Daemon { Command::Daemon {
@@ -379,7 +379,7 @@ fn run(args: Args) -> eyre::Result<()> {
.enable_all() .enable_all()
.build() .build()
.context("tokio runtime failed")?; .context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv, dora_command))?;
handle_dataflow_result(result, None)? handle_dataflow_result(result, None)?
} }
Command::Up { config } => { Command::Up { config } => {
@@ -541,11 +541,11 @@ fn run(args: Args) -> eyre::Result<()> {
); );
} }


let result = Daemon::run_dataflow(&dataflow_path, false).await?;
let result = Daemon::run_dataflow(&dataflow_path, false, dora_command).await?;
handle_dataflow_result(result, None) handle_dataflow_result(result, None)
} }
None => { None => {
Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port).await
Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port, dora_command).await
} }
} }
}) })
@@ -799,9 +799,9 @@ fn py_main(_py: Python) -> PyResult<()> {
pyo3::prepare_freethreaded_python(); pyo3::prepare_freethreaded_python();
// Skip first argument as it is a python call. // Skip first argument as it is a python call.
let args = std::env::args_os().skip(1).collect::<Vec<_>>(); let args = std::env::args_os().skip(1).collect::<Vec<_>>();
let dora_command = DoraCommand::from_python_env()?;
match Args::try_parse_from(args) { match Args::try_parse_from(args) {
Ok(args) => lib_main(args),
Ok(args) => lib_main(args, dora_command),
Err(err) => { Err(err) => {
eprintln!("{err}"); eprintln!("{err}");
} }


+ 6
- 1
binaries/cli/src/main.rs View File

@@ -1,7 +1,12 @@
use clap::Parser; use clap::Parser;
use dora_cli::Args; use dora_cli::Args;
use dora_daemon::DoraCommand;


fn main() { fn main() {
let args = Args::parse(); let args = Args::parse();
dora_cli::lib_main(args);
let dora_command = DoraCommand {
executable: std::env::current_exe().expect("failed to get current executable"),
args: Vec::new(),
};
dora_cli::lib_main(args, dora_command);
} }

+ 53
- 1
binaries/daemon/src/lib.rs View File

@@ -97,6 +97,8 @@ pub struct Daemon {
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,


logger: DaemonLogger, logger: DaemonLogger,

dora_command: DoraCommand,
} }


type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>; type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
@@ -106,6 +108,7 @@ impl Daemon {
coordinator_addr: SocketAddr, coordinator_addr: SocketAddr,
machine_id: Option<String>, machine_id: Option<String>,
local_listen_port: u16, local_listen_port: u16,
dora_command: DoraCommand,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let clock = Arc::new(HLC::default()); let clock = Arc::new(HLC::default());


@@ -137,12 +140,17 @@ impl Daemon {
None, None,
clock, clock,
Some(remote_daemon_events_tx), Some(remote_daemon_events_tx),
dora_command,
) )
.await .await
.map(|_| ()) .map(|_| ())
} }


pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
pub async fn run_dataflow(
dataflow_path: &Path,
uv: bool,
dora_command: DoraCommand,
) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path let working_dir = dataflow_path
.canonicalize() .canonicalize()
.context("failed to canonicalize dataflow path")? .context("failed to canonicalize dataflow path")?
@@ -192,6 +200,7 @@ impl Daemon {
Some(exit_when_done), Some(exit_when_done),
clock.clone(), clock.clone(),
None, None,
dora_command,
); );


let spawn_result = reply_rx let spawn_result = reply_rx
@@ -223,6 +232,7 @@ impl Daemon {
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>, exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>, clock: Arc<HLC>,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>, remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
dora_command: DoraCommand,
) -> eyre::Result<DaemonRunResult> { ) -> eyre::Result<DaemonRunResult> {
let coordinator_connection = match coordinator_addr { let coordinator_connection = match coordinator_addr {
Some(addr) => { Some(addr) => {
@@ -286,6 +296,7 @@ impl Daemon {
clock, clock,
zenoh_session, zenoh_session,
remote_daemon_events_tx, remote_daemon_events_tx,
dora_command,
}; };


let dora_events = ReceiverStream::new(dora_events_rx); let dora_events = ReceiverStream::new(dora_events_rx);
@@ -762,6 +773,7 @@ impl Daemon {
node_stderr_most_recent, node_stderr_most_recent,
uv, uv,
&mut logger, &mut logger,
&self.dora_command,
) )
.await .await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`")) .wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
@@ -2180,3 +2192,43 @@ impl CoreNodeKindExt for CoreNodeKind {
} }
} }
} }

/// Enables dora to spawn itself as a subprocess.
#[derive(Clone, Debug)]
pub struct DoraCommand {
/// Path to the `dora` (or `python`) executable.
///
/// When dora is installed as a standalone binary, this points to the
/// `dora` executable directly. When installed through `pip`, this will
/// point to the Python executable.
pub executable: PathBuf,
/// Additional arguments required to invoke dora.
///
/// This will be empty when dora is installed as standalone binary. For
/// pip installations, this will point to the dora python executable.
pub args: Vec<String>,
}

impl DoraCommand {
pub fn from_python_env() -> eyre::Result<Self> {
let mut args = std::env::args();
// first arg is executable name
// (we use first arg instead of std::env::current_exe() because the
// latter follows symlinks on some platforms, which affects the python
// import folders so that the dora_cli is not found)
let executable = args.next().context("no executable name arg")?.into();
// second argument should be the python executable wrapping dora
let python_file = args.next().context("no python file argument")?;

Ok(DoraCommand {
executable,
args: [python_file].to_vec(),
})
}

fn to_tokio_command(&self) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new(&self.executable);
cmd.args(&self.args);
cmd
}
}

+ 3
- 4
binaries/daemon/src/spawn.rs View File

@@ -1,7 +1,7 @@
use crate::{ use crate::{
log::{self, NodeLogger}, log::{self, NodeLogger},
node_communication::spawn_listener_loop, node_communication::spawn_listener_loop,
node_inputs, CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode,
node_inputs, CoreNodeKindExt, DoraCommand, DoraEvent, Event, OutputId, RunningNode,
}; };
use aligned_vec::{AVec, ConstAlign}; use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue; use crossbeam::queue::ArrayQueue;
@@ -51,6 +51,7 @@ pub async fn spawn_node(
node_stderr_most_recent: Arc<ArrayQueue<String>>, node_stderr_most_recent: Arc<ArrayQueue<String>>,
uv: bool, uv: bool,
logger: &mut NodeLogger<'_>, logger: &mut NodeLogger<'_>,
dora_command: &DoraCommand,
) -> eyre::Result<RunningNode> { ) -> eyre::Result<RunningNode> {
let node_id = node.id.clone(); let node_id = node.id.clone();
logger logger
@@ -301,9 +302,7 @@ pub async fn spawn_node(
cmd cmd
} }
} else if python_operators.is_empty() && other_operators { } else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe().wrap_err("failed to get current executable path")?,
);
let mut cmd = dora_command.to_tokio_command();
cmd.arg("runtime"); cmd.arg("runtime");
cmd cmd
} else { } else {


Loading…
Cancel
Save