|
|
|
@@ -44,9 +44,9 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i |
|
|
|
DeviceQueueOp::~DeviceQueueOp() {} |
|
|
|
|
|
|
|
#ifdef ENABLE_GPUQUE |
|
|
|
void ReleaseData(void *addr) { |
|
|
|
void DeviceQueueOp::ReleaseData(void *addr) { |
|
|
|
if (addr != nullptr) { |
|
|
|
free(addr); |
|
|
|
pool_->Deallocate(addr); |
|
|
|
} |
|
|
|
} |
|
|
|
#endif |
|
|
|
@@ -87,6 +87,7 @@ Status DeviceQueueOp::operator()() { |
|
|
|
#endif |
|
|
|
} else if (device_type_ == DeviceType::GPU) { |
|
|
|
#ifdef ENABLE_GPUQUE |
|
|
|
RETURN_IF_NOT_OK(CircularPool::CreateCircularPool(&pool_)); |
|
|
|
RETURN_IF_NOT_OK(SendDataToGPU()); |
|
|
|
#endif |
|
|
|
} else if (device_type_ == DeviceType::CPU) { |
|
|
|
@@ -187,6 +188,7 @@ Status DeviceQueueOp::SendDataToGPU() { |
|
|
|
bool is_break_loop = false; |
|
|
|
bool is_open = false; |
|
|
|
uint32_t handle = INVALID_HANDLE; |
|
|
|
auto release_function = std::bind(&DeviceQueueOp::ReleaseData, this, std::placeholders::_1); |
|
|
|
|
|
|
|
std::unique_ptr<DataBuffer> current_buffer; |
|
|
|
RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); |
|
|
|
@@ -204,7 +206,7 @@ Status DeviceQueueOp::SendDataToGPU() { |
|
|
|
data_size.push_back(static_cast<size_t>(curr_row[i]->SizeInBytes())); |
|
|
|
} |
|
|
|
if (!is_open) { |
|
|
|
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, ReleaseData); |
|
|
|
handle = GpuBufferMgr::GetInstance().Open(0, channel_name_, data_size, release_function); |
|
|
|
if (handle == INVALID_HANDLE) { |
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "open failed"); |
|
|
|
} |
|
|
|
@@ -246,7 +248,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con |
|
|
|
BlockQueueStatus_T ret = GpuBufferMgr::GetInstance().Push(handle, items, WAIT_TIME); |
|
|
|
if (ret) { |
|
|
|
for (int i = 0; i < items.size(); i++) { |
|
|
|
free(items[i].data_ptr_); |
|
|
|
ReleaseData(items[i].data_ptr_); |
|
|
|
} |
|
|
|
if (ret == BlockQueueStatus_T::ERROR_INPUT) { |
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "invalid input Data, please check it."); |
|
|
|
@@ -267,7 +269,7 @@ Status DeviceQueueOp::RetryPushGPUData(const std::vector<size_t> &data_size, con |
|
|
|
Status DeviceQueueOp::MallocForGPUData(std::vector<device::DataItemGpu> *items, const TensorRow &curr_row) { |
|
|
|
int i = 0; |
|
|
|
for (auto &sub_item : *items) { |
|
|
|
sub_item.data_ptr_ = (unsigned char *)malloc(sub_item.data_len_); |
|
|
|
RETURN_IF_NOT_OK(pool_->Allocate(sub_item.data_len_, &sub_item.data_ptr_)); |
|
|
|
if (sub_item.data_ptr_ == nullptr) { |
|
|
|
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "memory malloc failed."); |
|
|
|
} |
|
|
|
|