diff --git a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt index ecacf5e4ff..28678026c6 100644 --- a/mindspore/ccsrc/minddata/dataset/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/CMakeLists.txt @@ -264,7 +264,7 @@ if(ENABLE_GPUQUE) endif() if(ENABLE_TDTQUE) - target_link_libraries(_c_dataengine PRIVATE ${TSDCLIENT}) + target_link_libraries(_c_dataengine PRIVATE ${ACL}) endif() add_dependencies(_c_dataengine _c_mindrecord) diff --git a/mindspore/ccsrc/minddata/dataset/api/datasets.cc b/mindspore/ccsrc/minddata/dataset/api/datasets.cc index 2c2bfd88ee..7324473cf7 100644 --- a/mindspore/ccsrc/minddata/dataset/api/datasets.cc +++ b/mindspore/ccsrc/minddata/dataset/api/datasets.cc @@ -131,8 +131,8 @@ std::shared_ptr Dataset::CreateIterator(std::vector colum #ifndef ENABLE_ANDROID // Function to return a transferred Node that transfers data through a device. -bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32_t num_epochs, bool send_epoch_end, - int32_t total_batches, bool create_data_info_queue) { +bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32_t device_id, int32_t num_epochs, + bool send_epoch_end, int32_t total_batches, bool create_data_info_queue) { Status rc; // Build and launch tree @@ -144,8 +144,8 @@ bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32 } // Add TransferNode IR on top of dataset - auto ds = std::make_shared(shared_from_this()->IRNode(), queue_name, device_type, send_epoch_end, - total_batches, create_data_info_queue); + auto ds = std::make_shared(shared_from_this()->IRNode(), queue_name, device_type, device_id, + send_epoch_end, total_batches, create_data_info_queue); // Get ToDevice consumer auto consumer = std::make_unique(num_epochs); diff --git a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc index 8a94ab4a5c..dfe7ce9bcf 100644 --- a/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc +++ b/mindspore/ccsrc/minddata/dataset/api/python/bindings/dataset/include/datasets_bindings.cc @@ -527,9 +527,10 @@ PYBIND_REGISTER(TransferNode, 2, ([](const py::module *m) { (void)py::class_>(*m, "TransferNode", "to create a TransferNode") .def(py::init([](std::shared_ptr self, std::string queue_name, std::string device_type, - bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) { - auto transfer = std::make_shared(self, queue_name, device_type, send_epoch_end, - total_batch, create_data_info_queue); + int32_t device_id, bool send_epoch_end, int32_t total_batch, + bool create_data_info_queue) { + auto transfer = std::make_shared( + self, queue_name, device_type, device_id, send_epoch_end, total_batch, create_data_info_queue); THROW_IF_ERROR(transfer->ValidateParams()); return transfer; })); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 05714d8865..80a816b700 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -62,6 +62,7 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i #endif #ifdef ENABLE_TDTQUE ascend_keep_waiting_ = true; + tdtInstancePtr = std::make_shared(channel_name_, device_id_); #endif } @@ -159,7 +160,7 @@ Status DeviceQueueOp::SendDataToAscend() { RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); WaitContinueSignal(); auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost); - if (status == TdtStatus::FAILED) { + if (status != Status::OK()) { if (stop_send_) { MS_LOG(INFO) << "stop_send received"; return Status::OK(); @@ -190,9 +191,9 @@ Status DeviceQueueOp::SendDataToAscend() { } if (current_buffer->eoe() && send_epoch_end_) { TensorRow currRow; - auto status = - tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, tdt::TDT_END_OF_SEQUENCE); - if (status == TdtStatus::FAILED) { + auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, + ACL_TENSOR_DATA_END_OF_SEQUENCE); + if (status != Status::OK()) { if (stop_send_) { MS_LOG(INFO) << "stop_send received"; return Status::OK(); @@ -209,7 +210,6 @@ Status DeviceQueueOp::SendDataToAscend() { } RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); } - tree_->SetFinished(); return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc index c85f555e0d..1ba1674892 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.cc @@ -32,20 +32,20 @@ namespace dataset { // Constructor for TransferNode TransferNode::TransferNode(std::shared_ptr child, std::string queue_name, std::string device_type, - bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) + int32_t device_id, bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) : prefetch_size_(16), queue_name_(std::move(queue_name)), device_type_(std::move(device_type)), send_epoch_end_(send_epoch_end), total_batch_(total_batch), create_data_info_queue_(create_data_info_queue), - device_id_(0) { + device_id_(device_id) { this->AddChild(child); } std::shared_ptr TransferNode::Copy() { - auto node = std::make_shared(nullptr, queue_name_, device_type_, send_epoch_end_, total_batch_, - create_data_info_queue_); + auto node = std::make_shared(nullptr, queue_name_, device_type_, device_id_, send_epoch_end_, + total_batch_, create_data_info_queue_); return node; } @@ -96,9 +96,9 @@ Status TransferNode::Build(std::vector> *const node_o RETURN_STATUS_UNEXPECTED(err_msg); } - // Get device ID (shard ID) from children - device_id_ = 0; - RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); +// // Get device ID (shard ID) from children +// device_id_ = 0; +// RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); auto op = std::make_shared(queue_name_, type, device_id_, prefetch_size_, send_epoch_end_, total_batch_, create_data_info_queue_); diff --git a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h index 9d5617f2a5..b136ea71bf 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h +++ b/mindspore/ccsrc/minddata/dataset/engine/ir/datasetops/transfer_node.h @@ -29,8 +29,8 @@ namespace dataset { class TransferNode : public DatasetNode { public: /// \brief Constructor - TransferNode(std::shared_ptr child, std::string queue_name, std::string device_type, bool send_epoch_end, - int32_t total_batch, bool create_data_info_queue); + TransferNode(std::shared_ptr child, std::string queue_name, std::string device_type, int32_t device_id, + bool send_epoch_end, int32_t total_batch, bool create_data_info_queue); /// \brief Destructor ~TransferNode() = default; diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt index 50165bb147..590f6db490 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/CMakeLists.txt @@ -1,5 +1,6 @@ -file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") +file( + GLOB_RECURSE _CURRENT_SRC_FILES + RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} + "*.cc") set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) -add_library(engine-tdt OBJECT - tdt_plugin.cc - ) +add_library(engine-tdt OBJECT tdt_plugin.cc tdt_handle.cc) \ No newline at end of file diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc new file mode 100644 index 0000000000..21f250073d --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.cc @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "minddata/dataset/engine/tdt/tdt_handle.h" +namespace mindspore { +namespace dataset { + +std::vector TdtHandle::acl_handle = std::vector(); + +void TdtHandle::AddHandle(acltdtChannelHandle *handle) { + if (handle != nullptr) { + acl_handle.emplace_back(handle); + } +} + +bool TdtHandle::DestroyHandle() { + for (auto handle : acl_handle) { + if (handle != nullptr) { + if (acltdtDestroyChannel(handle) != ACL_SUCCESS) { + return false; + } + } + } + return true; +} +} // namespace dataset +} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h new file mode 100644 index 0000000000..3c0cfdf839 --- /dev/null +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h @@ -0,0 +1,38 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ +#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ + +#include +#include +#include "acl/acl_tdt.h" + +namespace mindspore { +namespace dataset { +class TdtHandle { + public: + static void AddHandle(acltdtChannelHandle *handle); + + static bool DestroyHandle(); + + private: + TdtHandle() {} + + static std::vector acl_handle; +}; +} // namespace dataset +} // namespace mindspore +#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc index 9bfdcacee4..79ce95465c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc @@ -23,108 +23,138 @@ namespace mindspore { namespace dataset { -static std::shared_ptr instance_ptr_ = nullptr; +TdtPlugin::TdtPlugin(const std::string &channel_name, int32_t device_id) { + // create acl tdt handle + acl_handle_ = acltdtCreateChannel(device_id, channel_name.c_str()); + if (acl_handle_ == nullptr) { + MS_LOG(ERROR) << "Failed to create channel for tdt queue."; + } + TdtHandle::AddHandle(acl_handle_); +} -std::shared_ptr TdtPlugin::GetInstance() { - if (instance_ptr_ == nullptr) { - instance_ptr_ = std::shared_ptr(new TdtPlugin); +TdtPlugin::~TdtPlugin() { + if (acl_handle_ != nullptr && acltdtDestroyChannel(acl_handle_) != ACL_SUCCESS) { + MS_LOG(ERROR) << "Failed to destroy channel for tdt queue."; } - return instance_ptr_; } -TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time, - tdt::TdtDataType tdt_type) { +Status TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time, + acltdtTensorType tdt_type) { MS_LOG(DEBUG) << "TDT channel name is " << channel_name << "."; - std::vector items; + + acltdtDataset *acl_dataset = nullptr; double start_time; - if (tdt_type == tdt::TDT_TENSOR) { - auto ret = translate(ts_row, items); - if (ret != SUCCESS) { - MS_LOG(ERROR) << "TDT converting tensor failed!"; - return FAILED; - } - } else if (tdt_type == tdt::TDT_END_OF_SEQUENCE) { - DataItem data_item; - data_item.dataType_ = tdt::TDT_END_OF_SEQUENCE; - items.emplace_back(data_item); - MS_LOG(INFO) << "TDT data type is TDT_END_OF_SEQUENCE"; + auto ret = translate(tdt_type, ts_row, &acl_dataset); + if (ret != Status::OK()) { + DestroyAclDataset(acl_dataset); + RETURN_STATUS_UNEXPECTED("TDT converting tensor failed!"); } + if (profiling) { start_time = ProfilingTime::GetCurMilliSecond(); } #if ENABLE_D // Data prefetch only when PS mode enables cache. - if (items.size() > 0) { - if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name, items[0].dataPtr_.get(), items[0].dataLen_)) { - return FAILED; + if (acltdtGetDatasetSize(acl_dataset) > 0) { + acltdtDataItem *item0 = acltdtGetDataItem(acl_dataset, 0); + if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name, acltdtGetDataAddrFromItem(item0), + acltdtGetDataSizeFromItem(item0))) { + RETURN_STATUS_UNEXPECTED("PrefetchData failed in when pre-processing sending data."); } } #endif - if (tdt::TdtHostPushData(channel_name, items) != 0) { - return FAILED; + auto status = acltdtSendTensor(acl_handle_, acl_dataset, -1); + DestroyAclDataset(acl_dataset); + if (status != ACL_SUCCESS) { + RETURN_STATUS_UNEXPECTED("Tdt Send data failed."); } if (profiling) { double end_time = ProfilingTime::GetCurMilliSecond(); time = (int32_t)(end_time - start_time); } - return SUCCESS; + return Status::OK(); } -TdtStatus TdtPlugin::getTdtType(DataType d_type, std::string &datatype) { +Status TdtPlugin::getTdtType(DataType d_type, aclDataType &datatype) { switch (d_type.value()) { case DataType::DE_BOOL: - datatype = "bool"; + datatype = ACL_BOOL; break; case DataType::DE_INT8: - datatype = "int8"; + datatype = ACL_INT8; break; case DataType::DE_UINT8: - datatype = "uint8"; + datatype = ACL_UINT8; break; case DataType::DE_INT16: - datatype = "int16"; + datatype = ACL_INT16; break; case DataType::DE_UINT16: - datatype = "uint16"; + datatype = ACL_UINT16; break; case DataType::DE_INT32: - datatype = "int32"; + datatype = ACL_INT32; break; case DataType::DE_UINT32: - datatype = "uint32"; + datatype = ACL_UINT32; break; case DataType::DE_FLOAT16: - datatype = "float16"; + datatype = ACL_FLOAT16; break; case DataType::DE_FLOAT32: - datatype = "float32"; + datatype = ACL_FLOAT; break; case DataType::DE_FLOAT64: - datatype = "float64"; + datatype = ACL_DOUBLE; break; case DataType::DE_INT64: - datatype = "int64"; + datatype = ACL_INT64; break; case DataType::DE_UINT64: - datatype = "uint64"; + datatype = ACL_UINT64; break; default: - return FAILED; + RETURN_STATUS_UNEXPECTED("Invalid data, got unexpected data type."); } - return SUCCESS; + return Status::OK(); } -TdtStatus TdtPlugin::translate(const TensorRow &ts_row, std::vector &items) { - if (ts_row.size() == 0) { - MS_LOG(ERROR) << "TDT the size of row is zero."; - return SUCCESS; +Status TdtPlugin::translate(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset **output_acl_dataset) { + auto acl_dataset = acltdtCreateDataset(); + if (acl_dataset == nullptr) { + RETURN_STATUS_UNEXPECTED("Create tdt dataset failed."); } - for (auto ts : ts_row) { - std::string datatype; - TdtStatus status = getTdtType(ts->type(), datatype); - if (status != SUCCESS) { - return status; + auto status = AssembleTensor2AclDataset(tdt_type, ts_row, acl_dataset); + if (status != Status::OK()) { + DestroyAclDataset(acl_dataset); + RETURN_STATUS_UNEXPECTED("Assemble tensor row to tdt dataset failed."); + } + + *output_acl_dataset = acl_dataset; + return Status::OK(); +} + +Status TdtPlugin::AssembleTensor2AclDataset(acltdtTensorType tdt_type, const TensorRow &ts_row, + acltdtDataset *acl_dataset) { + if (tdt_type != ACL_TENSOR_DATA_TENSOR || ts_row.size() == 0) { + acltdtDataItem *acl_data = acltdtCreateDataItem(tdt_type, nullptr, 0, ACL_BOOL, nullptr, 0); + if (acl_data == nullptr) { + RETURN_STATUS_UNEXPECTED("Create data item failed when send data with type:" + std::to_string(tdt_type)); } + if (acltdtAddDataItem(acl_dataset, acl_data) != ACL_SUCCESS) { + if (acltdtDestroyDataItem(acl_data) != ACL_SUCCESS) { + MS_LOG(ERROR) << "Destroy data item failed when send data with type: " << tdt_type; + } + RETURN_STATUS_UNEXPECTED("Add data item to tdt dataset failed when send data."); + } + return Status::OK(); + } + + for (auto ts : ts_row) { + aclDataType datatype; + acltdtDataItem *acl_data = nullptr; + RETURN_IF_NOT_OK(getTdtType(ts->type(), datatype)); + TensorShape tsShape = ts->shape(); std::string dataShapes = "["; for (auto dim : tsShape.AsVector()) { @@ -132,18 +162,46 @@ TdtStatus TdtPlugin::translate(const TensorRow &ts_row, std::vector &i } dataShapes.pop_back(); (void)dataShapes.append("]"); - DataItem data_item; - data_item.dataType_ = tdt::TDT_TENSOR; - data_item.tensorShape_ = dataShapes; - data_item.tensorType_ = datatype; - data_item.dataLen_ = ts->SizeInBytes(); - data_item.dataPtr_ = + + std::shared_ptr dataPtr = std::shared_ptr(reinterpret_cast(&(*ts->begin())), [](const void *elem) {}); - items.emplace_back(data_item); + size_t dataLen = ts->SizeInBytes(); + const dsize_t dims = tsShape.Rank(); + std::vector dataShape; + for (auto i = 0; i < dims; i++) { + dataShape.emplace_back(tsShape[i]); + } + acl_data = acltdtCreateDataItem(ACL_TENSOR_DATA_TENSOR, (tsShape.empty() ? nullptr : &dataShape[0]), dims, datatype, + dataPtr.get(), dataLen); + if (acl_data == nullptr) { + RETURN_STATUS_UNEXPECTED("Create data item failed when send data."); + } + if (acltdtAddDataItem(acl_dataset, acl_data) != ACL_SUCCESS) { + if (acltdtDestroyDataItem(acl_data) != ACL_SUCCESS) { + MS_LOG(ERROR) << "Destroy data item failed when send data with type ACL_TENSOR_DATA_TENSOR."; + } + RETURN_STATUS_UNEXPECTED("Add data item to tdt dataset failed when send data."); + } + MS_LOG(DEBUG) << "TDT data type is TDT_TENSOR, tensor type is " << datatype << ", tensor shape is " << dataShapes << ", data length is " << ts->Size() << "."; } - return SUCCESS; + + return Status::OK(); +} + +Status TdtPlugin::DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item) { + if (include_data_item) { + for (size_t i = 0; i < acltdtGetDatasetSize(acl_dataset); i++) { + if (acltdtDestroyDataItem(acltdtGetDataItem(acl_dataset, i)) != ACL_SUCCESS) { + RETURN_STATUS_UNEXPECTED("Destroy data item failed when send data."); + } + } + } + if (acltdtDestroyDataset(acl_dataset) != ACL_SUCCESS) { + RETURN_STATUS_UNEXPECTED("Destroy tdt dataset failed when send data."); + } + return Status::OK(); } } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h index 1275918c9f..0d2202d51b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h +++ b/mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.h @@ -22,33 +22,40 @@ #include #include #include -#include "tdt/tdt_host_interface.h" +#include "acl/acl_tdt.h" +#include "minddata/dataset/engine/tdt/tdt_handle.h" #include "minddata/dataset/core/data_type.h" #include "minddata/dataset/core/tensor.h" #include "minddata/dataset/core/tensor_row.h" +#include "minddata/dataset/util/status.h" namespace mindspore { namespace dataset { -enum TdtStatus { SUCCESS, FAILED }; - -using tdt::DataItem; class TdtPlugin { public: static std::shared_ptr GetInstance(); - TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time, - tdt::TdtDataType tdt_type = tdt::TDT_TENSOR); + Status hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time, + acltdtTensorType tdt_type = ACL_TENSOR_DATA_TENSOR); + + TdtPlugin(const std::string &channel_name, int32_t device_id); + + ~TdtPlugin(); private: - TdtPlugin() {} + Status DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item = true); - TdtStatus getTdtType(DataType d_type, std::string &datatype); + Status AssembleTensor2AclDataset(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset *acl_dataset); - TdtStatus translate(const TensorRow &ts_row, std::vector &items); + Status getTdtType(DataType d_type, aclDataType &datatype); + + Status translate(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset **output_acl_dataset); void *tdt_handle_ = nullptr; + + acltdtChannelHandle *acl_handle_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/include/datasets.h b/mindspore/ccsrc/minddata/dataset/include/datasets.h index da139f6a4f..55e94d2243 100644 --- a/mindspore/ccsrc/minddata/dataset/include/datasets.h +++ b/mindspore/ccsrc/minddata/dataset/include/datasets.h @@ -152,14 +152,16 @@ class Dataset : public std::enable_shared_from_this { /// of data transmission per time is 256M. /// \param[in] queue_name Channel name (default="", create new unique name). /// \param[in] device_type Type of device (default="", get from MSContext). + /// \param[in] device_id id of device (default=0, get from MSContext). /// \param[in] num_epochs Number of epochs (default=-1, infinite epochs). /// \param[in] send_epoch_end Whether to send end of sequence to device or not (default=true). /// \param[in] total_batches Number of batches to be sent to the device (default=0, all data). /// \param[in] create_data_info_queue Whether to create queue which stores types and shapes /// of data or not(default=false). /// \return Returns true if no error encountered else false. - bool DeviceQueue(std::string queue_name = "", std::string device_type = "", int32_t num_epochs = -1, - bool send_epoch_end = true, int32_t total_batches = 0, bool create_data_info_queue = false); + bool DeviceQueue(std::string queue_name = "", std::string device_type = "", int32_t device_id = 0, + int32_t num_epochs = -1, bool send_epoch_end = true, int32_t total_batches = 0, + bool create_data_info_queue = false); /// \brief Function to create a Saver to save the dynamic data processed by the dataset pipeline /// \note Usage restrictions: diff --git a/mindspore/ccsrc/minddata/dataset/util/task.cc b/mindspore/ccsrc/minddata/dataset/util/task.cc index 6d2c0bcaa0..f7db73b658 100644 --- a/mindspore/ccsrc/minddata/dataset/util/task.cc +++ b/mindspore/ccsrc/minddata/dataset/util/task.cc @@ -21,8 +21,9 @@ #include "minddata/dataset/util/services.h" #endif #ifdef ENABLE_TDTQUE -#include "tdt/tdt_host_interface.h" +#include "acl/acl_tdt.h" #include "tdt/status.h" +#include "minddata/dataset/engine/tdt/tdt_handle.h" #endif namespace mindspore { @@ -161,11 +162,10 @@ Status Task::Join(WaitFlag blocking) { if (wait_times > 5 && my_name_.find("DeviceQueueOp") != std::string::npos) { MS_LOG(WARNING) << "Wait " << wait_times << " seconds, " << "the task: " << my_name_ << " will be destroyed by TdtHostDestory."; - int32_t destory_status = tdt::TdtHostDestroy(); - if (destory_status != TDT_OK_CODE) { - MS_LOG(WARNING) << "Destroy tsd failed, status = " << destory_status << "."; + if (!TdtHandle::DestroyHandle()) { + MS_LOG(WARNING) << "Destroy tdt channel failed."; } else { - MS_LOG(INFO) << "Destroy tsd success."; + MS_LOG(INFO) << "Destroy tdt channel success."; } // just wait 30 seconds diff --git a/mindspore/ccsrc/runtime/device/CMakeLists.txt b/mindspore/ccsrc/runtime/device/CMakeLists.txt index a246638176..3709a6188f 100644 --- a/mindspore/ccsrc/runtime/device/CMakeLists.txt +++ b/mindspore/ccsrc/runtime/device/CMakeLists.txt @@ -1,5 +1,6 @@ file(GLOB_RECURSE DEVICE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "common/*.cc" - "kernel_info.cc" "executor/dynamic_kernel.cc" "executor/executor_callback.cc" "kernel_runtime.cc" "memory_manager.cc" "kernel_runtime_manager.cc" "convert_tensor_utils.cc" + "kernel_info.cc" "executor/dynamic_kernel.cc" "executor/executor_callback.cc" "kernel_runtime.cc" + "memory_manager.cc" "kernel_runtime_manager.cc" "convert_tensor_utils.cc" ) if(ENABLE_GPU) @@ -9,7 +10,8 @@ else() endif() if(ENABLE_D) - file(GLOB_RECURSE D_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ascend/*.cc" "kernel_adjust.cc") + file(GLOB_RECURSE D_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ascend/*.cc" "kernel_adjust.cc" + "../../minddata/dataset/engine/tdt/tdt_handle.cc") endif() if(ENABLE_CPU) diff --git a/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc b/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc index 3e7516d686..01c9d7c773 100644 --- a/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc +++ b/mindspore/ccsrc/runtime/device/ascend/ascend_kernel_runtime.cc @@ -60,8 +60,8 @@ #include "runtime/device/ascend/profiling/profiling_callback_register.h" #include "backend/kernel_compiler/hccl/hccl_context.h" #ifdef ENABLE_TDTQUE -#include "tdt/tdt_host_interface.h" -#include "tdt/status.h" +#include "minddata/dataset/engine/tdt/tdt_handle.h" +using mindspore::dataset::TdtHandle; #endif using ge::model_runner::ModelRunner; @@ -695,11 +695,10 @@ bool AscendKernelRuntime::RunTask(const session::KernelGraph *graph) { #ifdef ENABLE_TDTQUE // Run task error, we should call TdtHostDestroy to release tdt to avoid DeviceQueueOp hostPush hung // case1: cpu usage 100% cause thread/process exit, but some tdt thread remain in backend - int32_t destory_status = tdt::TdtHostDestroy(); - if (destory_status != TDT_OK_CODE) { - MS_LOG(WARNING) << "Destroy tsd failed, status = " << destory_status << "."; + if (!TdtHandle::DestroyHandle()) { + MS_LOG(WARNING) << "Destroy tdt channel failed."; } else { - MS_LOG(INFO) << "Destroy tsd success."; + MS_LOG(INFO) << "Destroy tdt channel success."; } #endif return false; diff --git a/mindspore/ccsrc/utils/context/context_extends.cc b/mindspore/ccsrc/utils/context/context_extends.cc index 9c987b6f48..b329aba52b 100644 --- a/mindspore/ccsrc/utils/context/context_extends.cc +++ b/mindspore/ccsrc/utils/context/context_extends.cc @@ -230,7 +230,7 @@ void GetGeOptions(const std::shared_ptr &ms_context_ptr, std::map