add hostpush part modify optimize previous code provide aclhandle access method modify CMakeList format add device_id parameter into TransferNodetags/v1.2.0-rc1
| @@ -264,7 +264,7 @@ if(ENABLE_GPUQUE) | |||
| endif() | |||
| if(ENABLE_TDTQUE) | |||
| target_link_libraries(_c_dataengine PRIVATE ${TSDCLIENT}) | |||
| target_link_libraries(_c_dataengine PRIVATE ${ACL}) | |||
| endif() | |||
| add_dependencies(_c_dataengine _c_mindrecord) | |||
| @@ -131,8 +131,8 @@ std::shared_ptr<Iterator> Dataset::CreateIterator(std::vector<std::string> colum | |||
| #ifndef ENABLE_ANDROID | |||
| // Function to return a transferred Node that transfers data through a device. | |||
| bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32_t num_epochs, bool send_epoch_end, | |||
| int32_t total_batches, bool create_data_info_queue) { | |||
| bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32_t device_id, int32_t num_epochs, | |||
| bool send_epoch_end, int32_t total_batches, bool create_data_info_queue) { | |||
| Status rc; | |||
| // Build and launch tree | |||
| @@ -144,8 +144,8 @@ bool Dataset::DeviceQueue(std::string queue_name, std::string device_type, int32 | |||
| } | |||
| // Add TransferNode IR on top of dataset | |||
| auto ds = std::make_shared<TransferNode>(shared_from_this()->IRNode(), queue_name, device_type, send_epoch_end, | |||
| total_batches, create_data_info_queue); | |||
| auto ds = std::make_shared<TransferNode>(shared_from_this()->IRNode(), queue_name, device_type, device_id, | |||
| send_epoch_end, total_batches, create_data_info_queue); | |||
| // Get ToDevice consumer | |||
| auto consumer = std::make_unique<ToDevice>(num_epochs); | |||
| @@ -527,9 +527,10 @@ PYBIND_REGISTER(TransferNode, 2, ([](const py::module *m) { | |||
| (void)py::class_<TransferNode, DatasetNode, std::shared_ptr<TransferNode>>(*m, "TransferNode", | |||
| "to create a TransferNode") | |||
| .def(py::init([](std::shared_ptr<DatasetNode> self, std::string queue_name, std::string device_type, | |||
| bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) { | |||
| auto transfer = std::make_shared<TransferNode>(self, queue_name, device_type, send_epoch_end, | |||
| total_batch, create_data_info_queue); | |||
| int32_t device_id, bool send_epoch_end, int32_t total_batch, | |||
| bool create_data_info_queue) { | |||
| auto transfer = std::make_shared<TransferNode>( | |||
| self, queue_name, device_type, device_id, send_epoch_end, total_batch, create_data_info_queue); | |||
| THROW_IF_ERROR(transfer->ValidateParams()); | |||
| return transfer; | |||
| })); | |||
| @@ -62,6 +62,7 @@ DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, i | |||
| #endif | |||
| #ifdef ENABLE_TDTQUE | |||
| ascend_keep_waiting_ = true; | |||
| tdtInstancePtr = std::make_shared<TdtPlugin>(channel_name_, device_id_); | |||
| #endif | |||
| } | |||
| @@ -159,7 +160,7 @@ Status DeviceQueueOp::SendDataToAscend() { | |||
| RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow)); | |||
| WaitContinueSignal(); | |||
| auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost); | |||
| if (status == TdtStatus::FAILED) { | |||
| if (status != Status::OK()) { | |||
| if (stop_send_) { | |||
| MS_LOG(INFO) << "stop_send received"; | |||
| return Status::OK(); | |||
| @@ -190,9 +191,9 @@ Status DeviceQueueOp::SendDataToAscend() { | |||
| } | |||
| if (current_buffer->eoe() && send_epoch_end_) { | |||
| TensorRow currRow; | |||
| auto status = | |||
| tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, tdt::TDT_END_OF_SEQUENCE); | |||
| if (status == TdtStatus::FAILED) { | |||
| auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, | |||
| ACL_TENSOR_DATA_END_OF_SEQUENCE); | |||
| if (status != Status::OK()) { | |||
| if (stop_send_) { | |||
| MS_LOG(INFO) << "stop_send received"; | |||
| return Status::OK(); | |||
| @@ -209,7 +210,6 @@ Status DeviceQueueOp::SendDataToAscend() { | |||
| } | |||
| RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); | |||
| } | |||
| tree_->SetFinished(); | |||
| return Status::OK(); | |||
| @@ -32,20 +32,20 @@ namespace dataset { | |||
| // Constructor for TransferNode | |||
| TransferNode::TransferNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type, | |||
| bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) | |||
| int32_t device_id, bool send_epoch_end, int32_t total_batch, bool create_data_info_queue) | |||
| : prefetch_size_(16), | |||
| queue_name_(std::move(queue_name)), | |||
| device_type_(std::move(device_type)), | |||
| send_epoch_end_(send_epoch_end), | |||
| total_batch_(total_batch), | |||
| create_data_info_queue_(create_data_info_queue), | |||
| device_id_(0) { | |||
| device_id_(device_id) { | |||
| this->AddChild(child); | |||
| } | |||
| std::shared_ptr<DatasetNode> TransferNode::Copy() { | |||
| auto node = std::make_shared<TransferNode>(nullptr, queue_name_, device_type_, send_epoch_end_, total_batch_, | |||
| create_data_info_queue_); | |||
| auto node = std::make_shared<TransferNode>(nullptr, queue_name_, device_type_, device_id_, send_epoch_end_, | |||
| total_batch_, create_data_info_queue_); | |||
| return node; | |||
| } | |||
| @@ -96,9 +96,9 @@ Status TransferNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o | |||
| RETURN_STATUS_UNEXPECTED(err_msg); | |||
| } | |||
| // Get device ID (shard ID) from children | |||
| device_id_ = 0; | |||
| RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); | |||
| // // Get device ID (shard ID) from children | |||
| // device_id_ = 0; | |||
| // RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); | |||
| auto op = std::make_shared<DeviceQueueOp>(queue_name_, type, device_id_, prefetch_size_, send_epoch_end_, | |||
| total_batch_, create_data_info_queue_); | |||
| @@ -29,8 +29,8 @@ namespace dataset { | |||
| class TransferNode : public DatasetNode { | |||
| public: | |||
| /// \brief Constructor | |||
| TransferNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type, bool send_epoch_end, | |||
| int32_t total_batch, bool create_data_info_queue); | |||
| TransferNode(std::shared_ptr<DatasetNode> child, std::string queue_name, std::string device_type, int32_t device_id, | |||
| bool send_epoch_end, int32_t total_batch, bool create_data_info_queue); | |||
| /// \brief Destructor | |||
| ~TransferNode() = default; | |||
| @@ -1,5 +1,6 @@ | |||
| file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc") | |||
| file( | |||
| GLOB_RECURSE _CURRENT_SRC_FILES | |||
| RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} | |||
| "*.cc") | |||
| set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) | |||
| add_library(engine-tdt OBJECT | |||
| tdt_plugin.cc | |||
| ) | |||
| add_library(engine-tdt OBJECT tdt_plugin.cc tdt_handle.cc) | |||
| @@ -0,0 +1,39 @@ | |||
| /** | |||
| * Copyright 2021 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include "minddata/dataset/engine/tdt/tdt_handle.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| std::vector<acltdtChannelHandle *> TdtHandle::acl_handle = std::vector<acltdtChannelHandle *>(); | |||
| void TdtHandle::AddHandle(acltdtChannelHandle *handle) { | |||
| if (handle != nullptr) { | |||
| acl_handle.emplace_back(handle); | |||
| } | |||
| } | |||
| bool TdtHandle::DestroyHandle() { | |||
| for (auto handle : acl_handle) { | |||
| if (handle != nullptr) { | |||
| if (acltdtDestroyChannel(handle) != ACL_SUCCESS) { | |||
| return false; | |||
| } | |||
| } | |||
| } | |||
| return true; | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,38 @@ | |||
| /** | |||
| * Copyright 2021 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ | |||
| #define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ | |||
| #include <iostream> | |||
| #include <vector> | |||
| #include "acl/acl_tdt.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class TdtHandle { | |||
| public: | |||
| static void AddHandle(acltdtChannelHandle *handle); | |||
| static bool DestroyHandle(); | |||
| private: | |||
| TdtHandle() {} | |||
| static std::vector<acltdtChannelHandle *> acl_handle; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_TDT_TDT_HANDLE_H_ | |||
| @@ -23,108 +23,138 @@ | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| static std::shared_ptr<TdtPlugin> instance_ptr_ = nullptr; | |||
| TdtPlugin::TdtPlugin(const std::string &channel_name, int32_t device_id) { | |||
| // create acl tdt handle | |||
| acl_handle_ = acltdtCreateChannel(device_id, channel_name.c_str()); | |||
| if (acl_handle_ == nullptr) { | |||
| MS_LOG(ERROR) << "Failed to create channel for tdt queue."; | |||
| } | |||
| TdtHandle::AddHandle(acl_handle_); | |||
| } | |||
| std::shared_ptr<TdtPlugin> TdtPlugin::GetInstance() { | |||
| if (instance_ptr_ == nullptr) { | |||
| instance_ptr_ = std::shared_ptr<TdtPlugin>(new TdtPlugin); | |||
| TdtPlugin::~TdtPlugin() { | |||
| if (acl_handle_ != nullptr && acltdtDestroyChannel(acl_handle_) != ACL_SUCCESS) { | |||
| MS_LOG(ERROR) << "Failed to destroy channel for tdt queue."; | |||
| } | |||
| return instance_ptr_; | |||
| } | |||
| TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time, | |||
| tdt::TdtDataType tdt_type) { | |||
| Status TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time, | |||
| acltdtTensorType tdt_type) { | |||
| MS_LOG(DEBUG) << "TDT channel name is " << channel_name << "."; | |||
| std::vector<DataItem> items; | |||
| acltdtDataset *acl_dataset = nullptr; | |||
| double start_time; | |||
| if (tdt_type == tdt::TDT_TENSOR) { | |||
| auto ret = translate(ts_row, items); | |||
| if (ret != SUCCESS) { | |||
| MS_LOG(ERROR) << "TDT converting tensor failed!"; | |||
| return FAILED; | |||
| } | |||
| } else if (tdt_type == tdt::TDT_END_OF_SEQUENCE) { | |||
| DataItem data_item; | |||
| data_item.dataType_ = tdt::TDT_END_OF_SEQUENCE; | |||
| items.emplace_back(data_item); | |||
| MS_LOG(INFO) << "TDT data type is TDT_END_OF_SEQUENCE"; | |||
| auto ret = translate(tdt_type, ts_row, &acl_dataset); | |||
| if (ret != Status::OK()) { | |||
| DestroyAclDataset(acl_dataset); | |||
| RETURN_STATUS_UNEXPECTED("TDT converting tensor failed!"); | |||
| } | |||
| if (profiling) { | |||
| start_time = ProfilingTime::GetCurMilliSecond(); | |||
| } | |||
| #if ENABLE_D | |||
| // Data prefetch only when PS mode enables cache. | |||
| if (items.size() > 0) { | |||
| if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name, items[0].dataPtr_.get(), items[0].dataLen_)) { | |||
| return FAILED; | |||
| if (acltdtGetDatasetSize(acl_dataset) > 0) { | |||
| acltdtDataItem *item0 = acltdtGetDataItem(acl_dataset, 0); | |||
| if (!ps::PsDataPrefetch::GetInstance().PrefetchData(channel_name, acltdtGetDataAddrFromItem(item0), | |||
| acltdtGetDataSizeFromItem(item0))) { | |||
| RETURN_STATUS_UNEXPECTED("PrefetchData failed in when pre-processing sending data."); | |||
| } | |||
| } | |||
| #endif | |||
| if (tdt::TdtHostPushData(channel_name, items) != 0) { | |||
| return FAILED; | |||
| auto status = acltdtSendTensor(acl_handle_, acl_dataset, -1); | |||
| DestroyAclDataset(acl_dataset); | |||
| if (status != ACL_SUCCESS) { | |||
| RETURN_STATUS_UNEXPECTED("Tdt Send data failed."); | |||
| } | |||
| if (profiling) { | |||
| double end_time = ProfilingTime::GetCurMilliSecond(); | |||
| time = (int32_t)(end_time - start_time); | |||
| } | |||
| return SUCCESS; | |||
| return Status::OK(); | |||
| } | |||
| TdtStatus TdtPlugin::getTdtType(DataType d_type, std::string &datatype) { | |||
| Status TdtPlugin::getTdtType(DataType d_type, aclDataType &datatype) { | |||
| switch (d_type.value()) { | |||
| case DataType::DE_BOOL: | |||
| datatype = "bool"; | |||
| datatype = ACL_BOOL; | |||
| break; | |||
| case DataType::DE_INT8: | |||
| datatype = "int8"; | |||
| datatype = ACL_INT8; | |||
| break; | |||
| case DataType::DE_UINT8: | |||
| datatype = "uint8"; | |||
| datatype = ACL_UINT8; | |||
| break; | |||
| case DataType::DE_INT16: | |||
| datatype = "int16"; | |||
| datatype = ACL_INT16; | |||
| break; | |||
| case DataType::DE_UINT16: | |||
| datatype = "uint16"; | |||
| datatype = ACL_UINT16; | |||
| break; | |||
| case DataType::DE_INT32: | |||
| datatype = "int32"; | |||
| datatype = ACL_INT32; | |||
| break; | |||
| case DataType::DE_UINT32: | |||
| datatype = "uint32"; | |||
| datatype = ACL_UINT32; | |||
| break; | |||
| case DataType::DE_FLOAT16: | |||
| datatype = "float16"; | |||
| datatype = ACL_FLOAT16; | |||
| break; | |||
| case DataType::DE_FLOAT32: | |||
| datatype = "float32"; | |||
| datatype = ACL_FLOAT; | |||
| break; | |||
| case DataType::DE_FLOAT64: | |||
| datatype = "float64"; | |||
| datatype = ACL_DOUBLE; | |||
| break; | |||
| case DataType::DE_INT64: | |||
| datatype = "int64"; | |||
| datatype = ACL_INT64; | |||
| break; | |||
| case DataType::DE_UINT64: | |||
| datatype = "uint64"; | |||
| datatype = ACL_UINT64; | |||
| break; | |||
| default: | |||
| return FAILED; | |||
| RETURN_STATUS_UNEXPECTED("Invalid data, got unexpected data type."); | |||
| } | |||
| return SUCCESS; | |||
| return Status::OK(); | |||
| } | |||
| TdtStatus TdtPlugin::translate(const TensorRow &ts_row, std::vector<DataItem> &items) { | |||
| if (ts_row.size() == 0) { | |||
| MS_LOG(ERROR) << "TDT the size of row is zero."; | |||
| return SUCCESS; | |||
| Status TdtPlugin::translate(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset **output_acl_dataset) { | |||
| auto acl_dataset = acltdtCreateDataset(); | |||
| if (acl_dataset == nullptr) { | |||
| RETURN_STATUS_UNEXPECTED("Create tdt dataset failed."); | |||
| } | |||
| for (auto ts : ts_row) { | |||
| std::string datatype; | |||
| TdtStatus status = getTdtType(ts->type(), datatype); | |||
| if (status != SUCCESS) { | |||
| return status; | |||
| auto status = AssembleTensor2AclDataset(tdt_type, ts_row, acl_dataset); | |||
| if (status != Status::OK()) { | |||
| DestroyAclDataset(acl_dataset); | |||
| RETURN_STATUS_UNEXPECTED("Assemble tensor row to tdt dataset failed."); | |||
| } | |||
| *output_acl_dataset = acl_dataset; | |||
| return Status::OK(); | |||
| } | |||
| Status TdtPlugin::AssembleTensor2AclDataset(acltdtTensorType tdt_type, const TensorRow &ts_row, | |||
| acltdtDataset *acl_dataset) { | |||
| if (tdt_type != ACL_TENSOR_DATA_TENSOR || ts_row.size() == 0) { | |||
| acltdtDataItem *acl_data = acltdtCreateDataItem(tdt_type, nullptr, 0, ACL_BOOL, nullptr, 0); | |||
| if (acl_data == nullptr) { | |||
| RETURN_STATUS_UNEXPECTED("Create data item failed when send data with type:" + std::to_string(tdt_type)); | |||
| } | |||
| if (acltdtAddDataItem(acl_dataset, acl_data) != ACL_SUCCESS) { | |||
| if (acltdtDestroyDataItem(acl_data) != ACL_SUCCESS) { | |||
| MS_LOG(ERROR) << "Destroy data item failed when send data with type: " << tdt_type; | |||
| } | |||
| RETURN_STATUS_UNEXPECTED("Add data item to tdt dataset failed when send data."); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| for (auto ts : ts_row) { | |||
| aclDataType datatype; | |||
| acltdtDataItem *acl_data = nullptr; | |||
| RETURN_IF_NOT_OK(getTdtType(ts->type(), datatype)); | |||
| TensorShape tsShape = ts->shape(); | |||
| std::string dataShapes = "["; | |||
| for (auto dim : tsShape.AsVector()) { | |||
| @@ -132,18 +162,46 @@ TdtStatus TdtPlugin::translate(const TensorRow &ts_row, std::vector<DataItem> &i | |||
| } | |||
| dataShapes.pop_back(); | |||
| (void)dataShapes.append("]"); | |||
| DataItem data_item; | |||
| data_item.dataType_ = tdt::TDT_TENSOR; | |||
| data_item.tensorShape_ = dataShapes; | |||
| data_item.tensorType_ = datatype; | |||
| data_item.dataLen_ = ts->SizeInBytes(); | |||
| data_item.dataPtr_ = | |||
| std::shared_ptr<void> dataPtr = | |||
| std::shared_ptr<void>(reinterpret_cast<uchar *>(&(*ts->begin<uint8_t>())), [](const void *elem) {}); | |||
| items.emplace_back(data_item); | |||
| size_t dataLen = ts->SizeInBytes(); | |||
| const dsize_t dims = tsShape.Rank(); | |||
| std::vector<int64_t> dataShape; | |||
| for (auto i = 0; i < dims; i++) { | |||
| dataShape.emplace_back(tsShape[i]); | |||
| } | |||
| acl_data = acltdtCreateDataItem(ACL_TENSOR_DATA_TENSOR, (tsShape.empty() ? nullptr : &dataShape[0]), dims, datatype, | |||
| dataPtr.get(), dataLen); | |||
| if (acl_data == nullptr) { | |||
| RETURN_STATUS_UNEXPECTED("Create data item failed when send data."); | |||
| } | |||
| if (acltdtAddDataItem(acl_dataset, acl_data) != ACL_SUCCESS) { | |||
| if (acltdtDestroyDataItem(acl_data) != ACL_SUCCESS) { | |||
| MS_LOG(ERROR) << "Destroy data item failed when send data with type ACL_TENSOR_DATA_TENSOR."; | |||
| } | |||
| RETURN_STATUS_UNEXPECTED("Add data item to tdt dataset failed when send data."); | |||
| } | |||
| MS_LOG(DEBUG) << "TDT data type is TDT_TENSOR, tensor type is " << datatype << ", tensor shape is " << dataShapes | |||
| << ", data length is " << ts->Size() << "."; | |||
| } | |||
| return SUCCESS; | |||
| return Status::OK(); | |||
| } | |||
| Status TdtPlugin::DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item) { | |||
| if (include_data_item) { | |||
| for (size_t i = 0; i < acltdtGetDatasetSize(acl_dataset); i++) { | |||
| if (acltdtDestroyDataItem(acltdtGetDataItem(acl_dataset, i)) != ACL_SUCCESS) { | |||
| RETURN_STATUS_UNEXPECTED("Destroy data item failed when send data."); | |||
| } | |||
| } | |||
| } | |||
| if (acltdtDestroyDataset(acl_dataset) != ACL_SUCCESS) { | |||
| RETURN_STATUS_UNEXPECTED("Destroy tdt dataset failed when send data."); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -22,33 +22,40 @@ | |||
| #include <memory> | |||
| #include <string> | |||
| #include <vector> | |||
| #include "tdt/tdt_host_interface.h" | |||
| #include "acl/acl_tdt.h" | |||
| #include "minddata/dataset/engine/tdt/tdt_handle.h" | |||
| #include "minddata/dataset/core/data_type.h" | |||
| #include "minddata/dataset/core/tensor.h" | |||
| #include "minddata/dataset/core/tensor_row.h" | |||
| #include "minddata/dataset/util/status.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| enum TdtStatus { SUCCESS, FAILED }; | |||
| using tdt::DataItem; | |||
| class TdtPlugin { | |||
| public: | |||
| static std::shared_ptr<TdtPlugin> GetInstance(); | |||
| TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time, | |||
| tdt::TdtDataType tdt_type = tdt::TDT_TENSOR); | |||
| Status hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time, | |||
| acltdtTensorType tdt_type = ACL_TENSOR_DATA_TENSOR); | |||
| TdtPlugin(const std::string &channel_name, int32_t device_id); | |||
| ~TdtPlugin(); | |||
| private: | |||
| TdtPlugin() {} | |||
| Status DestroyAclDataset(acltdtDataset *acl_dataset, bool include_data_item = true); | |||
| TdtStatus getTdtType(DataType d_type, std::string &datatype); | |||
| Status AssembleTensor2AclDataset(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset *acl_dataset); | |||
| TdtStatus translate(const TensorRow &ts_row, std::vector<DataItem> &items); | |||
| Status getTdtType(DataType d_type, aclDataType &datatype); | |||
| Status translate(acltdtTensorType tdt_type, const TensorRow &ts_row, acltdtDataset **output_acl_dataset); | |||
| void *tdt_handle_ = nullptr; | |||
| acltdtChannelHandle *acl_handle_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -152,14 +152,16 @@ class Dataset : public std::enable_shared_from_this<Dataset> { | |||
| /// of data transmission per time is 256M. | |||
| /// \param[in] queue_name Channel name (default="", create new unique name). | |||
| /// \param[in] device_type Type of device (default="", get from MSContext). | |||
| /// \param[in] device_id id of device (default=0, get from MSContext). | |||
| /// \param[in] num_epochs Number of epochs (default=-1, infinite epochs). | |||
| /// \param[in] send_epoch_end Whether to send end of sequence to device or not (default=true). | |||
| /// \param[in] total_batches Number of batches to be sent to the device (default=0, all data). | |||
| /// \param[in] create_data_info_queue Whether to create queue which stores types and shapes | |||
| /// of data or not(default=false). | |||
| /// \return Returns true if no error encountered else false. | |||
| bool DeviceQueue(std::string queue_name = "", std::string device_type = "", int32_t num_epochs = -1, | |||
| bool send_epoch_end = true, int32_t total_batches = 0, bool create_data_info_queue = false); | |||
| bool DeviceQueue(std::string queue_name = "", std::string device_type = "", int32_t device_id = 0, | |||
| int32_t num_epochs = -1, bool send_epoch_end = true, int32_t total_batches = 0, | |||
| bool create_data_info_queue = false); | |||
| /// \brief Function to create a Saver to save the dynamic data processed by the dataset pipeline | |||
| /// \note Usage restrictions: | |||
| @@ -21,8 +21,9 @@ | |||
| #include "minddata/dataset/util/services.h" | |||
| #endif | |||
| #ifdef ENABLE_TDTQUE | |||
| #include "tdt/tdt_host_interface.h" | |||
| #include "acl/acl_tdt.h" | |||
| #include "tdt/status.h" | |||
| #include "minddata/dataset/engine/tdt/tdt_handle.h" | |||
| #endif | |||
| namespace mindspore { | |||
| @@ -161,11 +162,10 @@ Status Task::Join(WaitFlag blocking) { | |||
| if (wait_times > 5 && my_name_.find("DeviceQueueOp") != std::string::npos) { | |||
| MS_LOG(WARNING) << "Wait " << wait_times << " seconds, " | |||
| << "the task: " << my_name_ << " will be destroyed by TdtHostDestory."; | |||
| int32_t destory_status = tdt::TdtHostDestroy(); | |||
| if (destory_status != TDT_OK_CODE) { | |||
| MS_LOG(WARNING) << "Destroy tsd failed, status = " << destory_status << "."; | |||
| if (!TdtHandle::DestroyHandle()) { | |||
| MS_LOG(WARNING) << "Destroy tdt channel failed."; | |||
| } else { | |||
| MS_LOG(INFO) << "Destroy tsd success."; | |||
| MS_LOG(INFO) << "Destroy tdt channel success."; | |||
| } | |||
| // just wait 30 seconds | |||
| @@ -1,5 +1,6 @@ | |||
| file(GLOB_RECURSE DEVICE_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "common/*.cc" | |||
| "kernel_info.cc" "executor/dynamic_kernel.cc" "executor/executor_callback.cc" "kernel_runtime.cc" "memory_manager.cc" "kernel_runtime_manager.cc" "convert_tensor_utils.cc" | |||
| "kernel_info.cc" "executor/dynamic_kernel.cc" "executor/executor_callback.cc" "kernel_runtime.cc" | |||
| "memory_manager.cc" "kernel_runtime_manager.cc" "convert_tensor_utils.cc" | |||
| ) | |||
| if(ENABLE_GPU) | |||
| @@ -9,7 +10,8 @@ else() | |||
| endif() | |||
| if(ENABLE_D) | |||
| file(GLOB_RECURSE D_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ascend/*.cc" "kernel_adjust.cc") | |||
| file(GLOB_RECURSE D_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "ascend/*.cc" "kernel_adjust.cc" | |||
| "../../minddata/dataset/engine/tdt/tdt_handle.cc") | |||
| endif() | |||
| if(ENABLE_CPU) | |||
| @@ -60,8 +60,8 @@ | |||
| #include "runtime/device/ascend/profiling/profiling_callback_register.h" | |||
| #include "backend/kernel_compiler/hccl/hccl_context.h" | |||
| #ifdef ENABLE_TDTQUE | |||
| #include "tdt/tdt_host_interface.h" | |||
| #include "tdt/status.h" | |||
| #include "minddata/dataset/engine/tdt/tdt_handle.h" | |||
| using mindspore::dataset::TdtHandle; | |||
| #endif | |||
| using ge::model_runner::ModelRunner; | |||
| @@ -695,11 +695,10 @@ bool AscendKernelRuntime::RunTask(const session::KernelGraph *graph) { | |||
| #ifdef ENABLE_TDTQUE | |||
| // Run task error, we should call TdtHostDestroy to release tdt to avoid DeviceQueueOp hostPush hung | |||
| // case1: cpu usage 100% cause thread/process exit, but some tdt thread remain in backend | |||
| int32_t destory_status = tdt::TdtHostDestroy(); | |||
| if (destory_status != TDT_OK_CODE) { | |||
| MS_LOG(WARNING) << "Destroy tsd failed, status = " << destory_status << "."; | |||
| if (!TdtHandle::DestroyHandle()) { | |||
| MS_LOG(WARNING) << "Destroy tdt channel failed."; | |||
| } else { | |||
| MS_LOG(INFO) << "Destroy tsd success."; | |||
| MS_LOG(INFO) << "Destroy tdt channel success."; | |||
| } | |||
| #endif | |||
| return false; | |||
| @@ -230,7 +230,7 @@ void GetGeOptions(const std::shared_ptr<MsContext> &ms_context_ptr, std::map<std | |||
| } else { | |||
| (*ge_options)["ge.exec.precision_mode"] = "allow_fp32_to_fp16"; | |||
| } | |||
| // Disable the global variable acc, only enable it whlie adding training graph in pipeline | |||
| // Disable the global variable acc, only enable it while adding training graph in pipeline | |||
| (*ge_options)["ge.exec.variable_acc"] = "0"; | |||
| #endif | |||
| } | |||
| @@ -2876,10 +2876,11 @@ class TransferDataset(Dataset): | |||
| def parse(self, children=None): | |||
| total_batch = 0 | |||
| device_id = context.get_context("device_id") | |||
| if hasattr(self.children[0], "__total_batch__"): | |||
| total_batch = self.children[0].__total_batch__ | |||
| return cde.TransferNode(children[0], self.queue_name, self.device_type, self._send_epoch_end, total_batch, | |||
| self._create_data_info_queue) | |||
| return cde.TransferNode(children[0], self.queue_name, self.device_type, device_id, self._send_epoch_end, | |||
| total_batch, self._create_data_info_queue) | |||
| def get_args(self): | |||
| args = super().get_args() | |||
| @@ -54,15 +54,20 @@ def get_tensor(is_scalar, input_type): | |||
| if __name__ == "__main__": | |||
| net = TensorPrint() | |||
| net(get_tensor('scalar', mindspore.bool_), get_tensor('scalar', mindspore.uint8), | |||
| get_tensor('scalar', mindspore.int8), get_tensor('scalar', mindspore.uint16), | |||
| get_tensor('scalar', mindspore.int16), get_tensor('scalar', mindspore.uint32), | |||
| get_tensor('scalar', mindspore.int32), get_tensor('scalar', mindspore.uint64), | |||
| get_tensor('scalar', mindspore.int64), get_tensor('scalar', mindspore.float16), | |||
| # net(get_tensor('scalar', mindspore.bool_), get_tensor('scalar', mindspore.uint8), | |||
| # get_tensor('scalar', mindspore.int8), get_tensor('scalar', mindspore.uint16), | |||
| # get_tensor('scalar', mindspore.int16), get_tensor('scalar', mindspore.uint32), | |||
| # get_tensor('scalar', mindspore.int32), get_tensor('scalar', mindspore.uint64), | |||
| # get_tensor('scalar', mindspore.int64), get_tensor('scalar', mindspore.float16), | |||
| # get_tensor('scalar', mindspore.float32), get_tensor('scalar', mindspore.float64), | |||
| # get_tensor('array', mindspore.bool_), get_tensor('array', mindspore.uint8), | |||
| # get_tensor('array', mindspore.int8), get_tensor('array', mindspore.uint16), | |||
| # get_tensor('array', mindspore.int16), get_tensor('array', mindspore.uint32), | |||
| # get_tensor('array', mindspore.int32), get_tensor('array', mindspore.uint64), | |||
| # get_tensor('array', mindspore.int64), get_tensor('array', mindspore.float16), | |||
| # get_tensor('array', mindspore.float32), get_tensor('array', mindspore.float64)) | |||
| net(get_tensor('scalar', mindspore.bool_), | |||
| get_tensor('scalar', mindspore.float32), get_tensor('scalar', mindspore.float64), | |||
| get_tensor('array', mindspore.bool_), get_tensor('array', mindspore.uint8), | |||
| get_tensor('array', mindspore.int8), get_tensor('array', mindspore.uint16), | |||
| get_tensor('array', mindspore.int16), get_tensor('array', mindspore.uint32), | |||
| get_tensor('array', mindspore.int32), get_tensor('array', mindspore.uint64), | |||
| get_tensor('array', mindspore.int64), get_tensor('array', mindspore.float16), | |||
| get_tensor('array', mindspore.bool_), | |||
| get_tensor('array', mindspore.float32), get_tensor('array', mindspore.float64)) | |||