diff --git a/examples/echo/README.md b/examples/echo/README.md index 4899421d..79e2234b 100644 --- a/examples/echo/README.md +++ b/examples/echo/README.md @@ -6,7 +6,4 @@ Make sure to have, dora, pip and cargo installed. dora up dora build dataflow.yml dora start dataflow.yml - -# In another terminal -terminal-input ``` diff --git a/examples/echo/dataflow.yml b/examples/echo/dataflow.yml index 16ae8c0e..f161854f 100644 --- a/examples/echo/dataflow.yml +++ b/examples/echo/dataflow.yml @@ -1,22 +1,24 @@ nodes: - - id: terminal-input - build: pip install -e ../../node-hub/terminal-input - path: dynamic + - id: pyarrow-sender + build: pip install -e ../../node-hub/pyarrow-sender + path: pyarrow-sender outputs: - data - inputs: - echo: dora-echo/echo + env: + DATA: "[1, 2, 3, 4, 5]" - id: dora-echo build: pip install -e ../../node-hub/dora-echo path: dora-echo inputs: - echo: terminal-input/data + data: pyarrow-sender/data outputs: - - echo + - data - - id: terminal-print - build: cargo build -p terminal-print - path: dynamic + - id: pyarrow-assert + build: pip install -e ../../node-hub/pyarrow-assert + path: pyarrow-assert inputs: - echo: dora-echo/echo + data: dora-echo/data + env: + DATA: "[1, 2, 3, 4, 5]" diff --git a/examples/pyarrow-test/dataflow.yml b/examples/pyarrow-test/dataflow.yml index fbe14b8d..46b274cd 100644 --- a/examples/pyarrow-test/dataflow.yml +++ b/examples/pyarrow-test/dataflow.yml @@ -1,7 +1,7 @@ nodes: - id: pyarrow-sender build: pip install -e ../../node-hub/pyarrow-sender - path: dynamic + path: pyarrow-sender outputs: - data env: @@ -13,4 +13,4 @@ nodes: inputs: data: pyarrow-sender/data env: - DATA: "pa.array([1, 2, 3, 4, 5])" + DATA: "[1, 2, 3, 4, 5]" diff --git a/node-hub/pyarrow-assert/pyarrow_assert/main.py b/node-hub/pyarrow-assert/pyarrow_assert/main.py index d2529da9..3cc04539 100644 --- a/node-hub/pyarrow-assert/pyarrow_assert/main.py +++ b/node-hub/pyarrow-assert/pyarrow_assert/main.py @@ -2,7 +2,7 @@ import argparse import os import ast - +import pyarrow as pa from dora import Node RUNNER_CI = True if os.getenv("CI") == "true" else False @@ -36,12 +36,23 @@ def main(): args.name ) # provide the name to connect to the dataflow if dynamic node - assert_data = ast.literal_eval(data) + data = ast.literal_eval(data) + + if isinstance(data, list): + data = pa.array(data) # initialize pyarrow array + elif isinstance(data, str): + data = pa.array([data]) + elif isinstance(data, int): + data = pa.array([data]) + elif isinstance(data, float): + data = pa.array([data]) + else: + data = pa.array(data) # initialize pyarrow array for event in node: if event["type"] == "INPUT": value = event["value"] - assert value == assert_data, f"Expected {assert_data}, got {value}" + assert value == data, f"Expected {data}, got {value}" if __name__ == "__main__":