From 26c40c5aacde2ea6401e0c5ef2121d4e431a77f0 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Thu, 27 Mar 2025 12:01:54 +0100 Subject: [PATCH] av1 --- node-hub/dora-av1-encoder/Cargo.toml | 1 + node-hub/dora-av1-encoder/src/main.rs | 128 ++++++++++++-------------- node-hub/dora-dav1d/src/main.rs | 108 ++++++---------------- 3 files changed, 89 insertions(+), 148 deletions(-) diff --git a/node-hub/dora-av1-encoder/Cargo.toml b/node-hub/dora-av1-encoder/Cargo.toml index b0ee3d8a..188ec001 100644 --- a/node-hub/dora-av1-encoder/Cargo.toml +++ b/node-hub/dora-av1-encoder/Cargo.toml @@ -13,3 +13,4 @@ repository.workspace = true rav1e = { version = "0.7.1", features = ["serialize"] } dora-node-api = { workspace = true, features = ["tracing"] } eyre = "0.6.8" +log = "0.4" diff --git a/node-hub/dora-av1-encoder/src/main.rs b/node-hub/dora-av1-encoder/src/main.rs index dbad450e..b53a8684 100644 --- a/node-hub/dora-av1-encoder/src/main.rs +++ b/node-hub/dora-av1-encoder/src/main.rs @@ -13,6 +13,7 @@ use dora_node_api::arrow::array::UInt8Array; use dora_node_api::dora_core::config::DataId; use dora_node_api::{DoraNode, Event, IntoArrow, MetadataParameters, Parameter}; use eyre::{Context as EyreContext, Result}; +use log::warn; // Encode the same tiny blank frame 30 times use rav1e::config::RateControlConfig; use rav1e::config::SpeedSettings; @@ -50,7 +51,7 @@ fn bgr_to_yuv(bgr_data: Vec, width: usize, height: usize) -> (Vec, Vec, width: usize, height: usize) -> (Vec, Vec, Vec) { +fn get_yuv_planes(buffer: &[u8], width: usize, height: usize) -> (&[u8], &[u8], &[u8]) { // Calculate sizes of Y, U, and V planes for YUV420 format let y_size = width * height; // Y has full resolution let uv_width = width / 2; // U and V are subsampled by 2 in both dimensions @@ -63,9 +64,9 @@ fn get_yuv_planes(buffer: Vec, width: usize, height: usize) -> (Vec, Vec // } // Extract Y, U, and V planes - let y_plane = buffer[0..y_size].to_vec(); - let u_plane = buffer[y_size..y_size + uv_size].to_vec(); - let v_plane = buffer[y_size + uv_size..].to_vec(); + let y_plane = &buffer[0..y_size]; //.to_vec(); + let u_plane = &buffer[y_size..y_size + uv_size]; //.to_vec(); + let v_plane = &buffer[y_size + uv_size..]; //.to_vec(); (y_plane, u_plane, v_plane) } @@ -82,7 +83,7 @@ fn main() -> Result<()> { let enc = EncoderConfig { width, height, - speed_settings: SpeedSettings::from_preset(8), + speed_settings: SpeedSettings::from_preset(10), low_latency: true, ..Default::default() }; @@ -91,6 +92,7 @@ fn main() -> Result<()> { // .with_rate_control(RateControlConfig::new().with_emit_data(true)) .with_encoder_config(enc.clone()) .with_threads(16); + cfg.validate()?; let mut ctx: Context = cfg.new_context().unwrap(); @@ -125,8 +127,58 @@ fn main() -> Result<()> { //un } else if encoding == "yuv420" { let buffer: &UInt8Array = data.as_any().downcast_ref().unwrap(); - let buffer: Vec = buffer.values().to_vec(); - buffer + let buffer = buffer.values(); //.to_vec(); + + let (y, u, v) = get_yuv_planes(buffer, width, height); + let mut f = ctx.new_frame(); + + let xdec = f.planes[0].cfg.xdec; + let stride = (enc.width + xdec) >> xdec; + f.planes[0].copy_from_raw_u8(&y, stride, 1); + let xdec = f.planes[1].cfg.xdec; + let stride = (enc.width + xdec) >> xdec; + f.planes[1].copy_from_raw_u8(&u, stride, 1); + let xdec = f.planes[2].cfg.xdec; + let stride = (enc.width + xdec) >> xdec; + f.planes[2].copy_from_raw_u8(&v, stride, 1); + + match ctx.send_frame(f) { + Ok(_) => {} + Err(e) => match e { + EncoderStatus::EnoughData => { + warn!("Unable to send frame "); + } + _ => { + warn!("Unable to send frame "); + } + }, + } + match ctx.receive_packet() { + Ok(pkt) => { + println!("Time to encode: {:?}", time.elapsed()); + time = std::time::Instant::now(); + let data = pkt.data; + println!("frame compression: {:#?}", width * height * 3 / data.len()); + println!("frame size: {:#?}", data.len()); + let arrow = data.into_arrow(); + node.send_output( + DataId::from("frame".to_owned()), + MetadataParameters::default(), + arrow, + ) + .context("could not send output") + .unwrap(); + } + Err(e) => match e { + EncoderStatus::LimitReached => {} + EncoderStatus::Encoded => {} + EncoderStatus::NeedMoreData => {} + _ => { + panic!("Unable to receive packet",); + } + }, + } + vec![] } else if encoding == "rgb8" { unimplemented!("We haven't worked on additional encodings."); let buffer: &UInt8Array = data.as_any().downcast_ref().unwrap(); @@ -149,68 +201,6 @@ fn main() -> Result<()> { _ => break, }; //let (y, u, v) = bgr_to_yuv(buffer, 640 as usize, 480 as usize); - - let (y, u, v) = get_yuv_planes(buffer, width, height); - let mut f = ctx.new_frame(); - - let xdec = f.planes[0].cfg.xdec; - let stride = (enc.width + xdec) >> xdec; - f.planes[0].copy_from_raw_u8(&y, stride, 1); - let xdec = f.planes[1].cfg.xdec; - let stride = (enc.width + xdec) >> xdec; - f.planes[1].copy_from_raw_u8(&u, stride, 1); - let xdec = f.planes[2].cfg.xdec; - let stride = (enc.width + xdec) >> xdec; - f.planes[2].copy_from_raw_u8(&v, stride, 1); - - match ctx.send_frame(f) { - Ok(_) => {} - Err(e) => match e { - EncoderStatus::EnoughData => { - println!("Unable to append frame to the internal queue"); - panic!("Unable to send frame "); - } - _ => { - panic!("Unable to send frame "); - } - }, - } - println!("Frame sent to encoder"); - for _ in 0..1 { - match ctx.receive_packet() { - Ok(pkt) => { - println!("Time to encode: {:?}", time.elapsed()); - time = std::time::Instant::now(); - let data = pkt.data; - println!("frame compression: {:#?}", width * height * 3 / data.len()); - println!("frame size: {:#?}", data.len()); - let arrow = data.into_arrow(); - node.send_output( - DataId::from("frame".to_owned()), - MetadataParameters::default(), - arrow, - ) - .context("could not send output") - .unwrap(); - - break; - } - Err(e) => match e { - EncoderStatus::LimitReached => { - break; - } - EncoderStatus::Encoded => { - break; - } - EncoderStatus::NeedMoreData => { - break; - } - _ => { - panic!("Unable to receive packet",); - } - }, - } - } } Ok(()) diff --git a/node-hub/dora-dav1d/src/main.rs b/node-hub/dora-dav1d/src/main.rs index 0dd758b5..22c160da 100644 --- a/node-hub/dora-dav1d/src/main.rs +++ b/node-hub/dora-dav1d/src/main.rs @@ -6,6 +6,7 @@ use dora_node_api::{ MetadataParameters, }; use eyre::{Context, Result}; +use log::warn; fn yuv420_to_bgr( y_plane: &[u8], @@ -44,6 +45,7 @@ fn yuv420_to_bgr( fn main() -> Result<()> { let mut settings = Settings::new(); settings.set_n_threads(16); + settings.set_max_frame_delay(1); let height: usize = std::env::var("IMAGE_HEIGHT") .unwrap_or_else(|_| "480".to_string()) @@ -57,76 +59,41 @@ fn main() -> Result<()> { let mut dec = dav1d::Decoder::with_settings(&settings).expect("failed to create decoder instance"); - let (tx, rx) = std::sync::mpsc::sync_channel(1); let (mut node, mut events) = DoraNode::init_from_env().context("Could not initialize dora node")?; - let _ = std::thread::spawn(move || { - let mut now = std::time::Instant::now(); - loop { - while let Ok(data) = rx.recv_timeout(Duration::from_millis(100)) { + let mut now = std::time::Instant::now(); + loop { + match events.recv() { + Some(Event::Input { + id: _, + data, + metadata: _, + }) => { + let data = data.as_any().downcast_ref::().unwrap(); + let data = data.values().clone(); + + // Send packet to the decoder + match dec.send_data(data, None, None, None) { - Err(e) if e.is_again() => { - panic!("Error sending data to the decoder: {}", e); - if let Ok(p) = dec.get_picture() { - let mut y = p.plane(dav1d::PlanarImageComponent::Y).to_vec(); - let mut u = p.plane(dav1d::PlanarImageComponent::U).to_vec(); - let mut v = p.plane(dav1d::PlanarImageComponent::V).to_vec(); - y.append(&mut u); - y.append(&mut v); - // let rgb = yuv420_to_rgb(&y, &u, &v, 100, 100); - let arrow = y.into_arrow(); - let mut metadata = MetadataParameters::default(); - metadata.insert( - "width".to_string(), - dora_node_api::Parameter::Integer(640), - ); - metadata.insert( - "height".to_string(), - dora_node_api::Parameter::Integer(480), - ); - metadata.insert( - "encoding".to_string(), - dora_node_api::Parameter::String("yuv420".to_string()), - ); - node.send_output(DataId::from("frame".to_string()), metadata, arrow) - .unwrap(); - println!("Time to decode: {:?}", now.elapsed()); - now = std::time::Instant::now(); - } - // If the decoder did not consume all data, output all - // pending pictures and send pending data to the decoder - // until it is all used up. - //loop { - // handle_pending_pictures(&mut dec, false, &mut node); - - // match dec.send_pending_data() { - // Err(e) if e.is_again() => continue, - // Err(e) => { - // panic!("Error sending pending data to the decoder: {}", e); - // } - // _ => break, - // } - //} - } Err(e) => { - panic!("Error sending data to the decoder: {}", e); + warn!("Error sending data to the decoder: {}", e); } Ok(()) => { if let Ok(p) = dec.get_picture() { - let mut y = p.plane(dav1d::PlanarImageComponent::Y).to_vec(); - let mut u = p.plane(dav1d::PlanarImageComponent::U).to_vec(); - let mut v = p.plane(dav1d::PlanarImageComponent::V).to_vec(); - // u.iter_mut().for_each(|e| { - // if *e < 128 { - // *e = *e + 128 - // } - // }); - // v.iter_mut().for_each(|e: &mut u8| { - // if *e < 128 { - // *e = *e + 128 - // } - // }); + let mut y = p.plane(dav1d::PlanarImageComponent::Y); //.to_vec(); + let mut u = p.plane(dav1d::PlanarImageComponent::U); //.to_vec(); + let mut v = p.plane(dav1d::PlanarImageComponent::V); //.to_vec(); + // u.iter_mut().for_each(|e| { + // if *e < 128 { + // *e = *e + 128 + // } + // }); + // v.iter_mut().for_each(|e: &mut u8| { + // if *e < 128 { + // *e = *e + 128 + // } + // }); // y.append(&mut u); // y.append(&mut v); @@ -153,28 +120,11 @@ fn main() -> Result<()> { node.send_output(DataId::from("frame".to_string()), metadata, arrow) .unwrap(); println!("Time to decode: {:?}", now.elapsed()); - println!("Delay: {:#?}", dec.get_frame_delay()); now = std::time::Instant::now(); } } } - } - } - }); - loop { - match events.recv() { - Some(Event::Input { - id: _, - data, - metadata: _, - }) => { - let data = data.as_any().downcast_ref::().unwrap(); - let data = data.values().clone(); - - // Send packet to the decoder - - tx.try_send(data).unwrap_or_default(); // Handle all pending pictures before sending the next data. // handle_pending_pictures(&mut dec, false, &mut node); }