@@ -5,7 +5,7 @@ use dora_core::{
};
};
use eyre::Context;
use eyre::Context;
use notify::event::ModifyKind;
use notify::event::ModifyKind;
use notify::{Config, Event as NotifyEvent, Event Kind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap;
use std::collections::HashMap;
use std::{path::PathBuf, sync::mpsc, time::Duration};
use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info};
use tracing::{error, info};
@@ -55,30 +55,14 @@ pub fn attach_dataflow(
}
}
// Setup dataflow file watcher if reload option is set.
// Setup dataflow file watcher if reload option is set.
let watcher_tx = tx.clone();
let _watcher = if hot_reload {
if hot_reload {
let (watcher_events_tx, watcher_events_rx) = mpsc::sync_channel(1);
let hash = node_path_lookup.clone();
let hash = node_path_lookup.clone();
let paths = hash.keys();
let notifier = move |event| {
let notifier = move |event| {
if let Ok(NotifyEvent {
paths,
kind: EventKind::Modify(ModifyKind::Data(_data)),
..
}) = event
{
for path in paths {
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
.context("Could not send reload request to the cli loop")
.unwrap();
}
if let Ok(event) = event {
if watcher_events_tx.send(event).is_err() {
tracing::warn!("failed to forward watch event");
}
}
// TODO: Manage different file event
}
}
};
};
@@ -87,13 +71,47 @@ pub fn attach_dataflow(
Config::default().with_poll_interval(Duration::from_secs(1)),
Config::default().with_poll_interval(Duration::from_secs(1)),
)?;
)?;
for path in paths {
for path in hash.keys() {
watcher.watch(path, RecursiveMode::Recursive)?;
watcher.watch(path, RecursiveMode::Recursive)?;
}
}
Some(watcher)
} else {
None
};
let watcher_tx = tx.clone();
std::thread::spawn(move || {
while let Ok(event) = watcher_events_rx.recv() {
let rewatch = match event.kind {
// file was modified, but still exists
EventKind::Modify(ModifyKind::Data(_data)) => false,
// file was removed and probably replaced by a new one (e.g. vim does this on save)
EventKind::Remove(_) => true,
// ignore other event types
_ => return,
};
// send reload request for modified nodes/operators
for path in hash.keys() {
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
.context("Could not send reload request to the cli loop")
.unwrap();
}
}
if rewatch {
// watch paths again
for path in hash.keys() {
if let Err(err) = watcher.watch(path, RecursiveMode::Recursive) {
tracing::warn!("failed to watch `{}` again: {err} -> further modifications will be ignored", path.display());
}
}
}
}
});
}
// Setup Ctrlc Watcher to stop dataflow after ctrlc
// Setup Ctrlc Watcher to stop dataflow after ctrlc
let ctrlc_tx = tx;
let ctrlc_tx = tx;