You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

lib.rs 6.8 kB

3 years ago
3 years ago
3 years ago
3 years ago
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. #![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]
  2. use arrow::pyarrow::{FromPyArrow, ToPyArrow};
  3. use dora_node_api::merged::{MergeExternalSend, MergedEvent};
  4. use dora_node_api::{DoraNode, EventStream};
  5. use dora_operator_api_python::{pydict_to_metadata, PyEvent};
  6. use dora_ros2_bridge_python::Ros2Subscription;
  7. use eyre::Context;
  8. use futures::{Stream, StreamExt};
  9. use pyo3::prelude::*;
  10. use pyo3::types::{PyBytes, PyDict};
  11. /// The custom node API lets you integrate `dora` into your application.
  12. /// It allows you to retrieve input and send output in any fashion you want.
  13. ///
  14. /// Use with:
  15. ///
  16. /// ```python
  17. /// from dora import Node
  18. ///
  19. /// node = Node()
  20. /// ```
  21. ///
  22. #[pyclass]
  23. pub struct Node {
  24. events: Events,
  25. node: DoraNode,
  26. }
  27. #[pymethods]
  28. impl Node {
  29. #[new]
  30. pub fn new() -> eyre::Result<Self> {
  31. let (node, events) = DoraNode::init_from_env()?;
  32. Ok(Node {
  33. events: Events::Dora(events),
  34. node,
  35. })
  36. }
  37. /// `.next()` gives you the next input that the node has received.
  38. /// It blocks until the next event becomes available.
  39. /// It will return `None` when all senders has been dropped.
  40. ///
  41. /// ```python
  42. /// event = node.next()
  43. /// ```
  44. ///
  45. /// You can also iterate over the event stream with a loop
  46. ///
  47. /// ```python
  48. /// for event in node:
  49. /// match event["type"]:
  50. /// case "INPUT":
  51. /// match event["id"]:
  52. /// case "image":
  53. /// ```
  54. #[allow(clippy::should_implement_trait)]
  55. pub fn next(&mut self, py: Python, timeout: Option<f32>) -> PyResult<Option<PyEvent>> {
  56. let event = py.allow_threads(|| self.events.recv(timeout));
  57. Ok(event)
  58. }
  59. pub fn __next__(&mut self, py: Python) -> PyResult<Option<PyEvent>> {
  60. let event = py.allow_threads(|| self.events.recv(None));
  61. Ok(event)
  62. }
  63. fn __iter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
  64. slf
  65. }
  66. /// `send_output` send data from the node.
  67. ///
  68. /// ```python
  69. /// Args:
  70. /// output_id: str,
  71. /// data: Bytes|Arrow,
  72. /// metadata: Option[Dict],
  73. /// ```
  74. ///
  75. /// ```python
  76. /// node.send_output("string", b"string", {"open_telemetry_context": "7632e76"})
  77. /// ```
  78. ///
  79. pub fn send_output(
  80. &mut self,
  81. output_id: String,
  82. data: PyObject,
  83. metadata: Option<&PyDict>,
  84. py: Python,
  85. ) -> eyre::Result<()> {
  86. let parameters = pydict_to_metadata(metadata)?;
  87. if let Ok(py_bytes) = data.downcast::<PyBytes>(py) {
  88. let data = py_bytes.as_bytes();
  89. self.node
  90. .send_output_bytes(output_id.into(), parameters, data.len(), data)
  91. .wrap_err("failed to send output")?;
  92. } else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow(data.as_ref(py)) {
  93. self.node.send_output(
  94. output_id.into(),
  95. parameters,
  96. arrow::array::make_array(arrow_array),
  97. )?;
  98. } else {
  99. eyre::bail!("invalid `data` type, must by `PyBytes` or arrow array")
  100. }
  101. Ok(())
  102. }
  103. /// Returns the full dataflow descriptor that this node is part of.
  104. ///
  105. /// This method returns the parsed dataflow YAML file.
  106. pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
  107. pythonize::pythonize(py, self.node.dataflow_descriptor())
  108. }
  109. pub fn merge_external_events(
  110. &mut self,
  111. subscription: &mut Ros2Subscription,
  112. ) -> eyre::Result<()> {
  113. let subscription = subscription.into_stream()?;
  114. let stream = futures::stream::poll_fn(move |cx| {
  115. let s = subscription.as_stream().map(|item| {
  116. match item.context("failed to read ROS2 message") {
  117. Ok((value, _info)) => Python::with_gil(|py| {
  118. value
  119. .to_pyarrow(py)
  120. .context("failed to convert value to pyarrow")
  121. .unwrap_or_else(|err| PyErr::from(err).to_object(py))
  122. }),
  123. Err(err) => Python::with_gil(|py| PyErr::from(err).to_object(py)),
  124. }
  125. });
  126. futures::pin_mut!(s);
  127. s.poll_next_unpin(cx)
  128. });
  129. // take out the event stream and temporarily replace it with a dummy
  130. let events = std::mem::replace(
  131. &mut self.events,
  132. Events::Merged(Box::new(futures::stream::empty())),
  133. );
  134. // update self.events with the merged stream
  135. self.events = Events::Merged(events.merge_external_send(Box::pin(stream)));
  136. Ok(())
  137. }
  138. }
  139. enum Events {
  140. Dora(EventStream),
  141. Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
  142. }
  143. impl Events {
  144. fn recv(&mut self, timeout: Option<f32>) -> Option<PyEvent> {
  145. match self {
  146. Events::Dora(events) => match timeout {
  147. Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from),
  148. None => events.recv().map(PyEvent::from),
  149. },
  150. Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from),
  151. }
  152. }
  153. }
  154. impl<'a> MergeExternalSend<'a, PyObject> for Events {
  155. type Item = MergedEvent<PyObject>;
  156. fn merge_external_send(
  157. self,
  158. external_events: impl Stream<Item = PyObject> + Unpin + Send + 'a,
  159. ) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
  160. match self {
  161. Events::Dora(events) => events.merge_external_send(external_events),
  162. Events::Merged(events) => {
  163. let merged = events.merge_external_send(external_events);
  164. Box::new(merged.map(|event| match event {
  165. MergedEvent::Dora(e) => MergedEvent::Dora(e),
  166. MergedEvent::External(e) => MergedEvent::External(e.flatten()),
  167. }))
  168. }
  169. }
  170. }
  171. }
  172. impl Node {
  173. pub fn id(&self) -> String {
  174. self.node.id().to_string()
  175. }
  176. }
  177. /// Start a runtime for Operators
  178. #[pyfunction]
  179. pub fn start_runtime() -> eyre::Result<()> {
  180. dora_runtime::main().wrap_err("Dora Runtime raised an error.")
  181. }
  182. #[pymodule]
  183. fn dora(py: Python, m: &PyModule) -> PyResult<()> {
  184. m.add_function(wrap_pyfunction!(start_runtime, m)?)?;
  185. m.add_class::<Node>().unwrap();
  186. let ros2_bridge = PyModule::new(py, "ros2_bridge")?;
  187. dora_ros2_bridge_python::create_dora_ros2_bridge_module(ros2_bridge)?;
  188. let experimental = PyModule::new(py, "experimental")?;
  189. experimental.add_submodule(ros2_bridge)?;
  190. m.add_submodule(experimental)?;
  191. Ok(())
  192. }