You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

blocking_queue.cc 4.5 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "device/gpu/blocking_queue.h"
  17. #include <chrono>
  18. #include "device/gpu/gpu_common.h"
  19. #include "common/utils.h"
  20. namespace mindspore {
  21. namespace device {
  22. GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
  23. : buffer_(addr), head_(0), tail_(0), shape_(shape), len_(0), capacity_(capacity), stream_(0), node_info_(nullptr) {
  24. CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
  25. node_info_ = std::make_unique<NodeInfo[]>(capacity);
  26. for (auto item : shape) {
  27. len_ += item;
  28. }
  29. }
  30. GpuQueue::~GpuQueue() { buffer_ = nullptr; }
  31. BlockQueueStatus_T GpuQueue::Push(const std::vector<DataItemGpu> &data) {
  32. int offset = 0;
  33. for (size_t i = 0; i < data.size(); i++) {
  34. auto item = data[i];
  35. if (item.data_ptr_ == nullptr || item.data_len_ != shape_[i]) {
  36. MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
  37. return ERROR_INPUT;
  38. }
  39. void *addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * len_ + offset;
  40. CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
  41. "Cuda Memcpy Error");
  42. offset += item.data_len_;
  43. }
  44. node_info_[tail_].event_.reset(new cudaEvent_t());
  45. CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
  46. node_info_[tail_].data_ = data;
  47. tail_ = (tail_ + 1) % (capacity_);
  48. return SUCCESS;
  49. }
  50. BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
  51. CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
  52. CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
  53. *addr = (unsigned char *)buffer_ + head_ * len_;
  54. *len = len_;
  55. for (auto item : node_info_[head_].data_) {
  56. host_release_(item.data_ptr_);
  57. }
  58. return SUCCESS;
  59. }
  60. BlockQueueStatus_T GpuQueue::Pop() {
  61. head_ = (head_ + 1) % (capacity_);
  62. return SUCCESS;
  63. }
  64. bool GpuQueue::Destroy() {
  65. if (stream_ != nullptr) {
  66. auto ret = cudaStreamDestroy(stream_);
  67. if (ret == cudaSuccess) {
  68. return true;
  69. } else {
  70. return false;
  71. }
  72. } else {
  73. return true;
  74. }
  75. }
  76. BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity) {
  77. if (addr == nullptr) {
  78. MS_LOG(ERROR) << "addr is nullptr";
  79. return INTERNAL_ERROR;
  80. }
  81. queue_ = std::make_shared<GpuQueue>(addr, shape, capacity);
  82. return SUCCESS;
  83. }
  84. void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); }
  85. BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int timeout_in_sec) {
  86. std::unique_lock<std::mutex> locker(mutex_);
  87. if (queue_->IsFull()) {
  88. if (not_full_cond_.wait_for(locker, std::chrono::seconds(timeout_in_sec)) == std::cv_status::timeout) {
  89. return TIMEOUT;
  90. }
  91. }
  92. auto ret = queue_->Push(data);
  93. if (ret) {
  94. return ret;
  95. }
  96. not_empty_cond_.notify_one();
  97. return SUCCESS;
  98. }
  99. BlockQueueStatus_T BlockingQueue::Front(void **addr, size_t *len) {
  100. std::unique_lock<std::mutex> locker(mutex_);
  101. bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
  102. if (!timeout) {
  103. return TIMEOUT;
  104. }
  105. return queue_->Front(addr, len);
  106. }
  107. BlockQueueStatus_T BlockingQueue::Pop() {
  108. std::unique_lock<std::mutex> locker(mutex_);
  109. not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
  110. auto ret = queue_->Pop();
  111. if (ret) {
  112. return ret;
  113. }
  114. not_full_cond_.notify_one();
  115. return SUCCESS;
  116. }
  117. bool BlockingQueue::Destroy() {
  118. if (queue_ != nullptr) {
  119. return queue_->Destroy();
  120. } else {
  121. return true;
  122. }
  123. }
  124. } // namespace device
  125. } // namespace mindspore