You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. use eyre::ContextCompat;
  2. use std::{
  3. path::{Path, PathBuf},
  4. process::Command,
  5. time::Duration,
  6. };
  7. use xshell::{cmd, Shell};
  8. fn main() -> eyre::Result<()> {
  9. // create a new shell in this folder
  10. let sh = prepare_shell()?;
  11. // build the `dora` binary (you can skip this if you use `cargo install dora-cli`)
  12. let dora = prepare_dora(&sh)?;
  13. // build the dataflow using `dora build`
  14. cmd!(sh, "{dora} build dataflow.yml").run()?;
  15. // start the dora coordinator (in background)
  16. Command::from(cmd!(sh, "{dora} coordinator")).spawn()?;
  17. // wait until coordinator is ready
  18. loop {
  19. match cmd!(sh, "{dora} list").quiet().ignore_stderr().run() {
  20. Ok(_) => {
  21. println!("coordinator connected");
  22. break;
  23. }
  24. Err(_) => {
  25. eprintln!("waiting for coordinator");
  26. std::thread::sleep(Duration::from_millis(100))
  27. }
  28. }
  29. }
  30. // start two daemons (in background)
  31. Command::from(cmd!(sh, "{dora} daemon --machine-id A")).spawn()?;
  32. Command::from(cmd!(sh, "{dora} daemon --machine-id B")).spawn()?;
  33. // wait until both daemons are connected
  34. loop {
  35. let output = cmd!(sh, "{dora} connected-machines")
  36. .quiet()
  37. .ignore_stderr()
  38. .read();
  39. match output {
  40. Ok(output) => {
  41. let connected: Vec<&str> = output.lines().collect();
  42. if connected == ["A", "B"] {
  43. println!("both daemons connected");
  44. break;
  45. } else {
  46. eprintln!("not all daemons connected yet (connected: {connected:?})");
  47. }
  48. }
  49. Err(err) => eprintln!("failed to query connected-machines: {err:?}"),
  50. }
  51. std::thread::sleep(Duration::from_millis(100));
  52. }
  53. // start running the dataflow.yml -> outputs the UUID assigned to the dataflow
  54. println!("starting dataflow");
  55. let output = cmd!(sh, "{dora} start dataflow.yml --attach").read_stderr()?;
  56. println!("dataflow finished successfully");
  57. let uuid = output.lines().next().context("no output")?;
  58. // stop the coordinator and both daemons again
  59. cmd!(sh, "{dora} destroy").run()?;
  60. // verify that the node output was written to `out`
  61. sh.change_dir("out");
  62. sh.change_dir(uuid);
  63. let sink_output = sh.read_file("log_rust-sink.txt")?;
  64. if sink_output.lines().count() < 50 {
  65. eyre::bail!("sink did not receive the expected number of messages")
  66. }
  67. Ok(())
  68. }
  69. /// Prepares a shell and set the working directory to the parent folder of this file.
  70. ///
  71. /// You can use your system shell instead (e.g. `bash`);
  72. fn prepare_shell() -> Result<Shell, eyre::Error> {
  73. let sh = Shell::new()?;
  74. let root = Path::new(env!("CARGO_MANIFEST_DIR"));
  75. sh.change_dir(root.join(file!()).parent().unwrap());
  76. Ok(sh)
  77. }
  78. /// Build the `dora` command-line executable from this repo.
  79. ///
  80. /// You can skip this step and run `cargo install dora-cli --locked` instead.
  81. fn prepare_dora(sh: &Shell) -> eyre::Result<PathBuf> {
  82. cmd!(sh, "cargo build --package dora-cli").run()?;
  83. let root = Path::new(env!("CARGO_MANIFEST_DIR"));
  84. let dora = root.join("target").join("debug").join("dora");
  85. Ok(dora)
  86. }