From b1ab8abaf5753ab25d59ae7fc78db4b2bbff3e49 Mon Sep 17 00:00:00 2001 From: haixuanTao Date: Fri, 28 Mar 2025 18:43:06 +0100 Subject: [PATCH] Enabling remote av1 communication --- Cargo.lock | 138 +++++++++++++++--- Cargo.toml | 2 +- binaries/daemon/src/lib.rs | 22 ++- node-hub/dora-av1-encoder/example.py | 22 --- node-hub/dora-dav1d/dataflow.yml | 84 +++++++++-- node-hub/dora-dav1d/src/main.rs | 70 ++------- .../Cargo.toml | 2 +- .../src/main.rs | 49 ++++--- 8 files changed, 248 insertions(+), 141 deletions(-) delete mode 100644 node-hub/dora-av1-encoder/example.py rename node-hub/{dora-av1-encoder => dora-rav1e}/Cargo.toml (94%) rename node-hub/{dora-av1-encoder => dora-rav1e}/src/main.rs (84%) diff --git a/Cargo.lock b/Cargo.lock index 9482a998..40d2d356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -383,6 +383,17 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" +[[package]] +name = "arg_enum_proc_macro" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ae92a5119aa49cdbcf6b9f893fe4e1d98b04ccbf82ee0584ad948a44a734dea" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "array-init" version = "2.1.0" @@ -406,6 +417,9 @@ name = "arrayvec" version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +dependencies = [ + "serde", +] [[package]] name = "arrow" @@ -1317,7 +1331,7 @@ dependencies = [ "lab", "num-traits", "rayon", - "thiserror 1.0.66", + "thiserror 1.0.69", "v_frame", ] @@ -1330,7 +1344,7 @@ dependencies = [ "anyhow", "arrayvec", "log", - "nom", + "nom 7.1.3", "num-rational", "serde", "v_frame", @@ -1555,6 +1569,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitstream-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6099cdc01846bc367c4e7dd630dc5966dccf36b652fae7a74e17b640411a91b2" + [[package]] name = "block" version = "0.1.6" @@ -2055,7 +2075,7 @@ dependencies = [ "bitflags 1.3.2", "strsim 0.8.0", "textwrap 0.11.0", - "unicode-width", + "unicode-width 0.1.14", "vec_map", ] @@ -2073,7 +2093,7 @@ dependencies = [ "once_cell", "strsim 0.10.0", "termcolor", - "textwrap 0.16.1", + "textwrap 0.16.2", ] [[package]] @@ -2105,7 +2125,7 @@ version = "4.5.47" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06f5378ea264ad4f82bbc826628b5aad714a75abf6ece087e923010eb937fb6" dependencies = [ - "clap 4.5.20", + "clap 4.5.32", ] [[package]] @@ -2757,6 +2777,28 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c297a1c74b71ae29df00c3e22dd9534821d60eb9af5a0192823fa2acea70c2a" +[[package]] +name = "dav1d" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d4b54a40baf633a71c6f0fb49494a7e4ee7bc26f3e727212b6cb915aa1ea1e1" +dependencies = [ + "av-data", + "bitflags 2.9.0", + "dav1d-sys", + "static_assertions", +] + +[[package]] +name = "dav1d-sys" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ecb1c5e8f4dc438eedc1b534a54672fb0e0a56035dae6b50162787bd2c50e95" +dependencies = [ + "libc", + "system-deps", +] + [[package]] name = "defmac" version = "0.1.3" @@ -3019,15 +3061,6 @@ dependencies = [ "num", ] -[[package]] -name = "dora-av1-encoder" -version = "0.3.10" -dependencies = [ - "dora-node-api", - "eyre", - "rav1e", -] - [[package]] name = "dora-cli" version = "0.3.10" @@ -3399,6 +3432,16 @@ dependencies = [ "safer-ffi", ] +[[package]] +name = "dora-rav1e" +version = "0.3.10" +dependencies = [ + "dora-node-api", + "eyre", + "log", + "rav1e", +] + [[package]] name = "dora-record" version = "0.3.10" @@ -5828,6 +5871,17 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "interpolate_name" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34819042dc3d3971c46c2190835914dfbe0c3c13f61449b2997f4e9722dfa60" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.100", +] + [[package]] name = "interprocess" version = "2.2.3" @@ -5954,6 +6008,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -5978,6 +6041,15 @@ version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" +[[package]] +name = "ivf" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "552f657140ee72c552b728601179c10abea14cd7d815de2d75d75dea42485eca" +dependencies = [ + "bitstream-io", +] + [[package]] name = "jiff" version = "0.2.4" @@ -7283,6 +7355,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "noop_proc_macro" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8" + [[package]] name = "notify" version = "5.2.0" @@ -9260,7 +9338,7 @@ dependencies = [ "built", "cc", "cfg-if 1.0.0", - "clap 4.5.20", + "clap 4.5.32", "clap_complete", "console", "fern", @@ -9273,22 +9351,22 @@ dependencies = [ "maybe-rayon", "nasm-rs 0.2.5", "new_debug_unreachable", - "nom", + "nom 7.1.3", "noop_proc_macro", "num-derive", "num-traits", "once_cell", "paste", "profiling", - "rand", - "rand_chacha", + "rand 0.8.5", + "rand_chacha 0.3.1", "scan_fmt", "serde", "serde-big-array", "signal-hook", "simd_helpers", "system-deps", - "thiserror 1.0.66", + "thiserror 1.0.69", "toml", "v_frame", "y4m", @@ -13028,6 +13106,19 @@ dependencies = [ "libc", ] +[[package]] +name = "system-deps" +version = "6.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e535eb8dded36d55ec13eddacd30dec501792ff23a0b1682c38601b8cf2349" +dependencies = [ + "cfg-expr", + "heck 0.5.0", + "pkg-config", + "toml", + "version-compare", +] + [[package]] name = "tabwriter" version = "1.4.1" @@ -13118,6 +13209,15 @@ dependencies = [ "libc", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width 0.1.14", +] + [[package]] name = "textwrap" version = "0.16.2" diff --git a/Cargo.toml b/Cargo.toml index 50bbee92..d7a82e78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ members = [ "node-hub/dora-kit-car", "node-hub/dora-object-to-pose", "node-hub/dora-mistral-rs", - "node-hub/dora-av1-encoder", + "node-hub/dora-rav1e", "node-hub/dora-dav1d", "libraries/extensions/ros2-bridge", "libraries/extensions/ros2-bridge/msg-gen", diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index de531fd0..2e08290f 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -251,11 +251,29 @@ impl Daemon { None => None, }; - let zenoh_config = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) { + let mut zenoh_config = match std::env::var(zenoh::Config::DEFAULT_CONFIG_PATH_ENV) { Ok(path) => zenoh::Config::from_file(&path) .map_err(|e| eyre!(e)) .wrap_err_with(|| format!("failed to read zenoh config from {path}"))?, - Err(std::env::VarError::NotPresent) => zenoh::Config::default(), + Err(std::env::VarError::NotPresent) => { + let mut zenoh_config = zenoh::Config::default(); + + if let Some(addr) = coordinator_addr { + zenoh_config + .insert_json5( + "connect/endpoints", + &format!(r#"["tcp/{}:5456"]"#, addr.ip()), + ) + .unwrap(); + zenoh_config + .insert_json5( + "listen/endpoints", + r#"{ router: ["tcp/[::]:7447"], peer: ["tcp/[::]:0", "tcp/[::]:5456"] }"#, + ) + .unwrap(); + }; + zenoh_config + } Err(std::env::VarError::NotUnicode(_)) => eyre::bail!( "{} env variable is not valid unicode", zenoh::Config::DEFAULT_CONFIG_PATH_ENV diff --git a/node-hub/dora-av1-encoder/example.py b/node-hub/dora-av1-encoder/example.py deleted file mode 100644 index 80d54b89..00000000 --- a/node-hub/dora-av1-encoder/example.py +++ /dev/null @@ -1,22 +0,0 @@ -import av -import cv2 -import numpy as np - -# Open the AV1 video file -container = av.open("video.av1") - -# Get the video stream -stream = next(s for s in container.streams if s.type == "video") - -# Iterate over the video frames -for frame in container.decode(stream): - # Convert the frame to a NumPy array - img = frame.to_ndarray(format="bgr24") - - # Display the frame - cv2.imshow("Frame", img) - if cv2.waitKey(1) & 0xFF == ord("q"): - break - -# Release the resources -cv2.destroyAllWindows() diff --git a/node-hub/dora-dav1d/dataflow.yml b/node-hub/dora-dav1d/dataflow.yml index b4853804..ee76b1f3 100644 --- a/node-hub/dora-dav1d/dataflow.yml +++ b/node-hub/dora-dav1d/dataflow.yml @@ -2,6 +2,8 @@ nodes: - id: camera build: pip install ../../node-hub/opencv-video-capture path: opencv-video-capture + _unstable_deploy: + machine: encoder inputs: tick: dora/timer/millis/50 outputs: @@ -11,43 +13,95 @@ nodes: IMAGE_WIDTH: 640 IMAGE_HEIGHT: 480 ENCODING: yuv420 + - id: camera2 + path: dora-pyrealsense + _unstable_deploy: + machine: encoder + inputs: + tick: dora/timer/millis/10 + outputs: + - image + env: + IMAGE_WIDTH: 640 + IMAGE_HEIGHT: 480 + ENCODING: jpeg - - id: av - path: /home/peter/Documents/work/dora/target/release/dora-av1-encoder - build: cargo build -p dora-av1-encoder --release + - id: echo + build: pip install -e ../../node-hub/dora-echo + path: dora-echo + _unstable_deploy: + machine: decoder inputs: - image: camera/image + image: camera2/image + outputs: + - image + + - id: rav1e-local + path: dora-av1-encoder + build: cargo build -p dora-av1-encoder --release + _unstable_deploy: + machine: encoder + #inputs: + # image: camera/image outputs: - frame env: IMAGE_WIDTH: 640 IMAGE_HEIGHT: 480 - - id: dav1d - path: /home/peter/Documents/work/dora/target/release/dora-dav1d + - id: dav1d-remote + path: dora-dav1d build: cargo build -p dora-dav1d --release + _unstable_deploy: + machine: decoder inputs: - image: av/frame + image: av-local/frame outputs: - frame env: IMAGE_WIDTH: 640 IMAGE_HEIGHT: 480 - - id: plot - build: pip install -e ../../node-hub/opencv-plot - path: opencv-plot + - id: rav1e-remote + path: dora-av1-encoder + build: cargo build -p dora-av1-encoder --release + _unstable_deploy: + machine: decoder inputs: - image: dav1d/frame + image: dav1d-remote/image + outputs: + - frame env: IMAGE_WIDTH: 640 IMAGE_HEIGHT: 480 - - id: plot2 - build: pip install -e ../../node-hub/opencv-plot - path: opencv-plot + - id: dav1d-local + path: dora-dav1d + build: cargo build -p dora-dav1d --release + _unstable_deploy: + machine: decoder inputs: - image: camera/image + image: rav1e-local/frame + outputs: + - frame env: IMAGE_WIDTH: 640 IMAGE_HEIGHT: 480 + + - id: plot + build: pip install -e ../../node-hub/dora-rerun + _unstable_deploy: + machine: encoder + path: dora-rerun + inputs: + image: dav1d-local/frame + # image: echo/image + #image_2: camera6/image + + - id: plot2 + build: pip install -e ../../node-hub/opencv-plot + _unstable_deploy: + machine: encoder + path: opencv-plot + inputs: + image: echo/image diff --git a/node-hub/dora-dav1d/src/main.rs b/node-hub/dora-dav1d/src/main.rs index 1486601e..f337dd41 100644 --- a/node-hub/dora-dav1d/src/main.rs +++ b/node-hub/dora-dav1d/src/main.rs @@ -1,10 +1,5 @@ -use std::time::Duration; - use dav1d::Settings; -use dora_node_api::{ - arrow::array::UInt8Array, dora_core::config::DataId, DoraNode, Event, IntoArrow, - MetadataParameters, -}; +use dora_node_api::{arrow::array::UInt8Array, DoraNode, Event, IntoArrow}; use eyre::{Context, Result}; use log::warn; @@ -44,38 +39,25 @@ fn yuv420_to_bgr( fn main() -> Result<()> { let mut settings = Settings::new(); - settings.set_n_threads(16); + // settings.set_n_threads(16); settings.set_max_frame_delay(1); - let height: usize = std::env::var("IMAGE_HEIGHT") - .unwrap_or_else(|_| "480".to_string()) - .parse() - .unwrap(); - let width: usize = std::env::var("IMAGE_WIDTH") - .unwrap_or_else(|_| "640".to_string()) - .parse() - .unwrap(); - let mut dec = dav1d::Decoder::with_settings(&settings).expect("failed to create decoder instance"); let (mut node, mut events) = DoraNode::init_from_env().context("Could not initialize dora node")?; - let mut now = std::time::Instant::now(); loop { - let time = std::time::Instant::now(); match events.recv() { Some(Event::Input { - id: _, + id, data, - metadata: _, + mut metadata, }) => { let data = data.as_any().downcast_ref::().unwrap(); let data = data.values().clone(); - // Send packet to the decoder - match dec.send_data(data, None, None, None) { Err(e) => { warn!("Error sending data to the decoder: {}", e); @@ -83,59 +65,25 @@ fn main() -> Result<()> { Ok(()) => { if let Ok(p) = dec.get_picture() { // println!("Time to decode: {:?}", time.elapsed()); - let mut y = p.plane(dav1d::PlanarImageComponent::Y); //.to_vec(); - let mut u = p.plane(dav1d::PlanarImageComponent::U); //.to_vec(); - let mut v = p.plane(dav1d::PlanarImageComponent::V); //.to_vec(); - // u.iter_mut().for_each(|e| { - // if *e < 128 { - // *e = *e + 128 - // } - // }); - // v.iter_mut().for_each(|e: &mut u8| { - // if *e < 128 { - // *e = *e + 128 - // } - // }); - - // y.append(&mut u); - // y.append(&mut v); + let y = p.plane(dav1d::PlanarImageComponent::Y); + let u = p.plane(dav1d::PlanarImageComponent::U); + let v = p.plane(dav1d::PlanarImageComponent::V); let y = yuv420_to_bgr(&y, &u, &v, width, height); let arrow = y.into_arrow(); - let mut metadata = MetadataParameters::default(); - metadata.insert( - "width".to_string(), - dora_node_api::Parameter::Integer( - width.try_into().unwrap_or_default(), - ), - ); - metadata.insert( - "height".to_string(), - dora_node_api::Parameter::Integer( - height.try_into().unwrap_or_default(), - ), - ); - metadata.insert( + metadata.parameters.insert( "encoding".to_string(), dora_node_api::Parameter::String("bgr8".to_string()), ); - node.send_output(DataId::from("frame".to_string()), metadata, arrow) - .unwrap(); - //println!("Time to decode: {:?}", now.elapsed()); - - now = std::time::Instant::now(); + node.send_output(id, metadata.parameters, arrow).unwrap(); } } } - // Handle all pending pictures before sending the next data. - // handle_pending_pictures(&mut dec, false, &mut node); } None => break, Some(_) => break, } } - // Handle all pending pictures that were not output yet. - // handle_pending_pictures(&mut dec, true, &mut node); Ok(()) } diff --git a/node-hub/dora-av1-encoder/Cargo.toml b/node-hub/dora-rav1e/Cargo.toml similarity index 94% rename from node-hub/dora-av1-encoder/Cargo.toml rename to node-hub/dora-rav1e/Cargo.toml index 188ec001..10a6bcda 100644 --- a/node-hub/dora-av1-encoder/Cargo.toml +++ b/node-hub/dora-rav1e/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dora-av1-encoder" +name = "dora-rav1e" edition = "2021" version.workspace = true description.workspace = true diff --git a/node-hub/dora-av1-encoder/src/main.rs b/node-hub/dora-rav1e/src/main.rs similarity index 84% rename from node-hub/dora-av1-encoder/src/main.rs rename to node-hub/dora-rav1e/src/main.rs index 0a177ad7..2931f3b3 100644 --- a/node-hub/dora-av1-encoder/src/main.rs +++ b/node-hub/dora-rav1e/src/main.rs @@ -7,9 +7,10 @@ // Media Patent License 1.0 was not distributed with this source code in the // PATENTS file, you can obtain it at www.aomedia.org/license/patent. +use std::env::var; + use dora_node_api::arrow::array::UInt8Array; -use dora_node_api::dora_core::config::DataId; -use dora_node_api::{DoraNode, Event, IntoArrow, MetadataParameters, Parameter}; +use dora_node_api::{DoraNode, Event, IntoArrow, Parameter}; use eyre::{Context as EyreContext, Result}; use log::warn; // Encode the same tiny blank frame 30 times @@ -77,17 +78,15 @@ fn main() -> Result<()> { .unwrap_or_else(|_| "640".to_string()) .parse() .unwrap(); - let enc = EncoderConfig { + let speed = var("RAV1E_SPEED").map(|s| s.parse().unwrap()).unwrap_or(10); + let mut enc = EncoderConfig { width, height, - speed_settings: SpeedSettings::from_preset(10), + speed_settings: SpeedSettings::from_preset(speed), low_latency: true, ..Default::default() }; - - let cfg = Config::new() - // .with_rate_control(RateControlConfig::new().with_emit_data(true)) - .with_encoder_config(enc.clone()); //.with_threads(16); + let cfg = Config::new().with_encoder_config(enc.clone()); cfg.validate()?; let (mut node, mut events) = @@ -95,7 +94,11 @@ fn main() -> Result<()> { loop { let _buffer = match events.recv() { - Some(Event::Input { id, data, metadata }) => { + Some(Event::Input { + id, + data, + mut metadata, + }) => { if let Some(Parameter::Integer(h)) = metadata.parameters.get("height") { height = *h as usize; }; @@ -109,7 +112,14 @@ fn main() -> Result<()> { } else { "bgr8" }; - + enc = EncoderConfig { + width, + height, + speed_settings: SpeedSettings::from_preset(speed), + low_latency: true, + ..Default::default() + }; + let cfg = Config::new().with_encoder_config(enc.clone()); if encoding == "bgr8" { let buffer: &UInt8Array = data.as_any().downcast_ref().unwrap(); let buffer: Vec = buffer.values().to_vec(); @@ -127,13 +137,13 @@ fn main() -> Result<()> { let mut f = ctx.new_frame(); let xdec = f.planes[0].cfg.xdec; - let stride = (enc.width + xdec) >> xdec; + let stride = (width + xdec) >> xdec; f.planes[0].copy_from_raw_u8(&y, stride, 1); let xdec = f.planes[1].cfg.xdec; - let stride = (enc.width + xdec) >> xdec; + let stride = (width + xdec) >> xdec; f.planes[1].copy_from_raw_u8(&u, stride, 1); let xdec = f.planes[2].cfg.xdec; - let stride = (enc.width + xdec) >> xdec; + let stride = (width + xdec) >> xdec; f.planes[2].copy_from_raw_u8(&v, stride, 1); match ctx.send_frame(f) { @@ -147,18 +157,17 @@ fn main() -> Result<()> { } }, } + metadata + .parameters + .insert("encoding".to_string(), Parameter::String("av1".to_string())); ctx.flush(); match ctx.receive_packet() { Ok(pkt) => { let data = pkt.data; let arrow = data.into_arrow(); - node.send_output( - DataId::from("frame".to_owned()), - MetadataParameters::default(), - arrow, - ) - .context("could not send output") - .unwrap(); + node.send_output(id, metadata.parameters, arrow) + .context("could not send output") + .unwrap(); } Err(e) => match e { EncoderStatus::LimitReached => {}