Philipp Oppermann 7 months ago
parent
commit
3adb413202
Failed to extract signature
3 changed files with 170 additions and 19 deletions
  1. +92
    -19
      binaries/cli/src/lib.rs
  2. +71
    -0
      binaries/cli/src/session.rs
  3. +7
    -0
      libraries/message/src/cli_to_coordinator.rs

+ 92
- 19
binaries/cli/src/lib.rs View File

@@ -17,17 +17,19 @@ use dora_message::{
cli_to_coordinator::ControlRequest,
common::LogMessage,
coordinator_to_cli::{ControlRequestReply, DataflowList, DataflowResult, DataflowStatus},
BuildId, SessionId,
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context};
use eyre::{bail, Context, ContextCompat};
use formatting::FormatDataflowError;
use std::{
env::current_dir,
io::Write,
net::{SocketAddr, TcpStream},
path::Path,
};
use std::{
net::{IpAddr, Ipv4Addr},
@@ -44,6 +46,7 @@ mod check;
mod formatting;
mod graph;
mod logs;
mod session;
mod template;
mod up;

@@ -385,8 +388,14 @@ fn run(args: Args) -> eyre::Result<()> {
uv,
} => {
let coordinator_socket = (coordinator_addr, coordinator_port).into();
let (_, _, mut session, uuid) =
build_dataflow(session_id, dataflow, coordinator_socket, uv)?;
let (mut session, build_id) = build_dataflow(dataflow, coordinator_socket, uv)?;
// wait until dataflow build is finished
wait_until_dataflow_built(
build_id,
&mut session,
coordinator_socket,
log::LevelFilter::Info,
)?;
}
Command::New {
args,
@@ -394,11 +403,20 @@ fn run(args: Args) -> eyre::Result<()> {
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow, uv } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_session = DataflowSession::read_session(&dataflow)
.context("failed to read DataflowSession")?;

let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path, uv))?;

let result = rt.block_on(Daemon::run_dataflow(
&dataflow_path,
dataflow_session.build_id,
dataflow_session.session_id,
uv,
))?;
handle_dataflow_result(result, None)?
}
Command::Up { config } => {
@@ -468,7 +486,6 @@ fn run(args: Args) -> eyre::Result<()> {
wait_until_dataflow_started(
dataflow_id,
&mut session,
false,
coordinator_socket,
log::LevelFilter::Info,
)?;
@@ -552,8 +569,10 @@ fn run(args: Args) -> eyre::Result<()> {
coordinator_addr
);
}
let dataflow_session =
DataflowSession::read_session(&dataflow_path).context("failed to read DataflowSession")?;

let result = Daemon::run_dataflow(&dataflow_path, false).await?;
let result = Daemon::run_dataflow(&dataflow_path, dataflow_session.build_id, dataflow_session.session_id, false).await?;
handle_dataflow_result(result, None)
}
None => {
@@ -625,14 +644,15 @@ fn run(args: Args) -> eyre::Result<()> {
}

fn build_dataflow(
session_id: Uuid,
dataflow: String,
coordinator_socket: SocketAddr,
uv: bool,
) -> Result<(PathBuf, Descriptor, Box<TcpRequestReplyConnection>, Uuid), eyre::Error> {
) -> eyre::Result<(Box<TcpRequestReplyConnection>, Uuid)> {
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?;

let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine
let local_working_dir = if cli_and_daemon_on_same_machine {
@@ -655,10 +675,10 @@ fn build_dataflow(
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Build {
session_id,
session_id: dataflow_session.build_id,
dataflow,
git_sources,
prev_git_sources,
prev_git_sources: dataflow_session.git_sources.clone(),
local_working_dir,
uv,
})
@@ -677,12 +697,66 @@ fn build_dataflow(
other => bail!("unexpected start dataflow reply: {other:?}"),
}
};
Ok((dataflow, dataflow_descriptor, session, dataflow_id))
Ok((session, dataflow_id))
}

fn wait_until_dataflow_built(
build_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
// subscribe to log messages
let mut log_session = TcpConnection {
stream: TcpStream::connect(coordinator_addr)
.wrap_err("failed to connect to dora coordinator")?,
};
log_session
.send(
&serde_json::to_vec(&ControlRequest::BuildLogSubscribe {
build_id,
level: log_level,
})
.wrap_err("failed to serialize message")?,
)
.wrap_err("failed to send build log subscribe request to coordinator")?;
std::thread::spawn(move || {
while let Ok(raw) = log_session.receive() {
let parsed: eyre::Result<LogMessage> =
serde_json::from_slice(&raw).context("failed to parse log message");
match parsed {
Ok(log_message) => {
print_log_message(log_message);
}
Err(err) => {
tracing::warn!("failed to parse log message: {err:?}")
}
}
}
});

let reply_raw = session
.request(&serde_json::to_vec(&ControlRequest::WaitForBuild { build_id }).unwrap())
.wrap_err("failed to send WaitForBuild message")?;

let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowBuildFinished {
build_id,
session_id,
result,
} => match result {
Ok(()) => eprintln!("dataflow build finished successfully"),
Err(err) => bail!("{err}"),
},
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
}
Ok(())
}

fn start_dataflow(
build_id: Option<Uuid>,
session_id: Uuid,
dataflow: String,
name: Option<String>,
coordinator_socket: SocketAddr,
@@ -691,6 +765,8 @@ fn start_dataflow(
let dataflow = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
let dataflow_session =
DataflowSession::read_session(&dataflow).context("failed to read DataflowSession")?;

let cli_and_daemon_on_same_machine = false; // TODO FIXME set to true if on same machine
let local_working_dir = if cli_and_daemon_on_same_machine {
@@ -741,7 +817,6 @@ fn start_dataflow(
fn wait_until_dataflow_started(
dataflow_id: Uuid,
session: &mut Box<TcpRequestReplyConnection>,
build_only: bool,
coordinator_addr: SocketAddr,
log_level: log::LevelFilter,
) -> eyre::Result<()> {
@@ -782,11 +857,7 @@ fn wait_until_dataflow_started(
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowSpawned { uuid } => {
if build_only {
eprintln!("dataflow build finished");
} else {
eprintln!("dataflow started: {uuid}");
}
eprintln!("dataflow started: {uuid}");
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),
@@ -944,6 +1015,8 @@ use pyo3::{
wrap_pyfunction, Bound, PyResult, Python,
};

use crate::session::DataflowSession;

#[cfg(feature = "python")]
#[pyfunction]
fn py_main(_py: Python) -> PyResult<()> {


+ 71
- 0
binaries/cli/src/session.rs View File

@@ -0,0 +1,71 @@
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
};

use dora_message::{common::GitSource, id::NodeId, BuildId};
use eyre::{Context, ContextCompat};
use uuid::Uuid;

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct DataflowSession {
pub build_id: Option<BuildId>,
pub session_id: Uuid,
pub git_sources: BTreeMap<NodeId, GitSource>,
}

impl Default for DataflowSession {
fn default() -> Self {
Self {
build_id: None,
session_id: Uuid::new_v4(),
git_sources: Default::default(),
}
}
}

impl DataflowSession {
pub fn read_session(dataflow_path: &Path) -> eyre::Result<Self> {
let session_file = session_file_path(dataflow_path)?;
if session_file.exists() {
if let Ok(parsed) = deserialize(&session_file) {
return Ok((parsed));
} else {
tracing::warn!("failed to read dataflow session file, regenerating (you might need to run `dora build` again)");
}
}

let default_session = DataflowSession::default();
default_session.write_out_for_dataflow(dataflow_path)?;
Ok(default_session)
}

pub fn write_out_for_dataflow(&self, dataflow_path: &Path) -> eyre::Result<()> {
let session_file = session_file_path(dataflow_path)?;
std::fs::write(session_file, self.serialize()?)
.context("failed to write dataflow session file");
Ok(())
}

fn serialize(&self) -> eyre::Result<String> {
Ok(serde_yaml::to_string(&self).context("failed to serialize dataflow session file")?)
}
}

fn deserialize(session_file: &Path) -> eyre::Result<DataflowSession> {
std::fs::read_to_string(&session_file)
.context("failed to read DataflowSession file")
.and_then(|s| {
serde_yaml::from_str(&s).context("failed to deserialize DataflowSession file")
})
}

fn session_file_path(dataflow_path: &Path) -> eyre::Result<PathBuf> {
let file_stem = dataflow_path
.file_stem()
.wrap_err("dataflow path has no file stem")?
.to_str()
.wrap_err("dataflow file stem is not valid utf-8")?;
let session_file = dataflow_path.with_file_name(format!("{file_stem}.dora-session.yaml"));
Ok(session_file)
}

+ 7
- 0
libraries/message/src/cli_to_coordinator.rs View File

@@ -26,6 +26,9 @@ pub enum ControlRequest {
local_working_dir: Option<PathBuf>,
uv: bool,
},
WaitForBuild {
build_id: Uuid,
},
Start {
build_id: Option<Uuid>,
session_id: Uuid,
@@ -73,4 +76,8 @@ pub enum ControlRequest {
dataflow_id: Uuid,
level: log::LevelFilter,
},
BuildLogSubscribe {
build_id: Uuid,
level: log::LevelFilter,
},
}

Loading…
Cancel
Save