From 4d454e3db94a39f5c508573389fd9676c8d9dd34 Mon Sep 17 00:00:00 2001 From: haixuantao Date: Sun, 16 Mar 2025 19:51:32 +0100 Subject: [PATCH] Adding run comand within python API --- Cargo.lock | 3 +++ apis/python/node/Cargo.toml | 3 +++ apis/python/node/src/lib.rs | 43 +++++++++++++++++++++++++++++++++++++ tests/llm/qwen2.5.yaml | 6 +++--- tests/llm/run_script.py | 6 ++++++ 5 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 tests/llm/run_script.py diff --git a/Cargo.lock b/Cargo.lock index e4159af3..eee14643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2874,6 +2874,8 @@ name = "dora-node-api-python" version = "0.3.10" dependencies = [ "arrow 54.2.1", + "dora-daemon", + "dora-download", "dora-node-api", "dora-operator-api-python", "dora-ros2-bridge-python", @@ -2884,6 +2886,7 @@ dependencies = [ "pyo3", "pythonize", "serde_yaml 0.8.26", + "tokio", ] [[package]] diff --git a/apis/python/node/Cargo.toml b/apis/python/node/Cargo.toml index f90f3cdb..025eb103 100644 --- a/apis/python/node/Cargo.toml +++ b/apis/python/node/Cargo.toml @@ -22,11 +22,14 @@ eyre = "0.6" serde_yaml = "0.8.23" flume = "0.10.14" dora-runtime = { workspace = true, features = ["tracing", "metrics", "python"] } +dora-daemon = { workspace = true } +dora-download = { workspace = true } arrow = { workspace = true, features = ["pyarrow"] } pythonize = { workspace = true } futures = "0.3.28" dora-ros2-bridge-python = { workspace = true } # pyo3_special_method_derive = "0.4.2" +tokio = { version = "1.24.2", features = ["rt"] } [lib] name = "dora" diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index 35b371c8..a54a17ed 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -1,10 +1,15 @@ #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] +use std::env::current_dir; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use arrow::pyarrow::{FromPyArrow, ToPyArrow}; +use dora_daemon::Daemon; +use dora_download::download_file; use dora_node_api::dora_core::config::NodeId; +use dora_node_api::dora_core::descriptor::source_is_url; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DataflowId, DoraNode, EventStream}; use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; @@ -310,11 +315,49 @@ pub fn start_runtime() -> eyre::Result<()> { dora_runtime::main().wrap_err("Dora Runtime raised an error.") } +pub fn resolve_dataflow(dataflow: String) -> eyre::Result { + let dataflow = if source_is_url(&dataflow) { + // try to download the shared library + let target_path = current_dir().context("Could not access the current dir")?; + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + rt.block_on(async { download_file(&dataflow, &target_path).await }) + .wrap_err("failed to download dataflow yaml file")? + } else { + PathBuf::from(dataflow) + }; + Ok(dataflow) +} + +/// Run a Dataflow +/// +/// :rtype: None +#[pyfunction] +#[pyo3(signature = (dataflow_path, uv=None))] +pub fn run(dataflow_path: String, uv: Option) -> eyre::Result<()> { + let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?; + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?; + match result.is_ok() { + true => Ok(()), + false => Err(eyre::eyre!( + "Dataflow failed to run with error: {:?}", + result.node_results + )), + } +} + #[pymodule] fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { dora_ros2_bridge_python::create_dora_ros2_bridge_module(&m)?; m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; + m.add_function(wrap_pyfunction!(run, &m)?)?; m.add_class::()?; m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; m.setattr("__author__", "Dora-rs Authors")?; diff --git a/tests/llm/qwen2.5.yaml b/tests/llm/qwen2.5.yaml index fc7ff8ec..fe8f5c18 100644 --- a/tests/llm/qwen2.5.yaml +++ b/tests/llm/qwen2.5.yaml @@ -8,8 +8,8 @@ nodes: DATA: "'Please only output: This is a test'" - id: dora-qwen2.5 - build: pip install -e ../../node-hub/dora-qwen2.5 - path: dora-qwen2-5 + build: pip install -e ../../node-hub/dora-qwen + path: dora-qwen inputs: text: pyarrow-sender/data outputs: @@ -19,6 +19,6 @@ nodes: build: pip install -e ../../node-hub/pyarrow-assert path: pyarrow-assert inputs: - data: dora-phi4/text + data: dora-qwen2.5/text env: DATA: "This is a test" diff --git a/tests/llm/run_script.py b/tests/llm/run_script.py new file mode 100644 index 00000000..5b53f710 --- /dev/null +++ b/tests/llm/run_script.py @@ -0,0 +1,6 @@ +from dora import run + +# Make sure to build it first with the CLI. +# Build step is on the todo list. + +run("qwen2.5.yaml", uv=True)