| @@ -76,6 +76,45 @@ class MockGroupClient final : public opr::GroupClient { | |||
| TEST(TestOprCollectiveComm, AllReduce) { | |||
| REQUIRE_GPU(2); | |||
| auto run_mode = [](const Mode mode) { | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| auto host_x1 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1, host_y_expect; | |||
| auto client = std::make_shared<MockGroupClient>(); | |||
| auto graph = ComputingGraph::make(); | |||
| auto x0 = opr::Host2DeviceCopy::make(*graph, host_x0, cn0); | |||
| auto x1 = opr::Host2DeviceCopy::make(*graph, host_x1, cn0); | |||
| auto x1c = opr::Copy::make(x1, cn1); | |||
| auto y0 = opr::CollectiveComm::make({x0}, graph.get(), "all_reduce", | |||
| 2, 0, 0, client, {mode}, dtype::Float32(), "nccl")[0]; | |||
| auto y1 = opr::CollectiveComm::make({x1c}, graph.get(), "all_reduce", | |||
| 2, 1, 0, client, {mode}, dtype::Float32(), "nccl")[0]; | |||
| auto y_expect = make_all_reduce_output(mode, {x0, x1}); | |||
| auto func = graph->compile({make_callback_copy(y0, host_y0), | |||
| make_callback_copy(y1, host_y1), | |||
| make_callback_copy(y_expect, host_y_expect)}); | |||
| func->execute(); | |||
| MGB_ASSERT_TENSOR_EQ(host_y_expect, host_y0); | |||
| MGB_ASSERT_TENSOR_EQ(host_y_expect, host_y1); | |||
| }; | |||
| run_mode(Mode::ALL_REDUCE_MAX); | |||
| run_mode(Mode::ALL_REDUCE_MIN); | |||
| run_mode(Mode::ALL_REDUCE_SUM); | |||
| } | |||
| TEST(TestOprCollectiveComm, AllReduceMultiThread) { | |||
| REQUIRE_GPU(2); | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| @@ -227,6 +266,38 @@ TEST(TestOprCollectiveComm, AllGather) { | |||
| auto host_x1 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1, host_y_expect; | |||
| auto client = std::make_shared<MockGroupClient>(); | |||
| auto graph = ComputingGraph::make(); | |||
| auto x0 = opr::Host2DeviceCopy::make(*graph, host_x0, cn0); | |||
| auto x1 = opr::Host2DeviceCopy::make(*graph, host_x1, cn0); | |||
| auto x1c = opr::Copy::make(x1, cn1); | |||
| auto y0 = opr::CollectiveComm::make({x0}, graph.get(), "all_gather", | |||
| 2, 0, 0, client, {Mode::ALL_GATHER}, dtype::Float32(), "nccl")[0]; | |||
| auto y1 = opr::CollectiveComm::make({x1c}, graph.get(), "all_gather", | |||
| 2, 1, 0, client, {Mode::ALL_GATHER}, dtype::Float32(), "nccl")[0]; | |||
| auto y_expect = opr::Concat::make({x0, x1}, 0); | |||
| auto func = graph->compile({make_callback_copy(y0, host_y0), | |||
| make_callback_copy(y1, host_y1), | |||
| make_callback_copy(y_expect, host_y_expect)}); | |||
| func->execute(); | |||
| MGB_ASSERT_TENSOR_EQ(host_y_expect, host_y0); | |||
| MGB_ASSERT_TENSOR_EQ(host_y_expect, host_y1); | |||
| } | |||
| TEST(TestOprCollectiveComm, AllGatherMultiThread) { | |||
| REQUIRE_GPU(2); | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| auto host_x1 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1, host_y_expect; | |||
| auto client = std::make_shared<MockGroupClient>(); | |||
| auto run_0 = [&]() { // rank 0 | |||
| @@ -360,6 +431,39 @@ TEST(TestOprCollectiveComm, ReduceScatterSum) { | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| auto host_x1 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1, host_y0_expect, host_y1_expect; | |||
| auto client = std::make_shared<MockGroupClient>(); | |||
| auto graph = ComputingGraph::make(); | |||
| auto x0 = opr::Host2DeviceCopy::make(*graph, host_x0, cn0); | |||
| auto x1 = opr::Host2DeviceCopy::make(*graph, host_x1, cn0); | |||
| auto x1c = opr::Copy::make(x1, cn1); | |||
| auto y0 = opr::CollectiveComm::make({x0}, graph.get(), "reduce_scatter_sum", | |||
| 2, 0, 0, client, {Mode::REDUCE_SCATTER_SUM}, dtype::Float32(), "nccl")[0]; | |||
| auto y1 = opr::CollectiveComm::make({x1c}, graph.get(), "reduce_scatter_sum", | |||
| 2, 1, 0, client, {Mode::REDUCE_SCATTER_SUM}, dtype::Float32(), "nccl")[0]; | |||
| auto y_expect = make_reduce_scatter_sum_output({x0, x1}); | |||
| auto func = graph->compile({make_callback_copy(y0, host_y0), | |||
| make_callback_copy(y1, host_y1), | |||
| make_callback_copy(y_expect[0], host_y0_expect), | |||
| make_callback_copy(y_expect[1], host_y1_expect)}); | |||
| func->execute(); | |||
| MGB_ASSERT_TENSOR_EQ(host_y0_expect, host_y0); | |||
| MGB_ASSERT_TENSOR_EQ(host_y1_expect, host_y1); | |||
| } | |||
| TEST(TestOprCollectiveComm, ReduceScatterSumMultiThread) { | |||
| REQUIRE_GPU(2); | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({8}); | |||
| auto host_x1 = gen({8}); | |||
| @@ -499,6 +603,37 @@ TEST(TestOprCollectiveComm, ReduceSum) { | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| auto host_x1 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1, host_y_expect; | |||
| auto client = std::make_shared<MockGroupClient>(); | |||
| auto graph = ComputingGraph::make(); | |||
| auto x0 = opr::Host2DeviceCopy::make(*graph, host_x0, cn0); | |||
| auto x1 = opr::Host2DeviceCopy::make(*graph, host_x1, cn0); | |||
| auto x1c = opr::Copy::make(x1, cn1); | |||
| auto y0 = opr::CollectiveComm::make({x0}, graph.get(), "reduce_sum", | |||
| 2, 0, 0, client, {Mode::REDUCE_SUM}, dtype::Float32(), "nccl")[0]; | |||
| auto y1 = opr::CollectiveComm::make({x1c}, graph.get(), "reduce_sum", | |||
| 2, 1, 0, client, {Mode::REDUCE_SUM}, dtype::Float32(), "nccl")[0]; | |||
| auto y_expect = x0 + x1; | |||
| auto func = graph->compile({make_callback_copy(y0, host_y0), | |||
| make_callback_copy(y1, host_y1), | |||
| make_callback_copy(y_expect, host_y_expect)}); | |||
| func->execute(); | |||
| MGB_ASSERT_TENSOR_EQ(host_y_expect, host_y0); | |||
| } | |||
| TEST(TestOprCollectiveComm, ReduceSumMultiThread) { | |||
| REQUIRE_GPU(2); | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| auto host_x1 = gen({28, 28}); | |||
| @@ -623,6 +758,36 @@ TEST(TestOprCollectiveComm, Broadcast) { | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1, host_y_expect; | |||
| auto client = std::make_shared<MockGroupClient>(); | |||
| auto graph = ComputingGraph::make(); | |||
| auto x0 = opr::Host2DeviceCopy::make(*graph, host_x0, cn0); | |||
| auto y0 = opr::CollectiveComm::make({x0}, graph.get(), "broadcast", | |||
| 2, 0, 0, client, {Mode::BROADCAST}, dtype::Float32(), "nccl")[0]; | |||
| auto y_dev = std::make_shared<DeviceTensorND>(DeviceTensorND() | |||
| .comp_node(cn1) | |||
| .dtype(dtype::Float32()) | |||
| .resize(host_x0->shape())); | |||
| auto y1 = opr::CollectiveComm::make({}, graph.get(), "broadcast", 2, 1, 0, | |||
| client, {y_dev}, {Mode::BROADCAST}, dtype::Float32(), "nccl", {cn1})[0]; | |||
| auto func = graph->compile({make_callback_copy(y0, host_y0), | |||
| make_callback_copy(y1, host_y1)}); | |||
| func->execute(); | |||
| MGB_ASSERT_TENSOR_EQ(*host_x0, host_y0); | |||
| MGB_ASSERT_TENSOR_EQ(*host_x0, host_y1); | |||
| } | |||
| TEST(TestOprCollectiveComm, BroadcastMultiThread) { | |||
| REQUIRE_GPU(2); | |||
| auto cn0 = CompNode::load("gpu0"); | |||
| auto cn1 = CompNode::load("gpu1"); | |||
| HostTensorGenerator<> gen; | |||
| auto host_x0 = gen({28, 28}); | |||
| HostTensorND host_y0, host_y1; | |||