From b20ef2391d4beea6d5ef40d0b7ade50ad2ff9163 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Sat, 12 Aug 2023 17:51:51 +0200 Subject: [PATCH] Optimise sending of small vector for python --- apis/rust/node/src/lib.rs | 2 +- binaries/runtime/src/operator/python.rs | 48 ++++++++++++++----------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index d23bae9f..b183a94d 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -17,7 +17,7 @@ pub use dora_core; pub use dora_core::message::{uhlc, Metadata, MetadataParameters}; pub use event_stream::{merged, Data, Event, EventStream, MappedInputData}; pub use flume::Receiver; -pub use node::{DataSample, DoraNode}; +pub use node::{DataSample, DoraNode, ZERO_COPY_THRESHOLD}; mod daemon_connection; mod event_stream; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index acfaf6c8..fcef9a9a 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -202,7 +202,6 @@ pub fn run( } = &mut event { use dora_tracing::telemetry::{deserialize_context, serialize_context}; - use std::borrow::Cow; use tracing_opentelemetry::OpenTelemetrySpanExt; span.record("input_id", input_id.as_str()); @@ -210,7 +209,7 @@ pub fn run( span.set_parent(cx); let cx = span.context(); let string_cx = serialize_context(&cx); - metadata.parameters.open_telemetry_context = Cow::Owned(string_cx); + metadata.parameters.open_telemetry_context = string_cx; } let py_event = PyEvent::from(event); @@ -270,7 +269,10 @@ mod callback_impl { use crate::operator::OperatorEvent; use super::SendOutputCallback; - use dora_operator_api_python::{process_python_output, pydict_to_metadata, python_output_len}; + use dora_node_api::ZERO_COPY_THRESHOLD; + use dora_operator_api_python::{ + process_python_output, process_python_type, pydict_to_metadata, python_output_len, + }; use eyre::{eyre, Context, Result}; use pyo3::{pymethods, types::PyDict, PyObject, Python}; use tokio::sync::oneshot; @@ -290,20 +292,24 @@ mod callback_impl { py: Python, ) -> Result<()> { let data_len = python_output_len(&data, py)?; - let mut sample = py.allow_threads(|| { - let (tx, rx) = oneshot::channel(); - self.events_tx - .blocking_send(OperatorEvent::AllocateOutputSample { - len: data_len, - sample: tx, - }) - .map_err(|_| eyre!("failed to send output to runtime"))?; - let sample = rx - .blocking_recv() - .wrap_err("failed to request output sample")? - .wrap_err("failed to allocate output sample")?; - Result::<_, eyre::Report>::Ok(sample) - })?; + let mut sample = { + if data_len > ZERO_COPY_THRESHOLD { + let (tx, rx) = oneshot::channel(); + self.events_tx + .blocking_send(OperatorEvent::AllocateOutputSample { + len: data_len, + sample: tx, + }) + .map_err(|_| eyre!("failed to send output to runtime"))?; + let sample = rx + .blocking_recv() + .wrap_err("failed to request output sample")? + .wrap_err("failed to allocate output sample")?; + sample + } else { + vec![0; data_len].into() + } + }; process_python_output(&data, py, |data| { sample.copy_from_slice(data); @@ -316,12 +322,12 @@ mod callback_impl { let data_type = process_python_type(&data, py)?; py.allow_threads(|| { - let event = OperatorEvent::Output { - output_id: output.to_owned().into(), + let event = OperatorEvent::Output { + output_id: output.to_owned().into(), data_type, parameters, - data: Some(sample), - }; + data: Some(sample), + }; self.events_tx .blocking_send(event) .map_err(|_| eyre!("failed to send output to runtime"))