diff --git a/apis/python/node/src/lib.rs b/apis/python/node/src/lib.rs index c6165f17..2d3634cd 100644 --- a/apis/python/node/src/lib.rs +++ b/apis/python/node/src/lib.rs @@ -381,7 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result { #[pyfunction] #[pyo3(signature = (dataflow_path, uv=None))] pub fn run(dataflow_path: String, uv: Option) -> eyre::Result<()> { - dora_cli::run::run(dataflow_path, uv.unwrap_or_default()) + dora_cli::command::run(dataflow_path, uv.unwrap_or_default()) } #[pymodule] diff --git a/binaries/cli/src/command/build/local.rs b/binaries/cli/src/command/build/local.rs index 1ae19ed4..66a182a5 100644 --- a/binaries/cli/src/command/build/local.rs +++ b/binaries/cli/src/command/build/local.rs @@ -1,20 +1,11 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - future::Future, - path::PathBuf, -}; +use std::{collections::BTreeMap, path::PathBuf}; use dora_core::{ build::{BuildInfo, BuildLogger, Builder, GitManager}, - descriptor::{self, Descriptor, NodeExt, ResolvedNode, SINGLE_OPERATOR_DEFAULT_ID}, -}; -use dora_message::{ - common::GitSource, - id::{NodeId, OperatorId}, - BuildId, SessionId, + descriptor::{Descriptor, DescriptorExt}, }; +use dora_message::{common::GitSource, id::NodeId}; use eyre::Context; -use futures::executor::block_on; use crate::session::DataflowSession; @@ -28,37 +19,35 @@ pub fn build_dataflow_locally( let runtime = tokio::runtime::Runtime::new()?; runtime.block_on(build_dataflow( - dataflow_session.session_id, - working_dir, - nodes, + dataflow, git_sources, - prev_git_sources, - local_nodes, + dataflow_session, + working_dir, uv, )) } async fn build_dataflow( - session_id: SessionId, + dataflow: Descriptor, + git_sources: &BTreeMap, + dataflow_session: &DataflowSession, base_working_dir: PathBuf, - nodes: BTreeMap, - git_sources: BTreeMap, - prev_git_sources: BTreeMap, - local_nodes: BTreeSet, uv: bool, ) -> eyre::Result { let builder = Builder { - session_id, + session_id: dataflow_session.session_id, base_working_dir, uv, }; + let nodes = dataflow.resolve_aliases_and_set_defaults()?; let mut git_manager = GitManager::default(); + let prev_git_sources = &dataflow_session.git_sources; let mut tasks = Vec::new(); // build nodes - for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) { + for node in nodes.into_values() { let node_id = node.id.clone(); let git_source = git_sources.get(&node_id).cloned(); let prev_git_source = prev_git_sources.get(&node_id).cloned(); @@ -108,18 +97,12 @@ struct LocalBuildLogger; impl BuildLogger for LocalBuildLogger { type Clone = Self; - fn log_message( - &mut self, - level: log::Level, - message: impl Into + Send, - ) -> impl Future + Send { - async move { - let message: String = message.into(); - println!("{level}: \t{message}"); - } + async fn log_message(&mut self, level: log::Level, message: impl Into + Send) { + let message: String = message.into(); + println!("{level}: \t{message}"); } - fn try_clone(&self) -> impl Future> + Send { - async { Ok(LocalBuildLogger) } + async fn try_clone(&self) -> eyre::Result { + Ok(LocalBuildLogger) } } diff --git a/binaries/cli/src/command/mod.rs b/binaries/cli/src/command/mod.rs index fb80c1f6..77654440 100644 --- a/binaries/cli/src/command/mod.rs +++ b/binaries/cli/src/command/mod.rs @@ -1,3 +1,8 @@ +pub use build::build; +pub use logs::logs; +pub use run::run; +pub use start::start; + use std::path::{Path, PathBuf}; use communication_layer_request_reply::TcpRequestReplyConnection; @@ -5,11 +10,11 @@ use dora_core::descriptor::Descriptor; use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply}; use eyre::{bail, Context, ContextCompat}; -pub mod build; +mod build; pub mod check; -pub mod logs; -pub mod run; -pub mod start; +mod logs; +mod run; +mod start; pub mod up; fn local_working_dir( diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index ec28062e..aeaca232 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -372,12 +372,12 @@ fn run_cli(args: Args) -> eyre::Result<()> { coordinator_port, uv, local, - } => command::build::build(dataflow, coordinator_addr, coordinator_port, uv, local)?, + } => command::build(dataflow, coordinator_addr, coordinator_port, uv, local)?, Command::New { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Run { dataflow, uv } => command::run::run(dataflow, uv)?, + Command::Run { dataflow, uv } => command::run(dataflow, uv)?, Command::Up { config } => { command::up::up(config.as_deref())?; } @@ -394,7 +394,7 @@ fn run_cli(args: Args) -> eyre::Result<()> { if let Some(dataflow) = dataflow { let uuid = Uuid::parse_str(&dataflow).ok(); let name = if uuid.is_some() { None } else { Some(dataflow) }; - command::logs::logs(&mut *session, uuid, name, node)? + command::logs(&mut *session, uuid, name, node)? } else { let active: Vec = list.get_active(); @@ -403,7 +403,7 @@ fn run_cli(args: Args) -> eyre::Result<()> { [uuid] => uuid.clone(), _ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?, }; - command::logs::logs(&mut *session, Some(uuid.uuid), None, node)? + command::logs(&mut *session, Some(uuid.uuid), None, node)? } } Command::Start { @@ -417,7 +417,7 @@ fn run_cli(args: Args) -> eyre::Result<()> { uv, } => { let coordinator_socket = (coordinator_addr, coordinator_port).into(); - command::start::start( + command::start( dataflow, name, coordinator_socket, diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index e31b97a6..7f5f7d3f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -827,7 +827,6 @@ async fn start_inner( } Event::DataflowBuildResult { build_id, - session_id, daemon_id, result, } => match running_builds.get_mut(&build_id) { @@ -1247,6 +1246,7 @@ async fn retrieve_logs( reply_logs.map_err(|err| eyre!(err)) } +#[allow(clippy::too_many_arguments)] #[tracing::instrument(skip(daemon_connections, clock))] async fn build_dataflow( build_id: BuildId, @@ -1283,7 +1283,6 @@ async fn build_dataflow( build_id, session_id, local_working_dir: local_working_dir.clone(), - nodes: nodes.clone(), git_sources: git_sources_by_daemon .remove(&machine.as_ref()) .unwrap_or_default(), @@ -1353,6 +1352,7 @@ async fn build_dataflow_on_machine( Ok(daemon_id) } +#[allow(clippy::too_many_arguments)] async fn start_dataflow( build_id: Option, session_id: SessionId, @@ -1468,7 +1468,6 @@ pub enum Event { }, DataflowBuildResult { build_id: BuildId, - session_id: SessionId, daemon_id: DaemonId, result: eyre::Result<()>, }, diff --git a/binaries/coordinator/src/listener.rs b/binaries/coordinator/src/listener.rs index 41516aff..ab7e3b9d 100644 --- a/binaries/coordinator/src/listener.rs +++ b/binaries/coordinator/src/listener.rs @@ -112,14 +112,9 @@ pub async fn handle_connection( break; } } - DaemonEvent::BuildResult { - build_id, - session_id, - result, - } => { + DaemonEvent::BuildResult { build_id, result } => { let event = Event::DataflowBuildResult { build_id, - session_id, daemon_id, result: result.map_err(|err| eyre::eyre!(err)), }; diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index db44f102..ba724680 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -465,7 +465,6 @@ impl Daemon { daemon_id: self.daemon_id.clone(), event: DaemonEvent::BuildResult { build_id, - session_id, result: result.map_err(|err| format!("{err:?}")), }, }, @@ -534,7 +533,6 @@ impl Daemon { build_id, session_id, local_working_dir, - nodes, git_sources, prev_git_sources, dataflow_descriptor, @@ -552,7 +550,6 @@ impl Daemon { build_id, session_id, base_working_dir, - nodes, git_sources, prev_git_sources, dataflow_descriptor, @@ -883,12 +880,12 @@ impl Daemon { } } + #[allow(clippy::too_many_arguments)] async fn build_dataflow( &mut self, build_id: BuildId, session_id: SessionId, base_working_dir: PathBuf, - nodes: BTreeMap, git_sources: BTreeMap, prev_git_sources: BTreeMap, dataflow_descriptor: Descriptor, @@ -900,6 +897,7 @@ impl Daemon { base_working_dir, uv, }; + let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?; let mut tasks = Vec::new(); @@ -979,6 +977,7 @@ impl Daemon { Ok(task) } + #[allow(clippy::too_many_arguments)] async fn spawn_dataflow( &mut self, build_id: Option, diff --git a/libraries/message/src/common.rs b/libraries/message/src/common.rs index 70850212..83591811 100644 --- a/libraries/message/src/common.rs +++ b/libraries/message/src/common.rs @@ -5,7 +5,7 @@ use aligned_vec::{AVec, ConstAlign}; use eyre::Context as _; use uuid::Uuid; -use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId, SessionId}; +use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId}; pub use log::Level as LogLevel; diff --git a/libraries/message/src/coordinator_to_daemon.rs b/libraries/message/src/coordinator_to_daemon.rs index 7a2b0e8d..69da8923 100644 --- a/libraries/message/src/coordinator_to_daemon.rs +++ b/libraries/message/src/coordinator_to_daemon.rs @@ -4,8 +4,6 @@ use std::{ time::Duration, }; -use uuid::Uuid; - use crate::{ common::{DaemonId, GitSource}, descriptor::{Descriptor, ResolvedNode}, @@ -70,7 +68,6 @@ pub struct BuildDataflowNodes { /// Note that nodes with git sources still use a subdirectory of /// the base working dir. pub local_working_dir: Option, - pub nodes: BTreeMap, pub git_sources: BTreeMap, pub prev_git_sources: BTreeMap, pub dataflow_descriptor: Descriptor, diff --git a/libraries/message/src/daemon_to_coordinator.rs b/libraries/message/src/daemon_to_coordinator.rs index 5ccdb174..ccafb0a5 100644 --- a/libraries/message/src/daemon_to_coordinator.rs +++ b/libraries/message/src/daemon_to_coordinator.rs @@ -5,7 +5,6 @@ pub use crate::common::{ }; use crate::{ common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId, - SessionId, }; #[derive(Debug, serde::Serialize, serde::Deserialize)] @@ -51,7 +50,6 @@ impl DaemonRegisterRequest { pub enum DaemonEvent { BuildResult { build_id: BuildId, - session_id: SessionId, result: Result<(), String>, }, SpawnResult {