|
|
|
@@ -237,13 +237,11 @@ where |
|
|
|
.unwrap_or_else(|| panic!("no input event found in drop iteration {i}")); |
|
|
|
|
|
|
|
// remove that event |
|
|
|
if let Some(event) = self.queue.remove(index) { |
|
|
|
if let NodeEvent::Input { |
|
|
|
data: Some(data), .. |
|
|
|
} = event |
|
|
|
{ |
|
|
|
drop_tokens.push(data.drop_token); |
|
|
|
} |
|
|
|
if let Some(NodeEvent::Input { |
|
|
|
data: Some(data), .. |
|
|
|
}) = self.queue.remove(index) |
|
|
|
{ |
|
|
|
drop_tokens.push(data.drop_token); |
|
|
|
} |
|
|
|
} |
|
|
|
self.report_drop_tokens(drop_tokens).await?; |
|
|
|
@@ -255,7 +253,9 @@ where |
|
|
|
match message { |
|
|
|
DaemonRequest::Register { .. } => { |
|
|
|
let reply = DaemonReply::Result(Err("unexpected register message".into())); |
|
|
|
self.send_reply(reply).await?; |
|
|
|
self.send_reply(reply) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send register reply")?; |
|
|
|
} |
|
|
|
DaemonRequest::Stopped => self.process_daemon_event(DaemonNodeEvent::Stopped).await?, |
|
|
|
DaemonRequest::CloseOutputs(outputs) => { |
|
|
|
@@ -281,7 +281,9 @@ where |
|
|
|
.await |
|
|
|
.wrap_err("failed to receive prepare output reply")?; |
|
|
|
// tracing::debug!("prepare latency: {:?}", start.elapsed()?); |
|
|
|
self.send_reply(reply).await?; |
|
|
|
self.send_reply(reply) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send PrepareOutputMessage reply")?; |
|
|
|
} |
|
|
|
DaemonRequest::SendPreparedMessage { id } => { |
|
|
|
let (reply_sender, reply) = oneshot::channel(); |
|
|
|
@@ -290,7 +292,7 @@ where |
|
|
|
self.send_reply( |
|
|
|
reply |
|
|
|
.await |
|
|
|
.wrap_err("failed to receive send output reply")?, |
|
|
|
.wrap_err("failed to receive SendPreparedMessage reply")?, |
|
|
|
) |
|
|
|
.await?; |
|
|
|
} |
|
|
|
@@ -311,7 +313,9 @@ where |
|
|
|
.send_daemon_event(event) |
|
|
|
.await |
|
|
|
.map_err(|_| "failed to receive send_empty_message reply".to_owned()); |
|
|
|
self.send_reply(DaemonReply::Result(result)).await?; |
|
|
|
self.send_reply(DaemonReply::Result(result)) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send SendEmptyMessage reply")?; |
|
|
|
} |
|
|
|
DaemonRequest::Subscribe => { |
|
|
|
let (tx, rx) = flume::bounded(100); |
|
|
|
@@ -342,7 +346,9 @@ where |
|
|
|
} |
|
|
|
}; |
|
|
|
|
|
|
|
self.send_reply(reply).await?; |
|
|
|
self.send_reply(reply) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send NextEvent reply")?; |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(()) |
|
|
|
@@ -385,7 +391,7 @@ where |
|
|
|
self.connection |
|
|
|
.send_reply(reply) |
|
|
|
.await |
|
|
|
.wrap_err("failed to send reply to node") |
|
|
|
.wrap_err_with(|| format!("failed to send reply to node `{}`", self.node_id)) |
|
|
|
} |
|
|
|
|
|
|
|
async fn send_shared_memory_event( |
|
|
|
|