|
|
|
@@ -5,7 +5,7 @@ use communication_layer_request_reply::{ |
|
|
|
}; |
|
|
|
use dora_coordinator::Event; |
|
|
|
use dora_core::{ |
|
|
|
descriptor::{source_is_url, Descriptor, DescriptorExt}, |
|
|
|
descriptor::{source_is_url, CoreNodeKind, CustomNode, Descriptor, DescriptorExt}, |
|
|
|
topics::{ |
|
|
|
DORA_COORDINATOR_PORT_CONTROL_DEFAULT, DORA_COORDINATOR_PORT_DEFAULT, |
|
|
|
DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, |
|
|
|
@@ -17,6 +17,7 @@ use dora_message::{ |
|
|
|
cli_to_coordinator::ControlRequest, |
|
|
|
common::LogMessage, |
|
|
|
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, |
|
|
|
descriptor::NodeSource, |
|
|
|
BuildId, |
|
|
|
}; |
|
|
|
#[cfg(feature = "tracing")] |
|
|
|
@@ -26,6 +27,7 @@ use duration_str::parse; |
|
|
|
use eyre::{bail, Context}; |
|
|
|
use formatting::FormatDataflowError; |
|
|
|
use std::{ |
|
|
|
collections::BTreeMap, |
|
|
|
env::current_dir, |
|
|
|
io::Write, |
|
|
|
net::{SocketAddr, TcpStream}, |
|
|
|
@@ -43,6 +45,7 @@ use uuid::Uuid; |
|
|
|
mod attach; |
|
|
|
mod check; |
|
|
|
mod formatting; |
|
|
|
mod git; |
|
|
|
mod graph; |
|
|
|
mod logs; |
|
|
|
mod session; |
|
|
|
@@ -647,17 +650,33 @@ fn build_dataflow( |
|
|
|
coordinator_socket: SocketAddr, |
|
|
|
uv: bool, |
|
|
|
) -> eyre::Result<(Box<TcpRequestReplyConnection>, BuildId)> { |
|
|
|
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; |
|
|
|
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; |
|
|
|
let dataflow_descriptor = |
|
|
|
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; |
|
|
|
let dataflow_session = |
|
|
|
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?; |
|
|
|
Descriptor::blocking_read(&dataflow_path).wrap_err("Failed to read yaml dataflow")?; |
|
|
|
let mut dataflow_session = |
|
|
|
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; |
|
|
|
|
|
|
|
let mut git_sources = BTreeMap::new(); |
|
|
|
let resolved_nodes = dataflow_descriptor |
|
|
|
.resolve_aliases_and_set_defaults() |
|
|
|
.context("failed to resolve nodes")?; |
|
|
|
for (node_id, node) in resolved_nodes { |
|
|
|
if let CoreNodeKind::Custom(CustomNode { |
|
|
|
source: NodeSource::GitBranch { repo, rev }, |
|
|
|
.. |
|
|
|
}) = node.kind |
|
|
|
{ |
|
|
|
let source = git::fetch_commit_hash(repo, rev) |
|
|
|
.with_context(|| format!("failed to find commit hash for `{node_id}`"))?; |
|
|
|
git_sources.insert(node_id, source); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine |
|
|
|
let local_working_dir = if cli_and_daemon_on_same_machine { |
|
|
|
// use dataflow dir as base working dir |
|
|
|
Some( |
|
|
|
dunce::canonicalize(&dataflow) |
|
|
|
dunce::canonicalize(&dataflow_path) |
|
|
|
.context("failed to canonicalize dataflow path")? |
|
|
|
.parent() |
|
|
|
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? |
|
|
|
@@ -676,7 +695,7 @@ fn build_dataflow( |
|
|
|
&serde_json::to_vec(&ControlRequest::Build { |
|
|
|
session_id: dataflow_session.session_id, |
|
|
|
dataflow, |
|
|
|
git_sources, |
|
|
|
git_sources: git_sources.clone(), |
|
|
|
prev_git_sources: dataflow_session.git_sources.clone(), |
|
|
|
local_working_dir, |
|
|
|
uv, |
|
|
|
@@ -690,6 +709,10 @@ fn build_dataflow( |
|
|
|
match result { |
|
|
|
ControlRequestReply::DataflowBuildTriggered { build_id } => { |
|
|
|
eprintln!("dataflow build triggered: {build_id}"); |
|
|
|
dataflow_session.git_sources = git_sources; |
|
|
|
dataflow_session |
|
|
|
.write_out_for_dataflow(&dataflow_path) |
|
|
|
.context("failed to write out dataflow session file")?; |
|
|
|
build_id |
|
|
|
} |
|
|
|
ControlRequestReply::Error(err) => bail!("{err}"), |
|
|
|
|