|
|
|
@@ -1,10 +1,15 @@ |
|
|
|
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods] |
|
|
|
|
|
|
|
use std::env::current_dir; |
|
|
|
use std::path::PathBuf; |
|
|
|
use std::sync::Arc; |
|
|
|
use std::time::Duration; |
|
|
|
|
|
|
|
use arrow::pyarrow::{FromPyArrow, ToPyArrow}; |
|
|
|
use dora_daemon::Daemon; |
|
|
|
use dora_download::download_file; |
|
|
|
use dora_node_api::dora_core::config::NodeId; |
|
|
|
use dora_node_api::dora_core::descriptor::source_is_url; |
|
|
|
use dora_node_api::merged::{MergeExternalSend, MergedEvent}; |
|
|
|
use dora_node_api::{DataflowId, DoraNode, EventStream}; |
|
|
|
use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; |
|
|
|
@@ -310,11 +315,49 @@ pub fn start_runtime() -> eyre::Result<()> { |
|
|
|
dora_runtime::main().wrap_err("Dora Runtime raised an error.") |
|
|
|
} |
|
|
|
|
|
|
|
pub fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> { |
|
|
|
let dataflow = if source_is_url(&dataflow) { |
|
|
|
// try to download the shared library |
|
|
|
let target_path = current_dir().context("Could not access the current dir")?; |
|
|
|
let rt = tokio::runtime::Builder::new_current_thread() |
|
|
|
.enable_all() |
|
|
|
.build() |
|
|
|
.context("tokio runtime failed")?; |
|
|
|
rt.block_on(async { download_file(&dataflow, &target_path).await }) |
|
|
|
.wrap_err("failed to download dataflow yaml file")? |
|
|
|
} else { |
|
|
|
PathBuf::from(dataflow) |
|
|
|
}; |
|
|
|
Ok(dataflow) |
|
|
|
} |
|
|
|
|
|
|
|
/// Run a Dataflow |
|
|
|
/// |
|
|
|
/// :rtype: None |
|
|
|
#[pyfunction] |
|
|
|
#[pyo3(signature = (dataflow_path, uv=None))] |
|
|
|
pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> { |
|
|
|
let dataflow_path = resolve_dataflow(dataflow_path).context("could not resolve dataflow")?; |
|
|
|
let rt = tokio::runtime::Builder::new_multi_thread() |
|
|
|
.enable_all() |
|
|
|
.build() |
|
|
|
.context("tokio runtime failed")?; |
|
|
|
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv.unwrap_or_default()))?; |
|
|
|
match result.is_ok() { |
|
|
|
true => Ok(()), |
|
|
|
false => Err(eyre::eyre!( |
|
|
|
"Dataflow failed to run with error: {:?}", |
|
|
|
result.node_results |
|
|
|
)), |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
#[pymodule] |
|
|
|
fn dora(_py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { |
|
|
|
dora_ros2_bridge_python::create_dora_ros2_bridge_module(&m)?; |
|
|
|
|
|
|
|
m.add_function(wrap_pyfunction!(start_runtime, &m)?)?; |
|
|
|
m.add_function(wrap_pyfunction!(run, &m)?)?; |
|
|
|
m.add_class::<Node>()?; |
|
|
|
m.setattr("__version__", env!("CARGO_PKG_VERSION"))?; |
|
|
|
m.setattr("__author__", "Dora-rs Authors")?; |
|
|
|
|