diff --git a/examples/cuda-benchmark/README.md b/examples/cuda-benchmark/README.md index 224f84bd..a6d87101 100644 --- a/examples/cuda-benchmark/README.md +++ b/examples/cuda-benchmark/README.md @@ -33,3 +33,12 @@ dora run cuda_bench.yml cat benchmark_data.csv ``` + +## To run the demo code + +```bash +dora up +dora start demo_bench.yml --detach +python demo_receiver.py +dora destroy +``` diff --git a/examples/cuda-benchmark/demo_bench.yml b/examples/cuda-benchmark/demo_bench.yml new file mode 100644 index 00000000..d39e0ee4 --- /dev/null +++ b/examples/cuda-benchmark/demo_bench.yml @@ -0,0 +1,17 @@ +nodes: + - id: node_1 + inputs: + next: node_2/next + outputs: + - latency + path: demo_sender.py + env: + DEVICE: cpu + - id: node_2 + path: dynamic # receiver.py + inputs: + next: node_1/latency + outputs: + - next + env: + DEVICE: cpu diff --git a/examples/cuda-benchmark/demo_receiver.py b/examples/cuda-benchmark/demo_receiver.py new file mode 100644 index 00000000..1e207c2b --- /dev/null +++ b/examples/cuda-benchmark/demo_receiver.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import os +import time + + +import pyarrow as pa +from tqdm import tqdm +from dora import Node +from dora.cuda import ipc_buffer_to_ipc_handle, cudabuffer_to_torch +from helper import record_results +import torch +import numpy as np + +torch.tensor([], device="cuda") + + +pa.array([]) +context = pa.cuda.Context() +node = Node("node_2") + +current_size = 8 +n = 0 +i = 0 +latencies = [] +mean_cpu = mean_cuda = 0 +DEVICE = os.getenv("DEVICE", "cuda") + +NAME = f"dora torch {DEVICE}" + +ctx = pa.cuda.Context() + +print("") +print("Receiving 40MB packets using default dora-rs") + +while True: + event = node.next() + + if event["type"] == "INPUT": + if i == 0: + pbar = tqdm(total=100) + elif i == 100: + print("vs") + print("Receiving 40MB packets using dora-rs CUDA->CUDA") + pbar = tqdm(total=100) + t_send = event["metadata"]["time"] + + if event["metadata"]["device"] != "cuda": + # BEFORE + handle = event["value"].to_numpy() + torch_tensor = torch.tensor(handle, device="cuda") + else: + # AFTER + # storage needs to be spawned in the same file as where it's used. Don't ask me why. + ipc_handle = ipc_buffer_to_ipc_handle(event["value"]) + cudabuffer = ctx.open_ipc_buffer(ipc_handle) + torch_tensor = cudabuffer_to_torch(cudabuffer, event["metadata"]) # on cuda + else: + break + t_received = time.perf_counter_ns() + length = len(torch_tensor) * 8 + + pbar.update(1) + latencies.append((t_received - t_send) / 1000) + node.send_output("next", pa.array([])) + + i += 1 + if i == 100: + pbar.close() + t_end_cpu = time.time() + mean_cpu = np.array(latencies).mean() + latencies = [] + n += 1 + + +mean_cuda = np.array(latencies).mean() +pbar.close() + +time.sleep(2) + +print("") +print("----") +print(f"Node communication duration with default dora-rs: {mean_cpu/1000:.1f}ms") +print(f"Node communication duration with dora CUDA->CUDA: {mean_cuda/1000:.1f}ms") + +print("----") +print(f"Speed Up: {(mean_cpu)/(mean_cuda):.0f}") +record_results(NAME, current_size, latencies) diff --git a/examples/cuda-benchmark/demo_sender.py b/examples/cuda-benchmark/demo_sender.py new file mode 100644 index 00000000..11868462 --- /dev/null +++ b/examples/cuda-benchmark/demo_sender.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import time +import os +import numpy as np +import pyarrow as pa +from dora import Node +from dora.cuda import torch_to_ipc_buffer +import torch + +torch.tensor([], device="cuda") + +SIZES = [10000 * 512] + + +DEVICE = os.getenv("DEVICE", "cuda") + +pa.array([]) +node = Node() + + +time.sleep(1) + +# test latency first +for size in SIZES: + for _ in range(0, 100): + now = time.time() + random_data = np.random.randint(1000, size=size, dtype=np.int64) + torch_tensor = torch.tensor(random_data, dtype=torch.int64, device="cuda") + t_send = time.perf_counter_ns() + if DEVICE == "cpu": + # BEFORE + torch_tensor = torch_tensor.to("cpu") + metadata = {} + metadata["time"] = t_send + metadata["device"] = "cpu" + node.send_output("latency", pa.array(torch_tensor.numpy()), metadata) + else: + # AFTER + ipc_buffer, metadata = torch_to_ipc_buffer(torch_tensor) + metadata["time"] = t_send + metadata["device"] = "cuda" + node.send_output("latency", ipc_buffer, metadata) + + # Wait before sending next output + node.next() + + +DEVICE = "cuda" + +time.sleep(1) +for size in SIZES: + for _ in range(0, 100): + now = time.time() + random_data = np.random.randint(1000, size=size, dtype=np.int64) + torch_tensor = torch.tensor(random_data, dtype=torch.int64, device="cuda") + t_send = time.perf_counter_ns() + if DEVICE == "cpu": + # BEFORE + torch_tensor = torch_tensor.to("cpu") + metadata = {} + metadata["time"] = t_send + metadata["device"] = "cpu" + node.send_output("latency", pa.array(torch_tensor.numpy()), metadata) + else: + # AFTER + ipc_buffer, metadata = torch_to_ipc_buffer(torch_tensor) + metadata["time"] = t_send + metadata["device"] = "cuda" + node.send_output("latency", ipc_buffer, metadata) + + # Wait before sending next output + node.next()