From: @hfarahat Reviewed-by: @robingrosman,@pandoublefeng Signed-off-by: @robingrosmanpull/14930/MERGE
| @@ -130,5 +130,30 @@ Status PullIterator::BuildAndLaunchTree(std::shared_ptr<Dataset> ds) { | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| Iterator::_Iterator::_Iterator(Iterator *lt) : lt_{lt}, cur_row_{nullptr} { | |||||
| if (lt_) { | |||||
| cur_row_ = new MSTensorMap(); | |||||
| Status rc = lt_->GetNextRow(cur_row_); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error getting next row. Message: " << rc; | |||||
| cur_row_ = nullptr; | |||||
| } | |||||
| } | |||||
| } | |||||
| Iterator::_Iterator &Iterator::_Iterator::operator++() { | |||||
| if (lt_) { | |||||
| ++ind_; | |||||
| Status rc = lt_->GetNextRow(cur_row_); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error getting next row. Message: " << rc; | |||||
| cur_row_ = nullptr; | |||||
| } | |||||
| } | |||||
| if (cur_row_ && cur_row_->size() == 0) { | |||||
| delete cur_row_; | |||||
| cur_row_ = nullptr; | |||||
| } | |||||
| return *this; | |||||
| } | |||||
| } // namespace dataset | } // namespace dataset | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| @@ -34,10 +34,10 @@ PYBIND_REGISTER(Execute, 0, ([](const py::module *m) { | |||||
| auto ms_tensor = mindspore::MSTensor(std::make_shared<DETensor>(de_tensor)); | auto ms_tensor = mindspore::MSTensor(std::make_shared<DETensor>(de_tensor)); | ||||
| THROW_IF_ERROR(self(ms_tensor, &ms_tensor)); | THROW_IF_ERROR(self(ms_tensor, &ms_tensor)); | ||||
| std::shared_ptr<dataset::Tensor> de_output_tensor; | std::shared_ptr<dataset::Tensor> de_output_tensor; | ||||
| dataset::Tensor::CreateFromMemory(dataset::TensorShape(ms_tensor.Shape()), | |||||
| MSTypeToDEType(static_cast<TypeId>(ms_tensor.DataType())), | |||||
| (const uchar *)(ms_tensor.Data().get()), | |||||
| ms_tensor.DataSize(), &de_output_tensor); | |||||
| THROW_IF_ERROR(dataset::Tensor::CreateFromMemory( | |||||
| dataset::TensorShape(ms_tensor.Shape()), | |||||
| MSTypeToDEType(static_cast<TypeId>(ms_tensor.DataType())), | |||||
| (const uchar *)(ms_tensor.Data().get()), ms_tensor.DataSize(), &de_output_tensor)); | |||||
| return de_output_tensor; | return de_output_tensor; | ||||
| }) | }) | ||||
| .def("__call__", [](Execute &self, const std::vector<std::shared_ptr<Tensor>> &input_tensor_list) { | .def("__call__", [](Execute &self, const std::vector<std::shared_ptr<Tensor>> &input_tensor_list) { | ||||
| @@ -51,9 +51,9 @@ PYBIND_REGISTER(Execute, 0, ([](const py::module *m) { | |||||
| std::vector<std::shared_ptr<dataset::Tensor>> de_output_tensor_list; | std::vector<std::shared_ptr<dataset::Tensor>> de_output_tensor_list; | ||||
| for (auto &tensor : ms_output_tensor_list) { | for (auto &tensor : ms_output_tensor_list) { | ||||
| std::shared_ptr<dataset::Tensor> de_output_tensor; | std::shared_ptr<dataset::Tensor> de_output_tensor; | ||||
| dataset::Tensor::CreateFromMemory( | |||||
| THROW_IF_ERROR(dataset::Tensor::CreateFromMemory( | |||||
| dataset::TensorShape(tensor.Shape()), MSTypeToDEType(static_cast<TypeId>(tensor.DataType())), | dataset::TensorShape(tensor.Shape()), MSTypeToDEType(static_cast<TypeId>(tensor.DataType())), | ||||
| (const uchar *)(tensor.Data().get()), tensor.DataSize(), &de_output_tensor); | |||||
| (const uchar *)(tensor.Data().get()), tensor.DataSize(), &de_output_tensor)); | |||||
| de_output_tensor_list.emplace_back(std::move(de_output_tensor)); | de_output_tensor_list.emplace_back(std::move(de_output_tensor)); | ||||
| } | } | ||||
| return de_output_tensor_list; | return de_output_tensor_list; | ||||
| @@ -264,7 +264,7 @@ Status toPadInfo(py::dict value, std::map<std::string, std::pair<TensorShape, st | |||||
| CHECK_FAIL_RETURN_UNEXPECTED( | CHECK_FAIL_RETURN_UNEXPECTED( | ||||
| Tensor::CreateEmpty(TensorShape::CreateScalar(), DataType(DataType::DE_FLOAT32), &pad_val), | Tensor::CreateEmpty(TensorShape::CreateScalar(), DataType(DataType::DE_FLOAT32), &pad_val), | ||||
| "Cannot create pad_value Tensor"); | "Cannot create pad_value Tensor"); | ||||
| pad_val->SetItemAt<float>({}, pad_val_float); | |||||
| RETURN_IF_NOT_OK(pad_val->SetItemAt<float>({}, pad_val_float)); | |||||
| } | } | ||||
| (void)pad_info->insert({toString(p.first), {shape, pad_val}}); | (void)pad_info->insert({toString(p.first), {shape, pad_val}}); | ||||
| } else { // tuple is None | } else { // tuple is None | ||||
| @@ -73,8 +73,16 @@ Concatenate::Concatenate(int8_t axis, MSTensor prepend, MSTensor append) | |||||
| std::shared_ptr<TensorOperation> Concatenate::Parse() { | std::shared_ptr<TensorOperation> Concatenate::Parse() { | ||||
| std::shared_ptr<Tensor> out_prepend, out_append; | std::shared_ptr<Tensor> out_prepend, out_append; | ||||
| Tensor::CreateFromMSTensor(data_->prepend_, &out_prepend); | |||||
| Tensor::CreateFromMSTensor(data_->append_, &out_append); | |||||
| Status rc = Tensor::CreateFromMSTensor(data_->prepend_, &out_prepend); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error creating prepend constant tensor. " << rc; | |||||
| return nullptr; | |||||
| } | |||||
| rc = Tensor::CreateFromMSTensor(data_->append_, &out_append); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error creating append constant tensor. " << rc; | |||||
| return nullptr; | |||||
| } | |||||
| return std::make_shared<ConcatenateOperation>(data_->axis_, out_prepend, out_append); | return std::make_shared<ConcatenateOperation>(data_->axis_, out_prepend, out_append); | ||||
| } | } | ||||
| #endif // not ENABLE_ANDROID | #endif // not ENABLE_ANDROID | ||||
| @@ -95,7 +103,11 @@ Fill::Fill(MSTensor fill_value) : data_(std::make_shared<Data>(fill_value)) {} | |||||
| std::shared_ptr<TensorOperation> Fill::Parse() { | std::shared_ptr<TensorOperation> Fill::Parse() { | ||||
| std::shared_ptr<Tensor> out_fill_value; | std::shared_ptr<Tensor> out_fill_value; | ||||
| Tensor::CreateFromMSTensor(data_->fill_value_, &out_fill_value); | |||||
| Status rc = Tensor::CreateFromMSTensor(data_->fill_value_, &out_fill_value); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error creating fill value tensor. " << rc; | |||||
| return nullptr; | |||||
| } | |||||
| return std::make_shared<FillOperation>(out_fill_value); | return std::make_shared<FillOperation>(out_fill_value); | ||||
| } | } | ||||
| @@ -113,7 +125,12 @@ Mask::Mask(RelationalOp op, MSTensor constant, mindspore::DataType ms_type) | |||||
| std::shared_ptr<TensorOperation> Mask::Parse() { | std::shared_ptr<TensorOperation> Mask::Parse() { | ||||
| std::shared_ptr<Tensor> out_constant; | std::shared_ptr<Tensor> out_constant; | ||||
| Tensor::CreateFromMSTensor(data_->constant_, &out_constant); | |||||
| Status rc = Tensor::CreateFromMSTensor(data_->constant_, &out_constant); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error creating constant tensor. " << rc; | |||||
| return nullptr; | |||||
| } | |||||
| DataType de_type = dataset::MSTypeToDEType(static_cast<TypeId>(data_->ms_type_)); | DataType de_type = dataset::MSTypeToDEType(static_cast<TypeId>(data_->ms_type_)); | ||||
| return std::make_shared<MaskOperation>(data_->op_, out_constant, de_type); | return std::make_shared<MaskOperation>(data_->op_, out_constant, de_type); | ||||
| } | } | ||||
| @@ -143,7 +160,11 @@ PadEnd::PadEnd(const std::vector<dsize_t> &pad_shape, MSTensor pad_value) | |||||
| std::shared_ptr<TensorOperation> PadEnd::Parse() { | std::shared_ptr<TensorOperation> PadEnd::Parse() { | ||||
| std::shared_ptr<Tensor> pad_value; | std::shared_ptr<Tensor> pad_value; | ||||
| Tensor::CreateFromMSTensor(data_->pad_value_, &pad_value); | |||||
| Status rc = Tensor::CreateFromMSTensor(data_->pad_value_, &pad_value); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Error creating value constant tensor. " << rc; | |||||
| return nullptr; | |||||
| } | |||||
| return std::make_shared<PadEndOperation>(TensorShape(data_->pad_shape_), pad_value); | return std::make_shared<PadEndOperation>(TensorShape(data_->pad_shape_), pad_value); | ||||
| } | } | ||||
| #endif // not ENABLE_ANDROID | #endif // not ENABLE_ANDROID | ||||
| @@ -118,8 +118,8 @@ Status DeviceTensor::SetAttributes(uint8_t *data_ptr, const uint32_t &dataSize, | |||||
| const uint32_t &widthStride, const uint32_t &height, const uint32_t &heightStride) { | const uint32_t &widthStride, const uint32_t &height, const uint32_t &heightStride) { | ||||
| device_data_ = data_ptr; | device_data_ = data_ptr; | ||||
| CHECK_FAIL_RETURN_UNEXPECTED(device_data_ != nullptr, "Fail to get the device data."); | CHECK_FAIL_RETURN_UNEXPECTED(device_data_ != nullptr, "Fail to get the device data."); | ||||
| SetSize_(dataSize); | |||||
| SetYuvStrideShape_(width, widthStride, height, heightStride); | |||||
| RETURN_IF_NOT_OK(SetSize_(dataSize)); | |||||
| RETURN_IF_NOT_OK(SetYuvStrideShape_(width, widthStride, height, heightStride)); | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -253,7 +253,7 @@ Status Tensor::CreateFromByteList(const dataengine::BytesList &bytes_list, const | |||||
| (*out)->data_end_ = (*out)->data_ + offset_arr[i]; | (*out)->data_end_ = (*out)->data_ + offset_arr[i]; | ||||
| MS_ASSERT(num_bytes == 0); | MS_ASSERT(num_bytes == 0); | ||||
| (*out)->Reshape(shape); | |||||
| RETURN_IF_NOT_OK((*out)->Reshape(shape)); | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| #endif | #endif | ||||
| @@ -398,7 +398,7 @@ void Tensor::PrintItemAt(const std::vector<dsize_t> &index, std::ostream &out) c | |||||
| case DataType::DE_STRING: { | case DataType::DE_STRING: { | ||||
| std::string_view o{""}; | std::string_view o{""}; | ||||
| GetItemAt(&o, index); | |||||
| rc = GetItemAt(&o, index); | |||||
| out << "\"" << o << "\""; | out << "\"" << o << "\""; | ||||
| break; | break; | ||||
| } | } | ||||
| @@ -683,7 +683,7 @@ Status Tensor::GetDataAsNumpy(py::array *data) { | |||||
| } else if (type_ == DataType::DE_FLOAT64) { | } else if (type_ == DataType::DE_FLOAT64) { | ||||
| *data = py::array_t<double>(shape_.AsVector(), reinterpret_cast<double *>(data_)); | *data = py::array_t<double>(shape_.AsVector(), reinterpret_cast<double *>(data_)); | ||||
| } else if (type_ == DataType::DE_STRING) { | } else if (type_ == DataType::DE_STRING) { | ||||
| GetDataAsNumpyStrings(data); | |||||
| RETURN_IF_NOT_OK(GetDataAsNumpyStrings(data)); | |||||
| } else { | } else { | ||||
| RETURN_STATUS_UNEXPECTED("Got unexpected type when returning numpy"); | RETURN_STATUS_UNEXPECTED("Got unexpected type when returning numpy"); | ||||
| } | } | ||||
| @@ -1030,7 +1030,7 @@ Status Tensor::SliceString(std::shared_ptr<Tensor> *out, const std::vector<std:: | |||||
| for (std::vector<dsize_t> index : indices) { | for (std::vector<dsize_t> index : indices) { | ||||
| std::vector<dsize_t> cur_index = HandleNegIndices(index, dim_length); | std::vector<dsize_t> cur_index = HandleNegIndices(index, dim_length); | ||||
| dsize_t cur_flat_index; | dsize_t cur_flat_index; | ||||
| shape_.ToFlatIndex(cur_index, &cur_flat_index); | |||||
| RETURN_IF_NOT_OK(shape_.ToFlatIndex(cur_index, &cur_flat_index)); | |||||
| std::string_view sv; | std::string_view sv; | ||||
| RETURN_IF_NOT_OK(GetItemAt(&sv, {cur_index})); | RETURN_IF_NOT_OK(GetItemAt(&sv, {cur_index})); | ||||
| strings.emplace_back(sv); | strings.emplace_back(sv); | ||||
| @@ -1038,6 +1038,10 @@ Status Tensor::SliceString(std::shared_ptr<Tensor> *out, const std::vector<std:: | |||||
| return CreateFromVector(strings, shape, out); | return CreateFromVector(strings, shape, out); | ||||
| } | } | ||||
| Status Tensor::CreateFromMSTensor(const MSTensor &in, TensorPtr *out) { | Status Tensor::CreateFromMSTensor(const MSTensor &in, TensorPtr *out) { | ||||
| if (in.Data().get() == nullptr) { | |||||
| *out = nullptr; | |||||
| return Status::OK(); | |||||
| } | |||||
| return Tensor::CreateFromMemory(TensorShape(in.Shape()), MSTypeToDEType(static_cast<TypeId>(in.DataType())), | return Tensor::CreateFromMemory(TensorShape(in.Shape()), MSTypeToDEType(static_cast<TypeId>(in.DataType())), | ||||
| (const uchar *)(in.Data().get()), in.DataSize(), out); | (const uchar *)(in.Data().get()), in.DataSize(), out); | ||||
| } | } | ||||
| @@ -777,7 +777,7 @@ inline Status Tensor::CreateFromVector<std::string>(const std::vector<std::strin | |||||
| // strings will be null-terminated --> need 1 extra byte per element | // strings will be null-terminated --> need 1 extra byte per element | ||||
| dsize_t num_bytes = (kOffsetSize + 1) * (*out)->shape_.NumOfElements() + kOffsetSize + total_length; | dsize_t num_bytes = (kOffsetSize + 1) * (*out)->shape_.NumOfElements() + kOffsetSize + total_length; | ||||
| (*out)->AllocateBuffer(num_bytes); | |||||
| RETURN_IF_NOT_OK((*out)->AllocateBuffer(num_bytes)); | |||||
| auto offset_arr = reinterpret_cast<offset_t *>((*out)->data_); | auto offset_arr = reinterpret_cast<offset_t *>((*out)->data_); | ||||
| uchar *buf = (*out)->GetStringsBuffer(); | uchar *buf = (*out)->GetStringsBuffer(); | ||||
| @@ -72,7 +72,8 @@ CacheClient::~CacheClient() { | |||||
| cache_miss_keys_wp_.Set(); | cache_miss_keys_wp_.Set(); | ||||
| // Manually release the async buffer because we need the comm layer. | // Manually release the async buffer because we need the comm layer. | ||||
| if (async_buffer_stream_) { | if (async_buffer_stream_) { | ||||
| async_buffer_stream_->ReleaseBuffer(); | |||||
| Status rc = async_buffer_stream_->ReleaseBuffer(); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << rc; | |||||
| } | } | ||||
| if (client_id_ != -1) { | if (client_id_ != -1) { | ||||
| try { | try { | ||||
| @@ -66,7 +66,7 @@ Status CacheClientGreeter::DoServiceStop() { | |||||
| cq_.Shutdown(); | cq_.Shutdown(); | ||||
| // Shutdown the TaskGroup. | // Shutdown the TaskGroup. | ||||
| vg_.interrupt_all(); | vg_.interrupt_all(); | ||||
| vg_.join_all(Task::WaitFlag::kNonBlocking); | |||||
| RETURN_IF_NOT_OK(vg_.join_all(Task::WaitFlag::kNonBlocking)); | |||||
| // Drain the queue. We know how many requests we send out | // Drain the queue. We know how many requests we send out | ||||
| while (!req_.empty()) { | while (!req_.empty()) { | ||||
| bool success; | bool success; | ||||
| @@ -152,7 +152,7 @@ Status CacheServerRequest::operator()(CacheServerGreeter::AsyncService *svc, grp | |||||
| if (type_ == BaseRequest::RequestType::kBatchFetchRows || type_ == BaseRequest::RequestType::kBatchCacheRows || | if (type_ == BaseRequest::RequestType::kBatchFetchRows || type_ == BaseRequest::RequestType::kBatchCacheRows || | ||||
| type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock || | type_ == BaseRequest::RequestType::kStopService || type_ == BaseRequest::RequestType::kAllocateSharedBlock || | ||||
| type_ == BaseRequest::RequestType::kFreeSharedBlock) { | type_ == BaseRequest::RequestType::kFreeSharedBlock) { | ||||
| cs.ProcessRequest(this); | |||||
| RETURN_IF_NOT_OK(cs.ProcessRequest(this)); | |||||
| // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag | // WARNING. After we call ProcessRequest, the memory of 'this' is being recycled by ReturnRequestTag | ||||
| // asynchronously. Further access of 'this' is unpredictable. | // asynchronously. Further access of 'this' is unpredictable. | ||||
| } else { | } else { | ||||
| @@ -131,7 +131,7 @@ ms::Status StartServer(int argc, char **argv) { | |||||
| if (daemonize && !rc.ToString().empty()) { | if (daemonize && !rc.ToString().empty()) { | ||||
| // If we have adjusted the number of workers provided by users, use the message queue to send the warning | // If we have adjusted the number of workers provided by users, use the message queue to send the warning | ||||
| // message if this is the child daemon. | // message if this is the child daemon. | ||||
| msg.SendStatus(rc); | |||||
| (void)msg.SendStatus(rc); | |||||
| } | } | ||||
| // If all goes well, kick off the threads. Loop forever and never return unless error. | // If all goes well, kick off the threads. Loop forever and never return unless error. | ||||
| ds::CacheServer &cs = ds::CacheServer::GetInstance(); | ds::CacheServer &cs = ds::CacheServer::GetInstance(); | ||||
| @@ -130,7 +130,7 @@ Status CacheServer::DoServiceStop() { | |||||
| // Finally wake up cache_admin if it is waiting | // Finally wake up cache_admin if it is waiting | ||||
| for (int32_t qID : shutdown_qIDs_) { | for (int32_t qID : shutdown_qIDs_) { | ||||
| SharedMessage msg(qID); | SharedMessage msg(qID); | ||||
| msg.SendStatus(Status::OK()); | |||||
| RETURN_IF_NOT_OK(msg.SendStatus(Status::OK())); | |||||
| msg.RemoveResourcesOnExit(); | msg.RemoveResourcesOnExit(); | ||||
| // Let msg goes out of scope which will destroy the queue. | // Let msg goes out of scope which will destroy the queue. | ||||
| } | } | ||||
| @@ -129,8 +129,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { | |||||
| if (tracing_ != nullptr) { | if (tracing_ != nullptr) { | ||||
| cur_batch_num_++; | cur_batch_num_++; | ||||
| tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, | |||||
| ProfilingTime::GetCurMilliSecond()); | |||||
| RETURN_IF_NOT_OK(tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, | |||||
| ProfilingTime::GetCurMilliSecond())); | |||||
| } | } | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -63,7 +63,7 @@ Status BuildSentencePieceVocabOp::operator()() { | |||||
| } | } | ||||
| // add empty tensorRow for quit | // add empty tensorRow for quit | ||||
| TensorRow empty_row = {}; | TensorRow empty_row = {}; | ||||
| sentence_queue_->EmplaceBack(empty_row); | |||||
| RETURN_IF_NOT_OK(sentence_queue_->EmplaceBack(empty_row)); | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -153,7 +153,11 @@ void BuildSentencePieceVocabOp::Next(std::string *sentence) { | |||||
| } | } | ||||
| std::string_view sentence_v; | std::string_view sentence_v; | ||||
| new_row[col_id_]->GetItemAt(&sentence_v, {}); | |||||
| ret_status_ = new_row[col_id_]->GetItemAt(&sentence_v, {}); | |||||
| if (ret_status_.IsError()) { | |||||
| read_done_ = true; | |||||
| return; | |||||
| } | |||||
| std::string st{sentence_v}; | std::string st{sentence_v}; | ||||
| *sentence = st; | *sentence = st; | ||||
| @@ -203,7 +203,7 @@ Status ConcatOp::GetNumClasses(int64_t *num_classes) { | |||||
| for (const auto &child : child_) { | for (const auto &child : child_) { | ||||
| // Choose a dataset which can get valid num_classes | // Choose a dataset which can get valid num_classes | ||||
| int64_t tmp_num_classes = -1; | int64_t tmp_num_classes = -1; | ||||
| child->GetNumClasses(&tmp_num_classes); | |||||
| RETURN_IF_NOT_OK(child->GetNumClasses(&tmp_num_classes)); | |||||
| if (tmp_num_classes > max_num_classes) { | if (tmp_num_classes > max_num_classes) { | ||||
| max_num_classes = tmp_num_classes; | max_num_classes = tmp_num_classes; | ||||
| } | } | ||||
| @@ -124,7 +124,7 @@ Status MapOp::GenerateWorkerJob(const std::unique_ptr<MapWorkerJob> *worker_job) | |||||
| if (map_job == nullptr) { | if (map_job == nullptr) { | ||||
| map_job = std::make_shared<CpuMapJob>(); | map_job = std::make_shared<CpuMapJob>(); | ||||
| } | } | ||||
| map_job->AddOperation(tfuncs_[i]); | |||||
| RETURN_IF_NOT_OK(map_job->AddOperation(tfuncs_[i])); | |||||
| // Push map_job into worker_job if one of the two conditions is true: | // Push map_job into worker_job if one of the two conditions is true: | ||||
| // 1) It is the last tensor operation in tfuncs_ | // 1) It is the last tensor operation in tfuncs_ | ||||
| @@ -144,7 +144,7 @@ Status ProjectOp::ComputeColMap() { | |||||
| } | } | ||||
| Status ProjectOp::GetNextRowPullMode(TensorRow *const row) { | Status ProjectOp::GetNextRowPullMode(TensorRow *const row) { | ||||
| ComputeColMap(); | |||||
| RETURN_IF_NOT_OK(ComputeColMap()); | |||||
| TensorRow new_row; | TensorRow new_row; | ||||
| RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row)); | RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row)); | ||||
| (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(*row), | (void)std::transform(projected_column_indices_.begin(), projected_column_indices_.end(), std::back_inserter(*row), | ||||
| @@ -49,7 +49,7 @@ Status AlbumOp::Builder::Build(std::shared_ptr<AlbumOp> *ptr) { | |||||
| RETURN_STATUS_UNEXPECTED("Invalid file, schema_file is invalid or not set: " + builder_schema_file_); | RETURN_STATUS_UNEXPECTED("Invalid file, schema_file is invalid or not set: " + builder_schema_file_); | ||||
| } else { | } else { | ||||
| MS_LOG(INFO) << "Schema file provided: " << builder_schema_file_ << "."; | MS_LOG(INFO) << "Schema file provided: " << builder_schema_file_ << "."; | ||||
| builder_schema_->LoadSchemaFile(builder_schema_file_, builder_columns_to_load_); | |||||
| RETURN_IF_NOT_OK(builder_schema_->LoadSchemaFile(builder_schema_file_, builder_columns_to_load_)); | |||||
| } | } | ||||
| *ptr = std::make_shared<AlbumOp>(builder_num_workers_, builder_dir_, builder_op_connector_size_, builder_decode_, | *ptr = std::make_shared<AlbumOp>(builder_num_workers_, builder_dir_, builder_op_connector_size_, builder_decode_, | ||||
| builder_extensions_, std::move(builder_schema_), std::move(builder_sampler_)); | builder_extensions_, std::move(builder_schema_), std::move(builder_sampler_)); | ||||
| @@ -484,7 +484,7 @@ Status AlbumOp::ComputeColMap() { | |||||
| } | } | ||||
| Status AlbumOp::GetNextRowPullMode(TensorRow *const row) { | Status AlbumOp::GetNextRowPullMode(TensorRow *const row) { | ||||
| if (image_rows_.empty()) PrescanEntry(); | |||||
| if (image_rows_.empty()) RETURN_IF_NOT_OK(PrescanEntry()); | |||||
| if (sample_ids_ == nullptr) { | if (sample_ids_ == nullptr) { | ||||
| RETURN_IF_NOT_OK(this->InitSampler()); | RETURN_IF_NOT_OK(this->InitSampler()); | ||||
| TensorRow sample_row; | TensorRow sample_row; | ||||
| @@ -289,9 +289,9 @@ Status CelebAOp::LoadTensorRow(row_id_type row_id, TensorRow *row) { | |||||
| RETURN_IF_NOT_OK(label->Zero()); | RETURN_IF_NOT_OK(label->Zero()); | ||||
| for (uint32_t index = 0; index < image_label.second.size(); index++) { | for (uint32_t index = 0; index < image_label.second.size(); index++) { | ||||
| if (image_label.second[index] == 1) { | if (image_label.second[index] == 1) { | ||||
| label->SetItemAt<uint32_t>({0, static_cast<dsize_t>(index)}, 1); | |||||
| RETURN_IF_NOT_OK(label->SetItemAt<uint32_t>({0, static_cast<dsize_t>(index)}, 1)); | |||||
| } else { | } else { | ||||
| label->SetItemAt<uint32_t>({0, static_cast<dsize_t>(index)}, 0); | |||||
| RETURN_IF_NOT_OK(label->SetItemAt<uint32_t>({0, static_cast<dsize_t>(index)}, 0)); | |||||
| } | } | ||||
| } | } | ||||
| label->Squeeze(); | label->Squeeze(); | ||||
| @@ -103,9 +103,7 @@ CsvOp::CsvParser::CsvParser(int32_t worker_id, JaggedConnector *connector, char | |||||
| total_rows_(0), | total_rows_(0), | ||||
| start_offset_(0), | start_offset_(0), | ||||
| end_offset_(std::numeric_limits<int64_t>::max()), | end_offset_(std::numeric_limits<int64_t>::max()), | ||||
| err_message_("unknown") { | |||||
| InitCsvParser(); | |||||
| } | |||||
| err_message_("unknown") {} | |||||
| void CsvOp::CsvParser::Reset() { | void CsvOp::CsvParser::Reset() { | ||||
| cur_state_ = START_OF_FILE; | cur_state_ = START_OF_FILE; | ||||
| @@ -154,15 +152,28 @@ int CsvOp::CsvParser::PutRecord(int c) { | |||||
| err_message_ = "Number of file columns does not match the default records"; | err_message_ = "Number of file columns does not match the default records"; | ||||
| return -1; | return -1; | ||||
| } | } | ||||
| Status rc; | |||||
| switch (column_default_[cur_col_]->type) { | switch (column_default_[cur_col_]->type) { | ||||
| case CsvOp::INT: | case CsvOp::INT: | ||||
| Tensor::CreateScalar(std::stoi(s), &t); | |||||
| rc = Tensor::CreateScalar(std::stoi(s), &t); | |||||
| if (rc.IsError()) { | |||||
| err_message_ = rc.ToString(); | |||||
| return -1; | |||||
| } | |||||
| break; | break; | ||||
| case CsvOp::FLOAT: | case CsvOp::FLOAT: | ||||
| Tensor::CreateScalar(std::stof(s), &t); | |||||
| rc = Tensor::CreateScalar(std::stof(s), &t); | |||||
| if (rc.IsError()) { | |||||
| err_message_ = rc.ToString(); | |||||
| return -1; | |||||
| } | |||||
| break; | break; | ||||
| default: | default: | ||||
| Tensor::CreateScalar(s, &t); | |||||
| rc = Tensor::CreateScalar(s, &t); | |||||
| if (rc.IsError()) { | |||||
| err_message_ = rc.ToString(); | |||||
| return -1; | |||||
| } | |||||
| break; | break; | ||||
| } | } | ||||
| if (cur_col_ >= cur_row_.size()) { | if (cur_col_ >= cur_row_.size()) { | ||||
| @@ -200,7 +211,11 @@ int CsvOp::CsvParser::PutRow(int c) { | |||||
| total_rows_++; | total_rows_++; | ||||
| cur_col_ = 0; | cur_col_ = 0; | ||||
| rows_connector_->Add(worker_id_, std::move(cur_row_)); | |||||
| Status s = rows_connector_->Add(worker_id_, std::move(cur_row_)); | |||||
| if (s.IsError()) { | |||||
| err_message_ = s.ToString(); | |||||
| return -1; | |||||
| } | |||||
| return 0; | return 0; | ||||
| } | } | ||||
| @@ -468,6 +483,7 @@ Status CsvOp::CsvParser::InitCsvParser() { | |||||
| Status CsvOp::LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) { | Status CsvOp::LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) { | ||||
| CsvParser csv_parser(worker_id, jagged_rows_connector_.get(), field_delim_, column_default_list_, file); | CsvParser csv_parser(worker_id, jagged_rows_connector_.get(), field_delim_, column_default_list_, file); | ||||
| RETURN_IF_NOT_OK(csv_parser.InitCsvParser()); | |||||
| csv_parser.SetStartOffset(start_offset); | csv_parser.SetStartOffset(start_offset); | ||||
| csv_parser.SetEndOffset(end_offset); | csv_parser.SetEndOffset(end_offset); | ||||
| std::ifstream ifs; | std::ifstream ifs; | ||||
| @@ -589,6 +605,11 @@ Status CsvOp::CalculateNumRowsPerShard() { | |||||
| int64_t CsvOp::CountTotalRows(const std::string &file) { | int64_t CsvOp::CountTotalRows(const std::string &file) { | ||||
| CsvParser csv_parser(0, jagged_rows_connector_.get(), field_delim_, column_default_list_, file); | CsvParser csv_parser(0, jagged_rows_connector_.get(), field_delim_, column_default_list_, file); | ||||
| Status rc = csv_parser.InitCsvParser(); | |||||
| if (rc.IsError()) { | |||||
| MS_LOG(ERROR) << "Failed to initialize CSV Parser. Error:" << rc; | |||||
| return 0; | |||||
| } | |||||
| std::ifstream ifs; | std::ifstream ifs; | ||||
| ifs.open(file, std::ifstream::in); | ifs.open(file, std::ifstream::in); | ||||
| if (!ifs.is_open()) { | if (!ifs.is_open()) { | ||||
| @@ -104,9 +104,9 @@ Status ManifestOp::LoadTensorRow(row_id_type row_id, TensorRow *trow) { | |||||
| [this](const std::string &label_name) { return label_index_[label_name]; }); | [this](const std::string &label_name) { return label_index_[label_name]; }); | ||||
| RETURN_IF_NOT_OK(Tensor::CreateFromVector(label_index, &label)); | RETURN_IF_NOT_OK(Tensor::CreateFromVector(label_index, &label)); | ||||
| if (label_index.size() == 1) { | if (label_index.size() == 1) { | ||||
| label->Reshape(TensorShape({})); | |||||
| RETURN_IF_NOT_OK(label->Reshape(TensorShape({}))); | |||||
| } else { | } else { | ||||
| label->Reshape(TensorShape(std::vector<dsize_t>(1, label_index.size()))); | |||||
| RETURN_IF_NOT_OK(label->Reshape(TensorShape(std::vector<dsize_t>(1, label_index.size())))); | |||||
| } | } | ||||
| RETURN_IF_NOT_OK(Tensor::CreateFromFile(data.first, &image)); | RETURN_IF_NOT_OK(Tensor::CreateFromFile(data.first, &image)); | ||||
| @@ -131,7 +131,8 @@ void RandomDataOp::GenerateSchema() { | |||||
| std::string colName = "c" + std::to_string(i); | std::string colName = "c" + std::to_string(i); | ||||
| newCol = std::make_unique<ColDescriptor>(colName, DataType(newType), TensorImpl::kFlexible, rank, newShape.get()); | newCol = std::make_unique<ColDescriptor>(colName, DataType(newType), TensorImpl::kFlexible, rank, newShape.get()); | ||||
| data_schema_->AddColumn(*newCol); | |||||
| Status rc = data_schema_->AddColumn(*newCol); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Failed to generate a schema. Message:" << rc; | |||||
| } | } | ||||
| } | } | ||||
| @@ -197,7 +198,7 @@ Status RandomDataOp::EpochSync(int32_t worker_id, bool *quitting) { | |||||
| // Sync on the guys_in counter | // Sync on the guys_in counter | ||||
| // We have to wait the last guy is out. | // We have to wait the last guy is out. | ||||
| all_out_.Wait(); | |||||
| RETURN_IF_NOT_OK(all_out_.Wait()); | |||||
| // If we are not in a repeat loop, or that was the last repeat already, then setup our exit | // If we are not in a repeat loop, or that was the last repeat already, then setup our exit | ||||
| // condition from the master loop. | // condition from the master loop. | ||||
| if (IsLastIteration()) { | if (IsLastIteration()) { | ||||
| @@ -40,7 +40,7 @@ Status PythonSamplerRT::GetNextSample(TensorRow *out) { | |||||
| try { | try { | ||||
| py::object py_ret = py_sampler_instance.attr("_get_indices")(); | py::object py_ret = py_sampler_instance.attr("_get_indices")(); | ||||
| py::array np_sample_ids = py_ret.cast<py::array>(); | py::array np_sample_ids = py_ret.cast<py::array>(); | ||||
| Tensor::CreateFromNpArray(np_sample_ids, &sample_ids); // copy numpy to tensor | |||||
| RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(np_sample_ids, &sample_ids)); // copy numpy to tensor | |||||
| if (HasChildSampler()) { | if (HasChildSampler()) { | ||||
| for (auto it = sample_ids->begin<int64_t>(); it != sample_ids->end<int64_t>(); ++it) { | for (auto it = sample_ids->begin<int64_t>(); it != sample_ids->end<int64_t>(); ++it) { | ||||
| @@ -79,7 +79,7 @@ Status GraphDataClient::Init() { | |||||
| Status GraphDataClient::Stop() { | Status GraphDataClient::Stop() { | ||||
| #if !defined(_WIN32) && !defined(_WIN64) | #if !defined(_WIN32) && !defined(_WIN64) | ||||
| if (registered_) { | if (registered_) { | ||||
| UnRegisterToServer(); | |||||
| RETURN_IF_NOT_OK(UnRegisterToServer()); | |||||
| } | } | ||||
| #endif | #endif | ||||
| return Status::OK(); | return Status::OK(); | ||||
| @@ -691,7 +691,7 @@ Status GraphDataImpl::RandomWalkBase::GetNodeProbability(const NodeIdType &node_ | |||||
| std::shared_ptr<StochasticIndex> *node_probability) { | std::shared_ptr<StochasticIndex> *node_probability) { | ||||
| // Generate alias nodes | // Generate alias nodes | ||||
| std::shared_ptr<Node> node; | std::shared_ptr<Node> node; | ||||
| graph_->GetNodeByNodeId(node_id, &node); | |||||
| RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(node_id, &node)); | |||||
| std::vector<NodeIdType> neighbors; | std::vector<NodeIdType> neighbors; | ||||
| RETURN_IF_NOT_OK(node->GetAllNeighbors(node_type, &neighbors, true)); | RETURN_IF_NOT_OK(node->GetAllNeighbors(node_type, &neighbors, true)); | ||||
| std::sort(neighbors.begin(), neighbors.end()); | std::sort(neighbors.begin(), neighbors.end()); | ||||
| @@ -706,12 +706,12 @@ Status GraphDataImpl::RandomWalkBase::GetEdgeProbability(const NodeIdType &src, | |||||
| std::shared_ptr<StochasticIndex> *edge_probability) { | std::shared_ptr<StochasticIndex> *edge_probability) { | ||||
| // Get the alias edge setup lists for a given edge. | // Get the alias edge setup lists for a given edge. | ||||
| std::shared_ptr<Node> src_node; | std::shared_ptr<Node> src_node; | ||||
| graph_->GetNodeByNodeId(src, &src_node); | |||||
| RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(src, &src_node)); | |||||
| std::vector<NodeIdType> src_neighbors; | std::vector<NodeIdType> src_neighbors; | ||||
| RETURN_IF_NOT_OK(src_node->GetAllNeighbors(meta_path_[meta_path_index], &src_neighbors, true)); | RETURN_IF_NOT_OK(src_node->GetAllNeighbors(meta_path_[meta_path_index], &src_neighbors, true)); | ||||
| std::shared_ptr<Node> dst_node; | std::shared_ptr<Node> dst_node; | ||||
| graph_->GetNodeByNodeId(dst, &dst_node); | |||||
| RETURN_IF_NOT_OK(graph_->GetNodeByNodeId(dst, &dst_node)); | |||||
| std::vector<NodeIdType> dst_neighbors; | std::vector<NodeIdType> dst_neighbors; | ||||
| RETURN_IF_NOT_OK(dst_node->GetAllNeighbors(meta_path_[meta_path_index + 1], &dst_neighbors, true)); | RETURN_IF_NOT_OK(dst_node->GetAllNeighbors(meta_path_[meta_path_index + 1], &dst_neighbors, true)); | ||||
| @@ -127,7 +127,7 @@ class CallData : public UntypedCall { | |||||
| status_ = STATE::PROCESS; | status_ = STATE::PROCESS; | ||||
| (async_service_->*enqueue_function_)(&ctx_, &request_, &responder_, cq_, cq_, this); | (async_service_->*enqueue_function_)(&ctx_, &request_, &responder_, cq_, cq_, this); | ||||
| } else if (status_ == STATE::PROCESS) { | } else if (status_ == STATE::PROCESS) { | ||||
| EnqueueRequest(service_impl_, async_service_, cq_, enqueue_function_, handle_request_function_); | |||||
| RETURN_IF_NOT_OK(EnqueueRequest(service_impl_, async_service_, cq_, enqueue_function_, handle_request_function_)); | |||||
| status_ = STATE::FINISH; | status_ = STATE::FINISH; | ||||
| grpc::Status s = (service_impl_->*handle_request_function_)(&ctx_, &request_, &response_); | grpc::Status s = (service_impl_->*handle_request_function_)(&ctx_, &request_, &response_); | ||||
| responder_.Finish(response_, s, this); | responder_.Finish(response_, s, this); | ||||
| @@ -130,7 +130,7 @@ Status GraphLoader::InitAndLoad() { | |||||
| RETURN_IF_NOT_OK(vg.CreateAsyncTask("GraphLoader", std::bind(&GraphLoader::WorkerEntry, this, wkr_id))); | RETURN_IF_NOT_OK(vg.CreateAsyncTask("GraphLoader", std::bind(&GraphLoader::WorkerEntry, this, wkr_id))); | ||||
| } | } | ||||
| // wait for threads to finish and check its return code | // wait for threads to finish and check its return code | ||||
| vg.join_all(Task::WaitFlag::kBlocking); | |||||
| RETURN_IF_NOT_OK(vg.join_all(Task::WaitFlag::kBlocking)); | |||||
| RETURN_IF_NOT_OK(vg.GetTaskErrorIfAny()); | RETURN_IF_NOT_OK(vg.GetTaskErrorIfAny()); | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -86,7 +86,7 @@ Status ConcatNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &size | |||||
| int64_t child_dataset_size = 0; | int64_t child_dataset_size = 0; | ||||
| for (int idx = 0; idx < children_.size(); idx++) { | for (int idx = 0; idx < children_.size(); idx++) { | ||||
| if (children_flag_and_nums_.empty() || children_flag_and_nums_[idx].second == 0) { | if (children_flag_and_nums_.empty() || children_flag_and_nums_[idx].second == 0) { | ||||
| children_[idx]->GetDatasetSize(size_getter, false, &child_dataset_size); | |||||
| RETURN_IF_NOT_OK(children_[idx]->GetDatasetSize(size_getter, false, &child_dataset_size)); | |||||
| total_dataset_size += child_dataset_size; | total_dataset_size += child_dataset_size; | ||||
| } else { | } else { | ||||
| total_dataset_size += children_flag_and_nums_[idx].second; | total_dataset_size += children_flag_and_nums_[idx].second; | ||||
| @@ -100,8 +100,8 @@ Status ConcatNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &size | |||||
| std::shared_ptr<DistributedSamplerRT> sampler_rt = | std::shared_ptr<DistributedSamplerRT> sampler_rt = | ||||
| sampler_ ? std::dynamic_pointer_cast<DistributedSamplerRT>(sampler_rt_base) : nullptr; | sampler_ ? std::dynamic_pointer_cast<DistributedSamplerRT>(sampler_rt_base) : nullptr; | ||||
| if (sampler_rt != nullptr) { | if (sampler_rt != nullptr) { | ||||
| sampler_rt->SetNumRowsInDataset(total_dataset_size); | |||||
| sampler_rt->InitSampler(); | |||||
| RETURN_IF_NOT_OK(sampler_rt->SetNumRowsInDataset(total_dataset_size)); | |||||
| RETURN_IF_NOT_OK(sampler_rt->InitSampler()); | |||||
| // (total_size % num_shards != 0) & shard_id >= (remainder) ? CalculateNumSamples()-1 : CalculateNumSamples() | // (total_size % num_shards != 0) & shard_id >= (remainder) ? CalculateNumSamples()-1 : CalculateNumSamples() | ||||
| // example: 23 rows, 10 shards --> shard sizes = {3,3,3,2,2,2,2,2,2,2} | // example: 23 rows, 10 shards --> shard sizes = {3,3,3,2,2,2,2,2,2,2} | ||||
| @@ -100,9 +100,9 @@ Status RandomNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops | |||||
| if (!schema_file_path.empty() || !schema_json_string.empty()) { | if (!schema_file_path.empty() || !schema_json_string.empty()) { | ||||
| data_schema_ = std::make_unique<DataSchema>(); | data_schema_ = std::make_unique<DataSchema>(); | ||||
| if (!schema_file_path.empty()) { | if (!schema_file_path.empty()) { | ||||
| data_schema_->LoadSchemaFile(schema_file_path, columns_to_load); | |||||
| RETURN_IF_NOT_OK(data_schema_->LoadSchemaFile(schema_file_path, columns_to_load)); | |||||
| } else if (!schema_json_string.empty()) { | } else if (!schema_json_string.empty()) { | ||||
| data_schema_->LoadSchemaString(schema_json_string, columns_to_load); | |||||
| RETURN_IF_NOT_OK(data_schema_->LoadSchemaString(schema_json_string, columns_to_load)); | |||||
| } | } | ||||
| } | } | ||||
| @@ -153,6 +153,15 @@ Status DistributedSamplerObj::to_json(nlohmann::json *const out_json) { | |||||
| *out_json = args; | *out_json = args; | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| std::shared_ptr<SamplerObj> DistributedSamplerObj::SamplerCopy() { | |||||
| auto sampler = | |||||
| std::make_shared<DistributedSamplerObj>(num_shards_, shard_id_, shuffle_, num_samples_, seed_, offset_, even_dist_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| // PKSampler | // PKSampler | ||||
| PKSamplerObj::PKSamplerObj(int64_t num_val, bool shuffle, int64_t num_samples) | PKSamplerObj::PKSamplerObj(int64_t num_val, bool shuffle, int64_t num_samples) | ||||
| @@ -212,6 +221,15 @@ std::shared_ptr<mindrecord::ShardOperator> PKSamplerObj::BuildForMindDataset() { | |||||
| } | } | ||||
| #endif | #endif | ||||
| std::shared_ptr<SamplerObj> PKSamplerObj::SamplerCopy() { | |||||
| auto sampler = std::make_shared<PKSamplerObj>(num_val_, shuffle_, num_samples_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| // PreBuiltOperation | // PreBuiltOperation | ||||
| PreBuiltSamplerObj::PreBuiltSamplerObj(std::shared_ptr<SamplerRT> sampler) : sp_(std::move(sampler)) {} | PreBuiltSamplerObj::PreBuiltSamplerObj(std::shared_ptr<SamplerRT> sampler) : sp_(std::move(sampler)) {} | ||||
| @@ -239,15 +257,17 @@ std::shared_ptr<SamplerObj> PreBuiltSamplerObj::SamplerCopy() { | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| if (sp_minddataset_ != nullptr) { | if (sp_minddataset_ != nullptr) { | ||||
| auto sampler = std::make_shared<PreBuiltSamplerObj>(sp_minddataset_); | auto sampler = std::make_shared<PreBuiltSamplerObj>(sp_minddataset_); | ||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | } | ||||
| return sampler; | return sampler; | ||||
| } | } | ||||
| #endif | #endif | ||||
| auto sampler = std::make_shared<PreBuiltSamplerObj>(sp_); | auto sampler = std::make_shared<PreBuiltSamplerObj>(sp_); | ||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | } | ||||
| return sampler; | return sampler; | ||||
| } | } | ||||
| @@ -306,6 +326,15 @@ std::shared_ptr<mindrecord::ShardOperator> RandomSamplerObj::BuildForMindDataset | |||||
| } | } | ||||
| #endif | #endif | ||||
| std::shared_ptr<SamplerObj> RandomSamplerObj::SamplerCopy() { | |||||
| auto sampler = std::make_shared<RandomSamplerObj>(replacement_, num_samples_, reshuffle_each_epoch_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| // SequentialSampler | // SequentialSampler | ||||
| SequentialSamplerObj::SequentialSamplerObj(int64_t start_index, int64_t num_samples) | SequentialSamplerObj::SequentialSamplerObj(int64_t start_index, int64_t num_samples) | ||||
| : start_index_(start_index), num_samples_(num_samples) {} | : start_index_(start_index), num_samples_(num_samples) {} | ||||
| @@ -358,7 +387,14 @@ std::shared_ptr<mindrecord::ShardOperator> SequentialSamplerObj::BuildForMindDat | |||||
| return mind_sampler; | return mind_sampler; | ||||
| } | } | ||||
| #endif | #endif | ||||
| std::shared_ptr<SamplerObj> SequentialSamplerObj::SamplerCopy() { | |||||
| auto sampler = std::make_shared<SequentialSamplerObj>(start_index_, num_samples_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| // SubsetSampler | // SubsetSampler | ||||
| SubsetSamplerObj::SubsetSamplerObj(std::vector<int64_t> indices, int64_t num_samples) | SubsetSamplerObj::SubsetSamplerObj(std::vector<int64_t> indices, int64_t num_samples) | ||||
| : indices_(std::move(indices)), num_samples_(num_samples) {} | : indices_(std::move(indices)), num_samples_(num_samples) {} | ||||
| @@ -405,6 +441,14 @@ Status SubsetSamplerObj::to_json(nlohmann::json *const out_json) { | |||||
| *out_json = args; | *out_json = args; | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| std::shared_ptr<SamplerObj> SubsetSamplerObj::SamplerCopy() { | |||||
| auto sampler = std::make_shared<SubsetSamplerObj>(indices_, num_samples_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| // SubsetRandomSampler | // SubsetRandomSampler | ||||
| SubsetRandomSamplerObj::SubsetRandomSamplerObj(std::vector<int64_t> indices, int64_t num_samples) | SubsetRandomSamplerObj::SubsetRandomSamplerObj(std::vector<int64_t> indices, int64_t num_samples) | ||||
| @@ -444,6 +488,14 @@ Status SubsetRandomSamplerObj::to_json(nlohmann::json *const out_json) { | |||||
| *out_json = args; | *out_json = args; | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| std::shared_ptr<SamplerObj> SubsetRandomSamplerObj::SamplerCopy() { | |||||
| auto sampler = std::make_shared<SubsetRandomSamplerObj>(indices_, num_samples_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| // WeightedRandomSampler | // WeightedRandomSampler | ||||
| WeightedRandomSamplerObj::WeightedRandomSamplerObj(std::vector<double> weights, int64_t num_samples, bool replacement) | WeightedRandomSamplerObj::WeightedRandomSamplerObj(std::vector<double> weights, int64_t num_samples, bool replacement) | ||||
| @@ -498,6 +550,14 @@ Status WeightedRandomSamplerObj::SamplerBuild(std::shared_ptr<SamplerRT> *sample | |||||
| sampler = s.IsOk() ? sampler : nullptr; | sampler = s.IsOk() ? sampler : nullptr; | ||||
| return s; | return s; | ||||
| } | } | ||||
| std::shared_ptr<SamplerObj> WeightedRandomSamplerObj::SamplerCopy() { | |||||
| auto sampler = std::make_shared<WeightedRandomSamplerObj>(weights_, num_samples_, replacement_); | |||||
| for (const auto &child : children_) { | |||||
| Status rc = sampler->AddChildSampler(child); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error in copying the sampler. Message: " << rc; | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| } // namespace dataset | } // namespace dataset | ||||
| } // namespace mindspore | } // namespace mindspore | ||||
| @@ -95,14 +95,7 @@ class DistributedSamplerObj : public SamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<DistributedSamplerObj>(num_shards_, shard_id_, shuffle_, num_samples_, seed_, | |||||
| offset_, even_dist_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | ||||
| @@ -137,13 +130,7 @@ class PKSamplerObj : public SamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<PKSamplerObj>(num_val_, shuffle_, num_samples_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | ||||
| @@ -198,13 +185,7 @@ class RandomSamplerObj : public SamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<RandomSamplerObj>(replacement_, num_samples_, reshuffle_each_epoch_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | ||||
| @@ -231,13 +212,7 @@ class SequentialSamplerObj : public SamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<SequentialSamplerObj>(start_index_, num_samples_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | ||||
| @@ -263,13 +238,7 @@ class SubsetSamplerObj : public SamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<SubsetSamplerObj>(indices_, num_samples_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | ||||
| @@ -297,13 +266,7 @@ class SubsetRandomSamplerObj : public SubsetSamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<SubsetRandomSamplerObj>(indices_, num_samples_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| #ifndef ENABLE_ANDROID | #ifndef ENABLE_ANDROID | ||||
| std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | std::shared_ptr<mindrecord::ShardOperator> BuildForMindDataset() override; | ||||
| @@ -320,13 +283,7 @@ class WeightedRandomSamplerObj : public SamplerObj { | |||||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | ||||
| std::shared_ptr<SamplerObj> SamplerCopy() override { | |||||
| auto sampler = std::make_shared<WeightedRandomSamplerObj>(weights_, num_samples_, replacement_); | |||||
| for (auto child : children_) { | |||||
| sampler->AddChildSampler(child); | |||||
| } | |||||
| return sampler; | |||||
| } | |||||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||||
| /// \brief Get the arguments of node | /// \brief Get the arguments of node | ||||
| /// \param[out] out_json JSON string of all attributes | /// \param[out] out_json JSON string of all attributes | ||||
| @@ -341,7 +341,8 @@ Status OperatorCpu::Collect(const ExecutionTree *tree) { | |||||
| for (auto iter = op_thread.begin(); iter != op_thread.end(); iter++) { | for (auto iter = op_thread.begin(); iter != op_thread.end(); iter++) { | ||||
| int32_t op_id = iter->first; | int32_t op_id = iter->first; | ||||
| for (auto thread_id : iter->second) { | for (auto thread_id : iter->second) { | ||||
| ParseCpuInfo(op_id, thread_id, &op_stat_); | |||||
| // ignore errors in the first collect | |||||
| (void)ParseCpuInfo(op_id, thread_id, &op_stat_); | |||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| @@ -23,8 +23,8 @@ | |||||
| namespace mindspore { | namespace mindspore { | ||||
| namespace dataset { | namespace dataset { | ||||
| Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, | |||||
| const int32_t value, const uint64_t time_stamp) { | |||||
| void DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, | |||||
| const int32_t value, const uint64_t time_stamp) { | |||||
| // Format: "type extra-info batch-num value" | // Format: "type extra-info batch-num value" | ||||
| // type: 0: time, 1: connector size | // type: 0: time, 1: connector size | ||||
| // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time | // extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time | ||||
| @@ -39,7 +39,6 @@ Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, | |||||
| std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " + | std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " + | ||||
| std::to_string(value) + " " + std::to_string(time_stamp); | std::to_string(value) + " " + std::to_string(time_stamp); | ||||
| value_.emplace_back(data); | value_.emplace_back(data); | ||||
| return Status::OK(); | |||||
| } | } | ||||
| Status DeviceQueueTracing::SaveToFile() { | Status DeviceQueueTracing::SaveToFile() { | ||||
| @@ -33,8 +33,8 @@ class DeviceQueueTracing : public Tracing { | |||||
| // Record tracing data | // Record tracing data | ||||
| // @return Status The status code returned | // @return Status The status code returned | ||||
| Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value, | |||||
| const uint64_t time_stamp); | |||||
| void Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value, | |||||
| const uint64_t time_stamp); | |||||
| std::string Name() const override { return kDeviceQueueTracingName; }; | std::string Name() const override { return kDeviceQueueTracingName; }; | ||||
| @@ -36,7 +36,8 @@ Status PythonRuntimeContext::TerminateImpl() { | |||||
| } | } | ||||
| PythonRuntimeContext::~PythonRuntimeContext() { | PythonRuntimeContext::~PythonRuntimeContext() { | ||||
| TerminateImpl(); | |||||
| Status rc = Terminate(); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error while terminating the consumer. Message:" << rc; | |||||
| { | { | ||||
| py::gil_scoped_acquire gil_acquire; | py::gil_scoped_acquire gil_acquire; | ||||
| tree_consumer_.reset(); | tree_consumer_.reset(); | ||||
| @@ -36,7 +36,10 @@ Status NativeRuntimeContext::TerminateImpl() { | |||||
| return tree_consumer_->Terminate(); | return tree_consumer_->Terminate(); | ||||
| } | } | ||||
| NativeRuntimeContext::~NativeRuntimeContext() { TerminateImpl(); } | |||||
| NativeRuntimeContext::~NativeRuntimeContext() { | |||||
| Status rc = Terminate(); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error while terminating the consumer. Message:" << rc; | |||||
| } | |||||
| TreeConsumer *RuntimeContext::GetConsumer() { return tree_consumer_.get(); } | TreeConsumer *RuntimeContext::GetConsumer() { return tree_consumer_.get(); } | ||||
| @@ -87,12 +87,7 @@ class Iterator { | |||||
| class _Iterator { | class _Iterator { | ||||
| public: | public: | ||||
| explicit _Iterator(Iterator *lt) : lt_{lt}, cur_row_{nullptr} { | |||||
| if (lt_) { | |||||
| cur_row_ = new MSTensorMap(); | |||||
| lt_->GetNextRow(cur_row_); | |||||
| } | |||||
| } | |||||
| explicit _Iterator(Iterator *lt); | |||||
| // Destructor | // Destructor | ||||
| ~_Iterator() { | ~_Iterator() { | ||||
| @@ -101,17 +96,7 @@ class Iterator { | |||||
| } | } | ||||
| } | } | ||||
| _Iterator &operator++() { | |||||
| if (lt_) { | |||||
| ++ind_; | |||||
| lt_->GetNextRow(cur_row_); | |||||
| } | |||||
| if (cur_row_ && cur_row_->size() == 0) { | |||||
| delete cur_row_; | |||||
| cur_row_ = nullptr; | |||||
| } | |||||
| return *this; | |||||
| } // prefix ++ overload | |||||
| _Iterator &operator++(); // prefix ++ overload | |||||
| MSTensorMap &operator*() { return *cur_row_; } // dereference operator | MSTensorMap &operator*() { return *cur_row_; } // dereference operator | ||||
| MSTensorMap *operator->() { return cur_row_; } | MSTensorMap *operator->() { return cur_row_; } | ||||
| @@ -125,73 +125,73 @@ Status FillHelper(const std::shared_ptr<Tensor> input, std::shared_ptr<Tensor> * | |||||
| case DataType::DE_BOOL: { | case DataType::DE_BOOL: { | ||||
| bool value = 0; | bool value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<bool>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<bool>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_INT8: { | case DataType::DE_INT8: { | ||||
| int8_t value = 0; | int8_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<int8_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<int8_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_UINT8: { | case DataType::DE_UINT8: { | ||||
| uint8_t value = 0; | uint8_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<uint8_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<uint8_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_UINT16: { | case DataType::DE_UINT16: { | ||||
| uint16_t value = 0; | uint16_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<uint16_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<uint16_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_INT16: { | case DataType::DE_INT16: { | ||||
| int16_t value = 0; | int16_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<int16_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<int16_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_UINT32: { | case DataType::DE_UINT32: { | ||||
| uint32_t value = 0; | uint32_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<uint32_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<uint32_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_INT32: { | case DataType::DE_INT32: { | ||||
| int32_t value = 0; | int32_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<int32_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<int32_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_UINT64: { | case DataType::DE_UINT64: { | ||||
| uint64_t value = 0; | uint64_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<uint64_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<uint64_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_INT64: { | case DataType::DE_INT64: { | ||||
| int64_t value = 0; | int64_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<int64_t>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<int64_t>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_FLOAT16: { | case DataType::DE_FLOAT16: { | ||||
| int64_t value = 0; | int64_t value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<float>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<float>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_FLOAT32: { | case DataType::DE_FLOAT32: { | ||||
| float value = 0; | float value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<float>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<float>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_FLOAT64: { | case DataType::DE_FLOAT64: { | ||||
| double value = 0; | double value = 0; | ||||
| RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | RETURN_IF_NOT_OK(fill_output->GetItemAt(&value, {})); | ||||
| (*out)->Fill<double>(value); | |||||
| RETURN_IF_NOT_OK((*out)->Fill<double>(value)); | |||||
| break; | break; | ||||
| } | } | ||||
| case DataType::DE_STRING: { | case DataType::DE_STRING: { | ||||
| @@ -50,11 +50,11 @@ Status AffineOp::Compute(const std::shared_ptr<Tensor> &input, std::shared_ptr<T | |||||
| float_t translation_x = translation_[0]; | float_t translation_x = translation_[0]; | ||||
| float_t translation_y = translation_[1]; | float_t translation_y = translation_[1]; | ||||
| float_t degrees = 0.0; | float_t degrees = 0.0; | ||||
| DegreesToRadians(degrees_, °rees); | |||||
| RETURN_IF_NOT_OK(DegreesToRadians(degrees_, °rees)); | |||||
| float_t shear_x = shear_[0]; | float_t shear_x = shear_[0]; | ||||
| float_t shear_y = shear_[1]; | float_t shear_y = shear_[1]; | ||||
| DegreesToRadians(shear_x, &shear_x); | |||||
| DegreesToRadians(-1 * shear_y, &shear_y); | |||||
| RETURN_IF_NOT_OK(DegreesToRadians(shear_x, &shear_x)); | |||||
| RETURN_IF_NOT_OK(DegreesToRadians(-1 * shear_y, &shear_y)); | |||||
| // Apply Affine Transformation | // Apply Affine Transformation | ||||
| // T is translation matrix: [1, 0, tx | 0, 1, ty | 0, 0, 1] | // T is translation matrix: [1, 0, tx | 0, 1, ty | 0, 0, 1] | ||||
| @@ -185,7 +185,7 @@ Status BoundingBox::UpdateBBoxesForResize(const TensorPtr &bbox_list, size_t bbo | |||||
| bbox->SetWidth(bbox->width() * W_aspRatio); | bbox->SetWidth(bbox->width() * W_aspRatio); | ||||
| bbox->SetHeight(bbox->height() * H_aspRatio); | bbox->SetHeight(bbox->height() * H_aspRatio); | ||||
| // reset bounding box values | // reset bounding box values | ||||
| bbox->WriteToTensor(bbox_list, i); | |||||
| RETURN_IF_NOT_OK(bbox->WriteToTensor(bbox_list, i)); | |||||
| } | } | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -444,7 +444,7 @@ Status HwcToChw(std::shared_ptr<Tensor> input, std::shared_ptr<Tensor> *output) | |||||
| int width = input_cv->shape()[1]; | int width = input_cv->shape()[1]; | ||||
| std::shared_ptr<CVTensor> output_cv; | std::shared_ptr<CVTensor> output_cv; | ||||
| CVTensor::CreateEmpty(TensorShape{num_channels, height, width}, input_cv->type(), &output_cv); | |||||
| RETURN_IF_NOT_OK(CVTensor::CreateEmpty(TensorShape{num_channels, height, width}, input_cv->type(), &output_cv)); | |||||
| for (int i = 0; i < num_channels; ++i) { | for (int i = 0; i < num_channels; ++i) { | ||||
| cv::Mat mat; | cv::Mat mat; | ||||
| RETURN_IF_NOT_OK(output_cv->MatAtIndex({i}, &mat)); | RETURN_IF_NOT_OK(output_cv->MatAtIndex({i}, &mat)); | ||||
| @@ -879,7 +879,7 @@ Status AutoContrast(const std::shared_ptr<Tensor> &input, std::shared_ptr<Tensor | |||||
| std::shared_ptr<CVTensor> output_cv; | std::shared_ptr<CVTensor> output_cv; | ||||
| RETURN_IF_NOT_OK(CVTensor::CreateFromMat(result, &output_cv)); | RETURN_IF_NOT_OK(CVTensor::CreateFromMat(result, &output_cv)); | ||||
| (*output) = std::static_pointer_cast<Tensor>(output_cv); | (*output) = std::static_pointer_cast<Tensor>(output_cv); | ||||
| (*output)->Reshape(input->shape()); | |||||
| RETURN_IF_NOT_OK((*output)->Reshape(input_cv->shape())); | |||||
| } catch (const cv::Exception &e) { | } catch (const cv::Exception &e) { | ||||
| RETURN_STATUS_UNEXPECTED("AutoContrast: " + std::string(e.what())); | RETURN_STATUS_UNEXPECTED("AutoContrast: " + std::string(e.what())); | ||||
| } | } | ||||
| @@ -980,7 +980,7 @@ Status Equalize(const std::shared_ptr<Tensor> &input, std::shared_ptr<Tensor> *o | |||||
| std::shared_ptr<CVTensor> output_cv; | std::shared_ptr<CVTensor> output_cv; | ||||
| RETURN_IF_NOT_OK(CVTensor::CreateFromMat(result, &output_cv)); | RETURN_IF_NOT_OK(CVTensor::CreateFromMat(result, &output_cv)); | ||||
| (*output) = std::static_pointer_cast<Tensor>(output_cv); | (*output) = std::static_pointer_cast<Tensor>(output_cv); | ||||
| (*output)->Reshape(input->shape()); | |||||
| RETURN_IF_NOT_OK((*output)->Reshape(input_cv->shape())); | |||||
| } catch (const cv::Exception &e) { | } catch (const cv::Exception &e) { | ||||
| RETURN_STATUS_UNEXPECTED("Equalize: " + std::string(e.what())); | RETURN_STATUS_UNEXPECTED("Equalize: " + std::string(e.what())); | ||||
| } | } | ||||
| @@ -1077,7 +1077,7 @@ Status Pad(const std::shared_ptr<Tensor> &input, std::shared_ptr<Tensor> *output | |||||
| RETURN_IF_NOT_OK(CVTensor::CreateFromMat(out_image, &output_cv)); | RETURN_IF_NOT_OK(CVTensor::CreateFromMat(out_image, &output_cv)); | ||||
| // pad the dimension if shape information is only 2 dimensional, this is grayscale | // pad the dimension if shape information is only 2 dimensional, this is grayscale | ||||
| int num_channels = input_cv->shape()[2]; | int num_channels = input_cv->shape()[2]; | ||||
| if (input_cv->Rank() == 3 && num_channels == 1 && output_cv->Rank() == 2) output_cv->ExpandDim(2); | |||||
| if (input_cv->Rank() == 3 && num_channels == 1 && output_cv->Rank() == 2) RETURN_IF_NOT_OK(output_cv->ExpandDim(2)); | |||||
| *output = std::static_pointer_cast<Tensor>(output_cv); | *output = std::static_pointer_cast<Tensor>(output_cv); | ||||
| return Status::OK(); | return Status::OK(); | ||||
| } catch (const cv::Exception &e) { | } catch (const cv::Exception &e) { | ||||
| @@ -46,7 +46,7 @@ Status RandomColorOp::Compute(const std::shared_ptr<Tensor> &in, std::shared_ptr | |||||
| cv::Mat cv_out; | cv::Mat cv_out; | ||||
| cv::merge(temp, 3, cv_out); | cv::merge(temp, 3, cv_out); | ||||
| std::shared_ptr<CVTensor> cvt_out; | std::shared_ptr<CVTensor> cvt_out; | ||||
| CVTensor::CreateFromMat(cv_out, &cvt_out); | |||||
| RETURN_IF_NOT_OK(CVTensor::CreateFromMat(cv_out, &cvt_out)); | |||||
| if (abs(t - 0.0) < eps) { | if (abs(t - 0.0) < eps) { | ||||
| // return grayscale | // return grayscale | ||||
| *out = std::static_pointer_cast<Tensor>(cvt_out); | *out = std::static_pointer_cast<Tensor>(cvt_out); | ||||
| @@ -41,17 +41,17 @@ Status SlidingWindowHelper(const std::shared_ptr<Tensor> &input, std::shared_ptr | |||||
| // Slice on specified axis and concatenate on new axis | // Slice on specified axis and concatenate on new axis | ||||
| for (int32_t i = 0; i + width <= axis_end; i++) { | for (int32_t i = 0; i + width <= axis_end; i++) { | ||||
| auto slice_op = std::make_unique<SliceOp>(Slice(i, i + width, 1)); | auto slice_op = std::make_unique<SliceOp>(Slice(i, i + width, 1)); | ||||
| slice_op->Compute(input, &tmp); | |||||
| RETURN_IF_NOT_OK(slice_op->Compute(input, &tmp)); | |||||
| if (i == 0) { | if (i == 0) { | ||||
| *output = tmp; | *output = tmp; | ||||
| } else { | } else { | ||||
| TensorRow in({*output, tmp}); | TensorRow in({*output, tmp}); | ||||
| TensorRow out_row; | TensorRow out_row; | ||||
| concatenate_op->Compute(in, &out_row); | |||||
| RETURN_IF_NOT_OK(concatenate_op->Compute(in, &out_row)); | |||||
| *output = out_row[0]; | *output = out_row[0]; | ||||
| } | } | ||||
| } | } | ||||
| (*output)->Reshape(out_shape); | |||||
| RETURN_IF_NOT_OK((*output)->Reshape(out_shape)); | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| } // namespace dataset | } // namespace dataset | ||||
| @@ -137,7 +137,7 @@ Status WordpieceTokenizerOp::Compute(const TensorRow &input, TensorRow *output) | |||||
| offsets_start.push_back(0); | offsets_start.push_back(0); | ||||
| offsets_limit.push_back(0); | offsets_limit.push_back(0); | ||||
| } | } | ||||
| Tensor::CreateFromVector(out_tokens, &token_tensor); | |||||
| RETURN_IF_NOT_OK(Tensor::CreateFromVector(out_tokens, &token_tensor)); | |||||
| output->push_back(token_tensor); | output->push_back(token_tensor); | ||||
| if (with_offsets_) { | if (with_offsets_) { | ||||
| RETURN_IF_NOT_OK(Tensor::CreateFromVector(offsets_start, &offsets_start_tensor)); | RETURN_IF_NOT_OK(Tensor::CreateFromVector(offsets_start, &offsets_start_tensor)); | ||||
| @@ -47,7 +47,7 @@ Status JsonHelper::CreateAlbum(const std::string &in_dir, const std::string &out | |||||
| // create json file in output dir with the path | // create json file in output dir with the path | ||||
| std::string out_file = out_dir + "/" + std::to_string(index) + ".json"; | std::string out_file = out_dir + "/" + std::to_string(index) + ".json"; | ||||
| UpdateValue(out_file, "image", v.toString(), out_file); | |||||
| RETURN_IF_NOT_OK(UpdateValue(out_file, "image", v.toString(), out_file)); | |||||
| index++; | index++; | ||||
| } | } | ||||
| return Status::OK(); | return Status::OK(); | ||||
| @@ -82,7 +82,8 @@ void TaskManager::interrupt_all() noexcept { | |||||
| auto svc = vg->GetIntrpService(); | auto svc = vg->GetIntrpService(); | ||||
| if (svc) { | if (svc) { | ||||
| // Stop the interrupt service. No new request is accepted. | // Stop the interrupt service. No new request is accepted. | ||||
| svc->ServiceStop(); | |||||
| Status rc = svc->ServiceStop(); | |||||
| if (rc.IsError()) MS_LOG(ERROR) << "Error while stopping the service. Message: " << rc; | |||||
| svc->InterruptAll(); | svc->InterruptAll(); | ||||
| } | } | ||||
| } | } | ||||
| @@ -141,7 +142,7 @@ TaskManager::TaskManager() try : global_interrupt_(0), | |||||
| TaskManager::~TaskManager() { | TaskManager::~TaskManager() { | ||||
| if (watchdog_) { | if (watchdog_) { | ||||
| WakeUpWatchDog(); | WakeUpWatchDog(); | ||||
| watchdog_->Join(); | |||||
| (void)watchdog_->Join(); | |||||
| // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it | // watchdog_grp_ and watchdog_ pointers come from Services::GetInstance().GetServiceMemPool() which we will free it | ||||
| // on shutdown. So no need to free these pointers one by one. | // on shutdown. So no need to free these pointers one by one. | ||||
| watchdog_grp_ = nullptr; | watchdog_grp_ = nullptr; | ||||
| @@ -195,7 +195,7 @@ def test_auto_contrast_one_channel_c(plot=False): | |||||
| num_samples = images_auto_contrast_c.shape[0] | num_samples = images_auto_contrast_c.shape[0] | ||||
| mse = np.zeros(num_samples) | mse = np.zeros(num_samples) | ||||
| for i in range(num_samples): | for i in range(num_samples): | ||||
| mse[i] = diff_mse(images_auto_contrast_c[i], images_auto_contrast_py[i]) | |||||
| mse[i] = diff_mse(np.squeeze(images_auto_contrast_c[i]), images_auto_contrast_py[i]) | |||||
| logger.info("MSE= {}".format(str(np.mean(mse)))) | logger.info("MSE= {}".format(str(np.mean(mse)))) | ||||
| np.testing.assert_equal(np.mean(mse), 0.0) | np.testing.assert_equal(np.mean(mse), 0.0) | ||||