| @@ -33,3 +33,12 @@ dora run cuda_bench.yml | |||
| cat benchmark_data.csv | |||
| ``` | |||
| ## To run the demo code | |||
| ```bash | |||
| dora up | |||
| dora start demo_bench.yml --detach | |||
| python demo_receiver.py | |||
| dora destroy | |||
| ``` | |||
| @@ -0,0 +1,17 @@ | |||
| nodes: | |||
| - id: node_1 | |||
| inputs: | |||
| next: node_2/next | |||
| outputs: | |||
| - latency | |||
| path: demo_sender.py | |||
| env: | |||
| DEVICE: cpu | |||
| - id: node_2 | |||
| path: dynamic # receiver.py | |||
| inputs: | |||
| next: node_1/latency | |||
| outputs: | |||
| - next | |||
| env: | |||
| DEVICE: cpu | |||
| @@ -0,0 +1,89 @@ | |||
| #!/usr/bin/env python | |||
| # -*- coding: utf-8 -*- | |||
| import os | |||
| import time | |||
| import pyarrow as pa | |||
| from tqdm import tqdm | |||
| from dora import Node | |||
| from dora.cuda import ipc_buffer_to_ipc_handle, cudabuffer_to_torch | |||
| from helper import record_results | |||
| import torch | |||
| import numpy as np | |||
| torch.tensor([], device="cuda") | |||
| pa.array([]) | |||
| context = pa.cuda.Context() | |||
| node = Node("node_2") | |||
| current_size = 8 | |||
| n = 0 | |||
| i = 0 | |||
| latencies = [] | |||
| mean_cpu = mean_cuda = 0 | |||
| DEVICE = os.getenv("DEVICE", "cuda") | |||
| NAME = f"dora torch {DEVICE}" | |||
| ctx = pa.cuda.Context() | |||
| print("") | |||
| print("Receiving 40MB packets using default dora-rs") | |||
| while True: | |||
| event = node.next() | |||
| if event["type"] == "INPUT": | |||
| if i == 0: | |||
| pbar = tqdm(total=100) | |||
| elif i == 100: | |||
| print("vs") | |||
| print("Receiving 40MB packets using dora-rs CUDA->CUDA") | |||
| pbar = tqdm(total=100) | |||
| t_send = event["metadata"]["time"] | |||
| if event["metadata"]["device"] != "cuda": | |||
| # BEFORE | |||
| handle = event["value"].to_numpy() | |||
| torch_tensor = torch.tensor(handle, device="cuda") | |||
| else: | |||
| # AFTER | |||
| # storage needs to be spawned in the same file as where it's used. Don't ask me why. | |||
| ipc_handle = ipc_buffer_to_ipc_handle(event["value"]) | |||
| cudabuffer = ctx.open_ipc_buffer(ipc_handle) | |||
| torch_tensor = cudabuffer_to_torch(cudabuffer, event["metadata"]) # on cuda | |||
| else: | |||
| break | |||
| t_received = time.perf_counter_ns() | |||
| length = len(torch_tensor) * 8 | |||
| pbar.update(1) | |||
| latencies.append((t_received - t_send) / 1000) | |||
| node.send_output("next", pa.array([])) | |||
| i += 1 | |||
| if i == 100: | |||
| pbar.close() | |||
| t_end_cpu = time.time() | |||
| mean_cpu = np.array(latencies).mean() | |||
| latencies = [] | |||
| n += 1 | |||
| mean_cuda = np.array(latencies).mean() | |||
| pbar.close() | |||
| time.sleep(2) | |||
| print("") | |||
| print("----") | |||
| print(f"Node communication duration with default dora-rs: {mean_cpu/1000:.1f}ms") | |||
| print(f"Node communication duration with dora CUDA->CUDA: {mean_cuda/1000:.1f}ms") | |||
| print("----") | |||
| print(f"Speed Up: {(mean_cpu)/(mean_cuda):.0f}") | |||
| record_results(NAME, current_size, latencies) | |||
| @@ -0,0 +1,74 @@ | |||
| #!/usr/bin/env python | |||
| # -*- coding: utf-8 -*- | |||
| import time | |||
| import os | |||
| import numpy as np | |||
| import pyarrow as pa | |||
| from dora import Node | |||
| from dora.cuda import torch_to_ipc_buffer | |||
| import torch | |||
| torch.tensor([], device="cuda") | |||
| SIZES = [10000 * 512] | |||
| DEVICE = os.getenv("DEVICE", "cuda") | |||
| pa.array([]) | |||
| node = Node() | |||
| time.sleep(1) | |||
| # test latency first | |||
| for size in SIZES: | |||
| for _ in range(0, 100): | |||
| now = time.time() | |||
| random_data = np.random.randint(1000, size=size, dtype=np.int64) | |||
| torch_tensor = torch.tensor(random_data, dtype=torch.int64, device="cuda") | |||
| t_send = time.perf_counter_ns() | |||
| if DEVICE == "cpu": | |||
| # BEFORE | |||
| torch_tensor = torch_tensor.to("cpu") | |||
| metadata = {} | |||
| metadata["time"] = t_send | |||
| metadata["device"] = "cpu" | |||
| node.send_output("latency", pa.array(torch_tensor.numpy()), metadata) | |||
| else: | |||
| # AFTER | |||
| ipc_buffer, metadata = torch_to_ipc_buffer(torch_tensor) | |||
| metadata["time"] = t_send | |||
| metadata["device"] = "cuda" | |||
| node.send_output("latency", ipc_buffer, metadata) | |||
| # Wait before sending next output | |||
| node.next() | |||
| DEVICE = "cuda" | |||
| time.sleep(1) | |||
| for size in SIZES: | |||
| for _ in range(0, 100): | |||
| now = time.time() | |||
| random_data = np.random.randint(1000, size=size, dtype=np.int64) | |||
| torch_tensor = torch.tensor(random_data, dtype=torch.int64, device="cuda") | |||
| t_send = time.perf_counter_ns() | |||
| if DEVICE == "cpu": | |||
| # BEFORE | |||
| torch_tensor = torch_tensor.to("cpu") | |||
| metadata = {} | |||
| metadata["time"] = t_send | |||
| metadata["device"] = "cpu" | |||
| node.send_output("latency", pa.array(torch_tensor.numpy()), metadata) | |||
| else: | |||
| # AFTER | |||
| ipc_buffer, metadata = torch_to_ipc_buffer(torch_tensor) | |||
| metadata["time"] = t_send | |||
| metadata["device"] = "cuda" | |||
| node.send_output("latency", ipc_buffer, metadata) | |||
| # Wait before sending next output | |||
| node.next() | |||