Browse Source

Add attach logic and hot-reloading logic to the cli

tags/v0.2.2-rc
haixuanTao 2 years ago
parent
commit
8f05ec79a1
8 changed files with 202 additions and 25 deletions
  1. +4
    -1
      Cargo.lock
  2. +1
    -6
      apis/rust/node/src/daemon_connection/event_stream.rs
  3. +1
    -1
      apis/rust/node/src/event.rs
  4. +8
    -0
      binaries/cli/Cargo.toml
  5. +143
    -0
      binaries/cli/src/attach.rs
  6. +38
    -5
      binaries/cli/src/main.rs
  7. +1
    -11
      binaries/daemon/src/lib.rs
  8. +6
    -1
      binaries/runtime/src/lib.rs

+ 4
- 1
Cargo.lock View File

@@ -822,15 +822,19 @@ dependencies = [
"atty",
"clap 4.0.3",
"communication-layer-request-reply",
"ctrlc",
"dora-core",
"dora-node-api-c",
"dora-operator-api-c",
"dora-tracing",
"eyre",
"inquire",
"notify",
"serde",
"serde_json",
"serde_yaml 0.9.19",
"termcolor",
"tracing",
"uuid",
"webbrowser",
]
@@ -889,7 +893,6 @@ dependencies = [
"flume",
"futures",
"futures-concurrency",
"notify",
"serde",
"serde_json",
"serde_yaml 0.8.23",


+ 1
- 6
apis/rust/node/src/daemon_connection/event_stream.rs View File

@@ -171,12 +171,7 @@ impl EventStream {
let event = match event {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload {
operator_id: Some(operator_id),
} => Event::Reload { operator_id },
NodeEvent::Reload { operator_id: None } => {
Event::Error(format!("Received reload event without operator id"))
}
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {


+ 1
- 1
apis/rust/node/src/event.rs View File

@@ -12,7 +12,7 @@ use shared_memory::{Shmem, ShmemConf};
pub enum Event<'a> {
Stop,
Reload {
operator_id: OperatorId,
operator_id: Option<OperatorId>,
},
Input {
id: DataId,


+ 8
- 0
binaries/cli/Cargo.toml View File

@@ -12,6 +12,10 @@ license.workspace = true
name = "dora-cli"
path = "src/main.rs"

[features]
default = ["tracing"]
tracing = ["dep:dora-tracing"]

[dependencies]
clap = { version = "4.0.3", features = ["derive"] }
eyre = "0.6.8"
@@ -27,3 +31,7 @@ atty = "0.2.14"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
inquire = "0.5.2"
communication-layer-request-reply = { workspace = true }
notify = "5.1.0"
ctrlc = "3.2.5"
tracing = "0.1.36"
dora-tracing = { workspace = true, optional = true }

+ 143
- 0
binaries/cli/src/attach.rs View File

@@ -0,0 +1,143 @@
use communication_layer_request_reply::TcpRequestReplyConnection;
use dora_core::{
descriptor::{resolve_path, CoreNodeKind, Descriptor},
topics::{ControlRequest, ControlRequestReply},
};
use eyre::Context;
use notify::event::ModifyKind;
use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use uuid::Uuid;

use crate::control_connection;

pub fn attach_dataflow(
dataflow: Descriptor,
dataflow_path: PathBuf,
dataflow_id: Uuid,
session: &mut Option<Box<TcpRequestReplyConnection>>,
hot_reload: bool,
) -> Result<(), eyre::ErrReport> {
let (tx, rx) = mpsc::sync_channel(2);
// Generate path hashmap
let mut node_path_lookup = HashMap::new();

let nodes = dataflow.resolve_aliases();

let working_dir = dataflow_path
.canonicalize()
.context("failed to canoncialize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("canonicalized dataflow path has no parent"))?
.to_owned();

for node in nodes {
match node.kind {
CoreNodeKind::Custom(_cn) => (), // TODO: Reloading for custom node
CoreNodeKind::Runtime(rn) => {
for op in rn.operators.iter() {
match &op.config.source {
dora_core::descriptor::OperatorSource::Python(source) => {
let path = resolve_path(&source, &working_dir).wrap_err_with(|| {
format!("failed to resolve node source `{}`", source)
})?;
node_path_lookup
.insert(path, (dataflow_id, node.id.clone(), Some(op.id.clone())));
}
_ => (), // TODO: Reloading for non-Python Operator
}
}
}
}
}

// Setup dataflow file watcher if reload option is set.
let watcher_tx = tx.clone();
let _watcher = if hot_reload {
let hash = node_path_lookup.clone();
let paths = hash.keys();
let notifier = move |event| match event {
Ok(NotifyEvent {
paths,
kind: EventKind::Modify(ModifyKind::Data(_data)),
..
}) => {
for path in paths {
let (dataflow_id, node_id, operator_id) =
node_path_lookup.get(&path).unwrap().clone();
watcher_tx
.send(ControlRequest::Reload {
dataflow_id,
node_id,
operator_id,
})
.unwrap();
}
}
_ => (), // TODO: Manage different file event
};

let mut watcher = RecommendedWatcher::new(
notifier,
Config::default().with_poll_interval(Duration::from_secs(1)),
)?;

for path in paths {
watcher.watch(path, RecursiveMode::Recursive)?;
}
Some(watcher)
} else {
None
};

// Setup Ctrlc Watcher to stop dataflow after ctrlc
let ctrlc_tx = tx.clone();
let mut ctrlc_sent = false;
ctrlc::set_handler(move || {
if ctrlc_sent {
std::process::abort();
} else {
if ctrlc_tx
.send(ControlRequest::Stop {
dataflow_uuid: dataflow_id,
})
.is_err()
{
// bail!("failed to report ctrl-c event to dora-daemon");
}
ctrlc_sent = true;
}
})
.wrap_err("failed to set ctrl-c handler")?;

loop {
let control_request = match rx.recv_timeout(Duration::from_secs(1)) {
Err(_err) => ControlRequest::Check {
dataflow_uuid: dataflow_id,
},
Ok(reload_event) => reload_event,
};

let reply_raw = control_connection(session)?
.request(&serde_json::to_vec(&control_request)?)
.wrap_err("failed to send request message to coordinator")?;
let result: ControlRequestReply =
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => info!("dataflow {uuid} started"),
ControlRequestReply::DataflowStopped { uuid } => {
info!("dataflow {uuid} stopped");
break;
}
ControlRequestReply::DataflowReloaded { uuid } => {
info!("dataflow {uuid} reloaded")
}
other => error!("Received unexpected Coordinator Reply: {:#?}", other),
};
}

Ok(())
}

+ 38
- 5
binaries/cli/src/main.rs View File

@@ -1,13 +1,18 @@
use std::path::PathBuf;

use attach::attach_dataflow;
use clap::Parser;
use communication_layer_request_reply::{RequestReplyLayer, TcpLayer, TcpRequestReplyConnection};
use dora_core::{
descriptor::Descriptor,
topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId},
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use eyre::{bail, Context};
use std::path::PathBuf;
use uuid::Uuid;

mod attach;
mod build;
mod check;
mod graph;
@@ -67,6 +72,10 @@ enum Command {
dataflow: PathBuf,
#[clap(long)]
name: Option<String>,
#[clap(long, action)]
attach: bool,
#[clap(long, action)]
hot_reload: bool,
},
/// Stop the given dataflow UUID. If no id is provided, you will be able to choose between the running dataflows.
Stop {
@@ -111,6 +120,8 @@ enum Lang {
}

fn main() -> eyre::Result<()> {
#[cfg(feature = "tracing")]
set_up_tracing().context("failed to set up tracing subscriber")?;
let args = Args::parse();

let mut session = None;
@@ -149,7 +160,29 @@ fn main() -> eyre::Result<()> {
coordinator_path.as_deref(),
daemon_path.as_deref(),
)?,
Command::Start { dataflow, name } => start_dataflow(dataflow, name, &mut session)?,
Command::Start {
dataflow,
name,
attach,
hot_reload,
} => {
let dataflow_description =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
dataflow_description
.check(&dataflow, None)
.wrap_err("Could not validate yaml")?;
let dataflow_id = start_dataflow(dataflow.clone(), name, &mut session)?;

if attach {
attach_dataflow(
dataflow_description,
dataflow,
dataflow_id,
&mut session,
hot_reload,
)?
}
}
Command::List => list(&mut session)?,
Command::Stop { uuid, name } => match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, &mut session)?,
@@ -166,7 +199,7 @@ fn start_dataflow(
dataflow: PathBuf,
name: Option<String>,
session: &mut Option<Box<TcpRequestReplyConnection>>,
) -> Result<(), eyre::ErrReport> {
) -> Result<Uuid, eyre::ErrReport> {
let canonicalized = dataflow
.canonicalize()
.wrap_err("given dataflow file does not exist")?;
@@ -184,8 +217,8 @@ fn start_dataflow(
serde_json::from_slice(&reply_raw).wrap_err("failed to parse reply")?;
match result {
ControlRequestReply::DataflowStarted { uuid } => {
println!("{uuid}");
Ok(())
eprintln!("{uuid}");
Ok(uuid)
}
ControlRequestReply::Error(err) => bail!("{err}"),
other => bail!("unexpected start dataflow reply: {other:?}"),


+ 1
- 11
binaries/daemon/src/lib.rs View File

@@ -27,10 +27,7 @@ use std::{
};
use tcp_utils::tcp_receive;
use tokio::sync::mpsc::UnboundedSender;
use tokio::{
fs,
sync::{mpsc, oneshot},
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
use uuid::Uuid;

@@ -1062,10 +1059,3 @@ enum RunStatus {
Continue,
Exit,
}

pub async fn read_descriptor(file: &Path) -> eyre::Result<Descriptor> {
let descriptor_file = fs::read(file).await.wrap_err("failed to open given file")?;
let descriptor: Descriptor =
serde_yaml::from_slice(&descriptor_file).context("failed to parse given descriptor")?;
Ok(descriptor)
}

+ 6
- 1
binaries/runtime/src/lib.rs View File

@@ -140,7 +140,12 @@ async fn run(
let daemon_events = Box::pin(futures::stream::unfold(daemon_events, |mut stream| async {
let event = stream.recv_async().await.map(|event| match event {
dora_node_api::Event::Stop => Event::Stop,
dora_node_api::Event::Reload { operator_id } => Event::Reload { operator_id },
dora_node_api::Event::Reload {
operator_id: Some(operator_id),
} => Event::Reload { operator_id },
dora_node_api::Event::Reload { operator_id: None } => Event::Error(
"Dora runtime node received reload event without operator id".to_string(),
),
dora_node_api::Event::Input { id, metadata, data } => Event::Input {
id,
metadata,


Loading…
Cancel
Save