Browse Source

Close log subscriber connection if send fails (instead of erroring)

tags/v0.3.5-rc0
Philipp Oppermann 1 year ago
parent
commit
1f32b29afe
Failed to extract signature
2 changed files with 22 additions and 4 deletions
  1. +4
    -1
      binaries/coordinator/src/lib.rs
  2. +18
    -3
      binaries/coordinator/src/log_subscriber.rs

+ 4
- 1
binaries/coordinator/src/lib.rs View File

@@ -581,8 +581,11 @@ async fn start_inner(
Event::Log(message) => {
if let Some(dataflow) = running_dataflows.get_mut(&message.dataflow_id) {
for subscriber in &mut dataflow.log_subscribers {
subscriber.send_message(&message).await?;
if subscriber.send_message(&message).await.is_err() {
subscriber.close();
}
}
dataflow.log_subscribers.retain(|s| !s.is_closed());
}
}
}


+ 18
- 3
binaries/coordinator/src/log_subscriber.rs View File

@@ -1,15 +1,19 @@
use dora_core::coordinator_messages::LogMessage;
use eyre::{Context, ContextCompat};

use crate::tcp_utils::tcp_send;

pub struct LogSubscriber {
pub level: log::LevelFilter,
connection: tokio::net::TcpStream,
connection: Option<tokio::net::TcpStream>,
}

impl LogSubscriber {
pub fn new(level: log::LevelFilter, connection: tokio::net::TcpStream) -> Self {
Self { level, connection }
Self {
level,
connection: Some(connection),
}
}

pub async fn send_message(&mut self, message: &LogMessage) -> eyre::Result<()> {
@@ -17,7 +21,18 @@ impl LogSubscriber {
return Ok(());
}
let message = serde_json::to_vec(&message)?;
tcp_send(&mut self.connection, &message).await?;
let connection = self.connection.as_mut().context("connection is closed")?;
tcp_send(connection, &message)
.await
.context("failed to send message")?;
Ok(())
}

pub fn is_closed(&self) -> bool {
self.connection.is_none()
}

pub fn close(&mut self) {
self.connection = None;
}
}

Loading…
Cancel
Save