diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index 7276b6bf..bff906b8 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -188,13 +188,13 @@ impl EventStream { } pub async fn recv_async_timeout(&mut self, dur: Duration) -> Option { - let next_event = match select(Delay::new(dur), pin!(self.recv_async())).await { + + match select(Delay::new(dur), pin!(self.recv_async())).await { Either::Left((_elapsed, _)) => Some(Self::convert_event_item(EventItem::TimeoutError( eyre!("Receiver timed out"), ))), Either::Right((event, _)) => event, - }; - next_event + } } fn convert_event_item(item: EventItem) -> Event { diff --git a/apis/rust/node/src/event_stream/scheduler.rs b/apis/rust/node/src/event_stream/scheduler.rs index c6e15abe..89165678 100644 --- a/apis/rust/node/src/event_stream/scheduler.rs +++ b/apis/rust/node/src/event_stream/scheduler.rs @@ -27,7 +27,6 @@ impl Scheduler { let topic = VecDeque::from_iter( event_queues .keys() - .into_iter() .filter(|t| **t != DataId::from(NON_INPUT_EVENT.to_string())) .cloned(), ); diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index b28aa0a2..1a83449d 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -1,7 +1,7 @@ use clap::Parser; use dora_cli::Args; -fn main() -> () { +fn main() { let args = Args::parse(); dora_cli::lib_main(args); } diff --git a/binaries/cli/src/template/python/mod.rs b/binaries/cli/src/template/python/mod.rs index b4dd92fd..3baf8b49 100644 --- a/binaries/cli/src/template/python/mod.rs +++ b/binaries/cli/src/template/python/mod.rs @@ -31,7 +31,7 @@ pub fn create(args: crate::CommandNew) -> eyre::Result<()> { fn replace_space(file: &str, name: &str) -> String { let mut file = file.replace("__node-name__", &name.replace(" ", "-")); file = file.replace("__node_name__", &name.replace("-", "_").replace(" ", "_")); - file.replace("Node Name", &name) + file.replace("Node Name", name) } fn create_custom_node( name: String, @@ -47,7 +47,7 @@ fn create_custom_node( fs::create_dir(&module_path) .with_context(|| format!("failed to create module directory `{}`", &root.display()))?; - fs::create_dir(&root.join("tests")) + fs::create_dir(root.join("tests")) .with_context(|| format!("failed to create tests directory `{}`", &root.display()))?; // PYPROJECT.toml diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index af326530..975f9060 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -111,7 +111,7 @@ pub async fn spawn_node( let resolved_path = if source_is_url(source) { // try to download the shared library let target_dir = Path::new("build"); - download_file(source, &target_dir) + download_file(source, target_dir) .await .wrap_err("failed to download custom node")? } else { @@ -153,8 +153,8 @@ pub async fn spawn_node( ), ) .await; - let cmd = tokio::process::Command::new(python); - cmd + + tokio::process::Command::new(python) }; // Force python to always flush stdout/stderr buffer cmd.arg("-u"); @@ -170,7 +170,7 @@ pub async fn spawn_node( ) .await; if uv { - let mut cmd = tokio::process::Command::new(&"uv"); + let mut cmd = tokio::process::Command::new("uv"); cmd.arg("run"); cmd.arg(&resolved_path); cmd @@ -290,8 +290,8 @@ pub async fn spawn_node( "spawning: python -uc import dora; dora.start_runtime() # {}", node.id ); - let cmd = tokio::process::Command::new(python); - cmd + + tokio::process::Command::new(python) }; // Force python to always flush stdout/stderr buffer cmd.args([ diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 23cbea6c..fd5e0b66 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -47,7 +47,7 @@ pub fn run( let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(download_file(&python_source.source, &target_path)) + rt.block_on(download_file(&python_source.source, target_path)) .wrap_err("failed to download Python operator")? } else { Path::new(&python_source.source).to_owned() diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index e7ec3068..b7795470 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -40,7 +40,7 @@ pub fn run( let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; - rt.block_on(download_file(source, &target_path)) + rt.block_on(download_file(source, target_path)) .wrap_err("failed to download shared library operator")? } else { adjust_shared_library_path(Path::new(source))? diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs index aca713ca..7ad9715f 100644 --- a/examples/benchmark/node/src/main.rs +++ b/examples/benchmark/node/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, dora_core::config::DataId, DoraNode}; -use eyre::{Context, ContextCompat, Error}; +use eyre::{Context, ContextCompat}; use rand::Rng; use std::collections::HashMap; use std::time::Duration; diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index b16bd555..02415e40 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -20,7 +20,7 @@ pub fn visualize_nodes(nodes: &BTreeMap) -> String { all_nodes.insert(&node.id, node); } - let dora_timers = collect_dora_timers(&nodes); + let dora_timers = collect_dora_timers(nodes); if !dora_timers.is_empty() { writeln!(flowchart, "subgraph ___dora___ [dora]").unwrap(); writeln!(flowchart, " subgraph ___timer_timer___ [timer]").unwrap(); diff --git a/libraries/extensions/ros2-bridge/python/src/lib.rs b/libraries/extensions/ros2-bridge/python/src/lib.rs index 730fbb65..b767ab78 100644 --- a/libraries/extensions/ros2-bridge/python/src/lib.rs +++ b/libraries/extensions/ros2-bridge/python/src/lib.rs @@ -18,7 +18,7 @@ use pyo3::{ types::{PyAnyMethods, PyDict, PyList, PyModule, PyModuleMethods}, Bound, PyAny, PyObject, PyResult, Python, }; -//// use pyo3_special_method_derive::{Dict, Dir, Repr, Str}; +/// use pyo3_special_method_derive::{Dict, Dir, Repr, Str}; use typed::{deserialize::StructDeserializer, TypeInfo, TypedValue}; pub mod qos; diff --git a/libraries/message/src/lib.rs b/libraries/message/src/lib.rs index 989e5a72..0d1720ee 100644 --- a/libraries/message/src/lib.rs +++ b/libraries/message/src/lib.rs @@ -29,8 +29,8 @@ pub type DataflowId = uuid::Uuid; fn current_crate_version() -> semver::Version { let crate_version_raw = env!("CARGO_PKG_VERSION"); - let crate_version = semver::Version::parse(crate_version_raw).unwrap(); - crate_version + + semver::Version::parse(crate_version_raw).unwrap() } fn versions_compatible( @@ -46,6 +46,6 @@ fn versions_compatible( "failed to parse specified dora version `{specified_version}` as `VersionReq`: {error}", ) })?; - let matches = req.matches(&specified_version) || specified_dora_req.matches(crate_version); + let matches = req.matches(specified_version) || specified_dora_req.matches(crate_version); Ok(matches) } diff --git a/node-hub/dora-object-to-pose/src/lib.rs b/node-hub/dora-object-to-pose/src/lib.rs index b7445367..e9c8122c 100644 --- a/node-hub/dora-object-to-pose/src/lib.rs +++ b/node-hub/dora-object-to-pose/src/lib.rs @@ -61,7 +61,7 @@ fn points_to_pose(points: &[(f32, f32, f32)]) -> Vec { let std_y = (sum_y2 / n - mean_y * mean_y).sqrt(); let corr = cov / (std_x * std_y); - return vec![mean_x, mean_y, mean_z, 0., 0., corr * f32::consts::PI / 2.]; + vec![mean_x, mean_y, mean_z, 0., 0., corr * f32::consts::PI / 2.] } pub fn lib_main() -> Result<()> { @@ -82,66 +82,130 @@ pub fn lib_main() -> Result<()> { // (0.32489833, -0.25068134, 0.4761387) while let Some(event) = events.recv() { - match event { - Event::Input { id, metadata, data } => match id.as_str() { - "image" => { - let buffer: &UInt8Array = data.as_any().downcast_ref().unwrap(); - image_cache.insert(id.clone(), buffer.values().to_vec()); - } - "depth" => { - height = if let Some(Parameter::Integer(height)) = - metadata.parameters.get("height") - { - *height + if let Event::Input { id, metadata, data } = event { match id.as_str() { + "image" => { + let buffer: &UInt8Array = data.as_any().downcast_ref().unwrap(); + image_cache.insert(id.clone(), buffer.values().to_vec()); + } + "depth" => { + height = if let Some(Parameter::Integer(height)) = + metadata.parameters.get("height") + { + *height + } else { + height + }; + width = + if let Some(Parameter::Integer(width)) = metadata.parameters.get("width") { + *width } else { - height + width }; - width = - if let Some(Parameter::Integer(width)) = metadata.parameters.get("width") { - *width + focal_length = if let Some(Parameter::ListInt(focals)) = + metadata.parameters.get("focal") + { + focals.to_vec() + } else { + vec![605, 605] + }; + resolution = if let Some(Parameter::ListInt(resolution)) = + metadata.parameters.get("resolution") + { + resolution.to_vec() + } else { + vec![640, 480] + }; + let buffer: &Float64Array = data.as_any().downcast_ref().unwrap(); + depth_frame = Some(buffer.clone()); + } + "masks" => { + let masks = if let Some(data) = data.as_primitive_opt::() { + let data = data + .iter() + .map(|x| if let Some(x) = x { x > 0. } else { false }) + .collect::>(); + data + } else if let Some(data) = data.as_boolean_opt() { + let data = data + .iter() + .map(|x| x.unwrap_or_default()) + .collect::>(); + data + } else { + println!("Got unexpected data type: {}", data.data_type()); + continue; + }; + + let outputs: Vec> = masks + .chunks(height as usize * width as usize) + .filter_map(|data| { + let mut points = vec![]; + let mut z_total = 0.; + let mut n = 0.; + + if let Some(depth_frame) = &depth_frame { + depth_frame.iter().enumerate().for_each(|(i, z)| { + let u = i as f32 % width as f32; // Calculate x-coordinate (u) + let v = i as f32 / width as f32; // Calculate y-coordinate (v) + + if let Some(z) = z { + let z = z as f32; + // Skip points that have empty depth or is too far away + if z == 0. || z > 20.0 { + return; + } + if data[i] { + let y = (u - resolution[0] as f32) * z + / focal_length[0] as f32; + let x = (v - resolution[1] as f32) * z + / focal_length[1] as f32; + let new_x = sin_theta * z + cos_theta * x; + let new_y = -y; + let new_z = cos_theta * z - sin_theta * x; + + points.push((new_x, new_y, new_z)); + z_total += new_z; + n += 1.; + } + } + }); } else { - width - }; - focal_length = if let Some(Parameter::ListInt(focals)) = - metadata.parameters.get("focal") - { - focals.to_vec() - } else { - vec![605, 605] - }; - resolution = if let Some(Parameter::ListInt(resolution)) = - metadata.parameters.get("resolution") - { - resolution.to_vec() - } else { - vec![640, 480] - }; - let buffer: &Float64Array = data.as_any().downcast_ref().unwrap(); - depth_frame = Some(buffer.clone()); - } - "masks" => { - let masks = if let Some(data) = data.as_primitive_opt::() { - let data = data - .iter() - .map(|x| if let Some(x) = x { x > 0. } else { false }) - .collect::>(); - data - } else if let Some(data) = data.as_boolean_opt() { - let data = data - .iter() - .map(|x| if let Some(x) = x { x } else { false }) - .collect::>(); - data - } else { - println!("Got unexpected data type: {}", data.data_type()); - continue; - }; + println!("No depth frame found"); + return None; + } + if points.is_empty() { + println!("No points in mask found"); + return None; + } + Some(points_to_pose(&points)) + }) + .collect(); + let flatten_data = outputs.into_iter().flatten().collect::>(); + let mut metadata = metadata.parameters.clone(); + metadata.insert( + "encoding".to_string(), + Parameter::String("xyzrpy".to_string()), + ); + println!("Got data: {:?}", flatten_data); - let outputs: Vec> = masks - .chunks(height as usize * width as usize) - .into_iter() - .map(|data| { + node.send_output( + DataId::from("pose".to_string()), + metadata, + flatten_data.into_arrow(), + )?; + } + "boxes2d" => { + if let Some(data) = data.as_primitive_opt::() { + let values = data.values(); + let outputs: Vec> = values + .chunks(4) + .filter_map(|data| { + let x_min = data[0] as f32; + let y_min = data[1] as f32; + let x_max = data[2] as f32; + let y_max = data[3] as f32; let mut points = vec![]; + let mut z_min = 100.; let mut z_total = 0.; let mut n = 0.; @@ -153,10 +217,10 @@ pub fn lib_main() -> Result<()> { if let Some(z) = z { let z = z as f32; // Skip points that have empty depth or is too far away - if z == 0. || z > 20.0 { + if z == 0. || z > 5.0 { return; } - if data[i] { + if u > x_min && u < x_max && v > y_min && v < y_max { let y = (u - resolution[0] as f32) * z / focal_length[0] as f32; let x = (v - resolution[1] as f32) * z @@ -164,7 +228,9 @@ pub fn lib_main() -> Result<()> { let new_x = sin_theta * z + cos_theta * x; let new_y = -y; let new_z = cos_theta * z - sin_theta * x; - + if new_z < z_min { + z_min = new_z; + } points.push((new_x, new_y, new_z)); z_total += new_z; n += 1.; @@ -176,13 +242,16 @@ pub fn lib_main() -> Result<()> { return None; } if points.is_empty() { - println!("No points in mask found"); return None; } + let raw_mean_z = z_total / n as f32; + let threshold = (raw_mean_z + z_min) / 2.; + let points = points + .into_iter() + .filter(|(_x, _y, z)| z > &threshold) + .collect::>(); Some(points_to_pose(&points)) }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) .collect(); let flatten_data = outputs.into_iter().flatten().collect::>(); let mut metadata = metadata.parameters.clone(); @@ -190,7 +259,6 @@ pub fn lib_main() -> Result<()> { "encoding".to_string(), Parameter::String("xyzrpy".to_string()), ); - println!("Got data: {:?}", flatten_data); node.send_output( DataId::from("pose".to_string()), @@ -198,86 +266,9 @@ pub fn lib_main() -> Result<()> { flatten_data.into_arrow(), )?; } - "boxes2d" => { - if let Some(data) = data.as_primitive_opt::() { - let values = data.values(); - let outputs: Vec> = values - .chunks(4) - .into_iter() - .map(|data| { - let x_min = data[0] as f32; - let y_min = data[1] as f32; - let x_max = data[2] as f32; - let y_max = data[3] as f32; - let mut points = vec![]; - let mut z_min = 100.; - let mut z_total = 0.; - let mut n = 0.; - - if let Some(depth_frame) = &depth_frame { - depth_frame.iter().enumerate().for_each(|(i, z)| { - let u = i as f32 % width as f32; // Calculate x-coordinate (u) - let v = i as f32 / width as f32; // Calculate y-coordinate (v) - - if let Some(z) = z { - let z = z as f32; - // Skip points that have empty depth or is too far away - if z == 0. || z > 5.0 { - return; - } - if u > x_min && u < x_max && v > y_min && v < y_max { - let y = (u - resolution[0] as f32) * z - / focal_length[0] as f32; - let x = (v - resolution[1] as f32) * z - / focal_length[1] as f32; - let new_x = sin_theta * z + cos_theta * x; - let new_y = -y; - let new_z = cos_theta * z - sin_theta * x; - if new_z < z_min { - z_min = new_z; - } - points.push((new_x, new_y, new_z)); - z_total += new_z; - n += 1.; - } - } - }); - } else { - println!("No depth frame found"); - return None; - } - if points.is_empty() { - return None; - } - let raw_mean_z = z_total / n as f32; - let threshold = (raw_mean_z + z_min) / 2.; - let points = points - .into_iter() - .filter(|(_x, _y, z)| z > &threshold) - .collect::>(); - Some(points_to_pose(&points)) - }) - .filter(|x| x.is_some()) - .map(|x| x.unwrap()) - .collect(); - let flatten_data = outputs.into_iter().flatten().collect::>(); - let mut metadata = metadata.parameters.clone(); - metadata.insert( - "encoding".to_string(), - Parameter::String("xyzrpy".to_string()), - ); - - node.send_output( - DataId::from("pose".to_string()), - metadata, - flatten_data.into_arrow(), - )?; - } - } - other => eprintln!("Received input `{other}`"), - }, - _ => {} - } + } + other => eprintln!("Received input `{other}`"), + } } } Ok(()) diff --git a/node-hub/dora-rerun/src/boxes2d.rs b/node-hub/dora-rerun/src/boxes2d.rs index c3e028e5..f07280ae 100644 --- a/node-hub/dora-rerun/src/boxes2d.rs +++ b/node-hub/dora-rerun/src/boxes2d.rs @@ -79,7 +79,7 @@ pub fn update_boxes2d( } }; - if bbox.len() == 0 { + if bbox.is_empty() { rec.log(id.as_str(), &rerun::Clear::flat()) .wrap_err("Could not log Boxes2D")?; return Ok(()); @@ -116,7 +116,7 @@ pub fn update_boxes2d( } }); } - if bbox.len() == 0 { + if bbox.is_empty() { rec.log(id.as_str(), &rerun::Clear::flat()) .wrap_err("Could not log Boxes2D")?; return Ok(()); diff --git a/node-hub/dora-rerun/src/lib.rs b/node-hub/dora-rerun/src/lib.rs index 1a6a7797..671f4804 100644 --- a/node-hub/dora-rerun/src/lib.rs +++ b/node-hub/dora-rerun/src/lib.rs @@ -45,40 +45,40 @@ pub fn lib_main() -> Result<()> { let rec = match std::env::var("OPERATING_MODE").as_deref() { Ok("SPAWN") => { - let rec = rerun::RecordingStreamBuilder::new("dora-rerun") + + rerun::RecordingStreamBuilder::new("dora-rerun") .spawn_opts(&options, None) - .context("Could not spawn rerun visualization")?; - rec + .context("Could not spawn rerun visualization")? } Ok("CONNECT") => { let opt = std::env::var("RERUN_SERVER_ADDR").unwrap_or("127.0.0.1:9876".to_string()); - let rec = rerun::RecordingStreamBuilder::new("dora-rerun") + + rerun::RecordingStreamBuilder::new("dora-rerun") .connect_tcp_opts(std::net::SocketAddr::V4(opt.parse()?), None) - .context("Could not connect to rerun visualization")?; - rec + .context("Could not connect to rerun visualization")? } Ok("SAVE") => { let id = node.dataflow_id(); let path = Path::new("out") .join(id.to_string()) .join(format!("archive-{}.rerun", id)); - let rec = rerun::RecordingStreamBuilder::new("dora-rerun") + + rerun::RecordingStreamBuilder::new("dora-rerun") .save(path) - .context("Could not save rerun visualization")?; - rec + .context("Could not save rerun visualization")? } Ok(_) => { warn!("Invalid operating mode, defaulting to SPAWN mode."); - let rec = rerun::RecordingStreamBuilder::new("dora-rerun") + + rerun::RecordingStreamBuilder::new("dora-rerun") .spawn_opts(&options, None) - .context("Could not spawn rerun visualization")?; - rec + .context("Could not spawn rerun visualization")? } Err(_) => { - let rec = rerun::RecordingStreamBuilder::new("dora-rerun") + + rerun::RecordingStreamBuilder::new("dora-rerun") .spawn_opts(&options, None) - .context("Could not spawn rerun visualization")?; - rec + .context("Could not spawn rerun visualization")? } }; @@ -241,7 +241,7 @@ pub fn lib_main() -> Result<()> { } } - update_visualization(&rec, &chain, &id, &positions)?; + update_visualization(&rec, chain, &id, &positions)?; } else { println!("Could not find chain for {}", id); } diff --git a/node-hub/dora-rerun/src/series.rs b/node-hub/dora-rerun/src/series.rs index d2d36806..7b65452d 100644 --- a/node-hub/dora-rerun/src/series.rs +++ b/node-hub/dora-rerun/src/series.rs @@ -31,7 +31,7 @@ pub fn update_series(rec: &RecordingStream, id: DataId, data: ArrowData) -> Resu for (i, value) in series.iter().enumerate() { rec.log( format!("{}_{}", id.as_str(), i), - &rerun::Scalar::new(*value as f64), + &rerun::Scalar::new(*value), ) .wrap_err("could not log series")?; } diff --git a/tests/queue_size_latest_data_rust/receive_data/src/main.rs b/tests/queue_size_latest_data_rust/receive_data/src/main.rs index c859e953..6892e935 100644 --- a/tests/queue_size_latest_data_rust/receive_data/src/main.rs +++ b/tests/queue_size_latest_data_rust/receive_data/src/main.rs @@ -16,23 +16,20 @@ fn main() -> eyre::Result<()> { sleep(Duration::from_secs(5)); while let Some(event) = events.recv() { - match event { - dora_node_api::Event::Input { + if let dora_node_api::Event::Input { id: _, metadata, data, - } => { - let data: &PrimitiveArray = data.as_primitive(); - let _time: u64 = data.values()[0]; - let time_metadata = metadata.timestamp(); - let duration_metadata = time_metadata.get_time().to_system_time().elapsed()?; - println!("Latency duration: {:?}", duration_metadata); - assert!( - duration_metadata < Duration::from_millis(500), - "Time difference should be less than 500ms" - ); - } - _ => {} + } = event { + let data: &PrimitiveArray = data.as_primitive(); + let _time: u64 = data.values()[0]; + let time_metadata = metadata.timestamp(); + let duration_metadata = time_metadata.get_time().to_system_time().elapsed()?; + println!("Latency duration: {:?}", duration_metadata); + assert!( + duration_metadata < Duration::from_millis(500), + "Time difference should be less than 500ms" + ); } } Ok(())