Compare commits

...

1 Commits

Author SHA1 Message Date
  Philipp Oppermann 48ed13013b
Try to rewatch removed files in `--attach` 1 year ago
1 changed files with 45 additions and 27 deletions
Unified View
  1. +45
    -27
      binaries/cli/src/attach.rs

+ 45
- 27
binaries/cli/src/attach.rs View File

@@ -5,7 +5,7 @@ use dora_core::{
}; };
use eyre::Context; use eyre::Context;
use notify::event::ModifyKind; use notify::event::ModifyKind;
use notify::{Config, Event as NotifyEvent, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::HashMap; use std::collections::HashMap;
use std::{path::PathBuf, sync::mpsc, time::Duration}; use std::{path::PathBuf, sync::mpsc, time::Duration};
use tracing::{error, info}; use tracing::{error, info};
@@ -55,30 +55,14 @@ pub fn attach_dataflow(
} }


// Setup dataflow file watcher if reload option is set. // Setup dataflow file watcher if reload option is set.
let watcher_tx = tx.clone();
let _watcher = if hot_reload {
if hot_reload {
let (watcher_events_tx, watcher_events_rx) = mpsc::sync_channel(1);
let hash = node_path_lookup.clone(); let hash = node_path_lookup.clone();
let paths = hash.keys();
let notifier = move |event| { let notifier = move |event| {
if let Ok(NotifyEvent {
paths,
kind: EventKind::Modify(ModifyKind::Data(_data)),
..
}) = event
{
for path in paths {
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(&path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
.context("Could not send reload request to the cli loop")
.unwrap();
}
if let Ok(event) = event {
if watcher_events_tx.send(event).is_err() {
tracing::warn!("failed to forward watch event");
} }
// TODO: Manage different file event
} }
}; };


@@ -87,13 +71,47 @@ pub fn attach_dataflow(
Config::default().with_poll_interval(Duration::from_secs(1)), Config::default().with_poll_interval(Duration::from_secs(1)),
)?; )?;


for path in paths {
for path in hash.keys() {
watcher.watch(path, RecursiveMode::Recursive)?; watcher.watch(path, RecursiveMode::Recursive)?;
} }
Some(watcher)
} else {
None
};

let watcher_tx = tx.clone();
std::thread::spawn(move || {
while let Ok(event) = watcher_events_rx.recv() {
let rewatch = match event.kind {
// file was modified, but still exists
EventKind::Modify(ModifyKind::Data(_data)) => false,
// file was removed and probably replaced by a new one (e.g. vim does this on save)
EventKind::Remove(_) => true,
// ignore other event types
_ => return,
};

// send reload request for modified nodes/operators
for path in hash.keys() {
if let Some((dataflow_id, node_id, operator_id)) = node_path_lookup.get(path) {
watcher_tx
.send(ControlRequest::Reload {
dataflow_id: *dataflow_id,
node_id: node_id.clone(),
operator_id: operator_id.clone(),
})
.context("Could not send reload request to the cli loop")
.unwrap();
}
}

if rewatch {
// watch paths again
for path in hash.keys() {
if let Err(err) = watcher.watch(path, RecursiveMode::Recursive) {
tracing::warn!("failed to watch `{}` again: {err} -> further modifications will be ignored", path.display());
}
}
}
}
});
}


// Setup Ctrlc Watcher to stop dataflow after ctrlc // Setup Ctrlc Watcher to stop dataflow after ctrlc
let ctrlc_tx = tx; let ctrlc_tx = tx;


Loading…
Cancel
Save