Browse Source

Adding customizing `conda_env` within Python operator

This commit makes it possible to specify the conda env that we want to use in
a specific operator.

```yaml
  - id: robot
    operator:
      python:
        source: ../operators/robot.py
        conda_env: robomaster
```

This will call:
```bash
conda run -n robomaster python -c "import dora; dora.start_runtime()"
```
tags/v0.3.3
haixuanTao 1 year ago
parent
commit
15af539729
6 changed files with 105 additions and 19 deletions
  1. +1
    -0
      Cargo.lock
  2. +5
    -3
      binaries/cli/src/attach.rs
  3. +1
    -0
      binaries/daemon/Cargo.toml
  4. +50
    -13
      binaries/daemon/src/spawn.rs
  5. +45
    -1
      libraries/core/src/descriptor/mod.rs
  6. +3
    -2
      libraries/core/src/descriptor/validate.rs

+ 1
- 0
Cargo.lock View File

@@ -1557,6 +1557,7 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"uuid",
"which",
]

[[package]]


+ 5
- 3
binaries/cli/src/attach.rs View File

@@ -38,10 +38,12 @@ pub fn attach_dataflow(
CoreNodeKind::Custom(_cn) => (),
CoreNodeKind::Runtime(rn) => {
for op in rn.operators.iter() {
if let dora_core::descriptor::OperatorSource::Python(source) = &op.config.source
if let dora_core::descriptor::OperatorSource::Python(python_source) =
&op.config.source
{
let path = resolve_path(source, &working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
let path = resolve_path(&python_source.source, &working_dir)
.wrap_err_with(|| {
format!("failed to resolve node source `{}`", python_source.source)
})?;
node_path_lookup
.insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone())));


+ 1
- 0
binaries/daemon/Cargo.toml View File

@@ -37,3 +37,4 @@ bincode = "1.3.3"
async-trait = "0.1.64"
aligned-vec = "0.5.0"
ctrlc = "3.2.5"
which = "5.0.0"

+ 50
- 13
binaries/daemon/src/spawn.rs View File

@@ -8,7 +8,8 @@ use dora_core::{
config::{DataId, NodeRunConfig},
daemon_messages::{DataMessage, DataflowId, NodeConfig, RuntimeConfig, Timestamped},
descriptor::{
resolve_path, source_is_url, Descriptor, OperatorSource, ResolvedNode, SHELL_SOURCE,
resolve_path, source_is_url, Descriptor, OperatorDefinition, OperatorSource, PythonSource,
ResolvedNode, SHELL_SOURCE,
},
get_python_path,
message::uhlc::HLC,
@@ -149,26 +150,62 @@ pub async fn spawn_node(
})?
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
let has_python_operator = n
let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
.any(|x| matches!(x.config.source, OperatorSource::Python { .. }));
.filter(|x| matches!(x.config.source, OperatorSource::Python { .. }))
.collect();

let has_other_operator = n
let other_operators = n
.operators
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if has_python_operator && !has_other_operator {
let mut command = if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator
let python = get_python_path().context("Could not find python in daemon")?;
let mut command = tokio::process::Command::new(python);
command.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
} else if !has_python_operator && has_other_operator {

// TODO: Handle multi-operator runtime once sub-interpreter is supported
if python_operators.len() > 2 {
eyre::bail!(
"Runtime currently only support one Python Operator.
This is because pyo4 sub-interpreter is not yet available.
See: https://github.com/PyO4/pyo3/issues/576"
);
}

let python_operator = python_operators
.first()
.context("Runtime had no operators definition.")?;

if let OperatorSource::Python(PythonSource {
source: _,
conda_env: Some(conda_env),
}) = &python_operator.config.source
{
let conda = which::which("conda").context(
"failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.",
)?;
let mut command = tokio::process::Command::new(conda);
command.args([
"run",
"-n",
&conda_env,
"python",
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
} else {
let python = get_python_path()
.context("Could not find python path when spawning runtime node")?;
let mut command = tokio::process::Command::new(python);
command.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
command
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe().wrap_err("failed to get current executable path")?,
);


+ 45
- 1
libraries/core/src/descriptor/mod.rs View File

@@ -260,9 +260,53 @@ pub struct OperatorConfig {
#[serde(rename_all = "kebab-case")]
pub enum OperatorSource {
SharedLibrary(String),
Python(String),
Python(PythonSource),
Wasm(String),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(
deny_unknown_fields,
from = "PythonSourceDef",
into = "PythonSourceDef"
)]
pub struct PythonSource {
pub source: String,
pub conda_env: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum PythonSourceDef {
SourceOnly(String),
WithOptions {
source: String,
conda_env: Option<String>,
},
}

impl From<PythonSource> for PythonSourceDef {
fn from(input: PythonSource) -> Self {
match input {
PythonSource {
source,
conda_env: None,
} => Self::SourceOnly(source),
PythonSource { source, conda_env } => Self::WithOptions { source, conda_env },
}
}
}

impl From<PythonSourceDef> for PythonSource {
fn from(value: PythonSourceDef) -> Self {
match value {
PythonSourceDef::SourceOnly(source) => Self {
source,
conda_env: None,
},
PythonSourceDef::WithOptions { source, conda_env } => Self { source, conda_env },
}
}
}

pub fn source_is_url(source: &str) -> bool {
source.contains("://")


+ 3
- 2
libraries/core/src/descriptor/validate.rs View File

@@ -50,9 +50,10 @@ pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result
}
}
}
OperatorSource::Python(path) => {
OperatorSource::Python(python_source) => {
has_python_operator = true;
if source_is_url(path) {
let path = &python_source.source;
if source_is_url(&path) {
info!("{path} is a URL."); // TODO: Implement url check.
} else if !working_dir.join(path).exists() {
bail!("no Python library at `{path}`");


Loading…
Cancel
Save