You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

main.cc 3.7 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #include "../build/dora-node-api.h"
  2. #include <arrow/api.h>
  3. #include <arrow/c/bridge.h>
  4. #include <iostream>
  5. #include <memory>
  6. #include <string>
  7. #include <vector>
  8. std::shared_ptr<arrow::Array> receive_and_print_input(rust::cxxbridge1::Box<DoraEvent> event) {
  9. std::cout << "Received input event" << std::endl;
  10. struct ArrowArray c_array;
  11. struct ArrowSchema c_schema;
  12. auto result = event_as_arrow_input(
  13. std::move(event),
  14. reinterpret_cast<uint8_t*>(&c_array),
  15. reinterpret_cast<uint8_t*>(&c_schema)
  16. );
  17. if (!result.error.empty()) {
  18. std::cerr << "Error getting Arrow array: " << std::endl;
  19. return nullptr;
  20. }
  21. auto result2 = arrow::ImportArray(&c_array, &c_schema);
  22. std::shared_ptr<arrow::Array> input_array = result2.ValueOrDie();
  23. std::cout << "Received Arrow array: " << input_array->ToString() << std::endl;
  24. std::cout << "Array details: type=" << input_array->type()->ToString()
  25. << ", length=" << input_array->length() << std::endl;
  26. return input_array;
  27. }
  28. // To send output
  29. bool send_output(DoraNode& dora_node, std::shared_ptr<arrow::Array> output_array) {
  30. if (!output_array) {
  31. std::cerr << "Error: Attempted to send a null Arrow array" << std::endl;
  32. return false;
  33. }
  34. struct ArrowArray out_c_array;
  35. struct ArrowSchema out_c_schema;
  36. arrow::ExportArray(*output_array, &out_c_array, &out_c_schema);
  37. auto send_result = send_arrow_output(
  38. dora_node.send_output,
  39. "counter",
  40. reinterpret_cast<uint8_t*>(&out_c_array),
  41. reinterpret_cast<uint8_t*>(&out_c_schema)
  42. );
  43. if (!send_result.error.empty()) {
  44. std::string error_message(send_result.error);
  45. std::cerr << "Error sending Arrow array: " << error_message << std::endl;
  46. return false;
  47. }
  48. return true;
  49. }
  50. int main() {
  51. try {
  52. auto dora_node = init_dora_node();
  53. std::cout << "Dora node initialized successfully" << std::endl;
  54. int counter=0;
  55. while (counter<10) {
  56. counter++;
  57. auto event = dora_node.events->next();
  58. auto type = event_type(event);
  59. if (type == DoraEventType::Stop) {
  60. std::cout << "Received stop event, exiting" << std::endl;
  61. break;
  62. }
  63. else if (type == DoraEventType::AllInputsClosed) {
  64. std::cout << "All inputs closed, exiting" << std::endl;
  65. break;
  66. }
  67. else if (type == DoraEventType::Input) {
  68. std::shared_ptr<arrow::Array> input_array = receive_and_print_input(std::move(event));
  69. std::shared_ptr<arrow::Array> output_array;
  70. arrow::Int32Builder builder;
  71. builder.Append(10);
  72. builder.Append(100);
  73. builder.Append(1000);
  74. builder.Finish(&output_array);
  75. std::cout << "Created new string array: " << output_array->ToString() << std::endl;
  76. //Printing Before sending
  77. auto str_array = std::static_pointer_cast<arrow::Int32Array>(output_array);
  78. std::cout << "Values: [";
  79. for (int i = 0; i < str_array->length(); i++) {
  80. if (i > 0) std::cout << ", ";
  81. std::cout << str_array->Value(i);
  82. }
  83. std::cout << "]" << std::endl;
  84. send_output(dora_node, output_array);
  85. }
  86. }
  87. return 0;
  88. }
  89. catch (const std::exception& e) {
  90. std::cerr << "Error: " << e.what() << std::endl;
  91. return 1;
  92. }
  93. }