Philipp Oppermann 7 months ago
parent
commit
94b1d02ca7
Failed to extract signature
10 changed files with 40 additions and 64 deletions
  1. +1
    -1
      apis/python/node/src/lib.rs
  2. +18
    -35
      binaries/cli/src/command/build/local.rs
  3. +9
    -4
      binaries/cli/src/command/mod.rs
  4. +5
    -5
      binaries/cli/src/lib.rs
  5. +2
    -3
      binaries/coordinator/src/lib.rs
  6. +1
    -6
      binaries/coordinator/src/listener.rs
  7. +3
    -4
      binaries/daemon/src/lib.rs
  8. +1
    -1
      libraries/message/src/common.rs
  9. +0
    -3
      libraries/message/src/coordinator_to_daemon.rs
  10. +0
    -2
      libraries/message/src/daemon_to_coordinator.rs

+ 1
- 1
apis/python/node/src/lib.rs View File

@@ -381,7 +381,7 @@ pub fn resolve_dataflow(dataflow: String) -> eyre::Result<PathBuf> {
#[pyfunction]
#[pyo3(signature = (dataflow_path, uv=None))]
pub fn run(dataflow_path: String, uv: Option<bool>) -> eyre::Result<()> {
dora_cli::run::run(dataflow_path, uv.unwrap_or_default())
dora_cli::command::run(dataflow_path, uv.unwrap_or_default())
}

#[pymodule]


+ 18
- 35
binaries/cli/src/command/build/local.rs View File

@@ -1,20 +1,11 @@
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
path::PathBuf,
};
use std::{collections::BTreeMap, path::PathBuf};

use dora_core::{
build::{BuildInfo, BuildLogger, Builder, GitManager},
descriptor::{self, Descriptor, NodeExt, ResolvedNode, SINGLE_OPERATOR_DEFAULT_ID},
};
use dora_message::{
common::GitSource,
id::{NodeId, OperatorId},
BuildId, SessionId,
descriptor::{Descriptor, DescriptorExt},
};
use dora_message::{common::GitSource, id::NodeId};
use eyre::Context;
use futures::executor::block_on;

use crate::session::DataflowSession;

@@ -28,37 +19,35 @@ pub fn build_dataflow_locally(
let runtime = tokio::runtime::Runtime::new()?;

runtime.block_on(build_dataflow(
dataflow_session.session_id,
working_dir,
nodes,
dataflow,
git_sources,
prev_git_sources,
local_nodes,
dataflow_session,
working_dir,
uv,
))
}

async fn build_dataflow(
session_id: SessionId,
dataflow: Descriptor,
git_sources: &BTreeMap<NodeId, GitSource>,
dataflow_session: &DataflowSession,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
local_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<BuildInfo> {
let builder = Builder {
session_id,
session_id: dataflow_session.session_id,
base_working_dir,
uv,
};
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let mut git_manager = GitManager::default();
let prev_git_sources = &dataflow_session.git_sources;

let mut tasks = Vec::new();

// build nodes
for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
for node in nodes.into_values() {
let node_id = node.id.clone();
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();
@@ -108,18 +97,12 @@ struct LocalBuildLogger;
impl BuildLogger for LocalBuildLogger {
type Clone = Self;

fn log_message(
&mut self,
level: log::Level,
message: impl Into<String> + Send,
) -> impl Future<Output = ()> + Send {
async move {
let message: String = message.into();
println!("{level}: \t{message}");
}
async fn log_message(&mut self, level: log::Level, message: impl Into<String> + Send) {
let message: String = message.into();
println!("{level}: \t{message}");
}

fn try_clone(&self) -> impl Future<Output = eyre::Result<Self::Clone>> + Send {
async { Ok(LocalBuildLogger) }
async fn try_clone(&self) -> eyre::Result<Self::Clone> {
Ok(LocalBuildLogger)
}
}

+ 9
- 4
binaries/cli/src/command/mod.rs View File

@@ -1,3 +1,8 @@
pub use build::build;
pub use logs::logs;
pub use run::run;
pub use start::start;

use std::path::{Path, PathBuf};

use communication_layer_request_reply::TcpRequestReplyConnection;
@@ -5,11 +10,11 @@ use dora_core::descriptor::Descriptor;
use dora_message::{cli_to_coordinator::ControlRequest, coordinator_to_cli::ControlRequestReply};
use eyre::{bail, Context, ContextCompat};

pub mod build;
mod build;
pub mod check;
pub mod logs;
pub mod run;
pub mod start;
mod logs;
mod run;
mod start;
pub mod up;

fn local_working_dir(


+ 5
- 5
binaries/cli/src/lib.rs View File

@@ -372,12 +372,12 @@ fn run_cli(args: Args) -> eyre::Result<()> {
coordinator_port,
uv,
local,
} => command::build::build(dataflow, coordinator_addr, coordinator_port, uv, local)?,
} => command::build(dataflow, coordinator_addr, coordinator_port, uv, local)?,
Command::New {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow, uv } => command::run::run(dataflow, uv)?,
Command::Run { dataflow, uv } => command::run(dataflow, uv)?,
Command::Up { config } => {
command::up::up(config.as_deref())?;
}
@@ -394,7 +394,7 @@ fn run_cli(args: Args) -> eyre::Result<()> {
if let Some(dataflow) = dataflow {
let uuid = Uuid::parse_str(&dataflow).ok();
let name = if uuid.is_some() { None } else { Some(dataflow) };
command::logs::logs(&mut *session, uuid, name, node)?
command::logs(&mut *session, uuid, name, node)?
} else {
let active: Vec<dora_message::coordinator_to_cli::DataflowIdAndName> =
list.get_active();
@@ -403,7 +403,7 @@ fn run_cli(args: Args) -> eyre::Result<()> {
[uuid] => uuid.clone(),
_ => inquire::Select::new("Choose dataflow to show logs:", active).prompt()?,
};
command::logs::logs(&mut *session, Some(uuid.uuid), None, node)?
command::logs(&mut *session, Some(uuid.uuid), None, node)?
}
}
Command::Start {
@@ -417,7 +417,7 @@ fn run_cli(args: Args) -> eyre::Result<()> {
uv,
} => {
let coordinator_socket = (coordinator_addr, coordinator_port).into();
command::start::start(
command::start(
dataflow,
name,
coordinator_socket,


+ 2
- 3
binaries/coordinator/src/lib.rs View File

@@ -827,7 +827,6 @@ async fn start_inner(
}
Event::DataflowBuildResult {
build_id,
session_id,
daemon_id,
result,
} => match running_builds.get_mut(&build_id) {
@@ -1247,6 +1246,7 @@ async fn retrieve_logs(
reply_logs.map_err(|err| eyre!(err))
}

#[allow(clippy::too_many_arguments)]
#[tracing::instrument(skip(daemon_connections, clock))]
async fn build_dataflow(
build_id: BuildId,
@@ -1283,7 +1283,6 @@ async fn build_dataflow(
build_id,
session_id,
local_working_dir: local_working_dir.clone(),
nodes: nodes.clone(),
git_sources: git_sources_by_daemon
.remove(&machine.as_ref())
.unwrap_or_default(),
@@ -1353,6 +1352,7 @@ async fn build_dataflow_on_machine(
Ok(daemon_id)
}

#[allow(clippy::too_many_arguments)]
async fn start_dataflow(
build_id: Option<BuildId>,
session_id: SessionId,
@@ -1468,7 +1468,6 @@ pub enum Event {
},
DataflowBuildResult {
build_id: BuildId,
session_id: SessionId,
daemon_id: DaemonId,
result: eyre::Result<()>,
},


+ 1
- 6
binaries/coordinator/src/listener.rs View File

@@ -112,14 +112,9 @@ pub async fn handle_connection(
break;
}
}
DaemonEvent::BuildResult {
build_id,
session_id,
result,
} => {
DaemonEvent::BuildResult { build_id, result } => {
let event = Event::DataflowBuildResult {
build_id,
session_id,
daemon_id,
result: result.map_err(|err| eyre::eyre!(err)),
};


+ 3
- 4
binaries/daemon/src/lib.rs View File

@@ -465,7 +465,6 @@ impl Daemon {
daemon_id: self.daemon_id.clone(),
event: DaemonEvent::BuildResult {
build_id,
session_id,
result: result.map_err(|err| format!("{err:?}")),
},
},
@@ -534,7 +533,6 @@ impl Daemon {
build_id,
session_id,
local_working_dir,
nodes,
git_sources,
prev_git_sources,
dataflow_descriptor,
@@ -552,7 +550,6 @@ impl Daemon {
build_id,
session_id,
base_working_dir,
nodes,
git_sources,
prev_git_sources,
dataflow_descriptor,
@@ -883,12 +880,12 @@ impl Daemon {
}
}

#[allow(clippy::too_many_arguments)]
async fn build_dataflow(
&mut self,
build_id: BuildId,
session_id: SessionId,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
dataflow_descriptor: Descriptor,
@@ -900,6 +897,7 @@ impl Daemon {
base_working_dir,
uv,
};
let nodes = dataflow_descriptor.resolve_aliases_and_set_defaults()?;

let mut tasks = Vec::new();

@@ -979,6 +977,7 @@ impl Daemon {
Ok(task)
}

#[allow(clippy::too_many_arguments)]
async fn spawn_dataflow(
&mut self,
build_id: Option<BuildId>,


+ 1
- 1
libraries/message/src/common.rs View File

@@ -5,7 +5,7 @@ use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId, SessionId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};

pub use log::Level as LogLevel;



+ 0
- 3
libraries/message/src/coordinator_to_daemon.rs View File

@@ -4,8 +4,6 @@ use std::{
time::Duration,
};

use uuid::Uuid;

use crate::{
common::{DaemonId, GitSource},
descriptor::{Descriptor, ResolvedNode},
@@ -70,7 +68,6 @@ pub struct BuildDataflowNodes {
/// Note that nodes with git sources still use a subdirectory of
/// the base working dir.
pub local_working_dir: Option<PathBuf>,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub prev_git_sources: BTreeMap<NodeId, GitSource>,
pub dataflow_descriptor: Descriptor,


+ 0
- 2
libraries/message/src/daemon_to_coordinator.rs View File

@@ -5,7 +5,6 @@ pub use crate::common::{
};
use crate::{
common::DaemonId, current_crate_version, id::NodeId, versions_compatible, BuildId, DataflowId,
SessionId,
};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
@@ -51,7 +50,6 @@ impl DaemonRegisterRequest {
pub enum DaemonEvent {
BuildResult {
build_id: BuildId,
session_id: SessionId,
result: Result<(), String>,
},
SpawnResult {


Loading…
Cancel
Save