Browse Source

Sending data recordings into an `out/<DATAFLOW_ID>/input,arrow` folder to make it easier to manage multiple data coming from multiple runs of a dataflow

tags/v0.3.3-rc1
haixuanTao 1 year ago
parent
commit
f4fbd6562f
2 changed files with 29 additions and 16 deletions
  1. +7
    -1
      apis/rust/node/src/node/mod.rs
  2. +22
    -15
      libraries/extensions/dora-record/src/main.rs

+ 7
- 1
apis/rust/node/src/node/mod.rs View File

@@ -9,7 +9,7 @@ use aligned_vec::{AVec, ConstAlign};
use arrow::array::Array;
use dora_core::{
config::{DataId, NodeId, NodeRunConfig},
daemon_messages::{DataMessage, DropToken, NodeConfig},
daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig},
descriptor::Descriptor,
message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters},
};
@@ -33,6 +33,7 @@ pub const ZERO_COPY_THRESHOLD: usize = 4096;

pub struct DoraNode {
id: NodeId,
dataflow_id: DataflowId,
node_config: NodeRunConfig,
control_channel: ControlChannel,
clock: Arc<uhlc::HLC>,
@@ -89,6 +90,7 @@ impl DoraNode {

let node = Self {
id: node_id,
dataflow_id: dataflow_id,
node_config: run_config,
control_channel,
clock,
@@ -243,6 +245,10 @@ impl DoraNode {
&self.id
}

pub fn dataflow_id(&self) -> &DataflowId {
&self.dataflow_id
}

pub fn node_config(&self) -> &NodeRunConfig {
&self.node_config
}


+ 22
- 15
libraries/extensions/dora-record/src/main.rs View File

@@ -7,18 +7,18 @@ use dora_node_api::{
},
buffer::{OffsetBuffer, ScalarBuffer},
datatypes::{DataType, Field, Schema},
ipc::writer::FileWriter,
ipc::writer::StreamWriter,
record_batch::RecordBatch,
},
DoraNode, Event, Metadata,
};
use dora_tracing::telemetry::deserialize_to_hashmap;
use eyre::{Context, ContextCompat};
use std::{collections::HashMap, fs::File, sync::Arc};
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};

fn main() -> eyre::Result<()> {
let (_node, mut events) = DoraNode::init_from_env()?;
let (node, mut events) = DoraNode::init_from_env()?;
let dataflow_id = node.dataflow_id();
let mut writers = HashMap::new();
while let Some(event) = events.recv() {
match event {
@@ -47,23 +47,29 @@ fn main() -> eyre::Result<()> {
field_utc_epoch,
field_data,
]));
let file = std::fs::File::create(format!("{id}.arrow")).unwrap();
let dataflow_dir = PathBuf::from("out").join(dataflow_id.to_string());
if !dataflow_dir.exists() {
std::fs::create_dir_all(&dataflow_dir)
.context("could not create dataflow_dir")?;
}
let file = std::fs::File::create(dataflow_dir.join(format!("{id}.arrow")))
.context("Couldn't create write file")?;

let writer = FileWriter::try_new(file, &schema).unwrap();
let writer = StreamWriter::try_new(file, &schema).unwrap();
let mut writer = writer;
write_event(&mut writer, data.into(), &metadata)
write_event(&mut writer, data.into(), &metadata, schema.clone())
.context("could not write first record data")?;
writers.insert(id.clone(), writer);
writers.insert(id.clone(), (writer, schema));
}
Some(writer) => {
write_event(writer, data.into(), &metadata)
Some((writer, schema)) => {
write_event(writer, data.into(), &metadata, schema.clone())
.context("could not write record data")?;
}
};
}
Event::InputClosed { id } => match writers.remove(&id) {
None => {}
Some(mut writer) => writer.finish().context("Could not finish arrow file")?,
Some((mut writer, _)) => writer.finish().context("Could not finish arrow file")?,
},
_ => {}
}
@@ -71,7 +77,7 @@ fn main() -> eyre::Result<()> {

let result: eyre::Result<Vec<_>> = writers
.iter_mut()
.map(|(_, writer)| -> eyre::Result<()> {
.map(|(_, (writer, _))| -> eyre::Result<()> {
writer
.finish()
.context("Could not finish writing arrow file")?;
@@ -83,11 +89,12 @@ fn main() -> eyre::Result<()> {
Ok(())
}

/// Write a row of data into the writer
/// Write a row of data into the writerr
fn write_event(
writer: &mut FileWriter<File>,
writer: &mut StreamWriter<File>,
data: Arc<dyn Array>,
metadata: &Metadata,
schema: Arc<Schema>,
) -> eyre::Result<()> {
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32]));
let field = Arc::new(Field::new("item", data.data_type().clone(), true));
@@ -119,7 +126,7 @@ fn write_event(
let span_id_array = make_array(span_id_array.into());

let record = RecordBatch::try_new(
writer.schema().clone(),
schema,
vec![
trace_id_array,
span_id_array,


Loading…
Cancel
Save