diff --git a/examples/rust-dataflow/sink/src/main.rs b/examples/rust-dataflow/sink/src/main.rs index 6c827a25..0bd3929f 100644 --- a/examples/rust-dataflow/sink/src/main.rs +++ b/examples/rust-dataflow/sink/src/main.rs @@ -1,5 +1,5 @@ use dora_node_api::{self, DoraNode}; -use eyre::bail; +use eyre::{bail, Context}; use futures::StreamExt; use std::time::Duration; @@ -19,7 +19,15 @@ async fn main() -> eyre::Result<()> { match input.id.as_str() { "message" => { - println!("received message: {}", String::from_utf8_lossy(&input.data)); + let received_string = String::from_utf8(input.data) + .wrap_err("received message was not utf8-encoded")?; + println!("received message: {}", received_string); + if !received_string.starts_with("operator received random value ") { + bail!("unexpected message format (should start with 'operator received random value')") + } + if !received_string.ends_with(" ticks") { + bail!("unexpected message format (should end with 'ticks')") + } } other => eprintln!("Ignoring unexpected input `{other}`"), }