Browse Source

Optimise sending of small vector for python

tags/v0.2.5-alpha
haixuanTao 2 years ago
parent
commit
b20ef2391d
2 changed files with 28 additions and 22 deletions
  1. +1
    -1
      apis/rust/node/src/lib.rs
  2. +27
    -21
      binaries/runtime/src/operator/python.rs

+ 1
- 1
apis/rust/node/src/lib.rs View File

@@ -17,7 +17,7 @@ pub use dora_core;
pub use dora_core::message::{uhlc, Metadata, MetadataParameters};
pub use event_stream::{merged, Data, Event, EventStream, MappedInputData};
pub use flume::Receiver;
pub use node::{DataSample, DoraNode};
pub use node::{DataSample, DoraNode, ZERO_COPY_THRESHOLD};

mod daemon_connection;
mod event_stream;


+ 27
- 21
binaries/runtime/src/operator/python.rs View File

@@ -202,7 +202,6 @@ pub fn run(
} = &mut event
{
use dora_tracing::telemetry::{deserialize_context, serialize_context};
use std::borrow::Cow;
use tracing_opentelemetry::OpenTelemetrySpanExt;
span.record("input_id", input_id.as_str());

@@ -210,7 +209,7 @@ pub fn run(
span.set_parent(cx);
let cx = span.context();
let string_cx = serialize_context(&cx);
metadata.parameters.open_telemetry_context = Cow::Owned(string_cx);
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event);
@@ -270,7 +269,10 @@ mod callback_impl {
use crate::operator::OperatorEvent;

use super::SendOutputCallback;
use dora_operator_api_python::{process_python_output, pydict_to_metadata, python_output_len};
use dora_node_api::ZERO_COPY_THRESHOLD;
use dora_operator_api_python::{
process_python_output, process_python_type, pydict_to_metadata, python_output_len,
};
use eyre::{eyre, Context, Result};
use pyo3::{pymethods, types::PyDict, PyObject, Python};
use tokio::sync::oneshot;
@@ -290,20 +292,24 @@ mod callback_impl {
py: Python,
) -> Result<()> {
let data_len = python_output_len(&data, py)?;
let mut sample = py.allow_threads(|| {
let (tx, rx) = oneshot::channel();
self.events_tx
.blocking_send(OperatorEvent::AllocateOutputSample {
len: data_len,
sample: tx,
})
.map_err(|_| eyre!("failed to send output to runtime"))?;
let sample = rx
.blocking_recv()
.wrap_err("failed to request output sample")?
.wrap_err("failed to allocate output sample")?;
Result::<_, eyre::Report>::Ok(sample)
})?;
let mut sample = {
if data_len > ZERO_COPY_THRESHOLD {
let (tx, rx) = oneshot::channel();
self.events_tx
.blocking_send(OperatorEvent::AllocateOutputSample {
len: data_len,
sample: tx,
})
.map_err(|_| eyre!("failed to send output to runtime"))?;
let sample = rx
.blocking_recv()
.wrap_err("failed to request output sample")?
.wrap_err("failed to allocate output sample")?;
sample
} else {
vec![0; data_len].into()
}
};

process_python_output(&data, py, |data| {
sample.copy_from_slice(data);
@@ -316,12 +322,12 @@ mod callback_impl {
let data_type = process_python_type(&data, py)?;

py.allow_threads(|| {
let event = OperatorEvent::Output {
output_id: output.to_owned().into(),
let event = OperatorEvent::Output {
output_id: output.to_owned().into(),
data_type,
parameters,
data: Some(sample),
};
data: Some(sample),
};
self.events_tx
.blocking_send(event)
.map_err(|_| eyre!("failed to send output to runtime"))


Loading…
Cancel
Save