Browse Source

Merge pull request #434 from dora-rs/dora-record-async

Use Async Parquet Writer for `dora-record`
tags/v0.3.3-rc1
Haixuan Xavier Tao GitHub 1 year ago
parent
commit
9cb8d3c204
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
3 changed files with 226 additions and 44 deletions
  1. +167
    -17
      Cargo.lock
  2. +2
    -0
      libraries/extensions/dora-record/Cargo.toml
  3. +57
    -27
      libraries/extensions/dora-record/src/main.rs

+ 167
- 17
Cargo.lock View File

@@ -60,6 +60,21 @@ dependencies = [
"serde",
]

[[package]]
name = "alloc-no-stdlib"
version = "2.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3"

[[package]]
name = "alloc-stdlib"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece"
dependencies = [
"alloc-no-stdlib",
]

[[package]]
name = "android-tzdata"
version = "0.1.1"
@@ -759,6 +774,27 @@ dependencies = [
"tracing",
]

[[package]]
name = "brotli"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "516074a47ef4bce09577a3b379392300159ce5b1ba2e501ff1c819950066100f"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
"brotli-decompressor",
]

[[package]]
name = "brotli-decompressor"
version = "2.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e2e4afe60d7dd600fdd3de8d0f08c2b7ec039712e3b6137ff98b7004e82de4f"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
]

[[package]]
name = "bstr"
version = "1.6.2"
@@ -1166,7 +1202,7 @@ dependencies = [
"bitflags 1.3.2",
"crossterm_winapi",
"libc",
"mio 0.8.8",
"mio 0.8.10",
"parking_lot",
"signal-hook",
"signal-hook-mio",
@@ -1682,6 +1718,8 @@ dependencies = [
"dora-node-api",
"dora-tracing",
"eyre",
"parquet",
"tokio",
]

[[package]]
@@ -3034,6 +3072,15 @@ dependencies = [
"value-bag",
]

[[package]]
name = "lz4_flex"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "912b45c753ff5f7f5208307e8ace7d2a2e30d024e26d3509f3dce546c044ce15"
dependencies = [
"twox-hash",
]

[[package]]
name = "macro_rules_attribute"
version = "0.1.3"
@@ -3164,9 +3211,9 @@ dependencies = [

[[package]]
name = "mio"
version = "0.8.8"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [
"libc",
"log",
@@ -3360,7 +3407,7 @@ dependencies = [
"inotify",
"kqueue",
"libc",
"mio 0.8.8",
"mio 0.8.10",
"walkdir",
"windows-sys 0.45.0",
]
@@ -3622,7 +3669,7 @@ dependencies = [
"opentelemetry 0.18.0",
"opentelemetry-semantic-conventions 0.10.0",
"thiserror",
"thrift",
"thrift 0.16.0",
"tokio",
]

@@ -3764,6 +3811,15 @@ dependencies = [
"num-traits",
]

[[package]]
name = "ordered-float"
version = "2.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68f19d67e5a2795c94e73e0bb1cc1a7edeb2e28efd39e2e1c9b7a40c1108b11c"
dependencies = [
"num-traits",
]

[[package]]
name = "ordered-float"
version = "3.9.1"
@@ -3823,6 +3879,39 @@ dependencies = [
"windows-targets 0.48.5",
]

[[package]]
name = "parquet"
version = "48.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "239229e6a668ab50c61de3dce61cf0fa1069345f7aa0f4c934491f92205a4945"
dependencies = [
"ahash",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-ipc",
"arrow-schema",
"arrow-select",
"base64 0.21.4",
"brotli",
"bytes",
"chrono",
"flate2",
"futures",
"hashbrown 0.14.3",
"lz4_flex",
"num",
"num-bigint",
"paste",
"seq-macro",
"snap",
"thrift 0.17.0",
"tokio",
"twox-hash",
"zstd",
]

[[package]]
name = "paste"
version = "1.0.14"
@@ -4706,7 +4795,7 @@ dependencies = [
"log",
"md5",
"mio 0.6.23",
"mio 0.8.8",
"mio 0.8.10",
"mio-extras",
"num-derive",
"num-traits",
@@ -4714,7 +4803,7 @@ dependencies = [
"rand",
"serde",
"serde_repr",
"socket2 0.5.4",
"socket2 0.5.6",
"socketpair",
"speedy",
"static_assertions",
@@ -4921,6 +5010,12 @@ version = "1.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad977052201c6de01a8ef2aa3378c4bd23217a056337d1d6da40468d267a4fb0"

[[package]]
name = "seq-macro"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"

[[package]]
name = "serde"
version = "1.0.195"
@@ -5130,7 +5225,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29ad2e15f37ec9a6cc544097b78a1ec90001e9f71b81338ca39f430adaca99af"
dependencies = [
"libc",
"mio 0.8.8",
"mio 0.8.10",
"signal-hook",
]

@@ -5168,6 +5263,12 @@ version = "1.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "942b4a808e05215192e39f4ab80813e599068285906cc91aa64f923db842bd5a"

[[package]]
name = "snap"
version = "1.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b"

[[package]]
name = "socket2"
version = "0.4.9"
@@ -5180,12 +5281,12 @@ dependencies = [

[[package]]
name = "socket2"
version = "0.5.4"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e"
checksum = "05ffd9c0a93b7543e062e759284fcf5f5e3b098501104bfbdde4d404db792871"
dependencies = [
"libc",
"windows-sys 0.48.0",
"windows-sys 0.52.0",
]

[[package]]
@@ -5473,6 +5574,17 @@ dependencies = [
"threadpool",
]

[[package]]
name = "thrift"
version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09"
dependencies = [
"byteorder",
"integer-encoding",
"ordered-float 2.10.1",
]

[[package]]
name = "time"
version = "0.3.29"
@@ -5527,19 +5639,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"

[[package]]
name = "tokio"
version = "1.32.0"
version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9"
checksum = "61285f6515fa018fb2d1e46eb21223fff441ee8db5d0f1435e8ab4f5cdb80931"
dependencies = [
"backtrace",
"bytes",
"libc",
"mio 0.8.8",
"mio 0.8.10",
"num_cpus",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2 0.5.4",
"socket2 0.5.6",
"tokio-macros",
"windows-sys 0.48.0",
]
@@ -5556,9 +5668,9 @@ dependencies = [

[[package]]
name = "tokio-macros"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
@@ -5741,6 +5853,16 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed"

[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if 1.0.0",
"static_assertions",
]

[[package]]
name = "typenum"
version = "1.17.0"
@@ -6930,3 +7052,31 @@ name = "zeroize"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"

[[package]]
name = "zstd"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110"
dependencies = [
"zstd-safe",
]

[[package]]
name = "zstd-safe"
version = "7.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e"
dependencies = [
"zstd-sys",
]

[[package]]
name = "zstd-sys"
version = "2.0.9+zstd.1.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656"
dependencies = [
"cc",
"pkg-config",
]

+ 2
- 0
libraries/extensions/dora-record/Cargo.toml View File

@@ -9,7 +9,9 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.36.0", features = ["fs", "rt", "rt-multi-thread"] }
dora-node-api = { workspace = true, features = ["tracing"] }
eyre = "0.6.8"
chrono = "0.4.31"
dora-tracing = { workspace = true }
parquet = { version = "48.0.0", features = ["async"] }

+ 57
- 27
libraries/extensions/dora-record/src/main.rs View File

@@ -7,23 +7,26 @@ use dora_node_api::{
},
buffer::{OffsetBuffer, ScalarBuffer},
datatypes::{DataType, Field, Schema},
ipc::writer::StreamWriter,
record_batch::RecordBatch,
},
DoraNode, Event, Metadata,
};
use dora_tracing::telemetry::deserialize_to_hashmap;
use eyre::{Context, ContextCompat};
use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc};
use parquet::{arrow::AsyncArrowWriter, basic::BrotliLevel, file::properties::WriterProperties};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use tokio::sync::mpsc;

fn main() -> eyre::Result<()> {
#[tokio::main]
async fn main() -> eyre::Result<()> {
let (node, mut events) = DoraNode::init_from_env()?;
let dataflow_id = node.dataflow_id();
let mut writers = HashMap::new();

while let Some(event) = events.recv() {
match event {
Event::Input { id, data, metadata } => {
match writers.get_mut(&id) {
match writers.get(&id) {
None => {
let field_uhlc = Field::new("timestamp_uhlc", DataType::UInt64, false);
let field_utc_epoch = Field::new(
@@ -52,46 +55,72 @@ fn main() -> eyre::Result<()> {
std::fs::create_dir_all(&dataflow_dir)
.context("could not create dataflow_dir")?;
}
let file = std::fs::File::create(dataflow_dir.join(format!("{id}.arrow")))
.context("Couldn't create write file")?;
let file =
tokio::fs::File::create(dataflow_dir.join(format!("{id}.parquet")))
.await
.context("Couldn't create write file")?;
let mut writer = AsyncArrowWriter::try_new(
file,
schema.clone(),
0,
Some(
WriterProperties::builder()
.set_compression(parquet::basic::Compression::BROTLI(
BrotliLevel::default(),
))
.build(),
),
)
.context("Could not create parquet writer")?;
let (tx, mut rx) = mpsc::channel(10);

let writer = StreamWriter::try_new(file, &schema).unwrap();
let mut writer = writer;
write_event(&mut writer, data.into(), &metadata, schema.clone())
.context("could not write first record data")?;
writers.insert(id.clone(), (writer, schema));
// Per Input thread
let join_handle = tokio::spawn(async move {
while let Some((data, metadata)) = rx.recv().await {
if let Err(e) =
write_event(&mut writer, data, &metadata, schema.clone()).await
{
println!("Error writing event data into parquet file: {:?}", e)
};
}
writer.close().await
});
tx.send((data.into(), metadata))
.await
.context("Could not send event data into writer loop")?;
writers.insert(id, (tx, join_handle));
}
Some((writer, schema)) => {
write_event(writer, data.into(), &metadata, schema.clone())
.context("could not write record data")?;
Some((tx, _)) => {
tx.send((data.into(), metadata))
.await
.context("Could not send event data into writer loop")?;
}
};
}
Event::InputClosed { id } => match writers.remove(&id) {
None => {}
Some((mut writer, _)) => writer.finish().context("Could not finish arrow file")?,
Some(tx) => drop(tx),
},
_ => {}
}
}

let result: eyre::Result<Vec<_>> = writers
.iter_mut()
.map(|(_, (writer, _))| -> eyre::Result<()> {
writer
.finish()
.context("Could not finish writing arrow file")?;
Ok(())
})
.collect();
result.context("At least one of the input recorder file writer failed to finish")?;
for (id, (tx, join_handle)) in writers {
drop(tx);
join_handle
.await
.context("Writer thread failed")?
.context(format!(
"Could not close the Parquet writer for {id} parquet writer"
))?;
}

Ok(())
}

/// Write a row of data into the writer
fn write_event(
writer: &mut StreamWriter<File>,
async fn write_event(
writer: &mut AsyncArrowWriter<tokio::fs::File>,
data: Arc<dyn Array>,
metadata: &Metadata,
schema: Arc<Schema>,
@@ -138,6 +167,7 @@ fn write_event(
.context("Could not create record batch with the given data")?;
writer
.write(&record)
.await
.context("Could not write recordbatch to file")?;

Ok(())


Loading…
Cancel
Save