Compare commits

...

Author SHA1 Message Date
  Philipp Oppermann ac23cb4f08
Fix `dora runtime` spawn command for dora installed through `pip` 9 months ago
5 changed files with 78 additions and 17 deletions
Split View
  1. +7
    -2
      apis/python/node/src/lib.rs
  2. +9
    -9
      binaries/cli/src/lib.rs
  3. +6
    -1
      binaries/cli/src/main.rs
  4. +53
    -1
      binaries/daemon/src/lib.rs
  5. +3
    -4
      binaries/daemon/src/spawn.rs

+ 7
- 2
apis/python/node/src/lib.rs View File

@@ -6,7 +6,7 @@ use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_daemon::Daemon;
use dora_daemon::{Daemon, DoraCommand};
use dora_download::download_file;
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::descriptor::source_is_url;
@@ -387,7 +387,12 @@ pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> {
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?;
let dora_command = DoraCommand::from_python_env()?;
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
uv.unwrap_or_default(),
dora_command,
))?;
match result.is_ok() {
true => Ok(()),
false => Err(eyre::eyre!(


+ 9
- 9
binaries/cli/src/lib.rs View File

@@ -9,7 +9,7 @@ use dora_core::{
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
use dora_daemon::{Daemon, DoraCommand};
use dora_download::download_file;
use dora_message::{
cli_to_coordinator::ControlRequest,
@@ -285,15 +285,15 @@ enum Lang {
Cxx,
}

pub fn lib_main(args: Args) {
if let Err(err) = run(args) {
pub fn lib_main(args: Args, dora_command: DoraCommand) {
if let Err(err) = run(args, dora_command) {
eprintln!("\n\n{}", "[ERROR]".bold().red());
eprintln!("{err:?}");
std::process::exit(1);
}
}

fn run(args: Args) -> eyre::Result<()> {
fn run(args: Args, dora_command: DoraCommand) -> eyre::Result<()> {
#[cfg(feature = "tracing")]
match &args.command {
Command::Daemon {
@@ -379,7 +379,7 @@ fn run(args: Args) -> eyre::Result<()> {
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv, dora_command))?;
handle_dataflow_result(result, None)?
}
Command::Up { config } => {
@@ -541,11 +541,11 @@ fn run(args: Args) -> eyre::Result<()> {
);
}

let result = Daemon::run_dataflow(&dataflow_path, false).await?;
let result = Daemon::run_dataflow(&dataflow_path, false, dora_command).await?;
handle_dataflow_result(result, None)
}
None => {
Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port).await
Daemon::run(SocketAddr::new(coordinator_addr, coordinator_port), machine_id, local_listen_port, dora_command).await
}
}
})
@@ -799,9 +799,9 @@ fn py_main(_py: Python) -> PyResult<()> {
pyo3::prepare_freethreaded_python();
// Skip first argument as it is a python call.
let args = std::env::args_os().skip(1).collect::<Vec<_>>();
let dora_command = DoraCommand::from_python_env()?;
match Args::try_parse_from(args) {
Ok(args) => lib_main(args),
Ok(args) => lib_main(args, dora_command),
Err(err) => {
eprintln!("{err}");
}


+ 6
- 1
binaries/cli/src/main.rs View File

@@ -1,7 +1,12 @@
use clap::Parser;
use dora_cli::Args;
use dora_daemon::DoraCommand;

fn main() {
let args = Args::parse();
dora_cli::lib_main(args);
let dora_command = DoraCommand {
executable: std::env::current_exe().expect("failed to get current executable"),
args: Vec::new(),
};
dora_cli::lib_main(args, dora_command);
}

+ 53
- 1
binaries/daemon/src/lib.rs View File

@@ -97,6 +97,8 @@ pub struct Daemon {
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,

logger: DaemonLogger,

dora_command: DoraCommand,
}

type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
@@ -106,6 +108,7 @@ impl Daemon {
coordinator_addr: SocketAddr,
machine_id: Option<String>,
local_listen_port: u16,
dora_command: DoraCommand,
) -> eyre::Result<()> {
let clock = Arc::new(HLC::default());

@@ -137,12 +140,17 @@ impl Daemon {
None,
clock,
Some(remote_daemon_events_tx),
dora_command,
)
.await
.map(|_| ())
}

pub async fn run_dataflow(dataflow_path: &Path, uv: bool) -> eyre::Result<DataflowResult> {
pub async fn run_dataflow(
dataflow_path: &Path,
uv: bool,
dora_command: DoraCommand,
) -> eyre::Result<DataflowResult> {
let working_dir = dataflow_path
.canonicalize()
.context("failed to canonicalize dataflow path")?
@@ -192,6 +200,7 @@ impl Daemon {
Some(exit_when_done),
clock.clone(),
None,
dora_command,
);

let spawn_result = reply_rx
@@ -223,6 +232,7 @@ impl Daemon {
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
dora_command: DoraCommand,
) -> eyre::Result<DaemonRunResult> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
@@ -286,6 +296,7 @@ impl Daemon {
clock,
zenoh_session,
remote_daemon_events_tx,
dora_command,
};

let dora_events = ReceiverStream::new(dora_events_rx);
@@ -762,6 +773,7 @@ impl Daemon {
node_stderr_most_recent,
uv,
&mut logger,
&self.dora_command,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
@@ -2180,3 +2192,43 @@ impl CoreNodeKindExt for CoreNodeKind {
}
}
}

/// Enables dora to spawn itself as a subprocess.
#[derive(Clone, Debug)]
pub struct DoraCommand {
/// Path to the `dora` (or `python`) executable.
///
/// When dora is installed as a standalone binary, this points to the
/// `dora` executable directly. When installed through `pip`, this will
/// point to the Python executable.
pub executable: PathBuf,
/// Additional arguments required to invoke dora.
///
/// This will be empty when dora is installed as standalone binary. For
/// pip installations, this will point to the dora python executable.
pub args: Vec<String>,
}

impl DoraCommand {
pub fn from_python_env() -> eyre::Result<Self> {
let mut args = std::env::args();
// first arg is executable name
// (we use first arg instead of std::env::current_exe() because the
// latter follows symlinks on some platforms, which affects the python
// import folders so that the dora_cli is not found)
let executable = args.next().context("no executable name arg")?.into();
// second argument should be the python executable wrapping dora
let python_file = args.next().context("no python file argument")?;

Ok(DoraCommand {
executable,
args: [python_file].to_vec(),
})
}

fn to_tokio_command(&self) -> tokio::process::Command {
let mut cmd = tokio::process::Command::new(&self.executable);
cmd.args(&self.args);
cmd
}
}

+ 3
- 4
binaries/daemon/src/spawn.rs View File

@@ -1,7 +1,7 @@
use crate::{
log::{self, NodeLogger},
node_communication::spawn_listener_loop,
node_inputs, CoreNodeKindExt, DoraEvent, Event, OutputId, RunningNode,
node_inputs, CoreNodeKindExt, DoraCommand, DoraEvent, Event, OutputId, RunningNode,
};
use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
@@ -51,6 +51,7 @@ pub async fn spawn_node(
node_stderr_most_recent: Arc<ArrayQueue<String>>,
uv: bool,
logger: &mut NodeLogger<'_>,
dora_command: &DoraCommand,
) -> eyre::Result<RunningNode> {
let node_id = node.id.clone();
logger
@@ -301,9 +302,7 @@ pub async fn spawn_node(
cmd
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe().wrap_err("failed to get current executable path")?,
);
let mut cmd = dora_command.to_tokio_command();
cmd.arg("runtime");
cmd
} else {


Loading…
Cancel
Save