diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 31725c89..1b34cda1 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -9,7 +9,7 @@ use aligned_vec::{AVec, ConstAlign}; use arrow::array::Array; use dora_core::{ config::{DataId, NodeId, NodeRunConfig}, - daemon_messages::{DataMessage, DropToken, NodeConfig}, + daemon_messages::{DataMessage, DataflowId, DropToken, NodeConfig}, descriptor::Descriptor, message::{uhlc, ArrowTypeInfo, Metadata, MetadataParameters}, }; @@ -33,6 +33,7 @@ pub const ZERO_COPY_THRESHOLD: usize = 4096; pub struct DoraNode { id: NodeId, + dataflow_id: DataflowId, node_config: NodeRunConfig, control_channel: ControlChannel, clock: Arc, @@ -89,6 +90,7 @@ impl DoraNode { let node = Self { id: node_id, + dataflow_id: dataflow_id, node_config: run_config, control_channel, clock, @@ -243,6 +245,10 @@ impl DoraNode { &self.id } + pub fn dataflow_id(&self) -> &DataflowId { + &self.dataflow_id + } + pub fn node_config(&self) -> &NodeRunConfig { &self.node_config } diff --git a/libraries/extensions/dora-record/src/main.rs b/libraries/extensions/dora-record/src/main.rs index 2924fc4d..3d736af9 100644 --- a/libraries/extensions/dora-record/src/main.rs +++ b/libraries/extensions/dora-record/src/main.rs @@ -7,18 +7,18 @@ use dora_node_api::{ }, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, - ipc::writer::FileWriter, + ipc::writer::StreamWriter, record_batch::RecordBatch, }, DoraNode, Event, Metadata, }; use dora_tracing::telemetry::deserialize_to_hashmap; use eyre::{Context, ContextCompat}; -use std::{collections::HashMap, fs::File, sync::Arc}; +use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; fn main() -> eyre::Result<()> { - let (_node, mut events) = DoraNode::init_from_env()?; - + let (node, mut events) = DoraNode::init_from_env()?; + let dataflow_id = node.dataflow_id(); let mut writers = HashMap::new(); while let Some(event) = events.recv() { match event { @@ -47,23 +47,29 @@ fn main() -> eyre::Result<()> { field_utc_epoch, field_data, ])); - let file = std::fs::File::create(format!("{id}.arrow")).unwrap(); + let dataflow_dir = PathBuf::from("out").join(dataflow_id.to_string()); + if !dataflow_dir.exists() { + std::fs::create_dir_all(&dataflow_dir) + .context("could not create dataflow_dir")?; + } + let file = std::fs::File::create(dataflow_dir.join(format!("{id}.arrow"))) + .context("Couldn't create write file")?; - let writer = FileWriter::try_new(file, &schema).unwrap(); + let writer = StreamWriter::try_new(file, &schema).unwrap(); let mut writer = writer; - write_event(&mut writer, data.into(), &metadata) + write_event(&mut writer, data.into(), &metadata, schema.clone()) .context("could not write first record data")?; - writers.insert(id.clone(), writer); + writers.insert(id.clone(), (writer, schema)); } - Some(writer) => { - write_event(writer, data.into(), &metadata) + Some((writer, schema)) => { + write_event(writer, data.into(), &metadata, schema.clone()) .context("could not write record data")?; } }; } Event::InputClosed { id } => match writers.remove(&id) { None => {} - Some(mut writer) => writer.finish().context("Could not finish arrow file")?, + Some((mut writer, _)) => writer.finish().context("Could not finish arrow file")?, }, _ => {} } @@ -71,7 +77,7 @@ fn main() -> eyre::Result<()> { let result: eyre::Result> = writers .iter_mut() - .map(|(_, writer)| -> eyre::Result<()> { + .map(|(_, (writer, _))| -> eyre::Result<()> { writer .finish() .context("Could not finish writing arrow file")?; @@ -83,11 +89,12 @@ fn main() -> eyre::Result<()> { Ok(()) } -/// Write a row of data into the writer +/// Write a row of data into the writerr fn write_event( - writer: &mut FileWriter, + writer: &mut StreamWriter, data: Arc, metadata: &Metadata, + schema: Arc, ) -> eyre::Result<()> { let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, data.len() as i32])); let field = Arc::new(Field::new("item", data.data_type().clone(), true)); @@ -119,7 +126,7 @@ fn write_event( let span_id_array = make_array(span_id_array.into()); let record = RecordBatch::try_new( - writer.schema().clone(), + schema, vec![ trace_id_array, span_id_array,