Philipp Oppermann 7 months ago
parent
commit
effff21173
Failed to extract signature
18 changed files with 1096 additions and 689 deletions
  1. +2
    -2
      .gitignore
  2. +2
    -2
      binaries/cli/src/lib.rs
  3. +156
    -24
      binaries/coordinator/src/lib.rs
  4. +6
    -14
      binaries/coordinator/src/run/mod.rs
  5. +345
    -0
      binaries/daemon/src/build/git.rs
  6. +347
    -0
      binaries/daemon/src/build/mod.rs
  7. +147
    -15
      binaries/daemon/src/lib.rs
  8. +34
    -6
      binaries/daemon/src/log.rs
  9. +0
    -286
      binaries/daemon/src/spawn/git.rs
  10. +2
    -330
      binaries/daemon/src/spawn/mod.rs
  11. +2
    -2
      examples/rust-dataflow-git/dataflow.yml
  12. +8
    -1
      libraries/message/src/cli_to_coordinator.rs
  13. +3
    -2
      libraries/message/src/common.rs
  14. +20
    -4
      libraries/message/src/coordinator_to_cli.rs
  15. +14
    -1
      libraries/message/src/coordinator_to_daemon.rs
  16. +1
    -0
      libraries/message/src/daemon_to_coordinator.rs
  17. +6
    -0
      libraries/message/src/descriptor.rs
  18. +1
    -0
      libraries/message/src/lib.rs

+ 2
- 2
.gitignore View File

@@ -34,7 +34,7 @@ __pycache__/

# Distribution / packaging
.Python
build/
/build/
develop-eggs/
dist/
downloads/
@@ -179,4 +179,4 @@ out/
#Miscellaneous
yolo.yml

~*
~*

+ 2
- 2
binaries/cli/src/lib.rs View File

@@ -633,11 +633,11 @@ fn run(args: Args) -> eyre::Result<()> {
}

fn start_dataflow(
build_id: Option<Uuid>,
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
uv: bool,
build_only: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
@@ -656,11 +656,11 @@ fn start_dataflow(
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
build_id,
dataflow,
name,
local_working_dir: working_dir,
uv,
build_only,
})
.unwrap(),
)


+ 156
- 24
binaries/coordinator/src/lib.rs View File

@@ -5,6 +5,7 @@ use crate::{
pub use control::ControlEvent;
use dora_core::{
config::{NodeId, OperatorId},
descriptor::DescriptorExt,
uhlc::{self, HLC},
};
use dora_message::{
@@ -14,17 +15,24 @@ use dora_message::{
ControlRequestReply, DataflowIdAndName, DataflowList, DataflowListEntry, DataflowResult,
DataflowStatus, LogLevel, LogMessage,
},
coordinator_to_daemon::{DaemonCoordinatorEvent, RegisterResult, Timestamped},
coordinator_to_daemon::{
BuildDataflowNodes, DaemonCoordinatorEvent, RegisterResult, Timestamped,
},
daemon_to_coordinator::{DaemonCoordinatorReply, DataflowDaemonResult},
descriptor::{Descriptor, ResolvedNode},
BuildId,
};
use eyre::{bail, eyre, ContextCompat, Result, WrapErr};
use futures::{future::join_all, stream::FuturesUnordered, Future, Stream, StreamExt};
use futures_concurrency::stream::Merge;
use itertools::Itertools;
use log_subscriber::LogSubscriber;
use run::SpawnedDataflow;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
collections::{
btree_map::{Entry, OccupiedEntry},
BTreeMap, BTreeSet, HashMap,
},
net::SocketAddr,
path::PathBuf,
sync::Arc,
@@ -200,6 +208,8 @@ async fn start_inner(
let mut archived_dataflows: HashMap<Uuid, ArchivedDataflow> = HashMap::new();
let mut daemon_connections = DaemonConnections::default();

let mut build_log_subscribers: BTreeMap<BuildId, LogSubscriber> = Default::default();

while let Some(event) = events.next().await {
// used below for measuring the event handling duration
let start = Instant::now();
@@ -353,7 +363,8 @@ async fn start_inner(
send_log_message(
&mut finished_dataflow,
&LogMessage {
dataflow_id,
build_id: None,
dataflow_id: Some(dataflow_id),
node_id: None,
daemon_id: None,
level: LogLevel::Info,
@@ -399,12 +410,40 @@ async fn start_inner(
reply_sender,
} => {
match request {
ControlRequest::Build {
dataflow,
local_working_dir,
uv,
} => {
// assign a random build id
let build_id = BuildId::new_v4();

let result = build_dataflow(
build_id,
dataflow,
local_working_dir,
&clock,
uv,
&mut daemon_connections,
)
.await;
match result {
Ok(()) => {
let _ = reply_sender.send(Ok(
ControlRequestReply::DataflowBuildTriggered { build_id },
));
}
Err(err) => {
let _ = reply_sender.send(Err(err));
}
}
}
ControlRequest::Start {
build_id,
dataflow,
name,
local_working_dir,
uv,
build_only,
} => {
let name = name.or_else(|| names::Generator::default().next());

@@ -419,13 +458,13 @@ async fn start_inner(
}
}
let dataflow = start_dataflow(
build_id,
dataflow,
local_working_dir,
name,
&mut daemon_connections,
&clock,
uv,
build_only,
)
.await?;
Ok(dataflow)
@@ -721,8 +760,15 @@ async fn start_inner(
}
}
Event::Log(message) => {
if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
send_log_message(dataflow, &message).await;
if let Some(dataflow_id) = &message.dataflow_id {
if let Some(dataflow) = running_dataflows.get_mut(dataflow_id) {
send_log_message(dataflow, &message).await;
}
}
if let Some(build_id) = message.build_id {
if let Entry::Occupied(subscriber) = build_log_subscribers.entry(build_id) {
send_log_message_to_subscriber(&message, subscriber).await;
}
}
}
Event::DaemonExit { daemon_id } => {
@@ -739,21 +785,10 @@ async fn start_inner(
match result {
Ok(()) => {
if dataflow.pending_spawn_results.is_empty() {
tracing::info!(
"successfully {} dataflow `{dataflow_id}`",
if dataflow.build_only {
"built"
} else {
"spawned"
}
);
tracing::info!("successfully spawned dataflow `{dataflow_id}`",);
dataflow.spawn_result.set_result(Ok(
ControlRequestReply::DataflowSpawned { uuid: dataflow_id },
));

if dataflow.build_only {
running_dataflows.remove(&dataflow_id);
}
}
}
Err(err) => {
@@ -795,6 +830,24 @@ async fn send_log_message(dataflow: &mut RunningDataflow, message: &LogMessage)
dataflow.log_subscribers.retain(|s| !s.is_closed());
}

async fn send_log_message_to_subscriber(
message: &LogMessage,
mut subscriber: OccupiedEntry<'_, BuildId, LogSubscriber>,
) {
let send_result = tokio::time::timeout(
Duration::from_millis(100),
subscriber.get_mut().send_message(message),
);

if send_result.await.is_err() {
subscriber.get_mut().close();
}

if subscriber.get_mut().is_closed() {
subscriber.remove();
}
}

fn dataflow_result(
results: &BTreeMap<DaemonId, DataflowDaemonResult>,
dataflow_uuid: Uuid,
@@ -875,8 +928,6 @@ struct RunningDataflow {
log_subscribers: Vec<LogSubscriber>,

pending_spawn_results: BTreeSet<DaemonId>,

build_only: bool,
}

pub enum SpawnResult {
@@ -1123,26 +1174,108 @@ async fn retrieve_logs(
reply_logs.map_err(|err| eyre!(err))
}

#[tracing::instrument(skip(daemon_connections, clock))]
async fn build_dataflow(
build_id: BuildId,
dataflow: Descriptor,
working_dir: PathBuf,
clock: &HLC,
uv: bool,
daemon_connections: &mut DaemonConnections,
) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;

let nodes_by_daemon = nodes.values().into_group_map_by(|n| &n.deploy.machine);

let mut daemons = BTreeSet::new();
for (machine, nodes_on_machine) in &nodes_by_daemon {
let nodes_on_machine = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"Running dataflow build `{build_id}` on machine `{machine:?}` (nodes: {nodes_on_machine:?})"
);

let build_command = BuildDataflowNodes {
build_id,
working_dir: working_dir.clone(),
nodes: nodes.clone(),
dataflow_descriptor: dataflow.clone(),
nodes_on_machine,
uv,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Build(build_command),
timestamp: clock.new_timestamp(),
})?;

let daemon_id = build_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| format!("failed to build dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!("successfully triggered dataflow build `{build_id}`",);

Ok(())
}

async fn build_dataflow_on_machine(
daemon_connections: &mut DaemonConnections,
machine: Option<&str>,
message: &[u8],
) -> Result<DaemonId, eyre::ErrReport> {
let daemon_id = match machine {
Some(machine) => daemon_connections
.get_matching_daemon_id(machine)
.wrap_err_with(|| format!("no matching daemon for machine id {machine:?}"))?
.clone(),
None => daemon_connections
.unnamed()
.next()
.wrap_err("no unnamed daemon connections")?
.clone(),
};

let daemon_connection = daemon_connections
.get_mut(&daemon_id)
.wrap_err_with(|| format!("no daemon connection for daemon `{daemon_id}`"))?;
tcp_send(&mut daemon_connection.stream, message)
.await
.wrap_err("failed to send build message to daemon")?;

let reply_raw = tcp_receive(&mut daemon_connection.stream)
.await
.wrap_err("failed to receive build reply from daemon")?;
match serde_json::from_slice(&reply_raw)
.wrap_err("failed to deserialize build reply from daemon")?
{
DaemonCoordinatorReply::TriggerBuildResult(result) => result
.map_err(|e| eyre!(e))
.wrap_err("daemon returned an error")?,
_ => bail!("unexpected reply"),
}
Ok(daemon_id)
}

async fn start_dataflow(
build_id: Option<Uuid>,
dataflow: Descriptor,
working_dir: PathBuf,
name: Option<String>,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
build_only: bool,
) -> eyre::Result<RunningDataflow> {
let SpawnedDataflow {
uuid,
daemons,
nodes,
} = spawn_dataflow(
build_id,
dataflow,
working_dir,
daemon_connections,
clock,
uv,
build_only,
)
.await?;
Ok(RunningDataflow {
@@ -1160,7 +1293,6 @@ async fn start_dataflow(
stop_reply_senders: Vec::new(),
log_subscribers: Vec::new(),
pending_spawn_results: daemons,
build_only,
})
}



+ 6
- 14
binaries/coordinator/src/run/mod.rs View File

@@ -10,6 +10,7 @@ use dora_message::{
daemon_to_coordinator::DaemonCoordinatorReply,
descriptor::{Descriptor, ResolvedNode},
id::NodeId,
BuildId,
};
use eyre::{bail, eyre, ContextCompat, WrapErr};
use itertools::Itertools;
@@ -21,12 +22,12 @@ use uuid::{NoContext, Timestamp, Uuid};

#[tracing::instrument(skip(daemon_connections, clock))]
pub(super) async fn spawn_dataflow(
build_id: Option<BuildId>,
dataflow: Descriptor,
working_dir: PathBuf,
daemon_connections: &mut DaemonConnections,
clock: &HLC,
uv: bool,
build_only: bool,
) -> eyre::Result<SpawnedDataflow> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
@@ -37,18 +38,17 @@ pub(super) async fn spawn_dataflow(
for (machine, nodes_on_machine) in &nodes_by_daemon {
let spawn_nodes = nodes_on_machine.iter().map(|n| n.id.clone()).collect();
tracing::debug!(
"{} dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})",
if build_only { "Building" } else { "Spawning" }
"Spawning dataflow `{uuid}` on machine `{machine:?}` (nodes: {spawn_nodes:?})"
);

let spawn_command = SpawnDataflowNodes {
build_id,
dataflow_id: uuid,
working_dir: working_dir.clone(),
nodes: nodes.clone(),
dataflow_descriptor: dataflow.clone(),
spawn_nodes,
uv,
build_only,
};
let message = serde_json::to_vec(&Timestamped {
inner: DaemonCoordinatorEvent::Spawn(spawn_command),
@@ -57,19 +57,11 @@ pub(super) async fn spawn_dataflow(

let daemon_id = spawn_dataflow_on_machine(daemon_connections, machine.as_deref(), &message)
.await
.wrap_err_with(|| {
format!(
"failed to {} dataflow on machine `{machine:?}`",
if build_only { "build" } else { "spawn" }
)
})?;
.wrap_err_with(|| format!("failed to spawn dataflow on machine `{machine:?}`"))?;
daemons.insert(daemon_id);
}

tracing::info!(
"successfully triggered dataflow {} `{uuid}`",
if build_only { "build" } else { "spawn" }
);
tracing::info!("successfully triggered dataflow spawn `{uuid}`",);

Ok(SpawnedDataflow {
uuid,


+ 345
- 0
binaries/daemon/src/build/git.rs View File

@@ -0,0 +1,345 @@
use crate::log::NodeLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, BuildId, DataflowId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
};
use url::Url;
use uuid::Uuid;

#[derive(Default)]
pub struct GitManager {
/// Directories that are currently in use by running dataflows.
clones_in_use: BTreeMap<PathBuf, BTreeSet<DataflowId>>,
/// Builds that are prepared, but not done yet.
prepared_builds: BTreeMap<BuildId, PreparedBuild>,
reuse_for: BTreeMap<PathBuf, PathBuf>,
}

#[derive(Default)]
struct PreparedBuild {
/// Clone dirs that will be created during the build process.
///
/// This allows subsequent nodes to reuse the dirs.
planned_clone_dirs: BTreeSet<PathBuf>,
}

impl GitManager {
pub fn choose_clone_dir(
&mut self,
build_id: uuid::Uuid,
previous_build: Option<uuid::Uuid>,
repo_addr: String,
commit_hash: String,
target_dir: &Path,
) -> eyre::Result<GitFolder> {
let repo_url = Url::parse(&repo_addr).context("failed to parse git repository URL")?;
let clone_dir = Self::clone_dir_path(&target_dir, build_id, &repo_url, &commit_hash)?;

if self.clones_in_use.contains_key(&clone_dir) {
// The directory is currently in use by another dataflow. This should never
// happen as we generate a new build ID on every build.
eyre::bail!("clone_dir is already in use by other dataflow")
}

let reuse = if self.clone_dir_ready(build_id, &clone_dir) {
// The directory already contains a checkout of the commit we're interested in.
// So we can simply reuse the directory without doing any additional git
// operations.
ReuseOptions::Reuse {
dir: clone_dir.clone(),
}
} else if let Some(previous_build_id) = previous_build {
// we might be able to update a previous clone
let prev_clone_dir =
Self::clone_dir_path(&target_dir, previous_build_id, &repo_url, &commit_hash)?;

if self
.clones_in_use
.get(&prev_clone_dir)
.map(|ids| !ids.is_empty())
.unwrap_or(false)
{
// previous clone is still in use -> we cannot rename it, but we can copy it
ReuseOptions::CopyAndFetch {
from: prev_clone_dir,
target_dir: clone_dir.clone(),
commit_hash,
}
} else if prev_clone_dir.exists() {
// there is an unused previous clone that is not in use -> rename it
ReuseOptions::RenameAndFetch {
from: prev_clone_dir,
target_dir: clone_dir.clone(),
commit_hash,
}
} else {
// no existing clone associated with previous build id
ReuseOptions::NewClone {
target_dir: clone_dir.clone(),
repo_url,
commit_hash,
}
}
} else {
// no previous build that we can reuse
ReuseOptions::NewClone {
target_dir: clone_dir.clone(),
repo_url,
commit_hash,
}
};
self.register_ready_clone_dir(build_id, clone_dir);

Ok(GitFolder { reuse })
}

pub fn in_use(&self, dir: &Path) -> bool {
self.clones_in_use
.get(dir)
.map(|ids| !ids.is_empty())
.unwrap_or(false)
}

pub fn clone_dir_ready(&self, build_id: BuildId, dir: &Path) -> bool {
self.prepared_builds
.get(&build_id)
.map(|p| p.planned_clone_dirs.contains(dir))
.unwrap_or(false)
|| dir.exists()
}

pub fn register_ready_clone_dir(&mut self, build_id: BuildId, dir: PathBuf) -> bool {
self.prepared_builds
.entry(build_id)
.or_default()
.planned_clone_dirs
.insert(dir)
}

fn clone_dir_path(
base_dir: &Path,
build_id: BuildId,
repo_url: &Url,
commit_hash: &String,
) -> eyre::Result<PathBuf> {
let mut path = base_dir
.join(&build_id.to_string())
.join(repo_url.host_str().context("git URL has no hostname")?);
path.extend(repo_url.path_segments().context("no path in git URL")?);
let path = path.join(commit_hash);
Ok(dunce::simplified(&path).to_owned())
}
}

pub struct GitFolder {
/// Specifies whether an existing repo should be reused.
reuse: ReuseOptions,
}

impl GitFolder {
pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result<PathBuf> {
let GitFolder { reuse } = self;

let clone_dir = match reuse {
ReuseOptions::NewClone {
target_dir,
repo_url,
commit_hash,
} => {
let repository = clone_into(repo_url, &target_dir, logger).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::CopyAndFetch {
from,
target_dir,
commit_hash,
} => {
tokio::fs::copy(&from, &target_dir)
.await
.context("failed to copy repo clone")?;

logger
.log(
LogLevel::Info,
None,
format!("fetching changes after copying {}", from.display()),
)
.await;

let repository = fetch_changes(&target_dir, None).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::RenameAndFetch {
from,
target_dir,
commit_hash,
} => {
tokio::fs::rename(&from, &target_dir)
.await
.context("failed to rename repo clone")?;

logger
.log(
LogLevel::Info,
None,
format!("fetching changes after renaming {}", from.display()),
)
.await;

let repository = fetch_changes(&target_dir, None).await?;
checkout_tree(&repository, &commit_hash)?;
target_dir
}
ReuseOptions::Reuse { dir } => {
logger
.log(
LogLevel::Info,
None,
format!("reusing up-to-date {}", dir.display()),
)
.await;
dir
}
};
Ok(clone_dir)
}
}

fn used_by_other_dataflow(
dataflow_id: uuid::Uuid,
clone_dir_base: &PathBuf,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> bool {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
used_by_other_dataflow
}

enum ReuseOptions {
/// Create a new clone of the repository.
NewClone {
target_dir: PathBuf,
repo_url: Url,
commit_hash: String,
},
/// Reuse an existing up-to-date clone of the repository.
Reuse { dir: PathBuf },
/// Copy an older clone of the repository and fetch changes, then reuse it.
CopyAndFetch {
from: PathBuf,
target_dir: PathBuf,
commit_hash: String,
},
/// Rename an older clone of the repository and fetch changes, then reuse it.
RenameAndFetch {
from: PathBuf,
target_dir: PathBuf,
commit_hash: String,
},
}

fn rev_str(rev: &Option<GitRepoRev>) -> String {
match rev {
Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"),
Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"),
Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"),
None => String::new(),
}
}

async fn clone_into(
repo_addr: Url,
clone_dir: &Path,
logger: &mut NodeLogger<'_>,
) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create parent directory for git clone")?;
}

logger
.log(
LogLevel::Info,
None,
format!("cloning {repo_addr} into {}", clone_dir.display()),
)
.await;
let clone_dir = clone_dir.to_owned();
let task = tokio::task::spawn_blocking(move || {
let mut builder = git2::build::RepoBuilder::new();
let mut fetch_options = git2::FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
builder.fetch_options(fetch_options);
builder
.clone(repo_addr.as_str(), &clone_dir)
.context("failed to clone repo")
});
let repo = task.await??;
Ok(repo)
}

async fn fetch_changes(
repo_dir: &Path,
refname: Option<String>,
) -> Result<git2::Repository, eyre::Error> {
let repo_dir = repo_dir.to_owned();
let fetch_changes = tokio::task::spawn_blocking(move || {
let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?;

{
let mut remote = repository
.find_remote("origin")
.context("failed to find remote `origin` in repo")?;
remote
.connect(git2::Direction::Fetch)
.context("failed to connect to remote")?;
let default_branch = remote
.default_branch()
.context("failed to get default branch for remote")?;
let fetch = match &refname {
Some(refname) => refname,
None => default_branch
.as_str()
.context("failed to read default branch as string")?,
};
let mut fetch_options = FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
remote
.fetch(&[&fetch], Some(&mut fetch_options), None)
.context("failed to fetch from git repo")?;
}
Result::<_, eyre::Error>::Ok(repository)
});
let repository = fetch_changes.await??;
Ok(repository)
}

fn checkout_tree(repository: &git2::Repository, commit_hash: &str) -> eyre::Result<()> {
let (object, reference) = repository
.revparse_ext(&commit_hash)
.context("failed to parse ref")?;
repository
.checkout_tree(&object, None)
.context("failed to checkout ref")?;
match reference {
Some(reference) => repository
.set_head(reference.name().context("failed to get reference_name")?)
.context("failed to set head")?,
None => repository
.set_head_detached(object.id())
.context("failed to set detached head")?,
}

Ok(())
}

fn clone_dir_exists(dir: &PathBuf, repos_in_use: &BTreeMap<PathBuf, BTreeSet<Uuid>>) -> bool {
repos_in_use.contains_key(dir) || dir.exists()
}

+ 347
- 0
binaries/daemon/src/build/mod.rs View File

@@ -0,0 +1,347 @@
pub use git::GitManager;

use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
path::PathBuf,
sync::Arc,
};

use dora_core::{
build::run_build_command,
descriptor::{CustomNode, Descriptor, ResolvedNode},
uhlc::HLC,
};
use dora_message::{
common::{LogLevel, Timestamped},
daemon_to_node::NodeConfig,
descriptor::{EnvValue, GitRepoRev, ResolvedNodeSource},
id::NodeId,
BuildId, DataflowId,
};
use eyre::Context;
use tokio::sync::mpsc;

use crate::{build::git::GitFolder, log::DaemonLogger, Event};

mod git;

#[derive(Clone)]
pub struct Builder {
pub build_id: BuildId,
pub prev_build_id: Option<BuildId>,
pub working_dir: PathBuf,
pub daemon_tx: mpsc::Sender<Timestamped<Event>>,
pub dataflow_descriptor: Descriptor,
/// clock is required for generating timestamps when dropping messages early because queue is full
pub clock: Arc<HLC>,
pub uv: bool,
}

impl Builder {
pub async fn prepare_node(
self,
node: ResolvedNode,
logger: &mut DaemonLogger,
git_manager: &mut GitManager,
) -> eyre::Result<impl Future<Output = eyre::Result<PreparedNode>>> {
let build_id = self.build_id;
let node_id = node.id.clone();
logger
.log_build(
build_id,
LogLevel::Debug,
Some(node.id.clone()),
"building node",
)
.await;

let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom {
source: ResolvedNodeSource::GitCommit { repo, commit_hash },
..
} = &node.kind
{
let target_dir = self.working_dir.join("build");
let git_folder = git_manager.choose_clone_dir(
self.build_id,
self.prev_build_id,
repo.clone(),
commit_hash.clone(),
&target_dir,
)?;
Some(git_folder)
} else {
None
};

let mut logger = logger
.try_clone()
.await
.wrap_err("failed to clone logger")?;
let task = async move {
self.prepare_node_inner(node, &mut logger, build_id, prepared_git)
.await
};
Ok(task)
}

async fn prepare_node_inner(
mut self,
node: ResolvedNode,
logger: &mut DaemonLogger,
build_id: uuid::Uuid,
git_folder: Option<GitFolder>,
) -> eyre::Result<PreparedNode> {
let (command, error_msg) = match &node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let build_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.working_dir.clone(),
};

if let Some(build) = &n.build {
self.build_node(logger, &node.env, build_dir.clone(), build)
.await?;
}
let mut command = if self.build_only {
None
} else {
path_spawn_command(&build_dir, self.uv, logger, n, true).await?
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);
command.stdin(Stdio::null());

command.env(
"DORA_NODE_CONFIG",
serde_yaml::to_string(&node_config.clone())
.wrap_err("failed to serialize node config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = &n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
}
}

// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command.env("PYTHONUNBUFFERED", "1");
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};

let error_msg = format!(
"failed to run `{}` with args `{}`",
n.path,
n.args.as_deref().unwrap_or_default(),
);
(command, error_msg)
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
.await?;
}
}

let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
.filter(|x| matches!(x.config.source, OperatorSource::Python { .. }))
.collect();

let other_operators = n
.operators
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if self.build_only {
None
} else if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator

// TODO: Handle multi-operator runtime once sub-interpreter is supported
if python_operators.len() > 2 {
eyre::bail!(
"Runtime currently only support one Python Operator.
This is because pyo4 sub-interpreter is not yet available.
See: https://github.com/PyO4/pyo3/issues/576"
);
}

let python_operator = python_operators
.first()
.context("Runtime had no operators definition.")?;

if let OperatorSource::Python(PythonSource {
source: _,
conda_env: Some(conda_env),
}) = &python_operator.config.source
{
let conda = which::which("conda").context(
"failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.",
)?;
let mut command = tokio::process::Command::new(conda);
command.args([
"run",
"-n",
conda_env,
"python",
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
Some(command)
} else {
let mut cmd = if self.uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg("python");
tracing::info!(
"spawning: uv run python -uc import dora; dora.start_runtime() # {}",
node.id
);
cmd
} else {
let python = get_python_path()
.wrap_err("Could not find python path when spawning custom node")?;
tracing::info!(
"spawning: python -uc import dora; dora.start_runtime() # {}",
node.id
);

tokio::process::Command::new(python)
};
// Force python to always flush stdout/stderr buffer
cmd.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
Some(cmd)
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe()
.wrap_err("failed to get current executable path")?,
);
cmd.arg("runtime");
Some(cmd)
} else {
eyre::bail!("Runtime can not mix Python Operator with other type of operator.");
};

let runtime_config = RuntimeConfig {
node: node_config.clone(),
operators: n.operators.clone(),
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);

command.env(
"DORA_RUNTIME_CONFIG",
serde_yaml::to_string(&runtime_config)
.wrap_err("failed to serialize runtime config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};
let error_msg = format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
);
(command, error_msg)
}
};
Ok(PreparedNode {
command,
spawn_error_msg: error_msg,
working_dir: self.working_dir,
dataflow_id,
node,
node_config,
clock: self.clock,
daemon_tx: self.daemon_tx,
})
}
}

pub async fn build_node(
node_id: NodeId,
logger: &mut DaemonLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
uv: bool,
) -> eyre::Result<()> {
logger
.log(
LogLevel::Info,
None,
Some(node_id),
Some("build".to_owned()),
format!("running build command: `{build}"),
)
.await;
let build = build.to_owned();
let node_env = node_env.clone();
let mut logger = logger.try_clone().await.context("failed to clone logger")?;
let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10);
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env, stdout_tx)
.context("build command failed")
});
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log(
LogLevel::Info,
None,
Some(node_id),
Some("build command".into()),
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
.await;
}
});
task.await??;
Ok(())
}

pub struct PreparedNode {
command: Option<tokio::process::Command>,
spawn_error_msg: String,
working_dir: PathBuf,
dataflow_id: DataflowId,
node: ResolvedNode,
node_config: NodeConfig,
clock: Arc<HLC>,
daemon_tx: mpsc::Sender<Timestamped<Event>>,
}

+ 147
- 15
binaries/daemon/src/lib.rs View File

@@ -15,7 +15,7 @@ use dora_message::{
DaemonId, DataMessage, DropToken, LogLevel, NodeError, NodeErrorCause, NodeExitStatus,
},
coordinator_to_cli::DataflowResult,
coordinator_to_daemon::{DaemonCoordinatorEvent, SpawnDataflowNodes},
coordinator_to_daemon::{BuildDataflowNodes, DaemonCoordinatorEvent, SpawnDataflowNodes},
daemon_to_coordinator::{
CoordinatorRequest, DaemonCoordinatorReply, DaemonEvent, DataflowDaemonResult,
},
@@ -60,6 +60,7 @@ use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use tracing::{error, warn};
use uuid::{NoContext, Timestamp, Uuid};

mod build;
mod coordinator;
mod local_listener;
mod log;
@@ -73,7 +74,7 @@ use dora_tracing::telemetry::serialize_context;
#[cfg(feature = "telemetry")]
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::pending::DataflowStatus;
use crate::{build::GitManager, pending::DataflowStatus};

const STDERR_LOG_LINES: usize = 10;

@@ -101,7 +102,7 @@ pub struct Daemon {

logger: DaemonLogger,

repos_in_use: BTreeMap<PathBuf, BTreeSet<DataflowId>>,
git_manager: GitManager,
}

type DaemonRunResult = BTreeMap<Uuid, BTreeMap<NodeId, Result<(), NodeError>>>;
@@ -167,13 +168,13 @@ impl Daemon {

let dataflow_id = Uuid::new_v7(Timestamp::now(NoContext));
let spawn_command = SpawnDataflowNodes {
build_id,
dataflow_id,
working_dir,
spawn_nodes: nodes.keys().cloned().collect(),
nodes,
dataflow_descriptor: descriptor,
uv,
build_only: false,
};

let clock = Arc::new(HLC::default());
@@ -298,7 +299,7 @@ impl Daemon {
clock,
zenoh_session,
remote_daemon_events_tx,
repos_in_use: Default::default(),
git_manager: Default::default(),
};

let dora_events = ReceiverStream::new(dora_events_rx);
@@ -421,11 +422,7 @@ impl Daemon {
Event::SpawnDataflowResult {
dataflow_id,
result,
build_only,
} => {
if build_only {
self.running.remove(&dataflow_id);
}
if let Some(connection) = &mut self.coordinator_connection {
let msg = serde_json::to_vec(&Timestamped {
inner: CoordinatorRequest::Event {
@@ -476,14 +473,76 @@ impl Daemon {
reply_tx: Sender<Option<DaemonCoordinatorReply>>,
) -> eyre::Result<RunStatus> {
let status = match event {
DaemonCoordinatorEvent::Build(BuildDataflowNodes {
build_id,
working_dir,
nodes,
dataflow_descriptor,
nodes_on_machine,
uv,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
}

// Use the working directory if it exists, otherwise use the working directory where the daemon is spawned
let working_dir = if working_dir.exists() {
working_dir
} else {
std::env::current_dir().wrap_err("failed to get current working dir")?
};

let result = self
.build_dataflow(
build_id,
working_dir,
nodes,
dataflow_descriptor,
nodes_on_machine,
uv,
)
.await;
let (trigger_result, result_task) = match result {
Ok(result_task) => (Ok(()), Some(result_task)),
Err(err) => (Err(format!("{err:?}")), None),
};
let reply = DaemonCoordinatorReply::TriggerBuildResult(trigger_result);
let _ = reply_tx.send(Some(reply)).map_err(|_| {
error!("could not send `TriggerBuildResult` reply from daemon to coordinator")
});

let result_tx = self.events_tx.clone();
let clock = self.clock.clone();
if let Some(result_task) = result_task {
tokio::spawn(async move {
let message = Timestamped {
inner: Event::BuildDataflowResult {
build_id,
result: result_task.await,
},
timestamp: clock.new_timestamp(),
};
let _ = result_tx
.send(message)
.map_err(|_| {
error!(
"could not send `BuildResult` reply from daemon to coordinator"
)
})
.await;
});
}

RunStatus::Continue
}
DaemonCoordinatorEvent::Spawn(SpawnDataflowNodes {
build_id,
dataflow_id,
working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
uv,
build_only,
}) => {
match dataflow_descriptor.communication.remote {
dora_core::config::RemoteCommunicationConfig::Tcp => {}
@@ -498,13 +557,13 @@ impl Daemon {

let result = self
.spawn_dataflow(
build_id,
dataflow_id,
working_dir,
nodes,
dataflow_descriptor,
spawn_nodes,
uv,
build_only,
)
.await;
let (trigger_result, result_task) = match result {
@@ -524,7 +583,6 @@ impl Daemon {
inner: Event::SpawnDataflowResult {
dataflow_id,
result: result_task.await,
build_only,
},
timestamp: clock.new_timestamp(),
};
@@ -770,15 +828,87 @@ impl Daemon {
}
}

async fn build_dataflow(
&mut self,
build_id: uuid::Uuid,
working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
dataflow_descriptor: Descriptor,
local_nodes: BTreeSet<NodeId>,
uv: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let mut tasks = Vec::new();

// spawn nodes and set up subscriptions
for node in nodes.into_values().filter(|n| local_nodes.contains(&n.id)) {
let dynamic_node = node.kind.dynamic();

let node_id = node.id.clone();
self.logger
.log_build(build_id, LogLevel::Info, None, "building")
.await;
match spawner
.clone()
.prepare_node(
node,
node_stderr_most_recent,
&mut logger,
&mut self.repos_in_use,
)
.await
.wrap_err_with(|| format!("failed to spawn node `{node_id}`"))
{
Ok(result) => {
tasks.push(NodePrepareTask {
node_id,
task: result,
dynamic_node,
});
}
Err(err) => {
logger
.log(LogLevel::Error, Some("daemon".into()), format!("{err:?}"))
.await;
self.dataflow_node_results
.entry(dataflow_id)
.or_default()
.insert(
node_id.clone(),
Err(NodeError {
timestamp: self.clock.new_timestamp(),
cause: NodeErrorCause::FailedToSpawn(format!("{err:?}")),
exit_status: NodeExitStatus::Unknown,
}),
);
stopped.push((node_id.clone(), dynamic_node));
}
}
}
for (node_id, dynamic) in stopped {
self.handle_node_stop(dataflow_id, &node_id, dynamic)
.await?;
}

let spawn_result = Self::spawn_prepared_nodes(
dataflow_id,
logger,
tasks,
self.events_tx.clone(),
self.clock.clone(),
);

Ok(spawn_result)
}

async fn spawn_dataflow(
&mut self,
build_id: Option<uuid::Uuid>,
dataflow_id: uuid::Uuid,
working_dir: PathBuf,
nodes: BTreeMap<NodeId, ResolvedNode>,
dataflow_descriptor: Descriptor,
spawn_nodes: BTreeSet<NodeId>,
uv: bool,
build_only: bool,
) -> eyre::Result<impl Future<Output = eyre::Result<()>>> {
let mut logger = self
.logger
@@ -843,7 +973,6 @@ impl Daemon {
dataflow_descriptor,
clock: self.clock.clone(),
uv,
build_only,
};

let mut tasks = Vec::new();
@@ -2272,10 +2401,13 @@ pub enum Event {
dynamic_node: bool,
result: Result<RunningNode, NodeError>,
},
BuildDataflowResult {
build_id: Uuid,
result: eyre::Result<()>,
},
SpawnDataflowResult {
dataflow_id: Uuid,
result: eyre::Result<()>,
build_only: bool,
},
}



+ 34
- 6
binaries/daemon/src/log.rs View File

@@ -8,6 +8,7 @@ use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DaemonId, LogLevel, LogMessage, Timestamped},
daemon_to_coordinator::{CoordinatorRequest, DaemonEvent},
BuildId,
};
use eyre::Context;
use tokio::net::TcpStream;
@@ -81,7 +82,7 @@ impl<'a> DataflowLogger<'a> {
message: impl Into<String>,
) {
self.logger
.log(level, self.dataflow_id, node_id, target, message)
.log(level, Some(self.dataflow_id), node_id, target, message)
.await
}

@@ -113,12 +114,13 @@ impl DaemonLogger {
pub async fn log(
&mut self,
level: LogLevel,
dataflow_id: Uuid,
dataflow_id: Option<Uuid>,
node_id: Option<NodeId>,
target: Option<String>,
message: impl Into<String>,
) {
let message = LogMessage {
build_id: None,
daemon_id: Some(self.daemon_id.clone()),
dataflow_id,
node_id,
@@ -132,6 +134,28 @@ impl DaemonLogger {
self.logger.log(message).await
}

pub async fn log_build(
&mut self,
build_id: BuildId,
level: LogLevel,
node_id: Option<NodeId>,
message: impl Into<String>,
) {
let message = LogMessage {
build_id: Some(build_id),
daemon_id: Some(self.daemon_id.clone()),
dataflow_id: None,
node_id,
level,
target: Some("build".into()),
module_path: None,
file: None,
line: None,
message: message.into(),
};
self.logger.log(message).await
}

pub(crate) fn daemon_id(&self) -> &DaemonId {
&self.daemon_id
}
@@ -181,7 +205,8 @@ impl Logger {
match message.level {
LogLevel::Error => {
tracing::error!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
@@ -193,7 +218,8 @@ impl Logger {
}
LogLevel::Warn => {
tracing::warn!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
@@ -205,7 +231,8 @@ impl Logger {
}
LogLevel::Info => {
tracing::info!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,
@@ -217,7 +244,8 @@ impl Logger {
}
LogLevel::Debug => {
tracing::debug!(
dataflow_id = message.dataflow_id.to_string(),
build_id = ?message.build_id.map(|id| id.to_string()),
dataflow_id = ?message.dataflow_id.map(|id| id.to_string()),
node_id = ?message.node_id.map(|id| id.to_string()),
target = message.target,
module_path = message.module_path,


+ 0
- 286
binaries/daemon/src/spawn/git.rs View File

@@ -1,286 +0,0 @@
use crate::log::NodeLogger;
use dora_message::{common::LogLevel, descriptor::GitRepoRev, DataflowId};
use eyre::{ContextCompat, WrapErr};
use git2::FetchOptions;
use std::{
collections::{BTreeMap, BTreeSet},
path::{Path, PathBuf},
};
use url::Url;
use uuid::Uuid;

pub struct GitFolder {
/// The URL of the git repository.
repo_addr: String,
/// The branch, tag, or git revision to checkout.
rev: Option<GitRepoRev>,
/// The directory that should contain the checked-out repository.
clone_dir: PathBuf,
/// Specifies whether an existing repo should be reused.
reuse: ReuseOptions,
}

impl GitFolder {
pub fn choose_clone_dir(
dataflow_id: uuid::Uuid,
repo_addr: String,
rev: Option<GitRepoRev>,
target_dir: &Path,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<Self> {
let repo_url = Url::parse(&repo_addr).context("failed to parse git repository URL")?;

let base_dir = {
let base = {
let mut path =
target_dir.join(repo_url.host_str().context("git URL has no hostname")?);

path.extend(repo_url.path_segments().context("no path in git URL")?);
path
};
match &rev {
None => base,
Some(rev) => match rev {
GitRepoRev::Branch(branch) => base.join("branch").join(branch),
GitRepoRev::Tag(tag) => base.join("tag").join(tag),
GitRepoRev::Rev(rev) => base.join("rev").join(rev),
},
}
};
let clone_dir = if clone_dir_exists(&base_dir, repos_in_use) {
let used_by_other = used_by_other_dataflow(dataflow_id, &base_dir, repos_in_use);
if used_by_other {
// don't reuse, choose new directory
// (TODO reuse if still up to date)

let dir_name = base_dir.file_name().unwrap().to_str().unwrap();
let mut i = 1;
loop {
let new_path = base_dir.with_file_name(format!("{dir_name}-{i}"));
if clone_dir_exists(&new_path, repos_in_use)
&& used_by_other_dataflow(dataflow_id, &new_path, repos_in_use)
{
i += 1;
} else {
break new_path;
}
}
} else {
base_dir
}
} else {
base_dir
};
let clone_dir = dunce::simplified(&clone_dir).to_owned();

let reuse = if clone_dir_exists(&clone_dir, repos_in_use) {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(&clone_dir).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
if used_by_other_dataflow {
// The directory is currently in use by another dataflow. We currently don't
// support reusing the same clone across multiple dataflow runs. Above, we
// choose a new directory if we detect such a case. So this `if` branch
// should never be reached.
eyre::bail!("clone_dir is already in use by other dataflow")
} else if in_use.is_empty() {
// The cloned repo is not used by any dataflow, so we can safely reuse it. However,
// the clone might be still on an older commit, so we need to do a `git fetch`
// before we reuse it.
ReuseOptions::ReuseAfterFetch
} else {
// This clone is already used for another node of this dataflow. We will do a
// `git fetch` operation for the first node of this dataflow, so we don't need
// to do it again for other nodes of the dataflow. So we can simply reuse the
// directory without doing any additional git operations.
ReuseOptions::Reuse
}
} else {
ReuseOptions::NewClone
};
repos_in_use
.entry(clone_dir.clone())
.or_default()
.insert(dataflow_id);

Ok(GitFolder {
clone_dir,
reuse,
repo_addr,
rev,
})
}

pub async fn prepare(self, logger: &mut NodeLogger<'_>) -> eyre::Result<PathBuf> {
let GitFolder {
clone_dir,
reuse,
repo_addr,
rev,
} = self;

let rev_str = rev_str(&rev);
let refname = rev.clone().map(|rev| match rev {
GitRepoRev::Branch(branch) => format!("refs/remotes/origin/{branch}"),
GitRepoRev::Tag(tag) => format!("refs/tags/{tag}"),
GitRepoRev::Rev(rev) => rev,
});

match reuse {
ReuseOptions::NewClone => {
let repository = clone_into(&repo_addr, &rev, &clone_dir, logger).await?;
checkout_tree(&repository, refname)?;
}
ReuseOptions::ReuseAfterFetch => {
logger
.log(
LogLevel::Info,
None,
format!("fetching changes and reusing {repo_addr}{rev_str}"),
)
.await;
let refname_cloned = refname.clone();
let clone_dir = clone_dir.clone();
let repository = fetch_changes(clone_dir, refname_cloned).await?;
checkout_tree(&repository, refname)?;
}
ReuseOptions::Reuse => {
logger
.log(
LogLevel::Info,
None,
format!("reusing up-to-date {repo_addr}{rev_str}"),
)
.await;
}
};
Ok(clone_dir)
}
}

fn used_by_other_dataflow(
dataflow_id: uuid::Uuid,
clone_dir_base: &PathBuf,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> bool {
let empty = BTreeSet::new();
let in_use = repos_in_use.get(clone_dir_base).unwrap_or(&empty);
let used_by_other_dataflow = in_use.iter().any(|&id| id != dataflow_id);
used_by_other_dataflow
}

enum ReuseOptions {
/// Create a new clone of the repository.
NewClone,
/// Reuse an existing up-to-date clone of the repository.
Reuse,
/// Update an older clone of the repository, then reuse it.
ReuseAfterFetch,
}

fn rev_str(rev: &Option<GitRepoRev>) -> String {
match rev {
Some(GitRepoRev::Branch(branch)) => format!(" (branch {branch})"),
Some(GitRepoRev::Tag(tag)) => format!(" (tag {tag})"),
Some(GitRepoRev::Rev(rev)) => format!(" (rev {rev})"),
None => String::new(),
}
}

async fn clone_into(
repo_addr: &String,
rev: &Option<GitRepoRev>,
clone_dir: &Path,
logger: &mut NodeLogger<'_>,
) -> eyre::Result<git2::Repository> {
if let Some(parent) = clone_dir.parent() {
tokio::fs::create_dir_all(parent)
.await
.context("failed to create parent directory for git clone")?;
}

let rev_str = rev_str(rev);
logger
.log(
LogLevel::Info,
None,
format!("cloning {repo_addr}{rev_str} into {}", clone_dir.display()),
)
.await;
let rev: Option<GitRepoRev> = rev.clone();
let clone_into = clone_dir.to_owned();
let repo_addr = repo_addr.clone();
let task = tokio::task::spawn_blocking(move || {
let mut builder = git2::build::RepoBuilder::new();
let mut fetch_options = git2::FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
builder.fetch_options(fetch_options);
if let Some(GitRepoRev::Branch(branch)) = &rev {
builder.branch(branch);
}
builder
.clone(&repo_addr, &clone_into)
.context("failed to clone repo")
});
let repo = task.await??;
Ok(repo)
}

async fn fetch_changes(
repo_dir: PathBuf,
refname: Option<String>,
) -> Result<git2::Repository, eyre::Error> {
let fetch_changes = tokio::task::spawn_blocking(move || {
let repository = git2::Repository::open(&repo_dir).context("failed to open git repo")?;

{
let mut remote = repository
.find_remote("origin")
.context("failed to find remote `origin` in repo")?;
remote
.connect(git2::Direction::Fetch)
.context("failed to connect to remote")?;
let default_branch = remote
.default_branch()
.context("failed to get default branch for remote")?;
let fetch = match &refname {
Some(refname) => refname,
None => default_branch
.as_str()
.context("failed to read default branch as string")?,
};
let mut fetch_options = FetchOptions::new();
fetch_options.download_tags(git2::AutotagOption::All);
remote
.fetch(&[&fetch], Some(&mut fetch_options), None)
.context("failed to fetch from git repo")?;
}
Result::<_, eyre::Error>::Ok(repository)
});
let repository = fetch_changes.await??;
Ok(repository)
}

fn checkout_tree(repository: &git2::Repository, refname: Option<String>) -> eyre::Result<()> {
if let Some(refname) = refname {
let (object, reference) = repository
.revparse_ext(&refname)
.context("failed to parse ref")?;
repository
.checkout_tree(&object, None)
.context("failed to checkout ref")?;
match reference {
Some(reference) => repository
.set_head(reference.name().context("failed to get reference_name")?)
.context("failed to set head")?,
None => repository
.set_head_detached(object.id())
.context("failed to set detached head")?,
}
}
Ok(())
}

fn clone_dir_exists(dir: &PathBuf, repos_in_use: &BTreeMap<PathBuf, BTreeSet<Uuid>>) -> bool {
repos_in_use.contains_key(dir) || dir.exists()
}

+ 2
- 330
binaries/daemon/src/spawn/mod.rs View File

@@ -46,8 +46,6 @@ use tokio::{
};
use tracing::error;

mod git;

#[derive(Clone)]
pub struct Spawner {
pub dataflow_id: DataflowId,
@@ -57,333 +55,6 @@ pub struct Spawner {
/// clock is required for generating timestamps when dropping messages early because queue is full
pub clock: Arc<HLC>,
pub uv: bool,
pub build_only: bool,
}

impl Spawner {
pub async fn prepare_node(
self,
node: ResolvedNode,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
logger: &mut NodeLogger<'_>,
repos_in_use: &mut BTreeMap<PathBuf, BTreeSet<DataflowId>>,
) -> eyre::Result<impl Future<Output = eyre::Result<PreparedNode>>> {
let dataflow_id = self.dataflow_id;
let node_id = node.id.clone();
logger
.log(
LogLevel::Debug,
Some("daemon::spawner".into()),
"spawning node",
)
.await;

let queue_sizes = node_inputs(&node)
.into_iter()
.map(|(k, v)| (k, v.queue_size.unwrap_or(10)))
.collect();
let daemon_communication = spawn_listener_loop(
&dataflow_id,
&node_id,
&self.daemon_tx,
self.dataflow_descriptor.communication.local,
queue_sizes,
self.clock.clone(),
)
.await?;

let node_config = NodeConfig {
dataflow_id,
node_id: node_id.clone(),
run_config: node.kind.run_config(),
daemon_communication,
dataflow_descriptor: self.dataflow_descriptor.clone(),
dynamic: node.kind.dynamic(),
};

let prepared_git = if let dora_core::descriptor::CoreNodeKind::Custom(CustomNode {
source: dora_message::descriptor::NodeSource::GitBranch { repo, rev },
..
}) = &node.kind
{
let target_dir = self.working_dir.join("build");
let git_folder = GitFolder::choose_clone_dir(
self.dataflow_id,
repo.clone(),
rev.clone(),
&target_dir,
repos_in_use,
)?;
Some(git_folder)
} else {
None
};

let mut logger = logger
.try_clone()
.await
.wrap_err("failed to clone logger")?;
let task = async move {
self.prepare_node_inner(
node,
&mut logger,
dataflow_id,
node_config,
prepared_git,
node_stderr_most_recent,
)
.await
};
Ok(task)
}

async fn prepare_node_inner(
mut self,
node: ResolvedNode,
logger: &mut NodeLogger<'_>,
dataflow_id: uuid::Uuid,
node_config: NodeConfig,
git_folder: Option<GitFolder>,
node_stderr_most_recent: Arc<ArrayQueue<String>>,
) -> eyre::Result<PreparedNode> {
let (command, error_msg) = match &node.kind {
dora_core::descriptor::CoreNodeKind::Custom(n) => {
let build_dir = match git_folder {
Some(git_folder) => git_folder.prepare(logger).await?,
None => self.working_dir.clone(),
};

if let Some(build) = &n.build {
self.build_node(logger, &node.env, build_dir.clone(), build)
.await?;
}
let mut command = if self.build_only {
None
} else {
path_spawn_command(&build_dir, self.uv, logger, n, true).await?
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);
command.stdin(Stdio::null());

command.env(
"DORA_NODE_CONFIG",
serde_yaml::to_string(&node_config.clone())
.wrap_err("failed to serialize node config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
if let Some(envs) = &n.envs {
// node has some inner env variables -> add them too
for (key, value) in envs {
command.env(key, value.to_string());
}
}

// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command.env("PYTHONUNBUFFERED", "1");
command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};

let error_msg = format!(
"failed to run `{}` with args `{}`",
n.path,
n.args.as_deref().unwrap_or_default(),
);
(command, error_msg)
}
dora_core::descriptor::CoreNodeKind::Runtime(n) => {
// run build commands
for operator in &n.operators {
if let Some(build) = &operator.config.build {
self.build_node(logger, &node.env, self.working_dir.clone(), build)
.await?;
}
}

let python_operators: Vec<&OperatorDefinition> = n
.operators
.iter()
.filter(|x| matches!(x.config.source, OperatorSource::Python { .. }))
.collect();

let other_operators = n
.operators
.iter()
.any(|x| !matches!(x.config.source, OperatorSource::Python { .. }));

let mut command = if self.build_only {
None
} else if !python_operators.is_empty() && !other_operators {
// Use python to spawn runtime if there is a python operator

// TODO: Handle multi-operator runtime once sub-interpreter is supported
if python_operators.len() > 2 {
eyre::bail!(
"Runtime currently only support one Python Operator.
This is because pyo4 sub-interpreter is not yet available.
See: https://github.com/PyO4/pyo3/issues/576"
);
}

let python_operator = python_operators
.first()
.context("Runtime had no operators definition.")?;

if let OperatorSource::Python(PythonSource {
source: _,
conda_env: Some(conda_env),
}) = &python_operator.config.source
{
let conda = which::which("conda").context(
"failed to find `conda`, yet a `conda_env` was defined. Make sure that `conda` is available.",
)?;
let mut command = tokio::process::Command::new(conda);
command.args([
"run",
"-n",
conda_env,
"python",
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
Some(command)
} else {
let mut cmd = if self.uv {
let mut cmd = tokio::process::Command::new("uv");
cmd.arg("run");
cmd.arg("python");
tracing::info!(
"spawning: uv run python -uc import dora; dora.start_runtime() # {}",
node.id
);
cmd
} else {
let python = get_python_path()
.wrap_err("Could not find python path when spawning custom node")?;
tracing::info!(
"spawning: python -uc import dora; dora.start_runtime() # {}",
node.id
);

tokio::process::Command::new(python)
};
// Force python to always flush stdout/stderr buffer
cmd.args([
"-c",
format!("import dora; dora.start_runtime() # {}", node.id).as_str(),
]);
Some(cmd)
}
} else if python_operators.is_empty() && other_operators {
let mut cmd = tokio::process::Command::new(
std::env::current_exe()
.wrap_err("failed to get current executable path")?,
);
cmd.arg("runtime");
Some(cmd)
} else {
eyre::bail!("Runtime can not mix Python Operator with other type of operator.");
};

let runtime_config = RuntimeConfig {
node: node_config.clone(),
operators: n.operators.clone(),
};

if let Some(command) = &mut command {
command.current_dir(&self.working_dir);

command.env(
"DORA_RUNTIME_CONFIG",
serde_yaml::to_string(&runtime_config)
.wrap_err("failed to serialize runtime config")?,
);
// Injecting the env variable defined in the `yaml` into
// the node runtime.
if let Some(envs) = &node.env {
for (key, value) in envs {
command.env(key, value.to_string());
}
}
// Set the process group to 0 to ensure that the spawned process does not exit immediately on CTRL-C
#[cfg(unix)]
command.process_group(0);

command
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
};
let error_msg = format!(
"failed to run runtime {}/{}",
runtime_config.node.dataflow_id, runtime_config.node.node_id
);
(command, error_msg)
}
};
Ok(PreparedNode {
command,
spawn_error_msg: error_msg,
working_dir: self.working_dir,
dataflow_id,
node,
node_config,
clock: self.clock,
daemon_tx: self.daemon_tx,
node_stderr_most_recent,
})
}

async fn build_node(
&mut self,
logger: &mut NodeLogger<'_>,
node_env: &Option<BTreeMap<String, EnvValue>>,
working_dir: PathBuf,
build: &String,
) -> Result<(), eyre::Error> {
logger
.log(
LogLevel::Info,
None,
format!("running build command: `{build}"),
)
.await;
let build = build.to_owned();
let uv = self.uv;
let node_env = node_env.clone();
let mut logger = logger.try_clone().await.context("failed to clone logger")?;
let (stdout_tx, mut stdout) = tokio::sync::mpsc::channel(10);
let task = tokio::task::spawn_blocking(move || {
run_build_command(&build, &working_dir, uv, &node_env, stdout_tx)
.context("build command failed")
});
tokio::spawn(async move {
while let Some(line) = stdout.recv().await {
logger
.log(
LogLevel::Info,
Some("build command".into()),
line.unwrap_or_else(|err| format!("io err: {}", err.kind())),
)
.await;
}
});
task.await??;
Ok(())
}
}

pub struct PreparedNode {
@@ -642,7 +313,8 @@ impl PreparedNode {
cloned_logger
.log(LogMessage {
daemon_id: Some(daemon_id.clone()),
dataflow_id,
dataflow_id: Some(dataflow_id),
build_id: None,
level: LogLevel::Info,
node_id: Some(node_id.clone()),
target: Some("stdout".into()),


+ 2
- 2
examples/rust-dataflow-git/dataflow.yml View File

@@ -2,8 +2,8 @@ nodes:
- id: rust-node
git: https://github.com/dora-rs/dora.git
rev: e31b2a34 # pinned commit, update this when changing the message crate
build: cargo build -p rust-dataflow-example-node
path: target/debug/rust-dataflow-example-node
build: cargo build --manifest-path build/github.com/dora-rs/dora.git/e31b2a34/Cargo.toml -p rust-dataflow-example-node
path: build/github.com/dora-rs/dora.git/e31b2a34/target/debug/rust-dataflow-example-node
inputs:
tick: dora/timer/millis/10
outputs:


+ 8
- 1
libraries/message/src/cli_to_coordinator.rs View File

@@ -9,14 +9,21 @@ use crate::{

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub enum ControlRequest {
Build {
dataflow: Descriptor,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
uv: bool,
},
Start {
build_id: Option<Uuid>,
dataflow: Descriptor,
name: Option<String>,
// TODO: remove this once we figure out deploying of node/operator
// binaries from CLI to coordinator/daemon
local_working_dir: PathBuf,
uv: bool,
build_only: bool,
},
WaitForSpawn {
dataflow_id: Uuid,


+ 3
- 2
libraries/message/src/common.rs View File

@@ -5,14 +5,15 @@ use aligned_vec::{AVec, ConstAlign};
use eyre::Context as _;
use uuid::Uuid;

use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, DataflowId};
use crate::{daemon_to_daemon::InterDaemonEvent, id::NodeId, BuildId, DataflowId};

pub use log::Level as LogLevel;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[must_use]
pub struct LogMessage {
pub dataflow_id: DataflowId,
pub build_id: Option<BuildId>,
pub dataflow_id: Option<DataflowId>,
pub node_id: Option<NodeId>,
pub daemon_id: Option<DaemonId>,
pub level: LogLevel,


+ 20
- 4
libraries/message/src/coordinator_to_cli.rs View File

@@ -9,10 +9,26 @@ use crate::{common::DaemonId, id::NodeId};
pub enum ControlRequestReply {
Error(String),
CoordinatorStopped,
DataflowStartTriggered { uuid: Uuid },
DataflowSpawned { uuid: Uuid },
DataflowReloaded { uuid: Uuid },
DataflowStopped { uuid: Uuid, result: DataflowResult },
DataflowBuildTriggered {
build_id: Uuid,
},
DataflowBuildFinished {
build_id: Uuid,
result: Result<(), String>,
},
DataflowStartTriggered {
uuid: Uuid,
},
DataflowSpawned {
uuid: Uuid,
},
DataflowReloaded {
uuid: Uuid,
},
DataflowStopped {
uuid: Uuid,
result: DataflowResult,
},
DataflowList(DataflowList),
DestroyOk,
DaemonConnected(bool),


+ 14
- 1
libraries/message/src/coordinator_to_daemon.rs View File

@@ -4,6 +4,8 @@ use std::{
time::Duration,
};

use uuid::Uuid;

use crate::{
common::DaemonId,
descriptor::{Descriptor, ResolvedNode},
@@ -33,6 +35,7 @@ impl RegisterResult {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorEvent {
Build(BuildDataflowNodes),
Spawn(SpawnDataflowNodes),
AllNodesReady {
dataflow_id: DataflowId,
@@ -55,13 +58,23 @@ pub enum DaemonCoordinatorEvent {
Heartbeat,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct BuildDataflowNodes {
pub build_id: Uuid,
pub working_dir: PathBuf,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub nodes_on_machine: BTreeSet<NodeId>,
pub uv: bool,
}

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub struct SpawnDataflowNodes {
pub build_id: Option<Uuid>,
pub dataflow_id: DataflowId,
pub working_dir: PathBuf,
pub nodes: BTreeMap<NodeId, ResolvedNode>,
pub dataflow_descriptor: Descriptor,
pub spawn_nodes: BTreeSet<NodeId>,
pub uv: bool,
pub build_only: bool,
}

+ 1
- 0
libraries/message/src/daemon_to_coordinator.rs View File

@@ -77,6 +77,7 @@ impl DataflowDaemonResult {

#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub enum DaemonCoordinatorReply {
TriggerBuildResult(Result<(), String>),
TriggerSpawnResult(Result<(), String>),
ReloadResult(Result<(), String>),
StopResult(Result<(), String>),


+ 6
- 0
libraries/message/src/descriptor.rs View File

@@ -253,6 +253,12 @@ pub enum NodeSource {
},
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum ResolvedNodeSource {
Local,
GitCommit { repo: String, commit_hash: String },
}

#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum GitRepoRev {
Branch(String),


+ 1
- 0
libraries/message/src/lib.rs View File

@@ -26,6 +26,7 @@ pub use arrow_data;
pub use arrow_schema;

pub type DataflowId = uuid::Uuid;
pub type BuildId = uuid::Uuid;

fn current_crate_version() -> semver::Version {
let crate_version_raw = env!("CARGO_PKG_VERSION");


Loading…
Cancel
Save