Philipp Oppermann 7 months ago
parent
commit
389059fd85
Failed to extract signature
15 changed files with 210 additions and 121 deletions
  1. +3
    -0
      Cargo.lock
  2. +100
    -61
      binaries/cli/src/command/build/local.rs
  3. +12
    -3
      binaries/cli/src/command/build/mod.rs
  4. +1
    -0
      binaries/cli/src/command/run.rs
  5. +2
    -1
      binaries/cli/src/lib.rs
  6. +3
    -0
      binaries/cli/src/session.rs
  7. +23
    -13
      binaries/daemon/src/lib.rs
  8. +18
    -6
      binaries/daemon/src/log.rs
  9. +4
    -1
      libraries/core/Cargo.toml
  10. +0
    -0
      libraries/core/src/build/build_command.rs
  11. +8
    -8
      libraries/core/src/build/git.rs
  12. +15
    -0
      libraries/core/src/build/logger.rs
  13. +20
    -28
      libraries/core/src/build/mod.rs
  14. +0
    -0
      libraries/core/src/git.rs
  15. +1
    -0
      libraries/core/src/lib.rs

+ 3
- 0
Cargo.lock View File

@@ -3006,6 +3006,8 @@ dependencies = [
"dora-message",
"dunce",
"eyre",
"git2",
"itertools 0.14.0",
"log",
"once_cell",
"schemars",
@@ -3015,6 +3017,7 @@ dependencies = [
"serde_yaml 0.9.34+deprecated",
"tokio",
"tracing",
"url",
"uuid 1.16.0",
"which",
]


+ 100
- 61
binaries/cli/src/command/build/local.rs View File

@@ -1,14 +1,20 @@
use std::{collections::BTreeMap, path::PathBuf};
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
path::PathBuf,
};

use dora_core::{
build::run_build_command,
descriptor::{Descriptor, NodeExt, SINGLE_OPERATOR_DEFAULT_ID},
build::{BuildInfo, BuildLogger, Builder, GitManager},
descriptor::{self, Descriptor, NodeExt, ResolvedNode, SINGLE_OPERATOR_DEFAULT_ID},
};
use dora_message::{
common::GitSource,
id::{NodeId, OperatorId},
BuildId, SessionId,
};
use eyre::Context;
use futures::executor::block_on;

use crate::session::DataflowSession;

@@ -18,69 +24,102 @@ pub fn build_dataflow_locally(
dataflow_session: &DataflowSession,
working_dir: PathBuf,
uv: bool,
) -> eyre::Result<()> {
let default_op_id = OperatorId::from(SINGLE_OPERATOR_DEFAULT_ID.to_string());
let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel::<std::io::Result<String>>(10);
) -> eyre::Result<BuildInfo> {
let runtime = tokio::runtime::Runtime::new()?;

tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
println!(
"{}",
line.unwrap_or_else(|err| format!("io err: {}", err.kind()))
);
}
});
runtime.block_on(build_dataflow(
dataflow_session.session_id,
working_dir,
nodes,
git_sources,
prev_git_sources,
local_nodes,
uv,
))
}

for node in dataflow.nodes {
match node.kind()? {
dora_core::descriptor::NodeKind::Standard(_) => {
let Some(build) = node.build.as_deref() else {
continue;
};
run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone())
.with_context(|| {
format!("build command failed for standard node `{}`", node.id)
})?
}
dora_core::descriptor::NodeKind::Runtime(runtime_node) => {
for operator in &runtime_node.operators {
let Some(build) = operator.config.build.as_deref() else {
continue;
};
run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone())
.with_context(|| {
format!(
"build command failed for operator `{}/{}`",
node.id, operator.id
)
})?;
}
}
dora_core::descriptor::NodeKind::Custom(custom_node) => {
let Some(build) = custom_node.build.as_deref() else {
continue;
};
run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone())
.with_context(|| {
format!("build command failed for custom node `{}`", node.id)
})?
async fn build_dataflow(
session_id: SessionId,
base_working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
git_sources: BTreeMap<NodeId, GitSource>,
prev_git_sources: BTreeMap<NodeId, GitSource>,
local_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<BuildInfo> {
let builder = Builder {
session_id,
base_working_dir,
uv,
};

let mut git_manager = GitManager::default();

let mut tasks = Vec::new();

// build nodes
for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
let node_id = node.id.clone();
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();

let task = builder
.clone()
.build_node(
node,
git_source,
prev_git_source,
LocalBuildLogger,
&mut git_manager,
)
.await
.wrap_err_with(|| format!("failed to build node `{node_id}`"))?;
tasks.push((node_id, task));
}

let mut info = BuildInfo {
node_working_dirs: Default::default(),
};
let mut errors = Vec::new();
for (node_id, task) in tasks {
match task.await {
Ok(node) => {
info.node_working_dirs
.insert(node_id, node.node_working_dir);
}
dora_core::descriptor::NodeKind::Operator(operator) => {
let Some(build) = operator.config.build.as_deref() else {
continue;
};
run_build_command(build, &working_dir, uv, &node.env, stdout_tx.clone())
.with_context(|| {
format!(
"build command failed for operator `{}/{}`",
node.id,
operator.id.as_ref().unwrap_or(&default_op_id)
)
})?;
Err(err) => {
errors.push((node_id, err));
}
}
}
std::mem::drop(stdout_tx);
if errors.is_empty() {
Ok(info)
} else {
let mut message = "failed to build dataflow:\n".to_owned();
for (node_id, err) in errors {
message.push_str(&format!("- {node_id}: {err:?}\n-------------------\n\n"));
}
Err(eyre::eyre!(message))
}
}

struct LocalBuildLogger;

impl BuildLogger for LocalBuildLogger {
type Clone = Self;

fn log_message(
&mut self,
level: log::Level,
message: impl Into<String> + Send,
) -> impl Future<Output = ()> + Send {
async move {
let message: String = message.into();
println!("{level}: \t{message}");
}
}

Ok(())
fn try_clone(&self) -> impl Future<Output = eyre::Result<Self::Clone>> + Send {
async { Ok(LocalBuildLogger) }
}
}

+ 12
- 3
binaries/cli/src/command/build/mod.rs View File

@@ -3,7 +3,7 @@ use dora_core::{
descriptor::{CoreNodeKind, CustomNode, Descriptor, DescriptorExt},
topics::{DORA_COORDINATOR_PORT_CONTROL_DEFAULT, LOCALHOST},
};
use dora_message::descriptor::NodeSource;
use dora_message::{descriptor::NodeSource, BuildId};
use eyre::Context;
use std::collections::BTreeMap;

@@ -79,7 +79,7 @@ pub fn build(
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
build_dataflow_locally(
let build_info = build_dataflow_locally(
dataflow_descriptor,
&git_sources,
&dataflow_session,
@@ -88,6 +88,9 @@ pub fn build(
)?;

dataflow_session.git_sources = git_sources;
// generate a random BuildId and store the associated build info
dataflow_session.build_id = Some(BuildId::generate());
dataflow_session.local_build = Some(build_info);
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
@@ -122,8 +125,14 @@ pub fn build(
coordinator_socket(coordinator_addr, coordinator_port),
log::LevelFilter::Info,
)?;

dataflow_session.build_id = Some(build_id);
dataflow_session.local_build = None;
dataflow_session
.write_out_for_dataflow(&dataflow_path)
.context("failed to write out dataflow session file")?;
}
}
};

Ok(())
}


+ 1
- 0
binaries/cli/src/command/run.rs View File

@@ -15,6 +15,7 @@ pub fn run(dataflow: String, uv: bool) -> Result<(), eyre::Error> {
let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
dataflow_session.local_build,
dataflow_session.session_id,
uv,
))?;


+ 2
- 1
binaries/cli/src/lib.rs View File

@@ -508,7 +508,8 @@ fn run_cli(args: Args) -> eyre::Result<()> {
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let result = Daemon::run_dataflow(&dataflow_path, dataflow_session.build_id, dataflow_session.session_id, false).await?;
let result = Daemon::run_dataflow(&dataflow_path,
dataflow_session.build_id, dataflow_session.local_build, dataflow_session.session_id, false).await?;
handle_dataflow_result(result, None)
}
None => {


+ 3
- 0
binaries/cli/src/session.rs View File

@@ -3,6 +3,7 @@ use std::{
path::{Path, PathBuf},
};

use dora_core::build::BuildInfo;
use dora_message::{common::GitSource, id::NodeId, BuildId, SessionId};
use eyre::{Context, ContextCompat};

@@ -11,6 +12,7 @@ pub struct DataflowSession {
pub build_id: Option<BuildId>,
pub session_id: SessionId,
pub git_sources: BTreeMap<NodeId, GitSource>,
pub local_build: Option<BuildInfo>,
}

impl Default for DataflowSession {
@@ -19,6 +21,7 @@ impl Default for DataflowSession {
build_id: None,
session_id: SessionId::generate(),
git_sources: Default::default(),
local_build: Default::default(),
}
}
}


+ 23
- 13
binaries/daemon/src/lib.rs View File

@@ -2,6 +2,7 @@ use aligned_vec::{AVec, ConstAlign};
use coordinator::CoordinatorEvent;
use crossbeam::queue::ArrayQueue;
use dora_core::{
build::{self, BuildInfo, GitManager},
config::{DataId, Input, InputMapping, NodeId, NodeRunConfig, OperatorId},
descriptor::{
read_as_descriptor, CoreNodeKind, Descriptor, DescriptorExt, ResolvedNode, RuntimeNode,
@@ -62,7 +63,6 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::{error, warn};
use uuid::{NoContext, Timestamp, Uuid};

mod build;
mod coordinator;
mod local_listener;
mod log;
@@ -76,10 +76,7 @@ use dora_tracing::telemetry::serialize_context;
#[cfg(feature = "telemetry")]
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::{
build::{BuildInfo, GitManager},
pending::DataflowStatus,
};
use crate::pending::DataflowStatus;

const STDERR_LOG_LINES: usize = 10;

@@ -156,6 +153,7 @@ impl Daemon {
None,
clock,
Some(remote_daemon_events_tx),
Default::default(),
)
.await
.map(|_| ())
@@ -164,6 +162,7 @@ impl Daemon {
pub async fn run_dataflow(
dataflow_path: &Path,
build_id: Option<BuildId>,
local_build: Option<BuildInfo>,
session_id: SessionId,
uv: bool,
) -> eyre::Result<DataflowResult> {
@@ -218,6 +217,16 @@ impl Daemon {
Some(exit_when_done),
clock.clone(),
None,
if let Some(local_build) = local_build {
let Some(build_id) = build_id else {
bail!("no build_id, but local_build set")
};
let mut builds = BTreeMap::new();
builds.insert(build_id, local_build);
builds
} else {
Default::default()
},
);

let spawn_result = reply_rx
@@ -249,6 +258,7 @@ impl Daemon {
exit_when_done: Option<BTreeSet<(Uuid, NodeId)>>,
clock: Arc<HLC>,
remote_daemon_events_tx: Option<flume::Sender<eyre::Result<Timestamped<InterDaemonEvent>>>>,
builds: BTreeMap<BuildId, BuildInfo>,
) -> eyre::Result<DaemonRunResult> {
let coordinator_connection = match coordinator_addr {
Some(addr) => {
@@ -313,7 +323,7 @@ impl Daemon {
zenoh_session,
remote_daemon_events_tx,
git_manager: Default::default(),
builds: Default::default(),
builds,
sessions: Default::default(),
};

@@ -888,9 +898,6 @@ impl Daemon {
let builder = build::Builder {
session_id,
base_working_dir,
daemon_tx: self.events_tx.clone(),
dataflow_descriptor,
clock: self.clock.clone(),
uv,
};

@@ -906,13 +913,18 @@ impl Daemon {
let git_source = git_sources.get(&node_id).cloned();
let prev_git_source = prev_git_sources.get(&node_id).cloned();

let logger_cloned = logger
.try_clone_impl()
.await
.wrap_err("failed to clone logger")?;

match builder
.clone()
.build_node(
node,
git_source,
prev_git_source,
&mut logger,
logger_cloned,
&mut self.git_manager,
)
.await
@@ -926,9 +938,7 @@ impl Daemon {
});
}
Err(err) => {
self.logger
.log_build(build_id, LogLevel::Error, Some(node_id), format!("{err:?}"))
.await;
logger.log(LogLevel::Error, format!("{err:?}")).await;
return Err(err);
}
}


+ 18
- 6
binaries/daemon/src/log.rs View File

@@ -4,7 +4,7 @@ use std::{
sync::Arc,
};

use dora_core::{config::NodeId, uhlc};
use dora_core::{build::BuildLogger, config::NodeId, uhlc};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
@@ -101,17 +101,13 @@ pub struct NodeBuildLogger<'a> {
}

impl NodeBuildLogger<'_> {
pub fn inner(&self) -> &DaemonLogger {
&self.logger
}

pub async fn log(&mut self, level: LogLevel, message: impl Into<String>) {
self.logger
.log_build(self.build_id, level, Some(self.node_id.clone()), message)
.await
}

pub async fn try_clone(&self) -> eyre::Result<NodeBuildLogger<'static>> {
pub async fn try_clone_impl(&self) -> eyre::Result<NodeBuildLogger<'static>> {
Ok(NodeBuildLogger {
build_id: self.build_id,
node_id: self.node_id.clone(),
@@ -120,6 +116,22 @@ impl NodeBuildLogger<'_> {
}
}

impl BuildLogger for NodeBuildLogger<'_> {
type Clone = NodeBuildLogger<'static>;

fn log_message(
&mut self,
level: LogLevel,
message: impl Into<String> + Send,
) -> impl std::future::Future<Output = ()> + Send {
self.log(level, message)
}

fn try_clone(&self) -> impl std::future::Future<Output = eyre::Result<Self::Clone>> + Send {
self.try_clone_impl()
}
}

pub struct DaemonLogger {
daemon_id: DaemonId,
logger: Logger,


+ 4
- 1
libraries/core/Cargo.toml View File

@@ -19,8 +19,11 @@ which = "5.0.0"
uuid = { version = "1.7", features = ["serde", "v7"] }
tracing = "0.1"
serde-with-expand-env = "1.1.0"
tokio = { version = "1.24.1", features = ["fs", "process", "sync"] }
tokio = { version = "1.24.1", features = ["fs", "process", "sync", "rt"] }
schemars = "0.8.19"
serde_json = "1.0.117"
log = { version = "0.4.21", features = ["serde"] }
dunce = "1.0.5"
url = "2.5.4"
git2 = { version = "0.18.0", features = ["vendored-openssl"] }
itertools = "0.14"

libraries/core/src/build.rs → libraries/core/src/build/build_command.rs View File


binaries/daemon/src/build/git.rs → libraries/core/src/build/git.rs View File

@@ -1,4 +1,4 @@
use crate::log::NodeBuildLogger;
use crate::build::BuildLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId, SessionId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
@@ -142,7 +142,7 @@ pub struct GitFolder {
}

impl GitFolder {
pub async fn prepare(self, logger: &mut NodeBuildLogger<'_>) -> eyre::Result<PathBuf> {
pub async fn prepare(self, logger: &mut impl BuildLogger) -> eyre::Result<PathBuf> {
let GitFolder { reuse } = self;

let clone_dir = match reuse {
@@ -165,7 +165,7 @@ impl GitFolder {
.context("failed to copy repo clone")?;

logger
.log(
.log_message(
LogLevel::Info,
format!("fetching changes after copying {}", from.display()),
)
@@ -185,7 +185,7 @@ impl GitFolder {
.context("failed to rename repo clone")?;

logger
.log(
.log_message(
LogLevel::Info,
format!("fetching changes after renaming {}", from.display()),
)
@@ -197,7 +197,7 @@ impl GitFolder {
}
ReuseOptions::Reuse { dir } => {
logger
.log(
.log_message(
LogLevel::Info,
format!("reusing up-to-date {}", dir.display()),
)
@@ -244,7 +244,7 @@ fn rev_str(rev: &Option<GitRepoRev>) -> String {
async fn clone_into(
repo_addr: Url,
clone_dir: &Path,
logger: &mut NodeBuildLogger<'_>,
logger: &mut impl BuildLogger,
) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
tokio::fs::create_dir_all(parent)
@@ -253,7 +253,7 @@ async fn clone_into(
}

logger
.log(
.log_message(
LogLevel::Info,
format!("cloning {repo_addr} into {}", clone_dir.display()),
)
@@ -310,7 +310,7 @@ async fn fetch_changes(

fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> {
let (object, reference) = repository
.revparse_ext(&commit_hash)
.revparse_ext(commit_hash)
.context("failed to parse ref")?;
repository
.checkout_tree(&object, None)

+ 15
- 0
libraries/core/src/build/logger.rs View File

@@ -0,0 +1,15 @@
use std::future::Future;

use dora_message::common::LogLevel;

pub trait BuildLogger: Send {
type Clone: BuildLogger + 'static;

fn log_message(
&mut self,
level: LogLevel,
message: impl Into<String> + Send,
) -> impl Future<Output = ()> + Send;

fn try_clone(&self) -> impl Future<Output = eyre::Result<Self::Clone>> + Send;
}

binaries/daemon/src/build/mod.rs → libraries/core/src/build/mod.rs View File

@@ -1,34 +1,30 @@
pub use git::GitManager;
pub use logger::BuildLogger;

use url::Url;

use std::{collections::BTreeMap, future::Future, path::PathBuf, sync::Arc};
use std::{collections::BTreeMap, future::Future, path::PathBuf};

use dora_core::{
build::run_build_command,
descriptor::{Descriptor, ResolvedNode},
uhlc::HLC,
};
use crate::descriptor::ResolvedNode;
use dora_message::{
common::{GitSource, LogLevel, Timestamped},
descriptor::EnvValue,
common::{GitSource, LogLevel},
descriptor::{CoreNodeKind, EnvValue},
id::NodeId,
SessionId,
};
use eyre::Context;
use tokio::sync::mpsc;

use crate::{build::git::GitFolder, log::NodeBuildLogger, Event};
use build_command::run_build_command;
use git::GitFolder;

mod build_command;
mod git;
mod logger;

#[derive(Clone)]
pub struct Builder {
pub session_id: SessionId,
pub base_working_dir: PathBuf,
pub daemon_tx: mpsc::Sender<Timestamped<Event>>,
pub dataflow_descriptor: Descriptor,
/// clock is required for generating timestamps when dropping messages early because queue is full
pub clock: Arc<HLC>,
pub uv: bool,
}

@@ -38,10 +34,10 @@ impl Builder {
node: ResolvedNode,
git: Option<GitSource>,
prev_git: Option<GitSource>,
logger: &mut NodeBuildLogger<'_>,
mut logger: impl BuildLogger,
git_manager: &mut GitManager,
) -> eyre::Result<impl Future<Output = eyre::Result<BuiltNode>>> {
logger.log(LogLevel::Debug, "building node").await;
logger.log_message(LogLevel::Debug, "building node").await;

let prepared_git = if let Some(GitSource { repo, commit_hash }) = git {
let repo_url = Url::parse(&repo).context("failed to parse git repository URL")?;
@@ -59,10 +55,6 @@ impl Builder {
None
};

let mut logger = logger
.try_clone()
.await
.wrap_err("failed to clone logger")?;
let task = async move { self.build_node_inner(node, &mut logger, prepared_git).await };
Ok(task)
}
@@ -70,11 +62,11 @@ impl Builder {
async fn build_node_inner(
self,
node: ResolvedNode,
logger: &mut NodeBuildLogger<'_>,
logger: &mut impl BuildLogger,
git_folder: Option<GitFolder>,
) -> eyre::Result<BuiltNode> {
let node_working_dir = match &node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
CoreNodeKind::Custom(n) => {
let node_working_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.base_working_dir,
@@ -85,7 +77,7 @@ impl Builder {
}
node_working_dir
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
@@ -106,15 +98,15 @@ impl Builder {
}
}

pub async fn build_node(
logger: &mut NodeBuildLogger<'_>,
async fn build_node(
logger: &mut impl BuildLogger,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
uv: bool,
) -> eyre::Result<()> {
logger
.log(LogLevel::Info, format!("running build command: `{build}"))
.log_message(LogLevel::Info, format!("running build command: `{build}"))
.await;
let build = build.to_owned();
let node_env = node_env.clone();
@@ -127,7 +119,7 @@ pub async fn build_node(
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log(
.log_message(
LogLevel::Info,
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
@@ -142,7 +134,7 @@ pub struct BuiltNode {
pub node_working_dir: PathBuf,
}

#[derive(Debug, Clone)]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct BuildInfo {
pub node_working_dirs: BTreeMap<NodeId, PathBuf>,
}

+ 0
- 0
libraries/core/src/git.rs View File


+ 1
- 0
libraries/core/src/lib.rs View File

@@ -9,6 +9,7 @@ pub use dora_message::{config, uhlc};

pub mod build;
pub mod descriptor;
pub mod git;
pub mod metadata;
pub mod topics;



Loading…
Cancel
Save