You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

blocking_queue.cc 4.6 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "runtime/device/gpu/blocking_queue.h"
  17. #include <chrono>
  18. #include "runtime/device/gpu/gpu_common.h"
  19. #include "utils/ms_utils.h"
  20. namespace mindspore {
  21. namespace device {
  22. GpuQueue::GpuQueue(void *addr, const std::vector<size_t> &shape, const size_t &capacity)
  23. : buffer_(addr),
  24. head_(0),
  25. tail_(0),
  26. shape_(shape),
  27. len_(0),
  28. size_(0),
  29. capacity_(capacity),
  30. stream_(0),
  31. node_info_(nullptr) {
  32. CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
  33. node_info_ = std::make_unique<NodeInfo[]>(capacity);
  34. for (auto item : shape) {
  35. len_ += item;
  36. }
  37. }
  38. GpuQueue::~GpuQueue() { buffer_ = nullptr; }
  39. BlockQueueStatus_T GpuQueue::Push(const std::vector<DataItemGpu> &data) {
  40. int offset = 0;
  41. for (size_t i = 0; i < data.size(); i++) {
  42. auto item = data[i];
  43. if (item.data_ptr_ == nullptr || item.data_len_ != shape_[i]) {
  44. MS_LOG(ERROR) << "Invalid Input: ptr: " << item.data_ptr_ << ", len: " << item.data_len_;
  45. return ERROR_INPUT;
  46. }
  47. void *addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * len_ + offset;
  48. CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(addr, item.data_ptr_, item.data_len_, cudaMemcpyHostToDevice, stream_),
  49. "Cuda Memcpy Error");
  50. offset += item.data_len_;
  51. }
  52. node_info_[tail_].event_.reset(new cudaEvent_t());
  53. CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
  54. node_info_[tail_].data_ = data;
  55. tail_ = (tail_ + 1) % (capacity_);
  56. ++size_;
  57. return SUCCESS;
  58. }
  59. BlockQueueStatus_T GpuQueue::Front(void **addr, size_t *len) const {
  60. CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
  61. CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
  62. *addr = (unsigned char *)buffer_ + head_ * len_;
  63. *len = len_;
  64. for (auto item : node_info_[head_].data_) {
  65. host_release_(item.data_ptr_);
  66. }
  67. return SUCCESS;
  68. }
  69. BlockQueueStatus_T GpuQueue::Pop() {
  70. head_ = (head_ + 1) % (capacity_);
  71. --size_;
  72. return SUCCESS;
  73. }
  74. bool GpuQueue::Destroy() {
  75. if (stream_ != nullptr) {
  76. auto ret = cudaStreamDestroy(stream_);
  77. if (ret == cudaSuccess) {
  78. return true;
  79. } else {
  80. return false;
  81. }
  82. } else {
  83. return true;
  84. }
  85. }
  86. BlockQueueStatus_T BlockingQueue::Create(void *addr, const std::vector<size_t> &shape, const size_t &capacity) {
  87. if (addr == nullptr) {
  88. MS_LOG(ERROR) << "addr is nullptr";
  89. return INTERNAL_ERROR;
  90. }
  91. queue_ = std::make_shared<GpuQueue>(addr, shape, capacity);
  92. return SUCCESS;
  93. }
  94. void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); }
  95. BlockQueueStatus_T BlockingQueue::Push(const std::vector<DataItemGpu> &data, unsigned int) {
  96. std::unique_lock<std::mutex> locker(mutex_);
  97. if (queue_->IsFull()) {
  98. if (not_full_cond_.wait_for(locker, std::chrono::microseconds(100)) == std::cv_status::timeout) {
  99. return TIMEOUT;
  100. }
  101. }
  102. auto ret = queue_->Push(data);
  103. if (ret) {
  104. return ret;
  105. }
  106. not_empty_cond_.notify_one();
  107. return SUCCESS;
  108. }
  109. BlockQueueStatus_T BlockingQueue::Front(void **addr, size_t *len) {
  110. std::unique_lock<std::mutex> locker(mutex_);
  111. bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
  112. if (!timeout) {
  113. return TIMEOUT;
  114. }
  115. return queue_->Front(addr, len);
  116. }
  117. BlockQueueStatus_T BlockingQueue::Pop() {
  118. std::unique_lock<std::mutex> locker(mutex_);
  119. not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
  120. auto ret = queue_->Pop();
  121. if (ret) {
  122. return ret;
  123. }
  124. not_full_cond_.notify_one();
  125. return SUCCESS;
  126. }
  127. bool BlockingQueue::Destroy() {
  128. if (queue_ != nullptr) {
  129. return queue_->Destroy();
  130. } else {
  131. return true;
  132. }
  133. }
  134. } // namespace device
  135. } // namespace mindspore