diff --git a/apis/c++/node/src/lib.rs b/apis/c++/node/src/lib.rs index 82643e74..33969e37 100644 --- a/apis/c++/node/src/lib.rs +++ b/apis/c++/node/src/lib.rs @@ -1,16 +1,15 @@ use std::{any::Any, vec}; use dora_node_api::{ - self, + self, Event, EventStream, arrow::array::{AsArray, UInt8Array}, merged::{MergeExternal, MergedEvent}, - Event, EventStream, }; use eyre::bail; #[cfg(feature = "ros2-bridge")] use dora_ros2_bridge::{_core, ros2_client}; -use futures_lite::{stream, Stream, StreamExt}; +use futures_lite::{Stream, StreamExt, stream}; #[cxx::bridge] #[allow(clippy::needless_lifetimes)] diff --git a/apis/c++/operator/src/lib.rs b/apis/c++/operator/src/lib.rs index 4ea7bab3..c68e1357 100644 --- a/apis/c++/operator/src/lib.rs +++ b/apis/c++/operator/src/lib.rs @@ -2,7 +2,7 @@ #![warn(unsafe_op_in_unsafe_fn)] use dora_operator_api::{ - self, register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, + self, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, register_operator, }; use ffi::DoraSendOutputResult; diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index abe5527c..64effbdb 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -11,7 +11,7 @@ use dora_node_api::dora_core::config::NodeId; use dora_node_api::dora_core::descriptor::source_is_url; use dora_node_api::merged::{MergeExternalSend, MergedEvent}; use dora_node_api::{DataflowId, DoraNode, EventStream}; -use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent}; +use dora_operator_api_python::{DelayedCleanup, NodeCleanupHandle, PyEvent, pydict_to_metadata}; use dora_ros2_bridge_python::Ros2Subscription; use eyre::Context; use futures::{Stream, StreamExt}; diff --git a/apis/python/operator/src/lib.rs b/apis/python/operator/src/lib.rs index 8856e82c..037193a4 100644 --- a/apis/python/operator/src/lib.rs +++ b/apis/python/operator/src/lib.rs @@ -5,8 +5,8 @@ use std::{ use arrow::pyarrow::ToPyArrow; use dora_node_api::{ - merged::{MergeExternalSend, MergedEvent}, DoraNode, Event, EventStream, Metadata, MetadataParameters, Parameter, StopCause, + merged::{MergeExternalSend, MergedEvent}, }; use eyre::{Context, Result}; use futures::{Stream, StreamExt}; @@ -282,7 +282,7 @@ mod tests { use aligned_vec::{AVec, ConstAlign}; use arrow::{ array::{ - ArrayData, ArrayRef, BooleanArray, Float64Array, Int32Array, Int64Array, Int8Array, + ArrayData, ArrayRef, BooleanArray, Float64Array, Int8Array, Int32Array, Int64Array, ListArray, StructArray, }, buffer::Buffer, diff --git a/apis/rust/node/src/daemon_connection/mod.rs b/apis/rust/node/src/daemon_connection/mod.rs index 6a80452d..4a5f2eea 100644 --- a/apis/rust/node/src/daemon_connection/mod.rs +++ b/apis/rust/node/src/daemon_connection/mod.rs @@ -1,10 +1,10 @@ use dora_core::{config::NodeId, uhlc::Timestamp}; use dora_message::{ + DataflowId, daemon_to_node::DaemonReply, node_to_daemon::{DaemonRequest, NodeRegisterRequest, Timestamped}, - DataflowId, }; -use eyre::{bail, eyre, Context}; +use eyre::{Context, bail, eyre}; use shared_memory_server::{ShmemClient, ShmemConf}; #[cfg(unix)] use std::os::unix::net::UnixStream; diff --git a/apis/rust/node/src/daemon_connection/tcp.rs b/apis/rust/node/src/daemon_connection/tcp.rs index 1d5e1e2b..1d4232eb 100644 --- a/apis/rust/node/src/daemon_connection/tcp.rs +++ b/apis/rust/node/src/daemon_connection/tcp.rs @@ -2,7 +2,7 @@ use dora_message::{ daemon_to_node::DaemonReply, node_to_daemon::{DaemonRequest, Timestamped}, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use std::{ io::{Read, Write}, net::TcpStream, @@ -42,21 +42,20 @@ fn receive_reply( connection: &mut TcpStream, serializer: Serializer, ) -> eyre::Result> { - let raw = match tcp_receive(connection) { - Ok(raw) => raw, - Err(err) => match err.kind() { - std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { - return Ok(None) - } - other => { - return Err(err).with_context(|| { + let raw = + match tcp_receive(connection) { + Ok(raw) => raw, + Err(err) => match err.kind() { + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { + return Ok(None); + } + other => return Err(err).with_context(|| { format!( "unexpected I/O error (kind {other:?}) while trying to receive DaemonReply" ) - }) - } - }, - }; + }), + }, + }; match serializer { Serializer::Bincode => bincode::deserialize(&raw) .wrap_err("failed to deserialize DaemonReply") diff --git a/apis/rust/node/src/daemon_connection/unix_domain.rs b/apis/rust/node/src/daemon_connection/unix_domain.rs index fd6d50d2..a3c0dcc7 100644 --- a/apis/rust/node/src/daemon_connection/unix_domain.rs +++ b/apis/rust/node/src/daemon_connection/unix_domain.rs @@ -2,7 +2,7 @@ use dora_message::{ daemon_to_node::DaemonReply, node_to_daemon::{DaemonRequest, Timestamped}, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use std::{ io::{Read, Write}, os::unix::net::UnixStream, @@ -42,21 +42,20 @@ fn receive_reply( connection: &mut UnixStream, serializer: Serializer, ) -> eyre::Result> { - let raw = match stream_receive(connection) { - Ok(raw) => raw, - Err(err) => match err.kind() { - std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { - return Ok(None) - } - other => { - return Err(err).with_context(|| { + let raw = + match stream_receive(connection) { + Ok(raw) => raw, + Err(err) => match err.kind() { + std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::ConnectionAborted => { + return Ok(None); + } + other => return Err(err).with_context(|| { format!( "unexpected I/O error (kind {other:?}) while trying to receive DaemonReply" ) - }) - } - }, - }; + }), + }, + }; match serializer { Serializer::Bincode => bincode::deserialize(&raw) .wrap_err("failed to deserialize DaemonReply") diff --git a/apis/rust/node/src/event_stream/mod.rs b/apis/rust/node/src/event_stream/mod.rs index c7a3aefc..30058068 100644 --- a/apis/rust/node/src/event_stream/mod.rs +++ b/apis/rust/node/src/event_stream/mod.rs @@ -6,18 +6,18 @@ use std::{ }; use dora_message::{ + DataflowId, daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent}, id::DataId, node_to_daemon::{DaemonRequest, Timestamped}, - DataflowId, }; pub use event::{Event, StopCause}; use futures::{ - future::{select, Either}, Stream, StreamExt, + future::{Either, select}, }; use futures_timer::Delay; -use scheduler::{Scheduler, NON_INPUT_EVENT}; +use scheduler::{NON_INPUT_EVENT, Scheduler}; use self::thread::{EventItem, EventStreamThreadHandle}; use crate::{ @@ -28,7 +28,7 @@ use dora_core::{ config::{Input, NodeId}, uhlc, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; pub use scheduler::Scheduler as EventScheduler; diff --git a/apis/rust/node/src/event_stream/thread.rs b/apis/rust/node/src/event_stream/thread.rs index dfca33e7..b360ad4c 100644 --- a/apis/rust/node/src/event_stream/thread.rs +++ b/apis/rust/node/src/event_stream/thread.rs @@ -6,7 +6,7 @@ use dora_message::{ daemon_to_node::{DaemonReply, NodeEvent}, node_to_daemon::{DaemonRequest, DropToken, Timestamped}, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use flume::RecvTimeoutError; use std::{ sync::Arc, diff --git a/apis/rust/node/src/lib.rs b/apis/rust/node/src/lib.rs index 90f26621..fe3c90dd 100644 --- a/apis/rust/node/src/lib.rs +++ b/apis/rust/node/src/lib.rs @@ -84,13 +84,13 @@ pub use arrow; pub use dora_arrow_convert::*; pub use dora_core::{self, uhlc}; pub use dora_message::{ - metadata::{Metadata, MetadataParameters, Parameter}, DataflowId, + metadata::{Metadata, MetadataParameters, Parameter}, }; -pub use event_stream::{merged, Event, EventScheduler, EventStream, StopCause}; +pub use event_stream::{Event, EventScheduler, EventStream, StopCause, merged}; pub use flume::Receiver; pub use futures; -pub use node::{arrow_utils, DataSample, DoraNode, ZERO_COPY_THRESHOLD}; +pub use node::{DataSample, DoraNode, ZERO_COPY_THRESHOLD, arrow_utils}; mod daemon_connection; mod event_stream; diff --git a/apis/rust/node/src/node/control_channel.rs b/apis/rust/node/src/node/control_channel.rs index ce6d899f..75580300 100644 --- a/apis/rust/node/src/node/control_channel.rs +++ b/apis/rust/node/src/node/control_channel.rs @@ -6,12 +6,12 @@ use dora_core::{ uhlc::HLC, }; use dora_message::{ + DataflowId, daemon_to_node::{DaemonCommunication, DaemonReply}, metadata::Metadata, node_to_daemon::{DaemonRequest, DataMessage, Timestamped}, - DataflowId, }; -use eyre::{bail, eyre, Context}; +use eyre::{Context, bail, eyre}; pub(crate) struct ControlChannel { channel: DaemonChannel, diff --git a/apis/rust/node/src/node/drop_stream.rs b/apis/rust/node/src/node/drop_stream.rs index d62b0588..627131bb 100644 --- a/apis/rust/node/src/node/drop_stream.rs +++ b/apis/rust/node/src/node/drop_stream.rs @@ -3,11 +3,11 @@ use std::{sync::Arc, time::Duration}; use crate::daemon_connection::DaemonChannel; use dora_core::{config::NodeId, uhlc}; use dora_message::{ + DataflowId, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent}, node_to_daemon::{DaemonRequest, DropToken, Timestamped}, - DataflowId, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use flume::RecvTimeoutError; pub struct DropStream { diff --git a/apis/rust/node/src/node/mod.rs b/apis/rust/node/src/node/mod.rs index 46ea61ac..18798ef0 100644 --- a/apis/rust/node/src/node/mod.rs +++ b/apis/rust/node/src/node/mod.rs @@ -1,4 +1,4 @@ -use crate::{daemon_connection::DaemonChannel, EventStream}; +use crate::{EventStream, daemon_connection::DaemonChannel}; use self::{ arrow_utils::{copy_array_into_sample, required_data_size}, @@ -16,12 +16,12 @@ use dora_core::{ }; use dora_message::{ + DataflowId, daemon_to_node::{DaemonReply, NodeConfig}, metadata::{ArrowTypeInfo, Metadata, MetadataParameters}, node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped}, - DataflowId, }; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use shared_memory_extended::{Shmem, ShmemConf}; use std::{ collections::{BTreeSet, HashMap, VecDeque}, @@ -157,7 +157,9 @@ impl DoraNode { /// [`init_from_node_id`][Self::init_from_node_id]. pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> { if std::env::var("DORA_NODE_CONFIG").is_ok() { - info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"); + info!( + "Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`" + ); Self::init_from_env() } else { Self::init_from_node_id(node_id) diff --git a/apis/rust/operator/src/lib.rs b/apis/rust/operator/src/lib.rs index 262fcdd3..1b51c5c6 100644 --- a/apis/rust/operator/src/lib.rs +++ b/apis/rust/operator/src/lib.rs @@ -23,8 +23,8 @@ pub use dora_operator_api_macros::register_operator; pub use dora_operator_api_types as types; pub use types::DoraStatus; use types::{ - arrow::{self, array::Array}, Metadata, Output, SendOutput, + arrow::{self, array::Array}, }; pub mod raw; diff --git a/apis/rust/operator/src/raw.rs b/apis/rust/operator/src/raw.rs index 6634e68f..c79e64a7 100644 --- a/apis/rust/operator/src/raw.rs +++ b/apis/rust/operator/src/raw.rs @@ -1,6 +1,6 @@ use crate::{DoraOperator, DoraOutputSender, DoraStatus, Event}; use dora_operator_api_types::{ - arrow, DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, + DoraInitResult, DoraResult, OnEventResult, RawEvent, SendOutput, arrow, }; use std::ffi::c_void; diff --git a/binaries/cli/src/command/build/distributed.rs b/binaries/cli/src/command/build/distributed.rs index 1fd1ed91..07c63fb9 100644 --- a/binaries/cli/src/command/build/distributed.rs +++ b/binaries/cli/src/command/build/distributed.rs @@ -1,13 +1,13 @@ use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; use dora_core::descriptor::Descriptor; use dora_message::{ + BuildId, cli_to_coordinator::ControlRequest, common::{GitSource, LogMessage}, coordinator_to_cli::ControlRequestReply, id::NodeId, - BuildId, }; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ collections::BTreeMap, net::{SocketAddr, TcpStream}, diff --git a/binaries/cli/src/command/build/mod.rs b/binaries/cli/src/command/build/mod.rs index 04f16f55..b5dab018 100644 --- a/binaries/cli/src/command/build/mod.rs +++ b/binaries/cli/src/command/build/mod.rs @@ -52,11 +52,11 @@ use dora_core::{ descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}, }; -use dora_message::{descriptor::NodeSource, BuildId}; +use dora_message::{BuildId, descriptor::NodeSource}; use eyre::Context; use std::{collections::BTreeMap, net::IpAddr}; -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; use crate::{ common::{connect_to_coordinator, local_working_dir, resolve_dataflow}, session::DataflowSession, diff --git a/binaries/cli/src/command/check.rs b/binaries/cli/src/command/check.rs index 27b5fcbd..99b8da8d 100644 --- a/binaries/cli/src/command/check.rs +++ b/binaries/cli/src/command/check.rs @@ -1,10 +1,10 @@ -use super::{default_tracing, Executable}; -use crate::{common::connect_to_coordinator, LOCALHOST}; +use super::{Executable, default_tracing}; +use crate::{LOCALHOST, common::connect_to_coordinator}; use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::descriptor::DescriptorExt; use dora_core::{descriptor::Descriptor, topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ io::{IsTerminal, Write}, net::SocketAddr, diff --git a/binaries/cli/src/command/destroy.rs b/binaries/cli/src/command/destroy.rs index 1a10222f..88aebc8c 100644 --- a/binaries/cli/src/command/destroy.rs +++ b/binaries/cli/src/command/destroy.rs @@ -1,4 +1,4 @@ -use super::{default_tracing, up, Executable}; +use super::{Executable, default_tracing, up}; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; use std::net::IpAddr; use std::path::PathBuf; diff --git a/binaries/cli/src/command/list.rs b/binaries/cli/src/command/list.rs index fce836e4..affe9549 100644 --- a/binaries/cli/src/command/list.rs +++ b/binaries/cli/src/command/list.rs @@ -1,9 +1,9 @@ use std::io::Write; -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; use crate::{ - common::{connect_to_coordinator, query_running_dataflows}, LOCALHOST, + common::{connect_to_coordinator, query_running_dataflows}, }; use clap::Args; use communication_layer_request_reply::TcpRequestReplyConnection; diff --git a/binaries/cli/src/command/logs.rs b/binaries/cli/src/command/logs.rs index 06496ad6..bfa8ff96 100644 --- a/binaries/cli/src/command/logs.rs +++ b/binaries/cli/src/command/logs.rs @@ -1,11 +1,11 @@ -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; use crate::common::{connect_to_coordinator, query_running_dataflows}; use bat::{Input, PrettyPrinter}; use clap::Args; use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; -use eyre::{bail, Context, Result}; +use eyre::{Context, Result, bail}; use uuid::Uuid; #[derive(Debug, Args)] @@ -80,9 +80,11 @@ pub fn logs( .grid(false) .line_numbers(false) .paging_mode(bat::PagingMode::QuitIfOneScreen) - .inputs(vec![Input::from_bytes(&logs) - .name("Logs") - .title(format!("Logs from {node}.").as_str())]) + .inputs(vec![ + Input::from_bytes(&logs) + .name("Logs") + .title(format!("Logs from {node}.").as_str()), + ]) .print() .wrap_err("Something went wrong with viewing log file")?; diff --git a/binaries/cli/src/command/new.rs b/binaries/cli/src/command/new.rs index 0500ffb5..4d3d7517 100644 --- a/binaries/cli/src/command/new.rs +++ b/binaries/cli/src/command/new.rs @@ -1,6 +1,6 @@ use clap::Args; -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; #[derive(Debug, Args)] /// Generate a new project or node. Choose the language between Rust, Python, C or C++. diff --git a/binaries/cli/src/command/run.rs b/binaries/cli/src/command/run.rs index feb5947c..8749f4e5 100644 --- a/binaries/cli/src/command/run.rs +++ b/binaries/cli/src/command/run.rs @@ -11,7 +11,7 @@ use crate::{ output::print_log_message, session::DataflowSession, }; -use dora_daemon::{flume, Daemon, LogDestination}; +use dora_daemon::{Daemon, LogDestination, flume}; use dora_tracing::TracingBuilder; use eyre::Context; use tokio::runtime::Builder; diff --git a/binaries/cli/src/command/self_.rs b/binaries/cli/src/command/self_.rs index 1914e46b..5316219b 100644 --- a/binaries/cli/src/command/self_.rs +++ b/binaries/cli/src/command/self_.rs @@ -1,6 +1,6 @@ -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; use clap::Subcommand; -use eyre::{bail, Context}; +use eyre::{Context, bail}; #[derive(Debug, Subcommand)] /// Dora CLI self-management commands diff --git a/binaries/cli/src/command/start/attach.rs b/binaries/cli/src/command/start/attach.rs index bdca8bfc..09c9a047 100644 --- a/binaries/cli/src/command/start/attach.rs +++ b/binaries/cli/src/command/start/attach.rs @@ -1,5 +1,5 @@ use communication_layer_request_reply::{TcpConnection, TcpRequestReplyConnection}; -use dora_core::descriptor::{resolve_path, CoreNodeKind, Descriptor, DescriptorExt}; +use dora_core::descriptor::{CoreNodeKind, Descriptor, DescriptorExt, resolve_path}; use dora_message::cli_to_coordinator::ControlRequest; use dora_message::common::LogMessage; use dora_message::coordinator_to_cli::ControlRequestReply; diff --git a/binaries/cli/src/command/start/mod.rs b/binaries/cli/src/command/start/mod.rs index 077a67b4..22116a97 100644 --- a/binaries/cli/src/command/start/mod.rs +++ b/binaries/cli/src/command/start/mod.rs @@ -2,7 +2,7 @@ //! //! The `dora start` command does not run any build commands, nor update git dependencies or similar. Use `dora build` for that. -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; use crate::{ command::start::attach::attach_dataflow, common::{connect_to_coordinator, local_working_dir, resolve_dataflow}, @@ -17,7 +17,7 @@ use dora_core::{ use dora_message::{ cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::ControlRequestReply, }; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ net::{IpAddr, SocketAddr, TcpStream}, path::PathBuf, diff --git a/binaries/cli/src/command/stop.rs b/binaries/cli/src/command/stop.rs index 01d63402..0c7549d0 100644 --- a/binaries/cli/src/command/stop.rs +++ b/binaries/cli/src/command/stop.rs @@ -1,11 +1,11 @@ -use super::{default_tracing, Executable}; +use super::{Executable, default_tracing}; use crate::common::{connect_to_coordinator, handle_dataflow_result, query_running_dataflows}; use communication_layer_request_reply::TcpRequestReplyConnection; use dora_core::topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST}; use dora_message::cli_to_coordinator::ControlRequest; use dora_message::coordinator_to_cli::ControlRequestReply; use duration_str::parse; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::net::IpAddr; use std::time::Duration; use uuid::Uuid; diff --git a/binaries/cli/src/command/up.rs b/binaries/cli/src/command/up.rs index a9acbd93..f0cccf58 100644 --- a/binaries/cli/src/command/up.rs +++ b/binaries/cli/src/command/up.rs @@ -1,9 +1,9 @@ use super::check::daemon_running; -use super::{default_tracing, Executable}; -use crate::{common::connect_to_coordinator, LOCALHOST}; +use super::{Executable, default_tracing}; +use crate::{LOCALHOST, common::connect_to_coordinator}; use dora_core::topics::DORA_COORDINATOR_PORT_CONTROL_DEFAULT; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; -use eyre::{bail, Context, ContextCompat}; +use eyre::{Context, ContextCompat, bail}; use std::path::PathBuf; use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; diff --git a/binaries/cli/src/common.rs b/binaries/cli/src/common.rs index e02a1673..23611b8c 100644 --- a/binaries/cli/src/common.rs +++ b/binaries/cli/src/common.rs @@ -1,12 +1,12 @@ use crate::formatting::FormatDataflowError; use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection}; -use dora_core::descriptor::{source_is_url, Descriptor}; +use dora_core::descriptor::{Descriptor, source_is_url}; use dora_download::download_file; use dora_message::{ cli_to_coordinator::ControlRequest, coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult}, }; -use eyre::{bail, Context, ContextCompat}; +use eyre::{Context, ContextCompat, bail}; use std::{ env::current_dir, net::SocketAddr, diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index 868d7a5a..1a8177b4 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -65,9 +65,9 @@ pub fn lib_main(args: Args) { use clap::Parser; #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/binaries/cli/src/session.rs b/binaries/cli/src/session.rs index 9a8ac5b8..a394cf21 100644 --- a/binaries/cli/src/session.rs +++ b/binaries/cli/src/session.rs @@ -4,7 +4,7 @@ use std::{ }; use dora_core::build::BuildInfo; -use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId}; +use dora_message::{BuildId, SessionId, common::GitSource, id::NodeId}; use eyre::{Context, ContextCompat}; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] @@ -33,7 +33,9 @@ impl DataflowSession { if let Ok(parsed) = deserialize(&session_file) { return Ok(parsed); } else { - tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)"); + tracing::warn!( + "failed to read dataflow session file, regenerating (you might need to run `dora build` again)" + ); } } diff --git a/binaries/cli/src/template/c/mod.rs b/binaries/cli/src/template/c/mod.rs index 879eeedb..7c5df939 100644 --- a/binaries/cli/src/template/c/mod.rs +++ b/binaries/cli/src/template/c/mod.rs @@ -1,5 +1,5 @@ use dora_node_api_c::HEADER_NODE_API; -use eyre::{bail, Context, ContextCompat}; +use eyre::{Context, ContextCompat, bail}; use std::{ fs, path::{Path, PathBuf}, diff --git a/binaries/cli/src/template/cxx/mod.rs b/binaries/cli/src/template/cxx/mod.rs index 72b7769e..36b27882 100644 --- a/binaries/cli/src/template/cxx/mod.rs +++ b/binaries/cli/src/template/cxx/mod.rs @@ -1,4 +1,4 @@ -use eyre::{bail, Context, ContextCompat}; +use eyre::{Context, ContextCompat, bail}; use std::{ fs, path::{Path, PathBuf}, diff --git a/binaries/cli/src/template/python/mod.rs b/binaries/cli/src/template/python/mod.rs index 10ae7cc0..5a856cfc 100644 --- a/binaries/cli/src/template/python/mod.rs +++ b/binaries/cli/src/template/python/mod.rs @@ -1,4 +1,4 @@ -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ fs, path::{Path, PathBuf}, diff --git a/binaries/cli/src/template/rust/mod.rs b/binaries/cli/src/template/rust/mod.rs index f2c9fd9c..09676f24 100644 --- a/binaries/cli/src/template/rust/mod.rs +++ b/binaries/cli/src/template/rust/mod.rs @@ -1,4 +1,4 @@ -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ fs, path::{Path, PathBuf}, diff --git a/binaries/coordinator/src/control.rs b/binaries/coordinator/src/control.rs index 98bb0def..0a810718 100644 --- a/binaries/coordinator/src/control.rs +++ b/binaries/coordinator/src/control.rs @@ -1,15 +1,15 @@ use crate::{ - tcp_utils::{tcp_receive, tcp_send}, Event, + tcp_utils::{tcp_receive, tcp_send}, }; use dora_message::{ - cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, BuildId, + BuildId, cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use futures::{ + FutureExt, Stream, StreamExt, future::{self, Either}, stream::FuturesUnordered, - FutureExt, Stream, StreamExt, }; use futures_concurrency::future::Race; use std::{io::ErrorKind, net::SocketAddr}; diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 67ee69a4..e97fd115 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,6 +9,7 @@ use dora_core::{ uhlc::{self, HLC}, }; use dora_message::{ + BuildId, DataflowId, SessionId, cli_to_coordinator::ControlRequest, common::{DaemonId, GitSource}, coordinator_to_cli::{ @@ -20,10 +21,9 @@ use dora_message::{ }, daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult}, descriptor::{Descriptor, ResolvedNode}, - BuildId, DataflowId, SessionId, }; -use eyre::{bail, eyre, ContextCompat, Result, WrapErr}; -use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt}; +use eyre::{ContextCompat, Result, WrapErr, bail, eyre}; +use futures::{Future, Stream, StreamExt, future::join_all, stream::FuturesUnordered}; use futures_concurrency::stream::Merge; use itertools::Itertools; use log_subscriber::LogSubscriber; @@ -122,7 +122,9 @@ fn resolve_name( Ok(*uuid) } else { // TODO: Index the archived dataflows in order to return logs based on the index. - bail!("multiple archived dataflows found with name `{name}`, Please provide the UUID instead."); + bail!( + "multiple archived dataflows found with name `{name}`, Please provide the UUID instead." + ); } } else if let [uuid] = uuids.as_slice() { Ok(*uuid) @@ -285,7 +287,9 @@ async fn start_inner( ); } Err(err) => { - tracing::warn!("failed to register daemon connection for daemon `{daemon_id}`: {err}"); + tracing::warn!( + "failed to register daemon connection for daemon `{daemon_id}`: {err}" + ); } } } @@ -341,7 +345,9 @@ async fn start_inner( } } DataflowEvent::DataflowFinishedOnDaemon { daemon_id, result } => { - tracing::debug!("coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})"); + tracing::debug!( + "coordinator received DataflowFinishedOnDaemon ({daemon_id:?}, result: {result:?})" + ); match running_dataflows.entry(uuid) { std::collections::hash_map::Entry::Occupied(mut entry) => { let dataflow = entry.get_mut(); @@ -474,7 +480,9 @@ async fn start_inner( .values() .any(|d: &RunningDataflow| d.name.as_deref() == Some(name)) { - bail!("there is already a running dataflow with name `{name}`"); + bail!( + "there is already a running dataflow with name `{name}`" + ); } } let dataflow = start_dataflow( @@ -875,7 +883,9 @@ async fn start_inner( } } None => { - tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"); + tracing::warn!( + "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map" + ); } }, Event::DataflowSpawnResult { @@ -901,7 +911,9 @@ async fn start_inner( }; } None => { - tracing::warn!("received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map"); + tracing::warn!( + "received DataflowSpawnResult, but no matching dataflow in `running_dataflows` map" + ); } }, } diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index ab7e3b9d..3d4cc5bb 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -1,4 +1,4 @@ -use crate::{tcp_utils::tcp_receive, DaemonRequest, DataflowEvent, Event}; +use crate::{DaemonRequest, DataflowEvent, Event, tcp_utils::tcp_receive}; use dora_core::uhlc::HLC; use dora_message::daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, Timestamped}; use eyre::Context; @@ -12,7 +12,7 @@ pub async fn create_listener(bind: SocketAddr) -> eyre::Result { let socket = match TcpListener::bind(bind).await { Ok(socket) => socket, Err(err) => { - return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")) + return Err(eyre::Report::new(err).wrap_err("failed to create local TCP listener")); } }; Ok(socket) diff --git a/binaries/coordinator/src/run/mod.rs b/binaries/coordinator/src/run/mod.rs index 9edcabd3..035b6c6a 100644 --- a/binaries/coordinator/src/run/mod.rs +++ b/binaries/coordinator/src/run/mod.rs @@ -1,18 +1,18 @@ use crate::{ - tcp_utils::{tcp_receive, tcp_send}, DaemonConnections, + tcp_utils::{tcp_receive, tcp_send}, }; use dora_core::{descriptor::DescriptorExt, uhlc::HLC}; use dora_message::{ + BuildId, SessionId, common::DaemonId, coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes, Timestamped}, daemon_to_coordinator::DaemonCoordinatorReply, descriptor::{Descriptor, ResolvedNode}, id::NodeId, - BuildId, SessionId, }; -use eyre::{bail, eyre, ContextCompat, WrapErr}; +use eyre::{ContextCompat, WrapErr, bail, eyre}; use itertools::Itertools; use std::{ collections::{BTreeMap, BTreeSet}, diff --git a/binaries/daemon/src/coordinator.rs b/binaries/daemon/src/coordinator.rs index 4e81bfa2..22f16e8f 100644 --- a/binaries/daemon/src/coordinator.rs +++ b/binaries/daemon/src/coordinator.rs @@ -1,6 +1,6 @@ use crate::{ - socket_stream_utils::{socket_stream_receive, socket_stream_send}, DaemonCoordinatorEvent, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, }; use dora_core::uhlc::HLC; use dora_message::{ @@ -8,14 +8,14 @@ use dora_message::{ coordinator_to_daemon::RegisterResult, daemon_to_coordinator::{CoordinatorRequest, DaemonCoordinatorReply, DaemonRegisterRequest}, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use std::{io::ErrorKind, net::SocketAddr, time::Duration}; use tokio::{ net::TcpStream, sync::{mpsc, oneshot}, time::sleep, }; -use tokio_stream::{wrappers::ReceiverStream, Stream}; +use tokio_stream::{Stream, wrappers::ReceiverStream}; use tracing::warn; const DAEMON_COORDINATOR_RETRY_INTERVAL: std::time::Duration = Duration::from_secs(1); @@ -37,7 +37,9 @@ pub async fn register( .wrap_err("failed to connect to dora-coordinator") { Err(err) => { - warn!("Could not connect to: {addr}, with error: {err}. Retring in {DAEMON_COORDINATOR_RETRY_INTERVAL:#?}.."); + warn!( + "Could not connect to: {addr}, with error: {err}. Retring in {DAEMON_COORDINATOR_RETRY_INTERVAL:#?}.." + ); sleep(DAEMON_COORDINATOR_RETRY_INTERVAL).await; } Ok(stream) => { diff --git a/binaries/daemon/src/local_listener.rs b/binaries/daemon/src/local_listener.rs index 2e9f5c5a..cd9b5279 100644 --- a/binaries/daemon/src/local_listener.rs +++ b/binaries/daemon/src/local_listener.rs @@ -137,7 +137,7 @@ async fn receive_message( | ErrorKind::ConnectionReset => return Ok(None), _other => { return Err(err) - .context("unexpected I/O error while trying to receive DaemonRequest") + .context("unexpected I/O error while trying to receive DaemonRequest"); } }, }; diff --git a/binaries/daemon/src/log.rs b/binaries/daemon/src/log.rs index ce71d548..a433075d 100644 --- a/binaries/daemon/src/log.rs +++ b/binaries/daemon/src/log.rs @@ -10,9 +10,9 @@ use dora_core::{ uhlc, }; use dora_message::{ + BuildId, common::{DaemonId, LogLevel, LogMessage, Timestamped}, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent}, - BuildId, }; use eyre::Context; use flume::Sender; diff --git a/binaries/daemon/src/node_communication/mod.rs b/binaries/daemon/src/node_communication/mod.rs index 8ed9af0b..1d584fb7 100644 --- a/binaries/daemon/src/node_communication/mod.rs +++ b/binaries/daemon/src/node_communication/mod.rs @@ -5,13 +5,13 @@ use dora_core::{ uhlc, }; use dora_message::{ + DataflowId, common::{DropToken, Timestamped}, daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent, NodeEvent}, node_to_daemon::DaemonRequest, - DataflowId, }; -use eyre::{eyre, Context}; -use futures::{future, task, Future}; +use eyre::{Context, eyre}; +use futures::{Future, future, task}; use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ collections::{BTreeMap, VecDeque}, @@ -50,7 +50,7 @@ pub async fn spawn_listener_loop( Err(err) => { return Err( eyre::Report::new(err).wrap_err("failed to create local TCP listener") - ) + ); } }; let socket_addr = socket @@ -157,7 +157,7 @@ pub async fn spawn_listener_loop( Ok(socket) => socket, Err(err) => { return Err(eyre::Report::new(err) - .wrap_err("failed to create local Unix domain socket")) + .wrap_err("failed to create local Unix domain socket")); } }; diff --git a/binaries/daemon/src/node_communication/tcp.rs b/binaries/daemon/src/node_communication/tcp.rs index a2114a3e..ddbfb510 100644 --- a/binaries/daemon/src/node_communication/tcp.rs +++ b/binaries/daemon/src/node_communication/tcp.rs @@ -2,8 +2,8 @@ use std::{collections::BTreeMap, io::ErrorKind, sync::Arc}; use super::{Connection, Listener}; use crate::{ - socket_stream_utils::{socket_stream_receive, socket_stream_send}, Event, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, }; use dora_core::{config::DataId, uhlc::HLC}; use dora_message::{ @@ -70,7 +70,7 @@ impl Connection for TcpConnection { | ErrorKind::ConnectionReset => return Ok(None), _other => { return Err(err) - .context("unexpected I/O error while trying to receive DaemonRequest") + .context("unexpected I/O error while trying to receive DaemonRequest"); } }, }; diff --git a/binaries/daemon/src/node_communication/unix_domain.rs b/binaries/daemon/src/node_communication/unix_domain.rs index 5905e1e6..1e5408fa 100644 --- a/binaries/daemon/src/node_communication/unix_domain.rs +++ b/binaries/daemon/src/node_communication/unix_domain.rs @@ -11,8 +11,8 @@ use tokio::{ }; use crate::{ - socket_stream_utils::{socket_stream_receive, socket_stream_send}, Event, + socket_stream_utils::{socket_stream_receive, socket_stream_send}, }; use super::{Connection, Listener}; @@ -68,7 +68,7 @@ impl Connection for UnixConnection { | ErrorKind::ConnectionReset => return Ok(None), _other => { return Err(err) - .context("unexpected I/O error while trying to receive DaemonRequest") + .context("unexpected I/O error while trying to receive DaemonRequest"); } }, }; diff --git a/binaries/daemon/src/pending.rs b/binaries/daemon/src/pending.rs index 757a858d..bf90a6e4 100644 --- a/binaries/daemon/src/pending.rs +++ b/binaries/daemon/src/pending.rs @@ -2,18 +2,18 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use dora_core::{ config::NodeId, - uhlc::{Timestamp, HLC}, + uhlc::{HLC, Timestamp}, }; use dora_message::{ + DataflowId, common::DaemonId, daemon_to_coordinator::{CoordinatorRequest, DaemonEvent, LogLevel, LogMessage, Timestamped}, daemon_to_node::DaemonReply, - DataflowId, }; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use tokio::{net::TcpStream, sync::oneshot}; -use crate::{log::DataflowLogger, socket_stream_utils::socket_stream_send, CascadingErrorCauses}; +use crate::{CascadingErrorCauses, log::DataflowLogger, socket_stream_utils::socket_stream_send}; pub struct PendingNodes { dataflow_id: DataflowId, diff --git a/binaries/runtime/src/lib.rs b/binaries/runtime/src/lib.rs index 7a42142a..a210eda2 100644 --- a/binaries/runtime/src/lib.rs +++ b/binaries/runtime/src/lib.rs @@ -8,10 +8,10 @@ use dora_message::daemon_to_node::{NodeConfig, RuntimeConfig}; use dora_metrics::run_metrics_monitor; use dora_node_api::{DoraNode, Event}; use dora_tracing::TracingBuilder; -use eyre::{bail, Context, Result}; +use eyre::{Context, Result, bail}; use futures::{Stream, StreamExt}; use futures_concurrency::stream::Merge; -use operator::{run_operator, OperatorEvent, StopReason}; +use operator::{OperatorEvent, StopReason, run_operator}; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, @@ -211,7 +211,9 @@ async fn run( OperatorEvent::AllocateOutputSample { len, sample: tx } => { let sample = node.allocate_data_sample(len); if tx.send(sample).is_err() { - tracing::warn!("output sample requested, but operator {operator_id} exited already"); + tracing::warn!( + "output sample requested, but operator {operator_id} exited already" + ); } } OperatorEvent::Output { @@ -309,7 +311,10 @@ async fn run( open_inputs.remove(&input_id); if open_inputs.is_empty() { // all inputs of the node were closed -> close its event channel - tracing::trace!("all inputs of operator {}/{operator_id} were closed -> closing event channel", node.id()); + tracing::trace!( + "all inputs of operator {}/{operator_id} were closed -> closing event channel", + node.id() + ); open_operator_inputs.remove(&operator_id); operator_channels.remove(&operator_id); } diff --git a/binaries/runtime/src/operator/channel.rs b/binaries/runtime/src/operator/channel.rs index eac20b73..3cecaa12 100644 --- a/binaries/runtime/src/operator/channel.rs +++ b/binaries/runtime/src/operator/channel.rs @@ -1,8 +1,8 @@ use dora_core::config::DataId; use dora_node_api::Event; use futures::{ - future::{self, FusedFuture}, FutureExt, + future::{self, FusedFuture}, }; use std::collections::{BTreeMap, VecDeque}; diff --git a/binaries/runtime/src/operator/python.rs b/binaries/runtime/src/operator/python.rs index 6a250579..4e04e5ab 100644 --- a/binaries/runtime/src/operator/python.rs +++ b/binaries/runtime/src/operator/python.rs @@ -3,21 +3,20 @@ use super::{OperatorEvent, StopReason}; use dora_core::{ config::{NodeId, OperatorId}, - descriptor::{source_is_url, Descriptor, PythonSource}, + descriptor::{Descriptor, PythonSource, source_is_url}, }; use dora_download::download_file; -use dora_node_api::{merged::MergedEvent, Event, Parameter}; +use dora_node_api::{Event, Parameter, merged::MergedEvent}; use dora_operator_api_python::PyEvent; use dora_operator_api_types::DoraStatus; -use eyre::{bail, eyre, Context, Result}; +use eyre::{Context, Result, bail, eyre}; use pyo3::ffi::c_str; use pyo3::{ - pyclass, + Py, PyAny, Python, pyclass, types::{IntoPyDict, PyAnyMethods, PyDict, PyDictMethods, PyTracebackMethods}, - Py, PyAny, Python, }; use std::{ - panic::{catch_unwind, AssertUnwindSafe}, + panic::{AssertUnwindSafe, catch_unwind}, path::Path, }; use tokio::sync::{mpsc::Sender, oneshot}; @@ -295,16 +294,15 @@ mod callback_impl { use dora_core::metadata::ArrowTypeInfoExt; use dora_message::metadata::ArrowTypeInfo; use dora_node_api::{ - arrow_utils::{copy_array_into_sample, required_data_size}, ZERO_COPY_THRESHOLD, + arrow_utils::{copy_array_into_sample, required_data_size}, }; use dora_operator_api_python::pydict_to_metadata; use dora_tracing::telemetry::deserialize_context; - use eyre::{eyre, Context, Result}; + use eyre::{Context, Result, eyre}; use pyo3::{ - pymethods, + Bound, PyObject, Python, pymethods, types::{PyBytes, PyBytesMethods, PyDict}, - Bound, PyObject, Python, }; use tokio::sync::oneshot; use tracing::{field, span}; diff --git a/binaries/runtime/src/operator/shared_lib.rs b/binaries/runtime/src/operator/shared_lib.rs index 021243b6..f0505692 100644 --- a/binaries/runtime/src/operator/shared_lib.rs +++ b/binaries/runtime/src/operator/shared_lib.rs @@ -7,19 +7,19 @@ use dora_core::{ }; use dora_download::download_file; use dora_node_api::{ - arrow_utils::{copy_array_into_sample, required_data_size}, Event, Parameter, + arrow_utils::{copy_array_into_sample, required_data_size}, }; use dora_operator_api_types::{ - safer_ffi::closure::ArcDynFn1, DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, - DoraResult, DoraStatus, Metadata, OnEventResult, Output, SendOutput, + DoraDropOperator, DoraInitOperator, DoraInitResult, DoraOnEvent, DoraResult, DoraStatus, + Metadata, OnEventResult, Output, SendOutput, safer_ffi::closure::ArcDynFn1, }; -use eyre::{bail, eyre, Context, Result}; +use eyre::{Context, Result, bail, eyre}; use libloading::Symbol; use std::{ collections::BTreeMap, ffi::c_void, - panic::{catch_unwind, AssertUnwindSafe}, + panic::{AssertUnwindSafe, catch_unwind}, path::Path, sync::Arc, }; diff --git a/examples/benchmark/node/src/main.rs b/examples/benchmark/node/src/main.rs index e8deca6b..eb2849a4 100644 --- a/examples/benchmark/node/src/main.rs +++ b/examples/benchmark/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode}; +use dora_node_api::{self, DoraNode, dora_core::config::DataId}; use eyre::Context; use rand::RngCore; use std::time::Duration; diff --git a/examples/benchmark/run.rs b/examples/benchmark/run.rs index b6bed6fe..9ce36580 100644 --- a/examples/benchmark/run.rs +++ b/examples/benchmark/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/c++-arrow-dataflow/run.rs b/examples/c++-arrow-dataflow/run.rs index a77c4d78..314dc5e1 100644 --- a/examples/c++-arrow-dataflow/run.rs +++ b/examples/c++-arrow-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{env::consts::EXE_SUFFIX, path::Path, process::Command}; struct ArrowConfig { @@ -77,7 +77,9 @@ fn find_arrow_config() -> eyre::Result { .wrap_err("Failed to run pkg-config. Make sure Arrow C++ is installed")?; if !output.status.success() { - bail!("Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH"); + bail!( + "Arrow C++ not found via pkg-config. Make sure it's installed and in your PKG_CONFIG_PATH" + ); } let cflags = String::from_utf8(output.stdout)?.trim().to_string(); diff --git a/examples/c++-dataflow/run.rs b/examples/c++-dataflow/run.rs index dd88900a..8183fcc0 100644 --- a/examples/c++-dataflow/run.rs +++ b/examples/c++-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, path::Path, diff --git a/examples/c++-ros2-dataflow/run.rs b/examples/c++-ros2-dataflow/run.rs index 4af3cd31..fd0cc1e1 100644 --- a/examples/c++-ros2-dataflow/run.rs +++ b/examples/c++-ros2-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{env::consts::EXE_SUFFIX, path::Path}; #[tokio::main] diff --git a/examples/c-dataflow/run.rs b/examples/c-dataflow/run.rs index e71d802b..fd548712 100644 --- a/examples/c-dataflow/run.rs +++ b/examples/c-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ env::consts::{DLL_PREFIX, DLL_SUFFIX, EXE_SUFFIX}, path::Path, diff --git a/examples/camera/run.rs b/examples/camera/run.rs index 94988261..9a30cea0 100644 --- a/examples/camera/run.rs +++ b/examples/camera/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_uv_path, run}; use dora_tracing::set_up_tracing; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/cmake-dataflow/run.rs b/examples/cmake-dataflow/run.rs index b4530f26..34e828de 100644 --- a/examples/cmake-dataflow/run.rs +++ b/examples/cmake-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/multiple-daemons/node/src/main.rs b/examples/multiple-daemons/node/src/main.rs index bf1cd424..4a4cc3fd 100644 --- a/examples/multiple-daemons/node/src/main.rs +++ b/examples/multiple-daemons/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; +use dora_node_api::{self, DoraNode, Event, IntoArrow, dora_core::config::DataId}; fn main() -> eyre::Result<()> { println!("hello"); diff --git a/examples/multiple-daemons/operator/src/lib.rs b/examples/multiple-daemons/operator/src/lib.rs index d18138c5..c7d1696a 100644 --- a/examples/multiple-daemons/operator/src/lib.rs +++ b/examples/multiple-daemons/operator/src/lib.rs @@ -1,7 +1,7 @@ #![warn(unsafe_op_in_unsafe_fn)] use dora_operator_api::{ - register_operator, DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, + DoraOperator, DoraOutputSender, DoraStatus, Event, IntoArrow, register_operator, }; register_operator!(ExampleOperator); diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index cb558af3..f49922ec 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,7 +1,7 @@ use dora_cli::session::DataflowSession; use dora_coordinator::{ControlEvent, Event}; use dora_core::{ - descriptor::{read_as_descriptor, DescriptorExt}, + descriptor::{DescriptorExt, read_as_descriptor}, topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT}, }; use dora_message::{ @@ -10,7 +10,7 @@ use dora_message::{ coordinator_to_cli::{ControlRequestReply, DataflowIdAndName}, }; use dora_tracing::TracingBuilder; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::{ collections::BTreeSet, diff --git a/examples/multiple-daemons/sink/src/main.rs b/examples/multiple-daemons/sink/src/main.rs index feb01395..03453f28 100644 --- a/examples/multiple-daemons/sink/src/main.rs +++ b/examples/multiple-daemons/sink/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, DoraNode, Event}; -use eyre::{bail, Context}; +use eyre::{Context, bail}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; @@ -16,7 +16,9 @@ fn main() -> eyre::Result<()> { TryFrom::try_from(&data).context("expected string message")?; println!("sink received message: {received_string}"); if !received_string.starts_with("operator received random value ") { - bail!("unexpected message format (should start with 'operator received random value')") + bail!( + "unexpected message format (should start with 'operator received random value')" + ) } if !received_string.ends_with(" ticks") { bail!("unexpected message format (should end with 'ticks')") diff --git a/examples/python-dataflow/run.rs b/examples/python-dataflow/run.rs index de96795d..15e5a674 100644 --- a/examples/python-dataflow/run.rs +++ b/examples/python-dataflow/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_uv_path, run}; use dora_tracing::set_up_tracing; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/python-operator-dataflow/run.rs b/examples/python-operator-dataflow/run.rs index 94988261..9a30cea0 100644 --- a/examples/python-operator-dataflow/run.rs +++ b/examples/python-operator-dataflow/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_uv_path, run}; use dora_tracing::set_up_tracing; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/python-ros2-dataflow/run.rs b/examples/python-ros2-dataflow/run.rs index de96795d..15e5a674 100644 --- a/examples/python-ros2-dataflow/run.rs +++ b/examples/python-ros2-dataflow/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_uv_path, run}; use dora_tracing::set_up_tracing; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/rerun-viewer/run.rs b/examples/rerun-viewer/run.rs index 243db076..1f77bfab 100644 --- a/examples/rerun-viewer/run.rs +++ b/examples/rerun-viewer/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_uv_path, run}; use dora_tracing::set_up_tracing; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/rust-dataflow-git/run.rs b/examples/rust-dataflow-git/run.rs index 855eb85b..58345bf9 100644 --- a/examples/rust-dataflow-git/run.rs +++ b/examples/rust-dataflow-git/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/rust-dataflow-url/run.rs b/examples/rust-dataflow-url/run.rs index 158e8ed9..59d46c4e 100644 --- a/examples/rust-dataflow-url/run.rs +++ b/examples/rust-dataflow-url/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/rust-dataflow/node/src/main.rs b/examples/rust-dataflow/node/src/main.rs index bf1cd424..4a4cc3fd 100644 --- a/examples/rust-dataflow/node/src/main.rs +++ b/examples/rust-dataflow/node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; +use dora_node_api::{self, DoraNode, Event, IntoArrow, dora_core::config::DataId}; fn main() -> eyre::Result<()> { println!("hello"); diff --git a/examples/rust-dataflow/run.rs b/examples/rust-dataflow/run.rs index 855eb85b..58345bf9 100644 --- a/examples/rust-dataflow/run.rs +++ b/examples/rust-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/rust-dataflow/sink-dynamic/src/main.rs b/examples/rust-dataflow/sink-dynamic/src/main.rs index 5bda47f6..eb9fdc71 100644 --- a/examples/rust-dataflow/sink-dynamic/src/main.rs +++ b/examples/rust-dataflow/sink-dynamic/src/main.rs @@ -1,5 +1,5 @@ -use dora_node_api::{self, dora_core::config::NodeId, DoraNode, Event}; -use eyre::{bail, Context}; +use dora_node_api::{self, DoraNode, Event, dora_core::config::NodeId}; +use eyre::{Context, bail}; fn main() -> eyre::Result<()> { let (_node, mut events) = @@ -17,7 +17,9 @@ fn main() -> eyre::Result<()> { TryFrom::try_from(&data).context("expected string message")?; println!("sink received message: {received_string}"); if !received_string.starts_with("operator received random value ") { - bail!("unexpected message format (should start with 'operator received random value')") + bail!( + "unexpected message format (should start with 'operator received random value')" + ) } if !received_string.ends_with(" ticks") { bail!("unexpected message format (should end with 'ticks')") diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index feb01395..03453f28 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, DoraNode, Event}; -use eyre::{bail, Context}; +use eyre::{Context, bail}; fn main() -> eyre::Result<()> { let (_node, mut events) = DoraNode::init_from_env()?; @@ -16,7 +16,9 @@ fn main() -> eyre::Result<()> { TryFrom::try_from(&data).context("expected string message")?; println!("sink received message: {received_string}"); if !received_string.starts_with("operator received random value ") { - bail!("unexpected message format (should start with 'operator received random value')") + bail!( + "unexpected message format (should start with 'operator received random value')" + ) } if !received_string.ends_with(" ticks") { bail!("unexpected message format (should end with 'ticks')") diff --git a/examples/rust-dataflow/status-node/src/main.rs b/examples/rust-dataflow/status-node/src/main.rs index 99be97c6..1deaec1a 100644 --- a/examples/rust-dataflow/status-node/src/main.rs +++ b/examples/rust-dataflow/status-node/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::DataId, DoraNode, Event, IntoArrow}; +use dora_node_api::{self, DoraNode, Event, IntoArrow, dora_core::config::DataId}; use eyre::Context; fn main() -> eyre::Result<()> { diff --git a/examples/rust-ros2-dataflow/node/src/main.rs b/examples/rust-ros2-dataflow/node/src/main.rs index 32ad5970..f6c48235 100644 --- a/examples/rust-ros2-dataflow/node/src/main.rs +++ b/examples/rust-ros2-dataflow/node/src/main.rs @@ -1,10 +1,9 @@ use std::time::Duration; use dora_node_api::{ - self, + self, DoraNode, Event, dora_core::config::DataId, merged::{MergeExternal, MergedEvent}, - DoraNode, Event, }; use dora_ros2_bridge::{ messages::{ @@ -12,10 +11,10 @@ use dora_ros2_bridge::{ geometry_msgs::msg::{Twist, Vector3}, turtlesim::msg::Pose, }, - ros2_client::{self, ros2, NodeOptions}, + ros2_client::{self, NodeOptions, ros2}, rustdds::{self, policy}, }; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use futures::task::SpawnExt; fn main() -> eyre::Result<()> { diff --git a/examples/rust-ros2-dataflow/run.rs b/examples/rust-ros2-dataflow/run.rs index f81a25d5..5ba8796c 100644 --- a/examples/rust-ros2-dataflow/run.rs +++ b/examples/rust-ros2-dataflow/run.rs @@ -1,5 +1,5 @@ use dora_tracing::set_up_tracing; -use eyre::{bail, Context}; +use eyre::{Context, bail}; use std::path::Path; #[tokio::main] diff --git a/examples/vlm/run.rs b/examples/vlm/run.rs index 742c3818..273da6b7 100644 --- a/examples/vlm/run.rs +++ b/examples/vlm/run.rs @@ -1,6 +1,6 @@ use dora_core::{get_uv_path, run}; use dora_tracing::set_up_tracing; -use eyre::{bail, WrapErr}; +use eyre::{WrapErr, bail}; use std::path::Path; #[tokio::main] diff --git a/libraries/arrow-convert/src/from_impls.rs b/libraries/arrow-convert/src/from_impls.rs index 1f00ffc1..adf2ea5a 100644 --- a/libraries/arrow-convert/src/from_impls.rs +++ b/libraries/arrow-convert/src/from_impls.rs @@ -274,7 +274,7 @@ where #[cfg(test)] mod tests { - use arrow::array::{make_array, PrimitiveArray}; + use arrow::array::{PrimitiveArray, make_array}; use crate::ArrowData; diff --git a/libraries/arrow-convert/src/into_impls.rs b/libraries/arrow-convert/src/into_impls.rs index b2174146..668f7a7e 100644 --- a/libraries/arrow-convert/src/into_impls.rs +++ b/libraries/arrow-convert/src/into_impls.rs @@ -1,8 +1,8 @@ use crate::IntoArrow; use arrow::array::{PrimitiveArray, StringArray, TimestampNanosecondArray}; use arrow::datatypes::{ - ArrowPrimitiveType, ArrowTimestampType, Float16Type, Float32Type, Float64Type, Int16Type, - Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + ArrowPrimitiveType, ArrowTimestampType, Float16Type, Float32Type, Float64Type, Int8Type, + Int16Type, Int32Type, Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type, }; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use half::f16; diff --git a/libraries/arrow-convert/src/lib.rs b/libraries/arrow-convert/src/lib.rs index 32185a3f..d92d8324 100644 --- a/libraries/arrow-convert/src/lib.rs +++ b/libraries/arrow-convert/src/lib.rs @@ -3,11 +3,11 @@ #![warn(missing_docs)] use arrow::array::{ - Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, UInt16Array, - UInt32Array, UInt8Array, + Array, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, UInt8Array, + UInt16Array, UInt32Array, }; use arrow::datatypes::DataType; -use eyre::{eyre, ContextCompat, Result}; +use eyre::{ContextCompat, Result, eyre}; use num::NumCast; use std::ops::{Deref, DerefMut}; diff --git a/libraries/communication-layer/pub-sub/src/zenoh.rs b/libraries/communication-layer/pub-sub/src/zenoh.rs index b185376d..01cc2a8a 100644 --- a/libraries/communication-layer/pub-sub/src/zenoh.rs +++ b/libraries/communication-layer/pub-sub/src/zenoh.rs @@ -4,7 +4,7 @@ use super::{CommunicationLayer, Publisher, Subscriber}; use crate::{BoxError, ReceivedSample}; use std::{borrow::Cow, sync::Arc, time::Duration}; use zenoh::{ - prelude::{sync::SyncResolve, Config, Priority, SessionDeclarations, SplitBuffer}, + prelude::{Config, Priority, SessionDeclarations, SplitBuffer, sync::SyncResolve}, publication::CongestionControl, }; diff --git a/libraries/communication-layer/request-reply/src/lib.rs b/libraries/communication-layer/request-reply/src/lib.rs index 83f12699..d3506699 100644 --- a/libraries/communication-layer/request-reply/src/lib.rs +++ b/libraries/communication-layer/request-reply/src/lib.rs @@ -28,10 +28,10 @@ pub trait RequestReplyLayer: Send + Sync { Item = Result< Box< dyn ListenConnection< - RequestData = Self::RequestData, - ReplyData = Self::ReplyData, - Error = Self::Error, - >, + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, >, Self::Error, >, @@ -47,10 +47,10 @@ pub trait RequestReplyLayer: Send + Sync { ) -> Result< Box< dyn RequestReplyConnection< - RequestData = Self::RequestData, - ReplyData = Self::ReplyData, - Error = Self::Error, - >, + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, >, Self::Error, >; diff --git a/libraries/communication-layer/request-reply/src/tcp.rs b/libraries/communication-layer/request-reply/src/tcp.rs index 27dd59cf..563eb09e 100644 --- a/libraries/communication-layer/request-reply/src/tcp.rs +++ b/libraries/communication-layer/request-reply/src/tcp.rs @@ -37,10 +37,10 @@ impl RequestReplyLayer for TcpLayer { Item = Result< Box< dyn crate::ListenConnection< - RequestData = Self::RequestData, - ReplyData = Self::ReplyData, - Error = Self::Error, - >, + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, >, Self::Error, >, @@ -56,10 +56,10 @@ impl RequestReplyLayer for TcpLayer { r.map(|stream| { let connection: Box< dyn ListenConnection< - RequestData = Self::RequestData, - ReplyData = Self::ReplyData, - Error = Self::Error, - >, + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, > = Box::new(TcpConnection { stream }); connection }) @@ -74,20 +74,20 @@ impl RequestReplyLayer for TcpLayer { ) -> Result< Box< dyn crate::RequestReplyConnection< - RequestData = Self::RequestData, - ReplyData = Self::ReplyData, - Error = Self::Error, - >, + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, >, Self::Error, > { TcpStream::connect(addr).map(|s| { let connection: Box< dyn RequestReplyConnection< - RequestData = Self::RequestData, - ReplyData = Self::ReplyData, - Error = Self::Error, - >, + RequestData = Self::RequestData, + ReplyData = Self::ReplyData, + Error = Self::Error, + >, > = Box::new(TcpConnection { stream: s }); connection }) diff --git a/libraries/core/src/build/build_command.rs b/libraries/core/src/build/build_command.rs index a43c0a53..aed83e89 100644 --- a/libraries/core/src/build/build_command.rs +++ b/libraries/core/src/build/build_command.rs @@ -6,7 +6,7 @@ use std::{ }; use dora_message::descriptor::EnvValue; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; pub fn run_build_command( build: &str, diff --git a/libraries/core/src/build/git.rs b/libraries/core/src/build/git.rs index 128bd156..29fcd9a0 100644 --- a/libraries/core/src/build/git.rs +++ b/libraries/core/src/build/git.rs @@ -1,6 +1,6 @@ use crate::build::{BuildLogger, PrevGitSource}; -use dora_message::{common::LogLevel, DataflowId, SessionId}; -use eyre::{bail, ContextCompat, WrapErr}; +use dora_message::{DataflowId, SessionId, common::LogLevel}; +use eyre::{ContextCompat, WrapErr, bail}; use git2::FetchOptions; use itertools::Itertools; use std::{ diff --git a/libraries/core/src/descriptor/mod.rs b/libraries/core/src/descriptor/mod.rs index c3cd910a..1c7336f5 100644 --- a/libraries/core/src/descriptor/mod.rs +++ b/libraries/core/src/descriptor/mod.rs @@ -3,7 +3,7 @@ use dora_message::{ descriptor::{GitRepoRev, NodeSource}, id::{DataId, NodeId, OperatorId}, }; -use eyre::{bail, Context, OptionExt, Result}; +use eyre::{Context, OptionExt, Result, bail}; use std::{ collections::{BTreeMap, HashMap}, env::consts::EXE_EXTENSION, @@ -14,9 +14,9 @@ use tokio::process::Command; // reexport for compatibility pub use dora_message::descriptor::{ - CoreNodeKind, CustomNode, Descriptor, Node, OperatorConfig, OperatorDefinition, OperatorSource, - PythonSource, ResolvedNode, RuntimeNode, SingleOperatorDefinition, DYNAMIC_SOURCE, - SHELL_SOURCE, + CoreNodeKind, CustomNode, DYNAMIC_SOURCE, Descriptor, Node, OperatorConfig, OperatorDefinition, + OperatorSource, PythonSource, ResolvedNode, RuntimeNode, SHELL_SOURCE, + SingleOperatorDefinition, }; pub use validate::ResolvedNodeExt; pub use visualize::collect_dora_timers; @@ -165,7 +165,9 @@ fn node_kind_mut(node: &mut Node) -> eyre::Result { (None, Some(tag), None) => Some(GitRepoRev::Tag(tag.clone())), (None, None, Some(rev)) => Some(GitRepoRev::Rev(rev.clone())), other @ (_, _, _) => { - eyre::bail!("only one of `branch`, `tag`, and `rev` are allowed (got {other:?})") + eyre::bail!( + "only one of `branch`, `tag`, and `rev` are allowed (got {other:?})" + ) } }; NodeSource::GitBranch { diff --git a/libraries/core/src/descriptor/visualize.rs b/libraries/core/src/descriptor/visualize.rs index 890958f0..e4ad9535 100644 --- a/libraries/core/src/descriptor/visualize.rs +++ b/libraries/core/src/descriptor/visualize.rs @@ -1,5 +1,5 @@ use dora_message::{ - config::{format_duration, Input, InputMapping, UserInputMapping}, + config::{Input, InputMapping, UserInputMapping, format_duration}, descriptor::{CoreNodeKind, OperatorDefinition}, id::{DataId, NodeId}, }; diff --git a/libraries/core/src/lib.rs b/libraries/core/src/lib.rs index c45ec613..25084575 100644 --- a/libraries/core/src/lib.rs +++ b/libraries/core/src/lib.rs @@ -1,4 +1,4 @@ -use eyre::{bail, eyre, Context}; +use eyre::{Context, bail, eyre}; use std::{ env::consts::{DLL_PREFIX, DLL_SUFFIX}, ffi::OsStr, diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/action.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/action.rs index c2ebf56a..d381dbf9 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/action.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/action.rs @@ -61,7 +61,7 @@ mod test { use std::path::PathBuf; use super::*; - use crate::types::{primitives::*, sequences::*, MemberType}; + use crate::types::{MemberType, primitives::*, sequences::*}; fn parse_action_def(srv_name: &str) -> Result { let path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/constant.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/constant.rs index 08cffdbc..cc5affbe 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/constant.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/constant.rs @@ -1,4 +1,4 @@ -use anyhow::{ensure, Result}; +use anyhow::{Result, ensure}; use nom::{ bytes::complete::is_not, character::complete::{char, space0, space1}, @@ -8,7 +8,7 @@ use nom::{ }; use super::{error::RclMsgError, ident, literal, types}; -use crate::types::{primitives::PrimitiveType, Constant, ConstantType}; +use crate::types::{Constant, ConstantType, primitives::PrimitiveType}; fn validate_value(r#type: ConstantType, value: &str) -> Result> { match r#type { diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/ident.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/ident.rs index 76e59a8f..34c83709 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/ident.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/ident.rs @@ -1,10 +1,10 @@ use nom::{ + IResult, branch::alt, character::complete::{alphanumeric0, char, one_of}, combinator::{opt, recognize}, multi::{many1, separated_list0, separated_list1}, sequence::{pair, tuple}, - IResult, }; fn upperalpha(s: &str) -> IResult<&str, char> { diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/literal.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/literal.rs index 03ed7450..8e1dbdec 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/literal.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/literal.rs @@ -1,6 +1,7 @@ use std::convert::TryFrom; use nom::{ + IResult, branch::alt, bytes::complete::{is_not, tag, tag_no_case, take_while}, character::complete::{anychar, char, digit1, hex_digit1, none_of, oct_digit1, one_of, space0}, @@ -8,7 +9,6 @@ use nom::{ multi::{many0, separated_list1}, number::complete::recognize_float, sequence::{delimited, pair, tuple}, - IResult, }; use crate::types::primitives::{BasicType, GenericString}; diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/member.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/member.rs index 57c4a5db..c50a4ecf 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/member.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/member.rs @@ -1,4 +1,4 @@ -use anyhow::{ensure, Result}; +use anyhow::{Result, ensure}; use nom::{ bytes::complete::is_not, character::complete::{space0, space1}, @@ -8,7 +8,7 @@ use nom::{ }; use super::{error::RclMsgError, ident, literal, types}; -use crate::types::{primitives::NestableType, Member, MemberType}; +use crate::types::{Member, MemberType, primitives::NestableType}; fn nestable_type_default(nestable_type: NestableType, default: &str) -> Result> { match nestable_type { diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/package.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/package.rs index 8900a722..27e663a6 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/package.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/package.rs @@ -51,9 +51,9 @@ fn get_ros_msgs_each_package>(root_dir: P) -> Result continue; } else if visited_files.contains(&(package.clone(), file_name.clone())) { warn!( - "found two versions of package: {:?}, message: {:?}. will skip the one in: {:#?}", - package, file_name, path - ); + "found two versions of package: {:?}, message: {:?}. will skip the one in: {:#?}", + package, file_name, path + ); continue; } else { visited_files.push((package.clone(), file_name.clone())); diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/parser/types.rs b/libraries/extensions/ros2-bridge/msg-gen/src/parser/types.rs index 4561598c..3831392e 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/parser/types.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/parser/types.rs @@ -1,11 +1,11 @@ use anyhow::anyhow; use nom::{ + IResult, branch::alt, bytes::complete::tag, character::complete::{char, space1}, combinator::{eof, map, map_res, opt, peek}, sequence::{delimited, pair, preceded, tuple}, - IResult, }; use super::{ @@ -13,9 +13,9 @@ use super::{ literal::usize_literal, }; use crate::types::{ + ConstantType, MemberType, primitives::*, sequences::{Array, BoundedSequence, PrimitiveArray, Sequence}, - ConstantType, MemberType, }; pub fn parse_member_type(s: &str) -> IResult<&str, MemberType> { diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/action.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/action.rs index 6e2a9219..e0be2aa5 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/action.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/action.rs @@ -1,8 +1,8 @@ use heck::SnakeCase; -use quote::{format_ident, quote, ToTokens}; +use quote::{ToTokens, format_ident, quote}; use syn::Ident; -use super::{primitives::*, Member, Message, Service}; +use super::{Member, Message, Service, primitives::*}; /// An action definition #[derive(Debug, Clone)] diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/constant.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/constant.rs index 0db06617..47ff0dc6 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/constant.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/constant.rs @@ -1,4 +1,4 @@ -use quote::{quote, ToTokens}; +use quote::{ToTokens, quote}; use super::{ primitives::{BasicType, GenericUnboundedString, PrimitiveType}, diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/member.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/member.rs index c9069a8d..41e7dddf 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/member.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/member.rs @@ -1,4 +1,4 @@ -use quote::{quote, ToTokens}; +use quote::{ToTokens, quote}; use super::{primitives::*, sequences::*}; diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs index e7bda16e..b90a9dcb 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/message.rs @@ -1,7 +1,7 @@ -use quote::{format_ident, quote, ToTokens}; +use quote::{ToTokens, format_ident, quote}; use syn::Ident; -use super::{primitives::*, sequences::Array, ConstantType, MemberType}; +use super::{ConstantType, MemberType, primitives::*, sequences::Array}; /// A member of a structure #[derive(Debug, Clone)] diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs index 2e3fe5b1..a04577b3 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/package.rs @@ -1,5 +1,5 @@ use proc_macro2::Span; -use quote::{quote, ToTokens}; +use quote::{ToTokens, quote}; use syn::Ident; use crate::types::{Action, Message, Service}; diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/primitives.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/primitives.rs index c6da532c..4bacc1f5 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/primitives.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/primitives.rs @@ -1,7 +1,7 @@ use std::fmt; use proc_macro2::{Ident, Literal, Span}; -use quote::{format_ident, quote, ToTokens}; +use quote::{ToTokens, format_ident, quote}; macro_rules! define_enum_from { ($into_t:ty, $from_t:ty, $path:path) => { diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/sequences.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/sequences.rs index 2c4c1872..94a62792 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/sequences.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/sequences.rs @@ -1,4 +1,4 @@ -use quote::{quote, ToTokens}; +use quote::{ToTokens, quote}; use super::primitives::*; diff --git a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs index 18929578..ff4a478b 100644 --- a/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs +++ b/libraries/extensions/ros2-bridge/msg-gen/src/types/service.rs @@ -1,5 +1,5 @@ use heck::SnakeCase; -use quote::{format_ident, quote, ToTokens}; +use quote::{ToTokens, format_ident, quote}; use syn::Ident; use super::Message; diff --git a/libraries/extensions/ros2-bridge/python/src/lib.rs b/libraries/extensions/ros2-bridge/python/src/lib.rs index 14984245..9bc04add 100644 --- a/libraries/extensions/ros2-bridge/python/src/lib.rs +++ b/libraries/extensions/ros2-bridge/python/src/lib.rs @@ -7,19 +7,19 @@ use std::{ use ::dora_ros2_bridge::{ros2_client, rustdds}; use arrow::{ - array::{make_array, ArrayData}, + array::{ArrayData, make_array}, pyarrow::{FromPyArrow, ToPyArrow}, }; use dora_ros2_bridge_msg_gen::types::Message; -use eyre::{eyre, Context, ContextCompat, Result}; +use eyre::{Context, ContextCompat, Result, eyre}; use futures::{Stream, StreamExt}; use pyo3::{ + Bound, PyAny, PyObject, PyResult, Python, prelude::{pyclass, pymethods}, types::{PyAnyMethods, PyDict, PyList, PyModule, PyModuleMethods}, - Bound, PyAny, PyObject, PyResult, Python, }; use pyo3_special_method_derive::{Dict, Dir, Repr, Str}; -use typed::{deserialize::StructDeserializer, TypeInfo, TypedValue}; +use typed::{TypeInfo, TypedValue, deserialize::StructDeserializer}; pub mod qos; pub mod typed; @@ -176,12 +176,17 @@ impl Ros2Node { message_type: String, qos: qos::Ros2QosPolicies, ) -> eyre::Result { - let (namespace_name, message_name) = - match (message_type.split_once('/'), message_type.split_once("::")) { - (Some(msg), None) => msg, - (None, Some(msg)) => msg, - _ => eyre::bail!("Expected message type in the format `namespace/message` or `namespace::message`, such as `std_msgs/UInt8` but got: {}", message_type), - }; + let (namespace_name, message_name) = match ( + message_type.split_once('/'), + message_type.split_once("::"), + ) { + (Some(msg), None) => msg, + (None, Some(msg)) => msg, + _ => eyre::bail!( + "Expected message type in the format `namespace/message` or `namespace::message`, such as `std_msgs/UInt8` but got: {}", + message_type + ), + }; let message_type_name = ros2_client::MessageTypeName::new(namespace_name, message_name); let topic_name = ros2_client::Name::parse(name) diff --git a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs index 715431cc..a189b0bd 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/mod.rs @@ -1,6 +1,6 @@ -use super::{TypeInfo, DUMMY_STRUCT_NAME}; +use super::{DUMMY_STRUCT_NAME, TypeInfo}; use arrow::{ - array::{make_array, ArrayData, StructArray}, + array::{ArrayData, StructArray, make_array}, datatypes::Field, }; use core::fmt; diff --git a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/primitive.rs b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/primitive.rs index 1765546a..f6a52a81 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/primitive.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/primitive.rs @@ -1,7 +1,7 @@ use arrow::array::{ - ArrayData, BooleanBuilder, Float32Builder, Float64Builder, Int16Builder, Int32Builder, - Int64Builder, Int8Builder, NullArray, UInt16Builder, UInt32Builder, UInt64Builder, - UInt8Builder, + ArrayData, BooleanBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, + Int32Builder, Int64Builder, NullArray, UInt8Builder, UInt16Builder, UInt32Builder, + UInt64Builder, }; use core::fmt; use dora_ros2_bridge_msg_gen::types::primitives::BasicType; diff --git a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs index a5592196..16a665d4 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/deserialize/sequence.rs @@ -12,7 +12,7 @@ use std::{borrow::Cow, ops::Deref, sync::Arc}; use crate::typed::TypeInfo; -use super::{error, StructDeserializer}; +use super::{StructDeserializer, error}; pub struct SequenceDeserializer<'a> { pub item_type: &'a NestableType, diff --git a/libraries/extensions/ros2-bridge/python/src/typed/mod.rs b/libraries/extensions/ros2-bridge/python/src/typed/mod.rs index 7f05aa72..a6e0452c 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/mod.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/mod.rs @@ -27,10 +27,10 @@ const DUMMY_STRUCT_NAME: &str = "struct"; mod tests { use std::path::PathBuf; + use crate::Ros2Context; + use crate::typed::TypeInfo; use crate::typed::deserialize::StructDeserializer; use crate::typed::serialize; - use crate::typed::TypeInfo; - use crate::Ros2Context; use arrow::array::make_array; use arrow::pyarrow::FromPyArrow; @@ -47,8 +47,8 @@ mod tests { use pyo3::types::PyTuple; use pyo3::Python; - use serde::de::DeserializeSeed; use serde::Serialize; + use serde::de::DeserializeSeed; use serde_assert::Serializer; use serialize::TypedValue; diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs index 3b2bf889..84a98c7f 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/array.rs @@ -12,7 +12,7 @@ use serde::ser::SerializeTuple; use crate::typed::TypeInfo; -use super::{error, TypedValue}; +use super::{TypedValue, error}; /// Serialize an array with known size as tuple. pub struct ArraySerializeWrapper<'a> { diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/defaults.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/defaults.rs index 2cc1e6b7..1d0e021f 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/defaults.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/defaults.rs @@ -1,16 +1,16 @@ use arrow::{ array::{ - make_array, Array, ArrayData, BooleanArray, Float32Array, Float64Array, Int16Array, - Int32Array, Int64Array, Int8Array, ListArray, StringArray, StructArray, UInt16Array, - UInt32Array, UInt64Array, UInt8Array, + Array, ArrayData, BooleanArray, Float32Array, Float64Array, Int8Array, Int16Array, + Int32Array, Int64Array, ListArray, StringArray, StructArray, UInt8Array, UInt16Array, + UInt32Array, UInt64Array, make_array, }, buffer::{OffsetBuffer, ScalarBuffer}, compute::concat, datatypes::Field, }; use dora_ros2_bridge_msg_gen::types::{ - primitives::{BasicType, NestableType}, MemberType, Message, + primitives::{BasicType, NestableType}, }; use eyre::{Context, ContextCompat, Result}; use std::{collections::HashMap, sync::Arc, vec}; @@ -112,51 +112,73 @@ fn default_for_nestable_type( fn preset_default_for_basic_type(t: &NestableType, preset: &str) -> Result { Ok(match t { NestableType::BasicType(t) => match t { - BasicType::I8 => Int8Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::I8 => Int8Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::I16 => Int16Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::I16 => Int16Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::I32 => Int32Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::I32 => Int32Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::I64 => Int64Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::I64 => Int64Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::U8 => UInt8Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::U8 => UInt8Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::U16 => UInt16Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::U16 => UInt16Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::U32 => UInt32Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::U32 => UInt32Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::U64 => UInt64Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::U64 => UInt64Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::F32 => Float32Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::F32 => Float32Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), - BasicType::F64 => Float64Array::from(vec![preset - .parse::() - .context("Could not parse preset default value")?]) + BasicType::F64 => Float64Array::from(vec![ + preset + .parse::() + .context("Could not parse preset default value")?, + ]) .into(), BasicType::Char => StringArray::from(vec![preset]).into(), BasicType::Byte => UInt8Array::from(preset.as_bytes().to_owned()).into(), - BasicType::Bool => BooleanArray::from(vec![preset - .parse::() - .context("could not parse preset default value")?]) + BasicType::Bool => BooleanArray::from(vec![ + preset + .parse::() + .context("could not parse preset default value")?, + ]) .into(), }, NestableType::GenericString(_) => StringArray::from(vec![preset]).into(), diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs index 6bebe463..7ed922f9 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/mod.rs @@ -5,13 +5,13 @@ use arrow::{ error, }; use dora_ros2_bridge_msg_gen::types::{ - primitives::{GenericString, NestableType}, MemberType, + primitives::{GenericString, NestableType}, }; use eyre::Context; use serde::ser::SerializeTupleStruct; -use super::{TypeInfo, DUMMY_STRUCT_NAME}; +use super::{DUMMY_STRUCT_NAME, TypeInfo}; mod array; mod defaults; diff --git a/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs b/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs index d42d45fb..95a52c40 100644 --- a/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs +++ b/libraries/extensions/ros2-bridge/python/src/typed/serialize/sequence.rs @@ -9,7 +9,7 @@ use serde::ser::{SerializeSeq, SerializeTuple}; use crate::typed::TypeInfo; -use super::{error, TypedValue}; +use super::{TypedValue, error}; /// Serialize a variable-sized sequence. pub struct SequenceSerializeWrapper<'a> { diff --git a/libraries/extensions/ros2-bridge/src/_core/traits.rs b/libraries/extensions/ros2-bridge/src/_core/traits.rs index 92aeab72..c2d6b33c 100644 --- a/libraries/extensions/ros2-bridge/src/_core/traits.rs +++ b/libraries/extensions/ros2-bridge/src/_core/traits.rs @@ -8,11 +8,11 @@ pub trait MessageT: Default + Send + Sync { type RawRef: FFIFromRust; unsafe fn from_raw(from: &Self::Raw) -> Self { - unsafe{ from.to_rust() } + unsafe { from.to_rust() } } unsafe fn to_raw_ref(&self) -> Self::RawRef { - unsafe {Self::RawRef::from_rust(self)} + unsafe { Self::RawRef::from_rust(self) } } } @@ -84,9 +84,7 @@ where unsafe fn to_rust(&self) -> ::Target { self.iter() - .map(|v| unsafe { - v.to_rust() - }) + .map(|v| unsafe { v.to_rust() }) .collect::>() .try_into() .unwrap() diff --git a/libraries/extensions/telemetry/metrics/src/lib.rs b/libraries/extensions/telemetry/metrics/src/lib.rs index 933c79a6..88637b70 100644 --- a/libraries/extensions/telemetry/metrics/src/lib.rs +++ b/libraries/extensions/telemetry/metrics/src/lib.rs @@ -11,7 +11,7 @@ //! [`opentelemetry-rust`]: https://github.com/open-telemetry/opentelemetry-rust use eyre::Result; -use opentelemetry::{global, InstrumentationScope}; +use opentelemetry::{InstrumentationScope, global}; use opentelemetry_otlp::MetricExporter; use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_system_metrics::init_process_observer; diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index 6aa1ca49..f57bb756 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -8,7 +8,7 @@ use std::path::Path; use eyre::Context as EyreContext; use tracing::metadata::LevelFilter; use tracing_subscriber::{ - filter::FilterExt, prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Layer, + EnvFilter, Layer, filter::FilterExt, prelude::__tracing_subscriber_SubscriberExt, }; use tracing_subscriber::Registry; diff --git a/libraries/extensions/telemetry/tracing/src/telemetry.rs b/libraries/extensions/telemetry/tracing/src/telemetry.rs index 1d4e6772..8bfaf12c 100644 --- a/libraries/extensions/telemetry/tracing/src/telemetry.rs +++ b/libraries/extensions/telemetry/tracing/src/telemetry.rs @@ -1,7 +1,7 @@ use opentelemetry::propagation::Extractor; use opentelemetry::sdk::{propagation::TraceContextPropagator, trace as sdktrace}; use opentelemetry::trace::TraceError; -use opentelemetry::{global, Context}; +use opentelemetry::{Context, global}; use std::collections::HashMap; struct MetadataMap<'a>(HashMap<&'a str, &'a str>); diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index bf3d3a03..3bd76f2c 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -3,10 +3,10 @@ use std::{collections::BTreeMap, path::PathBuf, time::Duration}; use uuid::Uuid; use crate::{ + BuildId, SessionId, common::GitSource, descriptor::Descriptor, id::{NodeId, OperatorId}, - BuildId, SessionId, }; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index d48f1308..f6b4edb8 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -5,7 +5,7 @@ use aligned_vec::{AVec, ConstAlign}; use eyre::Context as _; use uuid::Uuid; -use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId}; +use crate::{BuildId, DataflowId, daemon_to_daemon::InterDaemonEvent, id::NodeId}; pub use log::Level as LogLevel; @@ -70,7 +70,10 @@ impl std::fmt::Display for NodeError { other => other.to_string().into(), }; if matches!(self.cause, NodeErrorCause::GraceDuration) { - write!(f, "node was killed by dora because it didn't react to a stop message in time ({signal_str})") + write!( + f, + "node was killed by dora because it didn't react to a stop message in time ({signal_str})" + ) } else { write!(f, "exited because of signal {signal_str}") } @@ -79,7 +82,7 @@ impl std::fmt::Display for NodeError { }?; match &self.cause { - NodeErrorCause::GraceDuration => {}, // handled above + NodeErrorCause::GraceDuration => {} // handled above NodeErrorCause::Cascading { caused_by_node } => write!( f, ". This error occurred because node `{caused_by_node}` exited before connecting to dora." @@ -90,7 +93,7 @@ impl std::fmt::Display for NodeError { let line: &str = "---------------------------------------------------------------------------------\n"; let stderr = stderr.trim_end(); write!(f, " with stderr output:\n{line}{stderr}\n{line}")? - }, + } } Ok(()) diff --git a/libraries/message/src/config.rs b/libraries/message/src/config.rs index 3b8bdc7b..d11d1632 100644 --- a/libraries/message/src/config.rs +++ b/libraries/message/src/config.rs @@ -180,7 +180,7 @@ impl<'de> Deserialize<'de> for InputMapping { other => { return Err(serde::de::Error::custom(format!( "timer unit must be either secs or millis (got `{other}`" - ))) + ))); } }; Self::Timer { interval } @@ -188,7 +188,7 @@ impl<'de> Deserialize<'de> for InputMapping { Some((other, _)) => { return Err(serde::de::Error::custom(format!( "unknown dora input `{other}`" - ))) + ))); } None => return Err(serde::de::Error::custom("dora input has invalid format")), }, diff --git a/libraries/message/src/coordinator_to_cli.rs b/libraries/message/src/coordinator_to_cli.rs index 02243468..77baf9a0 100644 --- a/libraries/message/src/coordinator_to_cli.rs +++ b/libraries/message/src/coordinator_to_cli.rs @@ -6,7 +6,7 @@ use std::{ use uuid::Uuid; pub use crate::common::{LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus}; -use crate::{common::DaemonId, id::NodeId, BuildId}; +use crate::{BuildId, common::DaemonId, id::NodeId}; #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub enum ControlRequestReply { diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 69da8923..02871f96 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -5,10 +5,10 @@ use std::{ }; use crate::{ + BuildId, DataflowId, SessionId, common::{DaemonId, GitSource}, descriptor::{Descriptor, ResolvedNode}, id::{NodeId, OperatorId}, - BuildId, DataflowId, SessionId, }; pub use crate::common::Timestamped; diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index ccafb0a5..83d11149 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -4,7 +4,7 @@ pub use crate::common::{ DataMessage, LogLevel, LogMessage, NodeError, NodeErrorCause, NodeExitStatus, Timestamped, }; use crate::{ - common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId, + BuildId, DataflowId, common::DaemonId, current_crate_version, id::NodeId, versions_compatible, }; #[derive(Debug, serde::Serialize, serde::Deserialize)] diff --git a/libraries/message/src/daemon_to_daemon.rs b/libraries/message/src/daemon_to_daemon.rs index b1452a9a..cb4b70e6 100644 --- a/libraries/message/src/daemon_to_daemon.rs +++ b/libraries/message/src/daemon_to_daemon.rs @@ -1,9 +1,9 @@ use aligned_vec::{AVec, ConstAlign}; use crate::{ + DataflowId, id::{DataId, NodeId}, metadata::Metadata, - DataflowId, }; #[derive(Debug, serde::Deserialize, serde::Serialize)] diff --git a/libraries/message/src/daemon_to_node.rs b/libraries/message/src/daemon_to_node.rs index 7d520f42..805223e5 100644 --- a/libraries/message/src/daemon_to_node.rs +++ b/libraries/message/src/daemon_to_node.rs @@ -1,11 +1,11 @@ use std::{net::SocketAddr, path::PathBuf}; use crate::{ + DataflowId, config::NodeRunConfig, descriptor::OperatorDefinition, id::{DataId, NodeId, OperatorId}, metadata::Metadata, - DataflowId, }; pub use crate::common::{DataMessage, DropToken, SharedMemoryId, Timestamped}; diff --git a/libraries/message/src/node_to_daemon.rs b/libraries/message/src/node_to_daemon.rs index bb5a0850..d8d22e7f 100644 --- a/libraries/message/src/node_to_daemon.rs +++ b/libraries/message/src/node_to_daemon.rs @@ -2,10 +2,10 @@ pub use crate::common::{ DataMessage, DropToken, LogLevel, LogMessage, SharedMemoryId, Timestamped, }; use crate::{ - current_crate_version, + DataflowId, current_crate_version, id::{DataId, NodeId}, metadata::Metadata, - versions_compatible, DataflowId, + versions_compatible, }; #[derive(Debug, serde::Serialize, serde::Deserialize)] diff --git a/libraries/shared-memory-server/src/bin/bench.rs b/libraries/shared-memory-server/src/bin/bench.rs index 1392b7e6..6cce4081 100644 --- a/libraries/shared-memory-server/src/bin/bench.rs +++ b/libraries/shared-memory-server/src/bin/bench.rs @@ -3,7 +3,7 @@ use std::{ time::{Duration, Instant}, }; -use eyre::{eyre, Context, ContextCompat}; +use eyre::{Context, ContextCompat, eyre}; use shared_memory_server::{ShmemClient, ShmemConf, ShmemServer}; fn main() -> eyre::Result<()> { diff --git a/libraries/shared-memory-server/src/channel.rs b/libraries/shared-memory-server/src/channel.rs index 8065d3c6..1d1d341f 100644 --- a/libraries/shared-memory-server/src/channel.rs +++ b/libraries/shared-memory-server/src/channel.rs @@ -1,4 +1,4 @@ -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use raw_sync_2::events::{Event, EventImpl, EventInit, EventState}; use serde::{Deserialize, Serialize}; use shared_memory_extended::Shmem; diff --git a/libraries/shared-memory-server/src/lib.rs b/libraries/shared-memory-server/src/lib.rs index 9a703a6e..9d5f72f8 100644 --- a/libraries/shared-memory-server/src/lib.rs +++ b/libraries/shared-memory-server/src/lib.rs @@ -1,7 +1,7 @@ #![allow(clippy::missing_safety_doc)] use self::channel::ShmemChannel; -use eyre::{eyre, Context}; +use eyre::{Context, eyre}; use serde::{Deserialize, Serialize}; pub use shared_memory_extended::{Shmem, ShmemConf}; use std::marker::PhantomData; @@ -18,9 +18,7 @@ pub struct ShmemServer { impl ShmemServer { pub unsafe fn new(memory: Shmem) -> eyre::Result { Ok(Self { - channel: unsafe { - ShmemChannel::new_server(memory)? - }, + channel: unsafe { ShmemChannel::new_server(memory)? }, reply_expected: false, phantom: PhantomData, }) @@ -59,9 +57,7 @@ pub struct ShmemClient { impl ShmemClient { pub unsafe fn new(memory: Shmem, timeout: Option) -> eyre::Result { Ok(Self { - channel: unsafe { - ShmemChannel::new_client(memory)? - }, + channel: unsafe { ShmemChannel::new_client(memory)? }, timeout, phantom: PhantomData, }) diff --git a/node-hub/dora-dav1d/src/lib.rs b/node-hub/dora-dav1d/src/lib.rs index 45e8477d..d8f283a4 100644 --- a/node-hub/dora-dav1d/src/lib.rs +++ b/node-hub/dora-dav1d/src/lib.rs @@ -1,7 +1,7 @@ use std::env::var; use dav1d::Settings; -use dora_node_api::{arrow::array::UInt8Array, DoraNode, Event, IntoArrow}; +use dora_node_api::{DoraNode, Event, IntoArrow, arrow::array::UInt8Array}; use eyre::{Context, Result}; use log::warn; @@ -198,9 +198,9 @@ pub fn lib_main() -> Result<()> { #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/node-hub/dora-kit-car/src/lib.rs b/node-hub/dora-kit-car/src/lib.rs index 2e883074..a77369ac 100644 --- a/node-hub/dora-kit-car/src/lib.rs +++ b/node-hub/dora-kit-car/src/lib.rs @@ -9,7 +9,7 @@ mod json_data; use std::{env, io::Write, time::Duration}; -use dora_node_api::{arrow::array::Float64Array, DoraNode, Event}; +use dora_node_api::{DoraNode, Event, arrow::array::Float64Array}; use error::Error; use eyre::Context; use serial::SerialPort; @@ -73,9 +73,9 @@ pub fn lib_main() -> eyre::Result<()> { #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/node-hub/dora-mistral-rs/src/main.rs b/node-hub/dora-mistral-rs/src/main.rs index 6bb623c9..d2da8034 100644 --- a/node-hub/dora-mistral-rs/src/main.rs +++ b/node-hub/dora-mistral-rs/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{dora_core::config::DataId, DoraNode, Event, IntoArrow}; +use dora_node_api::{DoraNode, Event, IntoArrow, dora_core::config::DataId}; use eyre::{Context, Report}; use mistralrs::{TextMessageRole, TextMessages, TextModelBuilder}; diff --git a/node-hub/dora-object-to-pose/src/lib.rs b/node-hub/dora-object-to-pose/src/lib.rs index eaec70ae..2adfa8ed 100644 --- a/node-hub/dora-object-to-pose/src/lib.rs +++ b/node-hub/dora-object-to-pose/src/lib.rs @@ -1,11 +1,11 @@ use core::f32; use dora_node_api::{ + DoraNode, Event, IntoArrow, Parameter, arrow::{ - array::{AsArray, UInt16Array, UInt8Array}, + array::{AsArray, UInt8Array, UInt16Array}, datatypes::{Float32Type, Int64Type}, }, dora_core::config::DataId, - DoraNode, Event, IntoArrow, Parameter, }; use eyre::Result; use std::collections::HashMap; @@ -88,7 +88,7 @@ pub fn lib_main() -> Result<()> { .unwrap(); let cos_theta = camera_pitch.cos(); // np.cos(np.deg2rad(180-38)) let sin_theta = camera_pitch.sin(); // np.sin(np.deg2rad(180-38)) - // (0.32489833, -0.25068134, 0.4761387) + // (0.32489833, -0.25068134, 0.4761387) while let Some(event) = events.recv() { if let Event::Input { id, metadata, data } = event { @@ -288,9 +288,9 @@ pub fn lib_main() -> Result<()> { #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/node-hub/dora-rav1e/src/lib.rs b/node-hub/dora-rav1e/src/lib.rs index 9ce9f1fd..a90e85cf 100644 --- a/node-hub/dora-rav1e/src/lib.rs +++ b/node-hub/dora-rav1e/src/lib.rs @@ -11,7 +11,7 @@ use std::env::var; use std::vec; use dora_node_api::arrow::array::AsArray; -use dora_node_api::arrow::datatypes::{UInt16Type, UInt8Type}; +use dora_node_api::arrow::datatypes::{UInt8Type, UInt16Type}; use dora_node_api::dora_core::config::DataId; use dora_node_api::{DoraNode, Event, IntoArrow, Metadata, MetadataParameters, Parameter}; use eyre::{Context as EyreContext, Result}; @@ -490,9 +490,9 @@ pub fn lib_main() -> Result<()> { #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/node-hub/dora-record/src/main.rs b/node-hub/dora-record/src/main.rs index f0ab232f..13823c6f 100644 --- a/node-hub/dora-record/src/main.rs +++ b/node-hub/dora-record/src/main.rs @@ -1,15 +1,14 @@ use chrono::{DateTime, Utc}; use dora_node_api::{ - self, + self, DoraNode, Event, Metadata, arrow::{ array::{ - make_array, Array, ListArray, StringArray, TimestampMillisecondArray, UInt64Array, + Array, ListArray, StringArray, TimestampMillisecondArray, UInt64Array, make_array, }, buffer::{OffsetBuffer, ScalarBuffer}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, }, - DoraNode, Event, Metadata, }; use dora_tracing::telemetry::deserialize_to_hashmap; use eyre::{Context, ContextCompat}; diff --git a/node-hub/dora-rerun/src/boxes2d.rs b/node-hub/dora-rerun/src/boxes2d.rs index ea1e34bd..7484ec02 100644 --- a/node-hub/dora-rerun/src/boxes2d.rs +++ b/node-hub/dora-rerun/src/boxes2d.rs @@ -1,4 +1,5 @@ use dora_node_api::{ + ArrowData, Metadata, Parameter, arrow::{ array::AsArray, datatypes::{ @@ -6,7 +7,6 @@ use dora_node_api::{ }, }, dora_core::config::DataId, - ArrowData, Metadata, Parameter, }; use eyre::{Context, ContextCompat, Result}; use rerun::{RecordingStream, Text}; @@ -75,7 +75,7 @@ pub fn update_boxes2d( _ => { return Err(eyre::eyre!( "Could not deserialize bbox as float32, float64, int32 or int64" - )) + )); } }; diff --git a/node-hub/dora-rerun/src/lib.rs b/node-hub/dora-rerun/src/lib.rs index 62105173..5571bf47 100644 --- a/node-hub/dora-rerun/src/lib.rs +++ b/node-hub/dora-rerun/src/lib.rs @@ -3,18 +3,19 @@ use std::{collections::HashMap, env::VarError, path::Path}; use dora_node_api::{ + DoraNode, Event, Parameter, arrow::{ - array::{Array, AsArray, Float64Array, StringArray, UInt16Array, UInt8Array}, + array::{Array, AsArray, Float64Array, StringArray, UInt8Array, UInt16Array}, datatypes::Float32Type, }, dora_core::config::DataId, - into_vec, DoraNode, Event, Parameter, + into_vec, }; -use eyre::{bail, eyre, Context, Result}; +use eyre::{Context, Result, bail, eyre}; use pinyin::ToPinyin; use rerun::{ - components::ImageBuffer, external::log::warn, ImageFormat, Points2D, Points3D, SpawnOptions, + ImageFormat, Points2D, Points3D, SpawnOptions, components::ImageBuffer, external::log::warn, }; pub mod boxes2d; pub mod series; @@ -257,7 +258,7 @@ pub fn lib_main() -> Result<()> { if let Some(z) = z { let z = z as f32 / 1000.0; // Convert to meters - // Skip points that have empty depth or is too far away + // Skip points that have empty depth or is too far away if z == 0. || z > 8.0 { points.push((0., 0., 0.)); return; @@ -463,9 +464,9 @@ pub fn lib_main() -> Result<()> { #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/node-hub/dora-rerun/src/series.rs b/node-hub/dora-rerun/src/series.rs index ea81d003..925df8ac 100644 --- a/node-hub/dora-rerun/src/series.rs +++ b/node-hub/dora-rerun/src/series.rs @@ -1,4 +1,4 @@ -use dora_node_api::{dora_core::config::DataId, into_vec, ArrowData}; +use dora_node_api::{ArrowData, dora_core::config::DataId, into_vec}; use eyre::{Context, Result}; use rerun::RecordingStream; diff --git a/node-hub/dora-rerun/src/urdf.rs b/node-hub/dora-rerun/src/urdf.rs index 4bc68525..365b0bf2 100644 --- a/node-hub/dora-rerun/src/urdf.rs +++ b/node-hub/dora-rerun/src/urdf.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, path::PathBuf}; use eyre::{Context, ContextCompat, Result}; -use k::{nalgebra::Quaternion, Chain, Translation3, UnitQuaternion}; +use k::{Chain, Translation3, UnitQuaternion, nalgebra::Quaternion}; use rerun::{RecordingStream, Vec3D}; pub struct MyIntersperse { iterator: I, @@ -89,7 +89,9 @@ pub fn init_urdf(rec: &RecordingStream) -> Result>> { if PathBuf::from(&urdf_path).file_name() != PathBuf::from(&path).file_name() { return Err(eyre::eyre!( - "URDF filename should be the same as the environment variable name and replacing the dot with a dash. Got {:#?} instead of {}", urdf_path, path + "URDF filename should be the same as the environment variable name and replacing the dot with a dash. Got {:#?} instead of {}", + urdf_path, + path )); } rec.log_file_from_path(&urdf_path, None, true) diff --git a/node-hub/dora-rustypot/src/lib.rs b/node-hub/dora-rustypot/src/lib.rs index 85f21152..292c5f5f 100644 --- a/node-hub/dora-rustypot/src/lib.rs +++ b/node-hub/dora-rustypot/src/lib.rs @@ -1,5 +1,5 @@ use dora_node_api::dora_core::config::DataId; -use dora_node_api::{into_vec, DoraNode, Event, IntoArrow, Parameter}; +use dora_node_api::{DoraNode, Event, IntoArrow, Parameter, into_vec}; use eyre::{Context, Result}; use rustypot::servo::feetech::sts3215::Sts3215Controller; use std::collections::BTreeMap; @@ -80,9 +80,9 @@ pub fn lib_main() -> Result<()> { #[cfg(feature = "python")] use pyo3::{ - pyfunction, pymodule, + Bound, PyResult, Python, pyfunction, pymodule, types::{PyModule, PyModuleMethods}, - wrap_pyfunction, Bound, PyResult, Python, + wrap_pyfunction, }; #[cfg(feature = "python")] diff --git a/node-hub/openai-proxy-server/src/main.rs b/node-hub/openai-proxy-server/src/main.rs index 7689dad4..89e54bbc 100644 --- a/node-hub/openai-proxy-server/src/main.rs +++ b/node-hub/openai-proxy-server/src/main.rs @@ -1,22 +1,21 @@ use dora_node_api::{ - self, + self, DoraNode, Event, arrow::array::{AsArray, StringArray}, dora_core::config::DataId, merged::MergeExternalSend, - DoraNode, Event, }; use eyre::{Context, ContextCompat}; use futures::{ - channel::oneshot::{self, Canceled}, TryStreamExt, + channel::oneshot::{self, Canceled}, }; use hyper::{ - body::{to_bytes, Body, HttpBody}, + Request, Response, Server, StatusCode, + body::{Body, HttpBody, to_bytes}, header, server::conn::AddrStream, service::{make_service_fn, service_fn}, - Request, Response, Server, StatusCode, }; use message::{ ChatCompletionObject, ChatCompletionObjectChoice, ChatCompletionObjectMessage, @@ -142,7 +141,9 @@ async fn main() -> eyre::Result<()> { }; if reply_channel.send(Ok(data)).is_err() { - tracing::warn!("failed to send chat completion reply because channel closed early"); + tracing::warn!( + "failed to send chat completion reply because channel closed early" + ); } } _ => eyre::bail!("unexpected input id: {}", id), diff --git a/node-hub/openai-proxy-server/src/message.rs b/node-hub/openai-proxy-server/src/message.rs index 4c9eb99f..d3734209 100644 --- a/node-hub/openai-proxy-server/src/message.rs +++ b/node-hub/openai-proxy-server/src/message.rs @@ -3,8 +3,8 @@ use std::collections::HashMap; use indexmap::IndexMap; use serde::{ - de::{self, MapAccess, Visitor}, Deserialize, Deserializer, Serialize, + de::{self, MapAccess, Visitor}, }; use serde_json::Value; diff --git a/node-hub/terminal-print/src/main.rs b/node-hub/terminal-print/src/main.rs index 49f1a94a..26ceffbb 100644 --- a/node-hub/terminal-print/src/main.rs +++ b/node-hub/terminal-print/src/main.rs @@ -1,4 +1,4 @@ -use dora_node_api::{self, dora_core::config::NodeId, DoraNode, Event}; +use dora_node_api::{self, DoraNode, Event, dora_core::config::NodeId}; use eyre::Context; fn main() -> eyre::Result<()> { diff --git a/tests/queue_size_latest_data_rust/receive_data/src/main.rs b/tests/queue_size_latest_data_rust/receive_data/src/main.rs index 52b21411..8008c94d 100644 --- a/tests/queue_size_latest_data_rust/receive_data/src/main.rs +++ b/tests/queue_size_latest_data_rust/receive_data/src/main.rs @@ -1,12 +1,11 @@ use std::{thread::sleep, time::Duration}; use dora_node_api::{ - self, + self, DoraNode, arrow::{ array::{AsArray, PrimitiveArray}, datatypes::UInt64Type, }, - DoraNode, }; fn main() -> eyre::Result<()> {