From 3adb41320225ba6afc80dbde414133ec4c277d01 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 4 Jun 2025 18:32:24 +0200 Subject: [PATCH] Wip --- binaries/cli/src/lib.rs | 111 ++++++++++++++++---- binaries/cli/src/session.rs | 71 +++++++++++++ libraries/message/src/cli_to_coordinator.rs | 7 ++ 3 files changed, 170 insertions(+), 19 deletions(-) create mode 100644 binaries/cli/src/session.rs diff --git a/binaries/cli/src/lib.rs b/binaries/cli/src/lib.rs index b5179844..b27dcb99 100644 --- a/binaries/cli/src/lib.rs +++ b/binaries/cli/src/lib.rs @@ -17,17 +17,19 @@ use dora_message::{ cli_to_coordinator::ControlRequest, common::LogMessage, coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus}, + BuildId, SessionId, }; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; use dora_tracing::{set_up_tracing_opts, FileLogging}; use duration_str::parse; -use eyre::{bail, Context}; +use eyre::{bail, Context, ContextCompat}; use formatting::FormatDataflowError; use std::{ env::current_dir, io::Write, net::{SocketAddr, TcpStream}, + path::Path, }; use std::{ net::{IpAddr, Ipv4Addr}, @@ -44,6 +46,7 @@ mod check; mod formatting; mod graph; mod logs; +mod session; mod template; mod up; @@ -385,8 +388,14 @@ fn run(args: Args) -> eyre::Result<()> { uv, } => { let coordinator_socket = (coordinator_addr, coordinator_port).into(); - let (_, _, mut session, uuid) = - build_dataflow(session_id, dataflow, coordinator_socket, uv)?; + let (mut session, build_id) = build_dataflow(dataflow, coordinator_socket, uv)?; + // wait until dataflow build is finished + wait_until_dataflow_built( + build_id, + &mut session, + coordinator_socket, + log::LevelFilter::Info, + )?; } Command::New { args, @@ -394,11 +403,20 @@ fn run(args: Args) -> eyre::Result<()> { } => template::create(args, internal_create_with_path_dependencies)?, Command::Run { dataflow, uv } => { let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let dataflow_session = DataflowSession::read_session(&dataflow) + .context("failed to read DataflowSession")?; + let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; - let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?; + + let result = rt.block_on(Daemon::run_dataflow( + &dataflow_path, + dataflow_session.build_id, + dataflow_session.session_id, + uv, + ))?; handle_dataflow_result(result, None)? } Command::Up { config } => { @@ -468,7 +486,6 @@ fn run(args: Args) -> eyre::Result<()> { wait_until_dataflow_started( dataflow_id, &mut session, - false, coordinator_socket, log::LevelFilter::Info, )?; @@ -552,8 +569,10 @@ fn run(args: Args) -> eyre::Result<()> { coordinator_addr ); } + let dataflow_session = + DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?; - let result = Daemon::run_dataflow(&dataflow_path, false).await?; + let result = Daemon::run_dataflow(&dataflow_path, dataflow_session.build_id, dataflow_session.session_id, false).await?; handle_dataflow_result(result, None) } None => { @@ -625,14 +644,15 @@ fn run(args: Args) -> eyre::Result<()> { } fn build_dataflow( - session_id: Uuid, dataflow: String, coordinator_socket: SocketAddr, uv: bool, -) -> Result<(PathBuf, Descriptor, Box, Uuid), eyre::Error> { +) -> eyre::Result<(Box, Uuid)> { let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; let dataflow_descriptor = Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let dataflow_session = + DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?; let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine let local_working_dir = if cli_and_daemon_on_same_machine { @@ -655,10 +675,10 @@ fn build_dataflow( let reply_raw = session .request( &serde_json::to_vec(&ControlRequest::Build { - session_id, + session_id: dataflow_session.build_id, dataflow, git_sources, - prev_git_sources, + prev_git_sources: dataflow_session.git_sources.clone(), local_working_dir, uv, }) @@ -677,12 +697,66 @@ fn build_dataflow( other => bail!("unexpected start dataflow reply: {other:?}"), } }; - Ok((dataflow, dataflow_descriptor, session, dataflow_id)) + Ok((session, dataflow_id)) +} + +fn wait_until_dataflow_built( + build_id: Uuid, + session: &mut Box, + coordinator_addr: SocketAddr, + log_level: log::LevelFilter, +) -> eyre::Result<()> { + // subscribe to log messages + let mut log_session = TcpConnection { + stream: TcpStream::connect(coordinator_addr) + .wrap_err("failed to connect to dora coordinator")?, + }; + log_session + .send( + &serde_json::to_vec(&ControlRequest::BuildLogSubscribe { + build_id, + level: log_level, + }) + .wrap_err("failed to serialize message")?, + ) + .wrap_err("failed to send build log subscribe request to coordinator")?; + std::thread::spawn(move || { + while let Ok(raw) = log_session.receive() { + let parsed: eyre::Result = + serde_json::from_slice(&raw).context("failed to parse log message"); + match parsed { + Ok(log_message) => { + print_log_message(log_message); + } + Err(err) => { + tracing::warn!("failed to parse log message: {err:?}") + } + } + } + }); + + let reply_raw = session + .request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap()) + .wrap_err("failed to send WaitForBuild message")?; + + let result: ControlRequestReply = + serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; + match result { + ControlRequestReply::DataflowBuildFinished { + build_id, + session_id, + result, + } => match result { + Ok(()) => eprintln!("dataflow build finished successfully"), + Err(err) => bail!("{err}"), + }, + ControlRequestReply::Error(err) => bail!("{err}"), + other => bail!("unexpected start dataflow reply: {other:?}"), + } + Ok(()) } fn start_dataflow( - build_id: Option, - session_id: Uuid, dataflow: String, name: Option, coordinator_socket: SocketAddr, @@ -691,6 +765,8 @@ fn start_dataflow( let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?; let dataflow_descriptor = Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?; + let dataflow_session = + DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?; let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine let local_working_dir = if cli_and_daemon_on_same_machine { @@ -741,7 +817,6 @@ fn start_dataflow( fn wait_until_dataflow_started( dataflow_id: Uuid, session: &mut Box, - build_only: bool, coordinator_addr: SocketAddr, log_level: log::LevelFilter, ) -> eyre::Result<()> { @@ -782,11 +857,7 @@ fn wait_until_dataflow_started( serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?; match result { ControlRequestReply::DataflowSpawned { uuid } => { - if build_only { - eprintln!("dataflow build finished"); - } else { - eprintln!("dataflow started: {uuid}"); - } + eprintln!("dataflow started: {uuid}"); } ControlRequestReply::Error(err) => bail!("{err}"), other => bail!("unexpected start dataflow reply: {other:?}"), @@ -944,6 +1015,8 @@ use pyo3::{ wrap_pyfunction, Bound, PyResult, Python, }; +use crate::session::DataflowSession; + #[cfg(feature = "python")] #[pyfunction] fn py_main(_py: Python) -> PyResult<()> { diff --git a/binaries/cli/src/session.rs b/binaries/cli/src/session.rs new file mode 100644 index 00000000..bc5d464a --- /dev/null +++ b/binaries/cli/src/session.rs @@ -0,0 +1,71 @@ +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; + +use dora_message::{common::GitSource, id::NodeId, BuildId}; +use eyre::{Context, ContextCompat}; +use uuid::Uuid; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DataflowSession { + pub build_id: Option, + pub session_id: Uuid, + pub git_sources: BTreeMap, +} + +impl Default for DataflowSession { + fn default() -> Self { + Self { + build_id: None, + session_id: Uuid::new_v4(), + git_sources: Default::default(), + } + } +} + +impl DataflowSession { + pub fn read_session(dataflow_path: &Path) -> eyre::Result { + let session_file = session_file_path(dataflow_path)?; + if session_file.exists() { + if let Ok(parsed) = deserialize(&session_file) { + return Ok((parsed)); + } else { + tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)"); + } + } + + let default_session = DataflowSession::default(); + default_session.write_out_for_dataflow(dataflow_path)?; + Ok(default_session) + } + + pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> { + let session_file = session_file_path(dataflow_path)?; + std::fs::write(session_file, self.serialize()?) + .context("failed to write dataflow session file"); + Ok(()) + } + + fn serialize(&self) -> eyre::Result { + Ok(serde_yaml::to_string(&self).context("failed to serialize dataflow session file")?) + } +} + +fn deserialize(session_file: &Path) -> eyre::Result { + std::fs::read_to_string(&session_file) + .context("failed to read DataflowSession file") + .and_then(|s| { + serde_yaml::from_str(&s).context("failed to deserialize DataflowSession file") + }) +} + +fn session_file_path(dataflow_path: &Path) -> eyre::Result { + let file_stem = dataflow_path + .file_stem() + .wrap_err("dataflow path has no file stem")? + .to_str() + .wrap_err("dataflow file stem is not valid utf-8")?; + let session_file = dataflow_path.with_file_name(format!("{file_stem}.dora-session.yaml")); + Ok(session_file) +} diff --git a/libraries/message/src/cli_to_coordinator.rs b/libraries/message/src/cli_to_coordinator.rs index b1f2a7bb..03c8dac3 100644 --- a/libraries/message/src/cli_to_coordinator.rs +++ b/libraries/message/src/cli_to_coordinator.rs @@ -26,6 +26,9 @@ pub enum ControlRequest { local_working_dir: Option, uv: bool, }, + WaitForBuild { + build_id: Uuid, + }, Start { build_id: Option, session_id: Uuid, @@ -73,4 +76,8 @@ pub enum ControlRequest { dataflow_id: Uuid, level: log::LevelFilter, }, + BuildLogSubscribe { + build_id: Uuid, + level: log::LevelFilter, + }, }