|
|
|
@@ -3,7 +3,7 @@ |
|
|
|
use super::{OperatorEvent, StopReason}; |
|
|
|
use dora_core::{ |
|
|
|
config::{NodeId, OperatorId}, |
|
|
|
descriptor::{source_is_url, Descriptor}, |
|
|
|
descriptor::{source_is_url, Descriptor, PythonSource}, |
|
|
|
}; |
|
|
|
use dora_download::download_file; |
|
|
|
use dora_node_api::Event; |
|
|
|
@@ -35,13 +35,13 @@ fn traceback(err: pyo3::PyErr) -> eyre::Report { |
|
|
|
pub fn run( |
|
|
|
node_id: &NodeId, |
|
|
|
operator_id: &OperatorId, |
|
|
|
source: &str, |
|
|
|
python_source: &PythonSource, |
|
|
|
events_tx: Sender<OperatorEvent>, |
|
|
|
incoming_events: flume::Receiver<Event>, |
|
|
|
init_done: oneshot::Sender<Result<()>>, |
|
|
|
dataflow_descriptor: &Descriptor, |
|
|
|
) -> eyre::Result<()> { |
|
|
|
let path = if source_is_url(source) { |
|
|
|
let path = if source_is_url(&python_source.source) { |
|
|
|
let target_path = Path::new("build") |
|
|
|
.join(node_id.to_string()) |
|
|
|
.join(format!("{}.py", operator_id)); |
|
|
|
@@ -49,11 +49,11 @@ pub fn run( |
|
|
|
let rt = tokio::runtime::Builder::new_current_thread() |
|
|
|
.enable_all() |
|
|
|
.build()?; |
|
|
|
rt.block_on(download_file(source, &target_path)) |
|
|
|
rt.block_on(download_file(&python_source.source, &target_path)) |
|
|
|
.wrap_err("failed to download Python operator")?; |
|
|
|
target_path |
|
|
|
} else { |
|
|
|
Path::new(source).to_owned() |
|
|
|
Path::new(&python_source.source).to_owned() |
|
|
|
}; |
|
|
|
|
|
|
|
if !path.exists() { |
|
|
|
|