| @@ -29,66 +29,22 @@ | |||
| #include "utils/profile.h" | |||
| namespace mindspore::device::ascend { | |||
| void AscendBucket::AllocateAllReduceAddr() { | |||
| // Check bucket is full | |||
| if (grad_tensor_list_.size() != bucket_size_) { | |||
| MS_LOG(EXCEPTION) << "grad tensor list size:" << grad_tensor_list_.size() | |||
| << " is not equal to bucket size:" << bucket_size_; | |||
| } | |||
| size_t total_size = 0; | |||
| std::vector<size_t> origin_size_list; | |||
| for (auto &tensor : grad_tensor_list_) { | |||
| MS_EXCEPTION_IF_NULL(tensor); | |||
| tensor_type_list_.emplace_back(tensor->data_type()); | |||
| DeviceAddressPtr device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address()); | |||
| MS_EXCEPTION_IF_NULL(device_address); | |||
| auto origin_size = device_address->GetSize(); | |||
| auto align_size = MemoryManager::GetCommonAlignSize(origin_size); | |||
| origin_size_list.emplace_back(origin_size); | |||
| (void)align_size_list_.emplace_back(align_size); | |||
| total_size += align_size; | |||
| memcpy_input_addrs_.emplace_back(std::make_shared<kernel::Address>( | |||
| static_cast<uint8_t *>(device_address->GetMutablePtr()), device_address->GetSize())); | |||
| } | |||
| total_size_ = total_size; | |||
| DeviceAddressPtr AscendBucket::CreateDeviceAddress(size_t size) const { | |||
| return std::make_shared<AscendDeviceAddress>(nullptr, size); | |||
| } | |||
| size_t AscendBucket::GetAlignSize(size_t size) const { return MemoryManager::GetCommonAlignSize(size); } | |||
| void AscendBucket::AllocateContinousMemory(const std::vector<DeviceAddressPtr> &to_allocate_address, size_t total_size, | |||
| const std::vector<size_t> &size_list) const { | |||
| auto runtime_instance = device::KernelRuntimeManager::Instance().GetCurrentKernelRuntime(); | |||
| MS_EXCEPTION_IF_NULL(runtime_instance); | |||
| // AllReduce input output addr need to clear zero | |||
| ar_input_addr_ = runtime_instance->MallocCommunicationMemFromMemPool(total_size); | |||
| ar_output_addr_ = runtime_instance->MallocCommunicationMemFromMemPool(total_size); | |||
| // generate memecpy output addr | |||
| uint8_t *memcpy_output = ar_input_addr_; | |||
| if (origin_size_list.size() < bucket_size_ || align_size_list_.size() < bucket_size_) { | |||
| MS_LOG(EXCEPTION) << "Invalid bucket_size_:" << bucket_size_ << " origin_size_list.size:" << origin_size_list.size() | |||
| << " align_size_list.size:" << align_size_list_.size(); | |||
| } | |||
| for (size_t i = 0; i < bucket_size_; ++i) { | |||
| memcpy_output_addrs_.emplace_back(std::make_shared<kernel::Address>(memcpy_output, origin_size_list[i])); | |||
| memcpy_output += align_size_list_[i]; | |||
| if (!runtime_instance->MallocContinuousMemFromMemPool(to_allocate_address, total_size, size_list)) { | |||
| MS_LOG(EXCEPTION) << "Allocate memory for AllReduce input failed"; | |||
| } | |||
| } | |||
| void AscendBucket::FreeDeviceMem(void *dev_ptr) { AscendMemoryPool::GetInstance().FreeTensorMem(dev_ptr); } | |||
| void AscendBucket::FreeAllDeviceMem() { | |||
| if (ar_input_addr_ != nullptr) { | |||
| uint8_t *origin_dev_addr = ar_input_addr_ - kMemAlignSize; | |||
| FreeDeviceMem(origin_dev_addr); | |||
| ar_input_addr_ = nullptr; | |||
| } | |||
| if (ar_output_addr_ != nullptr) { | |||
| uint8_t *origin_dev_addr = ar_output_addr_ - kMemAlignSize; | |||
| FreeDeviceMem(origin_dev_addr); | |||
| ar_output_addr_ = nullptr; | |||
| } | |||
| // clear launch mul device Memory | |||
| if (launch_mul_ != nullptr) { | |||
| launch_mul_->FreeLaunchDeviceMem(); | |||
| } | |||
| // clear launch atomic clean device Memory | |||
| if (launch_atomic_clean_ != nullptr) { | |||
| launch_atomic_clean_->FreeLaunchDeviceMem(); | |||
| @@ -149,8 +105,15 @@ void AscendBucket::LaunchAllReduce() { | |||
| auto hccl_count = total_size_ / type_size; | |||
| HcclReduceOp op_type = HcclReduceOp::HCCL_REDUCE_SUM; | |||
| auto hccl_result = hccl::HcclAdapter::GetInstance().HcclAllReduce(ar_input_addr_, ar_output_addr_, hccl_count, | |||
| iter->second, op_type, stream_); | |||
| if (ar_input_address_list_.empty() || ar_output_address_list_.empty()) { | |||
| MS_LOG(EXCEPTION) << "Fused AllReduce input address size is:" << ar_input_address_list_.size() | |||
| << " output address size is:" << ar_output_address_list_.size(); | |||
| } | |||
| MS_EXCEPTION_IF_NULL(ar_input_address_list_[0]); | |||
| MS_EXCEPTION_IF_NULL(ar_output_address_list_[0]); | |||
| auto hccl_result = hccl::HcclAdapter::GetInstance().HcclAllReduce(ar_input_address_list_[0]->GetMutablePtr(), | |||
| ar_output_address_list_[0]->GetMutablePtr(), | |||
| hccl_count, iter->second, op_type, stream_); | |||
| if (hccl_result != HCCL_SUCCESS) { | |||
| MS_LOG(EXCEPTION) << "HCCL AllReduce failed, ret:" << hccl_result; | |||
| } | |||
| @@ -162,20 +125,15 @@ void AscendBucket::CleanAllReduceInputAddr() { | |||
| MS_EXCEPTION_IF_NULL(launch_atomic_clean_); | |||
| } | |||
| // set atomic clean input addr | |||
| launch_atomic_clean_->SetInputAddr(ar_input_addr_); | |||
| if (ar_input_address_list_.empty()) { | |||
| MS_LOG(EXCEPTION) << "AllReduce input address not found"; | |||
| } | |||
| MS_EXCEPTION_IF_NULL(ar_input_address_list_[0]); | |||
| launch_atomic_clean_->SetInputAddr(static_cast<uint8_t *>(ar_input_address_list_[0]->GetMutablePtr())); | |||
| // launch atomic clean | |||
| launch_atomic_clean_->LaunchOpKernel(); | |||
| } | |||
| std::shared_ptr<LaunchKernel> AscendBucket::CreateLaunchMul() { | |||
| if (tensor_type_list_.empty()) { | |||
| MS_LOG(ERROR) << "tensor_type_list_ is empty"; | |||
| } | |||
| auto launch_mul = std::make_shared<AscendLaunchMul>(stream_, tensor_type_list_[0], total_size_); | |||
| MS_EXCEPTION_IF_NULL(launch_mul); | |||
| return launch_mul; | |||
| } | |||
| std::shared_ptr<LaunchKernel> AscendBucket::CreateLaunchAtomicClean() { | |||
| if (tensor_type_list_.empty()) { | |||
| MS_LOG(ERROR) << "tensor_type_list_ is empty"; | |||
| @@ -24,20 +24,21 @@ | |||
| namespace mindspore::device::ascend { | |||
| class AscendBucket : public Bucket { | |||
| public: | |||
| AscendBucket(uint32_t id, uint32_t bucket_size) : Bucket(id, bucket_size) {} | |||
| AscendBucket(uint32_t id, uint32_t bucket_size) : Bucket(id, bucket_size, kHcclWorldGroup) {} | |||
| ~AscendBucket() override = default; | |||
| void Init(const std::vector<void *> &compute_streams, const std::vector<void *> &communication_streams) override; | |||
| private: | |||
| void AllocateAllReduceAddr() override; | |||
| protected: | |||
| void FreeAllDeviceMem() override; | |||
| void FreeDeviceMem(void *dev_ptr) override; | |||
| void CopyTensorToContiguousMemory() override; | |||
| void LaunchAllReduce() override; | |||
| std::shared_ptr<LaunchKernel> CreateLaunchMul() override; | |||
| std::shared_ptr<LaunchKernel> CreateLaunchAtomicClean(); | |||
| void CleanAllReduceInputAddr(); | |||
| DeviceAddressPtr CreateDeviceAddress(size_t size) const override; | |||
| size_t GetAlignSize(size_t size) const override; | |||
| void AllocateContinousMemory(const std::vector<DeviceAddressPtr> &to_allocate_address, size_t total_size, | |||
| const std::vector<size_t> &size_list) const override; | |||
| }; | |||
| } // namespace mindspore::device::ascend | |||
| #endif // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_DEVICE_ASCEND_ASCEND_BUCKET_H_ | |||
| @@ -46,13 +46,11 @@ void Bucket::Launch() { | |||
| MS_LOG(INFO) << "Bucket is full, start to launch AllReduce"; | |||
| MS_EXCEPTION_IF_NULL(pre_event_); | |||
| MS_EXCEPTION_IF_NULL(post_event_); | |||
| AllocateAllReduceAddr(); | |||
| AllocateAllReduceMemory(); | |||
| CopyTensorToContiguousMemory(); | |||
| pre_event_->RecordEvent(); | |||
| pre_event_->WaitEvent(); | |||
| LaunchAllReduce(); | |||
| // mul fusion | |||
| CalculateMean(); | |||
| post_event_->RecordEvent(); | |||
| UpdateTensorAddr(); | |||
| // pass event to the tensor | |||
| @@ -63,64 +61,63 @@ void Bucket::Launch() { | |||
| MS_LOG(INFO) << "Bucket launch cost:" << (GetTime() - start) * 1e6 << " us"; | |||
| } | |||
| void Bucket::UpdateTensorAddr() { | |||
| if (grad_tensor_list_.size() != bucket_size_ || new_tensor_output_addrs_.size() != bucket_size_) { | |||
| MS_LOG(EXCEPTION) << "grad_tensor_list size:" << grad_tensor_list_.size() | |||
| << " tensor output addr size:" << new_tensor_output_addrs_.size() | |||
| << " bucket size:" << bucket_size_; | |||
| void Bucket::AllocateAllReduceMemory() { | |||
| // Check bucket is full | |||
| if (grad_tensor_list_.size() != bucket_size_) { | |||
| MS_LOG(EXCEPTION) << "Grad tensor list size:" << grad_tensor_list_.size() | |||
| << " is not equal to bucket size:" << bucket_size_; | |||
| } | |||
| for (size_t i = 0; i < bucket_size_; ++i) { | |||
| auto &tensor = grad_tensor_list_[i]; | |||
| size_t total_size = 0; | |||
| std::vector<size_t> origin_size_list; | |||
| for (auto &tensor : grad_tensor_list_) { | |||
| MS_EXCEPTION_IF_NULL(tensor); | |||
| auto device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address()); | |||
| // release old addr and manage addr by this Bucket. | |||
| tensor_type_list_.emplace_back(tensor->data_type()); | |||
| DeviceAddressPtr device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address()); | |||
| MS_EXCEPTION_IF_NULL(device_address); | |||
| auto origin_dev_ptr = device_address->GetMutablePtr(); | |||
| tensor_old_addr_list_.emplace_back(origin_dev_ptr); | |||
| device_address->from_mem_pool_ = false; | |||
| device_address->set_ptr(new_tensor_output_addrs_[i]); | |||
| } | |||
| } | |||
| auto origin_size = device_address->GetSize(); | |||
| auto align_size = MemoryManager::GetCommonAlignSize(origin_size); | |||
| origin_size_list.emplace_back(origin_size); | |||
| align_size_list_.emplace_back(align_size); | |||
| total_size += align_size; | |||
| memcpy_input_addrs_.emplace_back(std::make_shared<kernel::Address>( | |||
| static_cast<uint8_t *>(device_address->GetMutablePtr()), device_address->GetSize())); | |||
| void Bucket::CalculateMean() { | |||
| auto parallel_context = parallel::ParallelContext::GetInstance(); | |||
| MS_EXCEPTION_IF_NULL(parallel_context); | |||
| auto grad_mean = parallel_context->gradients_mean(); | |||
| if (!grad_mean) { | |||
| UpdateTensorOutputAddr(ar_output_addr_); | |||
| return; | |||
| ar_input_address_list_.emplace_back(CreateDeviceAddress(origin_size)); | |||
| ar_output_address_list_.emplace_back(CreateDeviceAddress(origin_size)); | |||
| } | |||
| if (launch_mul_ == nullptr) { | |||
| launch_mul_ = CreateLaunchMul(); | |||
| MS_EXCEPTION_IF_NULL(launch_mul_); | |||
| total_size_ = total_size; | |||
| AllocateContinousMemory(ar_input_address_list_, total_size, align_size_list_); | |||
| AllocateContinousMemory(ar_output_address_list_, total_size, align_size_list_); | |||
| // generate memecpy output addr | |||
| if (origin_size_list.size() != ar_input_address_list_.size()) { | |||
| MS_LOG(EXCEPTION) << "Invalid ar_input_address_list size:" << ar_input_address_list_.size() | |||
| << " origin_size_list size:" << origin_size_list.size(); | |||
| } | |||
| // set mul input1 addr | |||
| launch_mul_->SetInputAddr(ar_output_addr_); | |||
| // launch mean | |||
| launch_mul_->LaunchOpKernel(); | |||
| // store tensor output addr | |||
| auto launch_output = launch_mul_->GetKernelOutputAddr(); | |||
| if (launch_output.size() != 1) { | |||
| MS_LOG(EXCEPTION) << "launch mul outputs should have one output"; | |||
| size_t item_index = 0; | |||
| for (const auto &ar_input_address_item : ar_input_address_list_) { | |||
| MS_EXCEPTION_IF_NULL(ar_input_address_item); | |||
| memcpy_output_addrs_.emplace_back( | |||
| std::make_shared<kernel::Address>(ar_input_address_item->GetMutablePtr(), origin_size_list[item_index])); | |||
| ++item_index; | |||
| } | |||
| UpdateTensorOutputAddr(launch_output[0]); | |||
| } | |||
| void Bucket::UpdateTensorOutputAddr(uint8_t *addr) { | |||
| uint8_t *tensor_output = addr; | |||
| for (size_t i = 0; i < bucket_size_; ++i) { | |||
| (void)new_tensor_output_addrs_.emplace_back(tensor_output); | |||
| tensor_output += align_size_list_[i]; | |||
| void Bucket::UpdateTensorAddr() { | |||
| if (grad_tensor_list_.size() != bucket_size_ || ar_output_address_list_.size() != bucket_size_) { | |||
| MS_LOG(EXCEPTION) << "grad_tensor_list_ size:" << grad_tensor_list_.size() | |||
| << " ar_output_address_list_ size:" << ar_output_address_list_.size() | |||
| << " bucket size:" << bucket_size_; | |||
| } | |||
| } | |||
| void Bucket::LazyDeleteOldAddr() { | |||
| MS_LOG(INFO) << "Lazy delete old grad address"; | |||
| for (auto old_addr : tensor_old_addr_list_) { | |||
| FreeDeviceMem(old_addr); | |||
| for (size_t i = 0; i < bucket_size_; ++i) { | |||
| auto &tensor = grad_tensor_list_[i]; | |||
| MS_EXCEPTION_IF_NULL(tensor); | |||
| tensor->set_device_address(ar_output_address_list_[i]); | |||
| } | |||
| tensor_old_addr_list_.clear(); | |||
| } | |||
| void Bucket::Release() { | |||
| @@ -131,7 +128,8 @@ void Bucket::Release() { | |||
| memcpy_input_addrs_.clear(); | |||
| memcpy_output_addrs_.clear(); | |||
| tensor_type_list_.clear(); | |||
| LazyDeleteOldAddr(); | |||
| ar_input_address_list_.clear(); | |||
| ar_output_address_list_.clear(); | |||
| FreeAllDeviceMem(); | |||
| full_ = false; | |||
| } | |||
| @@ -30,7 +30,7 @@ | |||
| namespace mindspore::device { | |||
| class Bucket { | |||
| public: | |||
| Bucket(uint32_t id, uint32_t bucket_size) | |||
| Bucket(uint32_t id, uint32_t bucket_size, std::string group) | |||
| : id_(id), | |||
| bucket_size_(bucket_size), | |||
| full_(false), | |||
| @@ -38,11 +38,9 @@ class Bucket { | |||
| compute_stream_(nullptr), | |||
| pre_event_(nullptr), | |||
| post_event_(nullptr), | |||
| launch_mul_(nullptr), | |||
| launch_atomic_clean_(nullptr), | |||
| total_size_(0), | |||
| ar_input_addr_(nullptr), | |||
| ar_output_addr_(nullptr) {} | |||
| group_(std::move(group)) {} | |||
| virtual ~Bucket() = default; | |||
| uint32_t id() const { return id_; } | |||
| @@ -61,12 +59,11 @@ class Bucket { | |||
| std::shared_ptr<DeviceEvent> pre_event_; | |||
| std::shared_ptr<DeviceEvent> post_event_; | |||
| std::shared_ptr<LaunchKernel> launch_mul_; | |||
| std::shared_ptr<LaunchKernel> launch_atomic_clean_; | |||
| size_t total_size_; | |||
| uint8_t *ar_input_addr_; | |||
| uint8_t *ar_output_addr_; | |||
| std::vector<DeviceAddressPtr> ar_input_address_list_; | |||
| std::vector<DeviceAddressPtr> ar_output_address_list_; | |||
| std::string group_; | |||
| std::vector<size_t> align_size_list_; | |||
| std::vector<tensor::TensorPtr> grad_tensor_list_; | |||
| @@ -74,18 +71,16 @@ class Bucket { | |||
| std::vector<kernel::AddressPtr> memcpy_input_addrs_; | |||
| std::vector<kernel::AddressPtr> memcpy_output_addrs_; | |||
| std::vector<TypeId> tensor_type_list_; | |||
| std::vector<void *> tensor_old_addr_list_; | |||
| virtual void AllocateAllReduceAddr() = 0; | |||
| void UpdateTensorAddr(); | |||
| void CalculateMean(); | |||
| virtual std::shared_ptr<LaunchKernel> CreateLaunchMul() = 0; | |||
| void AllocateAllReduceMemory(); | |||
| virtual void FreeAllDeviceMem() {} | |||
| virtual void LaunchAllReduce() = 0; | |||
| virtual void FreeAllDeviceMem() = 0; | |||
| virtual void FreeDeviceMem(void *dev_ptr) = 0; | |||
| virtual void CopyTensorToContiguousMemory() = 0; | |||
| void UpdateTensorOutputAddr(uint8_t *addr); | |||
| void LazyDeleteOldAddr(); | |||
| virtual DeviceAddressPtr CreateDeviceAddress(size_t size) const = 0; | |||
| virtual size_t GetAlignSize(size_t size) const = 0; | |||
| virtual void AllocateContinousMemory(const std::vector<DeviceAddressPtr> &to_allocate_address, size_t total_size, | |||
| const std::vector<size_t> &size_list) const = 0; | |||
| }; | |||
| } // namespace mindspore::device | |||
| @@ -22,6 +22,7 @@ | |||
| #include <memory> | |||
| #include "abstract/utils.h" | |||
| #include "runtime/device/gpu/gpu_event.h" | |||
| #include "runtime/device/gpu/gpu_device_address.h" | |||
| #include "runtime/device/gpu/gpu_memory_allocator.h" | |||
| #include "runtime/device/gpu/gpu_device_manager.h" | |||
| #include "runtime/device/kernel_runtime_manager.h" | |||
| @@ -29,6 +30,7 @@ | |||
| #include "runtime/device/gpu/gpu_launch_mul.h" | |||
| #include "backend/kernel_compiler/gpu/nccl/nccl_gpu_kernel.h" | |||
| #include "runtime/device/gpu/gpu_common.h" | |||
| #include "runtime/hardware/device_context_manager.h" | |||
| namespace { | |||
| const size_t kCommunicationMemAlignSize = 16; | |||
| @@ -40,71 +42,36 @@ size_t AlignMemorySize(size_t size) { | |||
| } | |||
| } // namespace | |||
| namespace mindspore::device::gpu { | |||
| GPUBucket::GPUBucket(uint32_t id, uint32_t bucket_size) : Bucket(id, bucket_size), collective_handle_(nullptr) { | |||
| group_ = kNcclWorldGroup; | |||
| } | |||
| void GPUBucket::AllocateAllReduceAddr() { | |||
| MS_LOG(INFO) << "start"; | |||
| if (grad_tensor_list_.size() != bucket_size_) { | |||
| MS_LOG(EXCEPTION) << "grad tensor list size:" << grad_tensor_list_.size() | |||
| << " is not equal to bucket size:" << bucket_size_; | |||
| } | |||
| auto total_size = 0; | |||
| std::vector<size_t> size_list; | |||
| for (auto &tensor : grad_tensor_list_) { | |||
| MS_EXCEPTION_IF_NULL(tensor); | |||
| tensor_type_list_.emplace_back(tensor->data_type()); | |||
| DeviceAddressPtr device_address = std::dynamic_pointer_cast<DeviceAddress>(tensor->device_address()); | |||
| MS_EXCEPTION_IF_NULL(device_address); | |||
| auto origin_size = device_address->GetSize(); | |||
| auto align_size = AlignMemorySize(origin_size); | |||
| size_list.emplace_back(origin_size); | |||
| align_size_list_.emplace_back(align_size); | |||
| total_size += align_size; | |||
| memcpy_input_addrs_.emplace_back( | |||
| std::make_shared<kernel::Address>(static_cast<uint8_t *>(device_address->GetMutablePtr()), origin_size)); | |||
| } | |||
| total_size_ = total_size; | |||
| GPUBucket::GPUBucket(uint32_t id, uint32_t bucket_size) | |||
| : Bucket(id, bucket_size, kNcclWorldGroup), collective_handle_(nullptr), device_name_("GPU"), device_id_(0) {} | |||
| ar_input_addr_ = static_cast<uint8_t *>(GPUMemoryAllocator::GetInstance().AllocTensorMem(total_size)); | |||
| ar_output_addr_ = static_cast<uint8_t *>(GPUMemoryAllocator::GetInstance().AllocTensorMem(total_size)); | |||
| uint8_t *memcpy_output = ar_input_addr_; | |||
| for (size_t i = 0; i < bucket_size_; ++i) { | |||
| memcpy_output_addrs_.emplace_back(std::make_shared<kernel::Address>(memcpy_output, size_list[i])); | |||
| memcpy_output += align_size_list_[i]; | |||
| } | |||
| MS_LOG(INFO) << "end"; | |||
| DeviceAddressPtr GPUBucket::CreateDeviceAddress(size_t size) const { | |||
| return std::make_shared<GPUDeviceAddress>(nullptr, size); | |||
| } | |||
| void GPUBucket::FreeDeviceMem(void *dev_ptr) { GPUMemoryAllocator::GetInstance().FreeTensorMem(dev_ptr); } | |||
| size_t GPUBucket::GetAlignSize(size_t size) const { return AlignMemorySize(size); } | |||
| void GPUBucket::FreeAllDeviceMem() { | |||
| MS_LOG(INFO) << "start"; | |||
| if (ar_input_addr_ != nullptr) { | |||
| FreeDeviceMem(ar_input_addr_); | |||
| ar_input_addr_ = nullptr; | |||
| void GPUBucket::AllocateContinousMemory(const std::vector<DeviceAddressPtr> &to_allocate_address, size_t total_size, | |||
| const std::vector<size_t> &size_list) const { | |||
| const auto &device_context = | |||
| device::DeviceContextManager::GetInstance().GetOrCreateDeviceContext({device_name_, device_id_}); | |||
| MS_EXCEPTION_IF_NULL(device_context); | |||
| if (!device_context->AllocateContinuousMemory(to_allocate_address, total_size, size_list)) { | |||
| MS_LOG(EXCEPTION) << "Allocate memory for AllReduce input failed"; | |||
| } | |||
| if (ar_output_addr_ != nullptr) { | |||
| FreeDeviceMem(ar_output_addr_); | |||
| ar_output_addr_ = nullptr; | |||
| } | |||
| // clear launch mul device memory | |||
| if (launch_mul_ != nullptr) { | |||
| launch_mul_->FreeLaunchDeviceMem(); | |||
| } | |||
| MS_LOG(INFO) << "end"; | |||
| } | |||
| void GPUBucket::CopyTensorToContiguousMemory() { | |||
| MS_LOG(INFO) << "start"; | |||
| MS_EXCEPTION_IF_NULL(compute_stream_); | |||
| if (ar_input_address_list_.empty()) { | |||
| MS_LOG(EXCEPTION) << "AllReduce input address not found."; | |||
| } | |||
| MS_EXCEPTION_IF_NULL(ar_input_address_list_[0]); | |||
| // Clean allreduce input | |||
| CHECK_CUDA_RET_WITH_EXCEPT_NOTRACE( | |||
| cudaMemsetAsync(ar_input_addr_, 0, total_size_, static_cast<cudaStream_t>(compute_stream_)), | |||
| "Call cudaMemsetAsync failed"); | |||
| CHECK_CUDA_RET_WITH_EXCEPT_NOTRACE(cudaMemsetAsync(ar_input_address_list_[0]->GetMutablePtr(), 0, total_size_, | |||
| static_cast<cudaStream_t>(compute_stream_)), | |||
| "Call cudaMemsetAsync failed"); | |||
| for (size_t i = 0; i < bucket_size_; ++i) { | |||
| MS_EXCEPTION_IF_NULL(memcpy_output_addrs_[i]); | |||
| @@ -146,9 +113,16 @@ void GPUBucket::LaunchAllReduce() { | |||
| MS_LOG(EXCEPTION) << "Invalid type:" << type; | |||
| } | |||
| auto nccl_result = | |||
| (*all_reduce_funcptr)(ar_input_addr_, ar_output_addr_, total_size_ / type_size, nccl_data_type_iter->second, | |||
| ncclRedOp_t::ncclSum, static_cast<cudaStream_t>(stream_), group_); | |||
| if (ar_input_address_list_.empty() || ar_output_address_list_.empty()) { | |||
| MS_LOG(EXCEPTION) << "fusion AllReduce input address size is:" << ar_input_address_list_.size() | |||
| << " output address size is:" << ar_output_address_list_.size(); | |||
| } | |||
| MS_EXCEPTION_IF_NULL(ar_input_address_list_[0]); | |||
| MS_EXCEPTION_IF_NULL(ar_output_address_list_[0]); | |||
| auto nccl_result = (*all_reduce_funcptr)( | |||
| ar_input_address_list_[0]->GetMutablePtr(), ar_output_address_list_[0]->GetMutablePtr(), total_size_ / type_size, | |||
| nccl_data_type_iter->second, ncclRedOp_t::ncclSum, static_cast<cudaStream_t>(stream_), group_); | |||
| if (nccl_result != ncclSuccess) { | |||
| MS_LOG(EXCEPTION) << "AllReduce failed, ret:" << nccl_result; | |||
| } | |||
| @@ -156,15 +130,6 @@ void GPUBucket::LaunchAllReduce() { | |||
| MS_LOG(INFO) << "end"; | |||
| } | |||
| std::shared_ptr<LaunchKernel> GPUBucket::CreateLaunchMul() { | |||
| if (tensor_type_list_.empty()) { | |||
| MS_LOG(ERROR) << "tensor_type_list_ is empty"; | |||
| } | |||
| auto launch_mul = std::make_shared<GPULaunchMul>(stream_, tensor_type_list_[0], total_size_); | |||
| MS_EXCEPTION_IF_NULL(launch_mul); | |||
| return launch_mul; | |||
| } | |||
| void GPUBucket::Init(const std::vector<void *> &compute_streams, const std::vector<void *> &communication_streams) { | |||
| pre_event_ = std::make_shared<GpuEvent>(); | |||
| post_event_ = std::make_shared<GpuEvent>(); | |||
| @@ -184,5 +149,9 @@ void GPUBucket::Init(const std::vector<void *> &compute_streams, const std::vect | |||
| pre_event_->set_wait_stream(stream_); | |||
| post_event_->set_record_stream(stream_); | |||
| post_event_->set_wait_stream(compute_stream_); | |||
| auto ms_context = MsContext::GetInstance(); | |||
| MS_EXCEPTION_IF_NULL(ms_context); | |||
| device_id_ = ms_context->get_param<uint32_t>(MS_CTX_DEVICE_ID); | |||
| } | |||
| } // namespace mindspore::device::gpu | |||
| @@ -19,6 +19,7 @@ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include <string> | |||
| #include "runtime/device/bucket.h" | |||
| namespace mindspore::device::gpu { | |||
| @@ -29,14 +30,17 @@ class GPUBucket : public Bucket { | |||
| void Init(const std::vector<void *> &compute_streams, const std::vector<void *> &communication_streams) override; | |||
| private: | |||
| void AllocateAllReduceAddr() override; | |||
| void FreeAllDeviceMem() override; | |||
| void FreeDeviceMem(void *dev_ptr) override; | |||
| protected: | |||
| void CopyTensorToContiguousMemory() override; | |||
| void LaunchAllReduce() override; | |||
| std::shared_ptr<LaunchKernel> CreateLaunchMul() override; | |||
| DeviceAddressPtr CreateDeviceAddress(size_t size) const override; | |||
| size_t GetAlignSize(size_t size) const override; | |||
| void AllocateContinousMemory(const std::vector<DeviceAddressPtr> &to_allocate_address, size_t total_size, | |||
| const std::vector<size_t> &size_list) const override; | |||
| const void *collective_handle_; | |||
| std::string device_name_; | |||
| uint32_t device_id_; | |||
| }; | |||
| } // namespace mindspore::device::gpu | |||
| #endif // MINDSPORE_MINDSPORE_CCSRC_RUNTIME_DEVICE_GPU_GPU_BUCKET_H_ | |||
| @@ -88,6 +88,10 @@ class KernelRuntime { | |||
| uint8_t *MallocCommunicationMemFromMemPool(size_t size) { | |||
| return mem_manager_->MallocCommunicationMemFromMemPool(size); | |||
| } | |||
| bool MallocContinuousMemFromMemPool(const DeviceAddressPtrList &addr_list, size_t total_size, | |||
| std::vector<size_t> size_list) { | |||
| return mem_manager_->MallocContinuousMemFromMemPool(addr_list, total_size, size_list); | |||
| } | |||
| static void GenLaunchArgs(const mindspore::kernel::KernelMod &kernel_mod, const AnfNodePtr &kernel, | |||
| KernelLaunchInfo *kernel_launch_info); | |||
| @@ -825,11 +825,6 @@ void MindRTBackend::RunGraphBySingleOp(const std::vector<KernelGraphPtr> &graphs | |||
| } | |||
| graph_compiler_->CalculateForwardOpOutputCount(graph, &forward_op_output_tensor_id_); | |||
| // Clear bucket resources every step | |||
| if (graph->is_bprop()) { | |||
| graph_compiler_->ClearAllBucket(graph->graph_id()); | |||
| } | |||
| for (const auto &kernel : graph->execution_order()) { | |||
| InputTensorInfo input_tensor_info; | |||
| VectorRef op_outputs; | |||
| @@ -862,6 +857,10 @@ void MindRTBackend::RunGraphBySingleOp(const std::vector<KernelGraphPtr> &graphs | |||
| } | |||
| } | |||
| SyncLazyTasks(); | |||
| // Clear bucket resources every step | |||
| if (graph->is_bprop()) { | |||
| graph_compiler_->ClearAllBucket(graph->graph_id()); | |||
| } | |||
| } | |||
| } | |||
| @@ -412,7 +412,7 @@ class DistributedGradReducer(Cell): | |||
| datatypes = self.map_(F.partial(_get_datatype), grads) | |||
| grads = self.map_(F.partial(_cast_datatype, mstype.float32), grads) | |||
| if self.mode == context.PYNATIVE_MODE: | |||
| new_grad = grads | |||
| new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean), self.allreduce_filter, grads) | |||
| elif self.split_fusion: | |||
| if self.enable_parameter_server: | |||
| new_grad = self.map_(F.partial(reduce_opt, self.degree, self.mean, self.allgather), | |||