You can not select more than 25 topics Topics must start with a chinese character,a letter or number, can include dashes ('-') and can be up to 35 characters long.

blocking_queue.cc 5.8 kB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. /**
  2. * Copyright 2019 Huawei Technologies Co., Ltd
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. #include "device/gpu/blocking_queue.h"
  17. #include <chrono>
  18. #include "device/gpu/gpu_common.h"
  19. #include "common/utils.h"
  20. namespace mindspore {
  21. namespace device {
  22. GpuQueue::GpuQueue(void *addr, size_t feature_size, size_t label_size, size_t capacity)
  23. : buffer_(addr),
  24. head_(0),
  25. tail_(0),
  26. feature_size_(feature_size),
  27. label_size_(label_size),
  28. capacity_(capacity),
  29. stream_(0),
  30. node_info_(nullptr) {
  31. CHECK_CUDA_RET_WITH_ERROR(cudaStreamCreate(&stream_), "Cuda Create Stream Failed");
  32. node_info_ = std::make_unique<NodeInfo[]>(capacity);
  33. }
  34. GpuQueue::~GpuQueue() { buffer_ = nullptr; }
  35. BlockQueueStatus_T GpuQueue::Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size) {
  36. if ((feature_addr == nullptr) || (label_addr == nullptr)) {
  37. MS_LOG(ERROR) << "input nullptr";
  38. return ERROR_INPUT;
  39. }
  40. if ((feature_size != feature_size_) || (label_size != label_size_)) {
  41. MS_LOG(ERROR) << "Data input error. Input data size: (" << feature_size << ", " << label_size << "), with ("
  42. << feature_size_ << ", " << label_size_ << ") expect";
  43. return ERROR_INPUT;
  44. }
  45. void *feature_start_addr = reinterpret_cast<unsigned char *>(buffer_) + tail_ * (feature_size + label_size);
  46. if (feature_start_addr == nullptr) {
  47. MS_LOG(ERROR) << "feature start addr is nullptr";
  48. return INTERNAL_ERROR;
  49. }
  50. CHECK_CUDA_RET_WITH_ERROR(
  51. cudaMemcpyAsync(feature_start_addr, feature_addr, feature_size, cudaMemcpyHostToDevice, stream_),
  52. "Cuda Memcpy Error");
  53. void *label_start_addr = reinterpret_cast<unsigned char *>(feature_start_addr) + feature_size;
  54. if (label_start_addr == nullptr) {
  55. MS_LOG(ERROR) << "label start addr is nullptr";
  56. return INTERNAL_ERROR;
  57. }
  58. CHECK_CUDA_RET_WITH_ERROR(cudaMemcpyAsync(label_start_addr, label_addr, label_size, cudaMemcpyHostToDevice, stream_),
  59. "Cuda Memcpy Error");
  60. node_info_[tail_].event_.reset(new cudaEvent_t());
  61. CHECK_CUDA_RET_WITH_ERROR(cudaEventCreate(&(*(node_info_[tail_].event_))), "Cuda Create Event Failed");
  62. node_info_[tail_].host_feature_addr_ = feature_addr;
  63. node_info_[tail_].host_label_addr_ = label_addr;
  64. tail_ = (tail_ + 1) % (capacity_);
  65. return SUCCESS;
  66. }
  67. BlockQueueStatus_T GpuQueue::Front(void **feature_addr, size_t *feature_size, void **label_addr,
  68. size_t *label_size) const {
  69. CHECK_CUDA_RET_WITH_ERROR(cudaEventSynchronize(*(node_info_[head_].event_)), "Cuda Event Syn Failed");
  70. CHECK_CUDA_RET_WITH_ERROR(cudaEventDestroy(*(node_info_[head_].event_)), "Cuda Destroy Event Failed");
  71. *feature_addr = (unsigned char *)buffer_ + head_ * (feature_size_ + label_size_);
  72. *feature_size = feature_size_;
  73. *label_addr = (unsigned char *)buffer_ + head_ * (feature_size_ + label_size_) + feature_size_;
  74. *label_size = label_size_;
  75. host_release_(node_info_[head_].host_feature_addr_);
  76. host_release_(node_info_[head_].host_label_addr_);
  77. return SUCCESS;
  78. }
  79. BlockQueueStatus_T GpuQueue::Pop() {
  80. head_ = (head_ + 1) % (capacity_);
  81. return SUCCESS;
  82. }
  83. bool GpuQueue::Destroy() {
  84. if (stream_ != nullptr) {
  85. auto ret = cudaStreamDestroy(stream_);
  86. if (ret == cudaSuccess) {
  87. return true;
  88. } else {
  89. return false;
  90. }
  91. } else {
  92. return true;
  93. }
  94. }
  95. BlockQueueStatus_T BlockingQueue::Create(void *addr, size_t feature_size, size_t label_size, size_t capacity) {
  96. if (addr == nullptr) {
  97. MS_LOG(ERROR) << "addr is nullptr";
  98. return INTERNAL_ERROR;
  99. }
  100. queue_ = std::make_shared<GpuQueue>(addr, feature_size, label_size, capacity);
  101. return SUCCESS;
  102. }
  103. void BlockingQueue::RegisterRelease(const std::function<void(void *)> &func) { queue_->RegisterRelease(func); }
  104. BlockQueueStatus_T BlockingQueue::Push(void *feature_addr, size_t feature_size, void *label_addr, size_t label_size,
  105. unsigned int timeout_in_sec) {
  106. std::unique_lock<std::mutex> locker(mutex_);
  107. if (queue_->IsFull()) {
  108. if (not_full_cond_.wait_for(locker, std::chrono::seconds(timeout_in_sec)) == std::cv_status::timeout) {
  109. return TIMEOUT;
  110. }
  111. }
  112. auto ret = queue_->Push(feature_addr, feature_size, label_addr, label_size);
  113. if (ret) {
  114. return ret;
  115. }
  116. not_empty_cond_.notify_one();
  117. return SUCCESS;
  118. }
  119. BlockQueueStatus_T BlockingQueue::Front(void **feature_addr, size_t *feature_size, void **label_addr,
  120. size_t *label_size) {
  121. std::unique_lock<std::mutex> locker(mutex_);
  122. bool timeout = not_empty_cond_.wait_for(locker, std::chrono::seconds(30), [this] { return !queue_->IsEmpty(); });
  123. if (!timeout) {
  124. return TIMEOUT;
  125. }
  126. return queue_->Front(feature_addr, feature_size, label_addr, label_size);
  127. }
  128. BlockQueueStatus_T BlockingQueue::Pop() {
  129. std::unique_lock<std::mutex> locker(mutex_);
  130. not_empty_cond_.wait(locker, [this] { return !queue_->IsEmpty(); });
  131. auto ret = queue_->Pop();
  132. if (ret) {
  133. return ret;
  134. }
  135. not_full_cond_.notify_one();
  136. return SUCCESS;
  137. }
  138. bool BlockingQueue::Destroy() {
  139. if (queue_ != nullptr) {
  140. return queue_->Destroy();
  141. } else {
  142. return true;
  143. }
  144. }
  145. } // namespace device
  146. } // namespace mindspore