Merge pull request !4877 from MahdiRahmaniHanzaki/bucket_batch_by_length_ctags/v1.0.0
| @@ -37,6 +37,7 @@ | |||
| #endif | |||
| // Dataset operator headers (in alphabetical order) | |||
| #include "minddata/dataset/engine/datasetops/batch_op.h" | |||
| #include "minddata/dataset/engine/datasetops/bucket_batch_by_length_op.h" | |||
| #include "minddata/dataset/engine/datasetops/build_vocab_op.h" | |||
| #include "minddata/dataset/engine/datasetops/concat_op.h" | |||
| #include "minddata/dataset/engine/datasetops/map_op/map_op.h" | |||
| @@ -280,6 +281,25 @@ std::shared_ptr<BatchDataset> Dataset::Batch(int32_t batch_size, bool drop_remai | |||
| return ds; | |||
| } | |||
| // Function to create a BucketBatchByLength dataset | |||
| std::shared_ptr<BucketBatchByLengthDataset> Dataset::BucketBatchByLength( | |||
| const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries, | |||
| const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow), | |||
| const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info, bool pad_to_bucket_boundary, | |||
| bool drop_remainder) { | |||
| auto ds = std::make_shared<BucketBatchByLengthDataset>(column_names, bucket_boundaries, bucket_batch_sizes, | |||
| element_length_function, pad_info, pad_to_bucket_boundary, | |||
| drop_remainder); | |||
| if (!ds->ValidateParams()) { | |||
| return nullptr; | |||
| } | |||
| ds->children.push_back(shared_from_this()); | |||
| return ds; | |||
| } | |||
| #ifndef ENABLE_ANDROID | |||
| // Function to create a Vocab from dataset | |||
| std::shared_ptr<Vocab> Dataset::BuildVocab(const std::vector<std::string> &columns, | |||
| @@ -1590,6 +1610,79 @@ bool BatchDataset::ValidateParams() { | |||
| return true; | |||
| } | |||
| BucketBatchByLengthDataset::BucketBatchByLengthDataset( | |||
| const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries, | |||
| const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow), | |||
| const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info, bool pad_to_bucket_boundary, | |||
| bool drop_remainder) | |||
| : column_names_(column_names), | |||
| bucket_boundaries_(bucket_boundaries), | |||
| bucket_batch_sizes_(bucket_batch_sizes), | |||
| element_length_function_(element_length_function), | |||
| pad_info_(pad_info), | |||
| pad_to_bucket_boundary_(pad_to_bucket_boundary), | |||
| drop_remainder_(drop_remainder) {} | |||
| std::vector<std::shared_ptr<DatasetOp>> BucketBatchByLengthDataset::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| std::shared_ptr<TensorOp> c_func; | |||
| if (element_length_function_ != nullptr) { | |||
| c_func = std::make_shared<CFuncOp>(element_length_function_); | |||
| } else { | |||
| c_func = nullptr; | |||
| } | |||
| node_ops.push_back(std::make_shared<BucketBatchByLengthOp>(column_names_, bucket_boundaries_, bucket_batch_sizes_, | |||
| c_func, pad_info_, pad_to_bucket_boundary_, | |||
| drop_remainder_, connector_que_size_)); | |||
| return node_ops; | |||
| } | |||
| bool BucketBatchByLengthDataset::ValidateParams() { | |||
| if (element_length_function_ == nullptr && column_names_.size() != 1) { | |||
| MS_LOG(ERROR) << "BucketBatchByLength: If element_length_function is not specified, exactly one column name " | |||
| "should be passed."; | |||
| return false; | |||
| } | |||
| // Check bucket_boundaries: must be positive and strictly increasing | |||
| if (bucket_boundaries_.empty()) { | |||
| MS_LOG(ERROR) << "BucketBatchByLength: bucket_boundaries cannot be empty."; | |||
| return false; | |||
| } | |||
| for (int i = 0; i < bucket_boundaries_.size(); i++) { | |||
| if (bucket_boundaries_[i] <= 0) { | |||
| MS_LOG(ERROR) | |||
| << "BucketBatchByLength: bucket_boundaries must only contain positive numbers. However, the element at index: " | |||
| << i << " was: " << bucket_boundaries_[i]; | |||
| return false; | |||
| } | |||
| if (i > 0 && bucket_boundaries_[i - 1] >= bucket_boundaries_[i]) { | |||
| MS_LOG(ERROR) | |||
| << "BucketBatchByLength: bucket_boundaries must be strictly increasing. However, the elements at index: " | |||
| << i - 1 << " and " << i << " were: " << bucket_boundaries_[i - 1] << " and " << bucket_boundaries_[i] | |||
| << " respectively."; | |||
| return false; | |||
| } | |||
| } | |||
| // Check bucket_batch_sizes: must be positive | |||
| if (bucket_batch_sizes_.empty()) { | |||
| MS_LOG(ERROR) << "BucketBatchByLength: bucket_batch_sizes must be non-empty"; | |||
| return false; | |||
| } | |||
| if (bucket_batch_sizes_.size() != bucket_boundaries_.size() + 1) { | |||
| MS_LOG(ERROR) << "BucketBatchByLength: bucket_batch_sizes's size must equal the size of bucket_boundaries + 1"; | |||
| return false; | |||
| } | |||
| if (std::any_of(bucket_batch_sizes_.begin(), bucket_batch_sizes_.end(), [](int i) { return i <= 0; })) { | |||
| MS_LOG(ERROR) << "BucketBatchByLength: bucket_batch_sizes must only contain positive numbers."; | |||
| return false; | |||
| } | |||
| return true; | |||
| } | |||
| #ifndef ENABLE_ANDROID | |||
| BuildVocabDataset::BuildVocabDataset(std::shared_ptr<Vocab> vocab, const std::vector<std::string> &columns, | |||
| const std::pair<int64_t, int64_t> &freq_range, int64_t top_k, | |||
| @@ -952,7 +952,9 @@ Status DEPipeline::ParseBucketBatchByLengthOp(const py::dict &args, std::shared_ | |||
| (void)builder->SetBucketBatchSizes(ToIntVector(value)); | |||
| } | |||
| if (key == "element_length_function") { | |||
| (void)builder->SetElementLengthFunction(value.cast<py::function>()); | |||
| std::shared_ptr<TensorOp> py_func; | |||
| py_func = std::make_shared<PyFuncOp>(value.cast<py::function>(), DataType::DE_INT32); | |||
| (void)builder->SetElementLengthFunction(py_func); | |||
| } | |||
| if (key == "pad_info") { | |||
| PadInfo pad_info; | |||
| @@ -85,7 +85,7 @@ Status BucketBatchByLengthOp::Builder::Build(std::shared_ptr<BucketBatchByLength | |||
| BucketBatchByLengthOp::BucketBatchByLengthOp(std::vector<std::string> length_dependent_columns, | |||
| std::vector<int32_t> bucket_boundaries, | |||
| std::vector<int32_t> bucket_batch_sizes, | |||
| py::function element_length_function, PadInfo pad_info, | |||
| std::shared_ptr<TensorOp> element_length_function, PadInfo pad_info, | |||
| bool pad_to_bucket_boundary, bool drop_remainder, | |||
| int32_t op_connector_size) | |||
| : PipelineOp(op_connector_size), | |||
| @@ -155,34 +155,15 @@ Status BucketBatchByLengthOp::ObtainElementLength(int32_t *out_element_length, T | |||
| // call pyfunc here if given pyfunc, otherwise return 0th dimension of shape of | |||
| // the single column specified in length_dependent_columns_ | |||
| if (element_length_function_) { | |||
| py::gil_scoped_acquire gil_acquire; | |||
| if (Py_IsInitialized() == 0) { | |||
| return Status(StatusCode::kPythonInterpreterFailure, "Python Interpreter is finalized"); | |||
| } | |||
| try { | |||
| size_t number_of_arguments = length_dependent_columns_.size(); | |||
| py::tuple input_arguments(number_of_arguments); | |||
| for (size_t i = 0; i < number_of_arguments; i++) { | |||
| py::array argument_value; | |||
| int32_t column_index = column_name_id_map_[length_dependent_columns_[i]]; | |||
| RETURN_IF_NOT_OK(element[column_index]->GetDataAsNumpy(&argument_value)); | |||
| input_arguments[i] = argument_value; | |||
| } | |||
| py::object length = element_length_function_(*input_arguments); | |||
| *out_element_length = length.cast<int32_t>(); | |||
| if (*out_element_length < 0) { | |||
| return Status(StatusCode::kPyFuncException, "Element length function should return a non negative integer."); | |||
| } | |||
| } catch (const py::error_already_set &e) { | |||
| return Status(StatusCode::kPyFuncException, e.what()); | |||
| } catch (const py::cast_error &e) { | |||
| return Status(StatusCode::kPyFuncException, "Count not cast output of element length function to int32_t."); | |||
| TensorRow output; | |||
| RETURN_IF_NOT_OK(element_length_function_->Compute(element, &output)); | |||
| RETURN_IF_NOT_OK(output.at(0)->GetItemAt(out_element_length, {0})); | |||
| if (*out_element_length < 0) { | |||
| RETURN_STATUS_UNEXPECTED("BucketBatchByLength: element_length_function returned negative integer"); | |||
| } | |||
| } else { | |||
| *out_element_length = element[0]->shape()[0]; | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| @@ -27,6 +27,7 @@ | |||
| #include "minddata/dataset/engine/dataset_iterator.h" | |||
| #include "minddata/dataset/engine/datasetops/batch_op.h" | |||
| #include "minddata/dataset/engine/datasetops/pipeline_op.h" | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| #include "minddata/dataset/util/status.h" | |||
| namespace mindspore { | |||
| @@ -57,7 +58,7 @@ class BucketBatchByLengthOp : public PipelineOp { | |||
| return *this; | |||
| } | |||
| Builder &SetElementLengthFunction(py::function element_length_function) { | |||
| Builder &SetElementLengthFunction(std::shared_ptr<TensorOp> element_length_function) { | |||
| builder_element_length_function_ = element_length_function; | |||
| return *this; | |||
| } | |||
| @@ -90,7 +91,7 @@ class BucketBatchByLengthOp : public PipelineOp { | |||
| std::vector<std::string> builder_length_dependent_columns_; | |||
| std::vector<int32_t> builder_bucket_boundaries_; | |||
| std::vector<int32_t> builder_bucket_batch_sizes_; | |||
| py::function builder_element_length_function_; | |||
| std::shared_ptr<TensorOp> builder_element_length_function_; | |||
| PadInfo builder_pad_info_; | |||
| bool builder_pad_to_bucket_boundary_; | |||
| bool builder_drop_remainder_; | |||
| @@ -98,8 +99,8 @@ class BucketBatchByLengthOp : public PipelineOp { | |||
| }; | |||
| BucketBatchByLengthOp(std::vector<std::string> length_dependent_columns, std::vector<int32_t> bucket_boundaries, | |||
| std::vector<int32_t> bucket_batch_sizes, py::function element_length_function, PadInfo pad_info, | |||
| bool pad_to_bucket_boundary, bool drop_remainder, int32_t op_connector_size); | |||
| std::vector<int32_t> bucket_batch_sizes, std::shared_ptr<TensorOp> element_length_function, | |||
| PadInfo pad_info, bool pad_to_bucket_boundary, bool drop_remainder, int32_t op_connector_size); | |||
| // Destructor | |||
| ~BucketBatchByLengthOp() = default; | |||
| @@ -137,7 +138,7 @@ class BucketBatchByLengthOp : public PipelineOp { | |||
| std::vector<std::string> length_dependent_columns_; | |||
| std::vector<int32_t> bucket_boundaries_; | |||
| std::vector<int32_t> bucket_batch_sizes_; | |||
| py::function element_length_function_; | |||
| std::shared_ptr<TensorOp> element_length_function_; | |||
| PadInfo pad_info_; | |||
| bool pad_to_bucket_boundary_; | |||
| bool drop_remainder_; | |||
| @@ -30,6 +30,8 @@ | |||
| #include "minddata/dataset/include/iterator.h" | |||
| #include "minddata/dataset/include/samplers.h" | |||
| #include "minddata/dataset/include/type_id.h" | |||
| #include "minddata/dataset/kernels/c_func_op.h" | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| #ifndef ENABLE_ANDROID | |||
| #include "minddata/dataset/text/vocab.h" | |||
| #endif | |||
| @@ -72,6 +74,7 @@ class VOCDataset; | |||
| #endif | |||
| // Dataset Op classes (in alphabetical order) | |||
| class BatchDataset; | |||
| class BucketBatchByLengthDataset; | |||
| #ifndef ENABLE_ANDROID | |||
| class BuildVocabDataset; | |||
| #endif | |||
| @@ -370,6 +373,35 @@ class Dataset : public std::enable_shared_from_this<Dataset> { | |||
| /// \return Shared pointer to the current BatchDataset | |||
| std::shared_ptr<BatchDataset> Batch(int32_t batch_size, bool drop_remainder = false); | |||
| /// \brief Function to create a BucketBatchByLengthDataset | |||
| /// \notes Combines batch_size number of consecutive rows into batches | |||
| /// \param[in] column_names Columns passed to element_length_function | |||
| /// \param[in] bucket_boundaries A list consisting of the upper boundaries of the buckets. | |||
| /// Must be strictly increasing. If there are n boundaries, n+1 buckets are created: One bucket for | |||
| /// [0, bucket_boundaries[0]), one bucket for [bucket_boundaries[i], bucket_boundaries[i+1]) for each | |||
| /// 0<i<n, and one bucket for [bucket_boundaries[n-1], inf). | |||
| /// \param[in] bucket_batch_sizes A list consisting of the batch sizes for each bucket. | |||
| /// Must contain elements equal to the size of bucket_boundaries + 1. | |||
| /// \param[in] element_length_function A function pointer that takes in TensorRow and outputs a TensorRow. The output | |||
| /// must contain a single tensor containing a single int32_t. If no value is provided, then size of column_names | |||
| /// must be 1, and the size of the first dimension of that column will be taken as the length (default=nullptr) | |||
| /// \param[in] pad_info Represents how to batch each column. The key corresponds to the column name, the value must | |||
| /// be a tuple of 2 elements. The first element corresponds to the shape to pad to, and the second element | |||
| /// corresponds to the value to pad with. If a column is not specified, then that column will be padded to the | |||
| /// longest in the current batch, and 0 will be used as the padding value. Any unspecified dimensions will be | |||
| /// padded to the longest in the current batch, unless if pad_to_bucket_boundary is true. If no padding is wanted, | |||
| /// set pad_info to None (default=empty dictionary). | |||
| /// \param[in] pad_to_bucket_boundary If true, will pad each unspecified dimension in pad_info to the bucket_boundary | |||
| /// minus 1. If there are any elements that fall into the last bucket, an error will occur (default=false). | |||
| /// \param[in] drop_remainder If true, will drop the last batch for each bucket if it is not a full batch | |||
| /// (default=false). | |||
| /// \return Shared pointer to the current BucketBatchByLengthDataset | |||
| std::shared_ptr<BucketBatchByLengthDataset> BucketBatchByLength( | |||
| const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries, | |||
| const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow) = nullptr, | |||
| const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info = {}, | |||
| bool pad_to_bucket_boundary = false, bool drop_remainder = false); | |||
| #ifndef ENABLE_ANDROID | |||
| /// \brief Function to create a Vocab from source dataset | |||
| /// \notes Build a vocab from a dataset. This would collect all the unique words in a dataset and return a vocab | |||
| @@ -953,6 +985,36 @@ class BatchDataset : public Dataset { | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_map_; | |||
| }; | |||
| class BucketBatchByLengthDataset : public Dataset { | |||
| public: | |||
| /// \brief Constructor | |||
| BucketBatchByLengthDataset( | |||
| const std::vector<std::string> &column_names, const std::vector<int32_t> &bucket_boundaries, | |||
| const std::vector<int32_t> &bucket_batch_sizes, TensorRow (*element_length_function)(TensorRow) = nullptr, | |||
| const std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> &pad_info = {}, | |||
| bool pad_to_bucket_boundary = false, bool drop_remainder = false); | |||
| /// \brief Destructor | |||
| ~BucketBatchByLengthDataset() = default; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \brief Parameters validation | |||
| /// \return bool true if all the params are valid | |||
| bool ValidateParams() override; | |||
| private: | |||
| std::vector<std::string> column_names_; | |||
| std::vector<int32_t> bucket_boundaries_; | |||
| std::vector<int32_t> bucket_batch_sizes_; | |||
| TensorRow (*element_length_function_)(TensorRow); | |||
| std::map<std::string, std::pair<TensorShape, std::shared_ptr<Tensor>>> pad_info_; | |||
| bool pad_to_bucket_boundary_; | |||
| bool drop_remainder_; | |||
| }; | |||
| #ifndef ENABLE_ANDROID | |||
| class BuildVocabDataset : public Dataset { | |||
| public: | |||
| @@ -7,6 +7,7 @@ if (ENABLE_PYTHON) | |||
| data/compose_op.cc | |||
| data/random_apply_op.cc | |||
| data/random_choice_op.cc | |||
| c_func_op.cc | |||
| py_func_op.cc | |||
| tensor_op.cc) | |||
| target_include_directories(kernels PRIVATE ${pybind11_INCLUDE_DIRS}) | |||
| @@ -0,0 +1,37 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include "minddata/dataset/kernels/c_func_op.h" | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| #include "minddata/dataset/util/status.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| Status CFuncOp::Compute(const TensorRow &input, TensorRow *output) { | |||
| IO_CHECK_VECTOR(input, output); | |||
| Status ret = Status(StatusCode::kOK, "CFunc Call Succeed"); | |||
| try { | |||
| *output = (*c_func_ptr_)(input); | |||
| } catch (const std::exception &e) { | |||
| RETURN_STATUS_UNEXPECTED("Unexpected error in CFuncOp"); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,50 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_C_FUNC_OP_H_ | |||
| #define MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_C_FUNC_OP_H_ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include <utility> | |||
| #include <string> | |||
| #include "minddata/dataset/core/tensor.h" | |||
| #include "minddata/dataset/kernels/tensor_op.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class CFuncOp : public TensorOp { | |||
| public: | |||
| explicit CFuncOp(TensorRow (*func)(TensorRow)) : c_func_ptr_(func) {} | |||
| ~CFuncOp() override = default; | |||
| uint32_t NumInput() override { return 0; } | |||
| uint32_t NumOutput() override { return 0; } | |||
| // Calls c_func_ptr and returns the result | |||
| Status Compute(const TensorRow &input, TensorRow *output) override; | |||
| std::string Name() const override { return kCFuncOp; } | |||
| private: | |||
| TensorRow (*c_func_ptr_)(TensorRow); | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // MINDSPORE_CCSRC_MINDDATA_DATASET_KERNELS_C_FUNC_OP_H_ | |||
| @@ -37,35 +37,44 @@ Status PyFuncOp::Compute(const TensorRow &input, TensorRow *output) { | |||
| try { | |||
| // Transform input tensor vector into numpy array vector | |||
| py::tuple input_args(input.size()); | |||
| for (size_t i = 0; i < input.size(); i++) { | |||
| py::array new_data; | |||
| RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data)); | |||
| // possible memcpy here | |||
| input_args[i] = new_data; | |||
| py::object ret_py_obj; | |||
| if (input.size() > 0) { | |||
| for (size_t i = 0; i < input.size(); i++) { | |||
| py::array new_data; | |||
| RETURN_IF_NOT_OK(input.at(i)->GetDataAsNumpy(&new_data)); | |||
| // possible memcpy here | |||
| input_args[i] = new_data; | |||
| } | |||
| // Invoke python function | |||
| ret_py_obj = this->py_func_ptr_(*input_args); | |||
| } else { | |||
| ret_py_obj = this->py_func_ptr_(); | |||
| } | |||
| // Invoke python function | |||
| py::object ret_py_obj = this->py_func_ptr_(*input_args); | |||
| // Process the return value | |||
| if (py::isinstance<py::array>(ret_py_obj)) { | |||
| // In case of a n-1 mapping, the return value will be a numpy array | |||
| std::shared_ptr<Tensor> out; | |||
| RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_obj.cast<py::array>(), &out)); | |||
| output->push_back(out); | |||
| } else if (py::isinstance<py::tuple>(ret_py_obj)) { | |||
| // In case of a n-m mapping, the return value will be a tuple of numpy arrays | |||
| py::tuple ret_py_tuple = ret_py_obj.cast<py::tuple>(); | |||
| // Iterate over two containers simultaneously for memory copy | |||
| for (size_t i = 0; i < ret_py_tuple.size(); i++) { | |||
| py::object ret_py_ele = ret_py_tuple[i]; | |||
| if (!py::isinstance<py::array>(ret_py_ele)) { | |||
| goto ShapeMisMatch; | |||
| if (output_type_ != DataType::DE_UNKNOWN) { | |||
| RETURN_IF_NOT_OK(CastOutput(ret_py_obj, output)); | |||
| } else { | |||
| if (py::isinstance<py::tuple>(ret_py_obj)) { | |||
| // In case of a n-m mapping, the return value will be a tuple of numpy arrays | |||
| py::tuple ret_py_tuple = ret_py_obj.cast<py::tuple>(); | |||
| // Iterate over two containers simultaneously for memory copy | |||
| for (size_t i = 0; i < ret_py_tuple.size(); i++) { | |||
| py::object ret_py_ele = ret_py_tuple[i]; | |||
| if (!py::isinstance<py::array>(ret_py_ele)) { | |||
| goto ShapeMisMatch; | |||
| } | |||
| std::shared_ptr<Tensor> out; | |||
| RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_ele.cast<py::array>(), &out)); | |||
| output->push_back(out); | |||
| } | |||
| } else if (py::isinstance<py::array>(ret_py_obj)) { | |||
| // In case of a n-1 mapping, the return value will be a numpy array | |||
| std::shared_ptr<Tensor> out; | |||
| RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_ele.cast<py::array>(), &out)); | |||
| RETURN_IF_NOT_OK(Tensor::CreateFromNpArray(ret_py_obj.cast<py::array>(), &out)); | |||
| output->push_back(out); | |||
| } else { | |||
| goto ShapeMisMatch; | |||
| } | |||
| } else { | |||
| goto ShapeMisMatch; | |||
| } | |||
| } catch (const py::error_already_set &e) { | |||
| ret = Status(StatusCode::kPyFuncException, e.what()); | |||
| @@ -79,5 +88,24 @@ ShapeMisMatch: | |||
| ret = Status(StatusCode::kShapeMisMatch, "PyFunc should return a numpy array or a numpy array tuple"); | |||
| goto ComputeReturn; | |||
| } | |||
| Status PyFuncOp::CastOutput(const py::object &ret_py_obj, TensorRow *output) { | |||
| try { | |||
| std::shared_ptr<Tensor> out; | |||
| switch (output_type_) { | |||
| case DataType::DE_INT32: | |||
| RETURN_IF_NOT_OK(Tensor::CreateEmpty(TensorShape({1}), DataType(DataType::DE_INT32), &out)); | |||
| RETURN_IF_NOT_OK(out->SetItemAt({0}, ret_py_obj.cast<int32_t>())); | |||
| break; | |||
| default: | |||
| RETURN_STATUS_UNEXPECTED("No cast for the specified DataType was found."); | |||
| } | |||
| output->push_back(out); | |||
| } catch (const std::exception &e) { | |||
| return Status(StatusCode::kUnexpectedError, e.what()); | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -27,9 +27,11 @@ | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| class __attribute__((visibility("hidden"))) PyFuncOp : public TensorOp { | |||
| class PyFuncOp : public TensorOp { | |||
| public: | |||
| explicit PyFuncOp(py::function func) : py_func_ptr_(std::move(func)) {} | |||
| explicit PyFuncOp(py::function func) : py_func_ptr_(std::move(func)) { output_type_ = DataType::DE_UNKNOWN; } | |||
| explicit PyFuncOp(py::function func, DataType::Type output_type) | |||
| : py_func_ptr_(std::move(func)), output_type_(output_type) {} | |||
| ~PyFuncOp() override = default; | |||
| @@ -39,10 +41,18 @@ class __attribute__((visibility("hidden"))) PyFuncOp : public TensorOp { | |||
| // Compute function for n-n mapping. | |||
| Status Compute(const TensorRow &input, TensorRow *output) override; | |||
| /// \brief Function to convert a primitive type py::object to a TensorRow | |||
| /// \notes Changes the py::object to a tensor with corresponding C++ DataType based on output_type_ and adds it to a | |||
| /// TensorRow. This function is used inside Compute. | |||
| /// \param[in] ret_py_obj The python object we want to cast | |||
| /// \param[output] The TensorRow output | |||
| /// \return Status | |||
| Status CastOutput(const py::object &ret_py_obj, TensorRow *output); | |||
| std::string Name() const override { return kPyFuncOp; } | |||
| private: | |||
| py::function py_func_ptr_; | |||
| DataType::Type output_type_; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -127,6 +127,7 @@ constexpr char kToFloat16Op[] = "ToFloat16Op"; | |||
| constexpr char kTypeCastOp[] = "TypeCastOp"; | |||
| // other | |||
| constexpr char kCFuncOp[] = "CFuncOp"; | |||
| constexpr char kPyFuncOp[] = "PyFuncOp"; | |||
| constexpr char kNoOp[] = "NoOp"; | |||
| @@ -14,6 +14,7 @@ | |||
| * limitations under the License. | |||
| */ | |||
| #include "common/common.h" | |||
| #include "minddata/dataset/core/tensor_row.h" | |||
| #include "minddata/dataset/include/datasets.h" | |||
| #include "minddata/dataset/include/transforms.h" | |||
| @@ -24,6 +25,16 @@ class MindDataTestPipeline : public UT::DatasetOpTesting { | |||
| protected: | |||
| }; | |||
| mindspore::dataset::TensorRow BucketBatchTestFunction(mindspore::dataset::TensorRow input) { | |||
| mindspore::dataset::TensorRow output; | |||
| std::shared_ptr<Tensor> out; | |||
| Tensor::CreateEmpty(mindspore::dataset::TensorShape({1}), | |||
| mindspore::dataset::DataType(mindspore::dataset::DataType::Type::DE_INT32), &out); | |||
| out->SetItemAt({0}, 2); | |||
| output.push_back(out); | |||
| return output; | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBatchAndRepeat) { | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBatchAndRepeat."; | |||
| @@ -47,7 +58,7 @@ TEST_F(MindDataTestPipeline, TestBatchAndRepeat) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -65,6 +76,183 @@ TEST_F(MindDataTestPipeline, TestBatchAndRepeat) { | |||
| iter->Stop(); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthSuccess1) { | |||
| // Calling with default values | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthSuccess1."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {1, 2, 3}, {4, 5, 6, 7}); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create an iterator over the result of the above dataset | |||
| // This will trigger the creation of the Execution Tree and launch it. | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| uint64_t i = 0; | |||
| while (row.size() != 0) { | |||
| i++; | |||
| auto image = row["image"]; | |||
| MS_LOG(INFO) << "Tensor image shape: " << image->shape(); | |||
| iter->GetNextRow(&row); | |||
| } | |||
| // 2 batches of size 5 | |||
| EXPECT_EQ(i, 2); | |||
| // Manually terminate the pipeline | |||
| iter->Stop(); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthSuccess2) { | |||
| // Calling with non-default values | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthSuccess2."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| std::map<std::string, std::pair<mindspore::dataset::TensorShape, std::shared_ptr<Tensor>>> pad_info; | |||
| ds = ds->BucketBatchByLength({"image"}, {1, 2}, {1, 2, 3}, | |||
| &BucketBatchTestFunction, pad_info, true, true); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create an iterator over the result of the above dataset | |||
| // This will trigger the creation of the Execution Tree and launch it. | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| uint64_t i = 0; | |||
| while (row.size() != 0) { | |||
| i++; | |||
| auto image = row["image"]; | |||
| MS_LOG(INFO) << "Tensor image shape: " << image->shape(); | |||
| iter->GetNextRow(&row); | |||
| } | |||
| // 5 batches of size 2 | |||
| EXPECT_EQ(i, 5); | |||
| // Manually terminate the pipeline | |||
| iter->Stop(); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail1) { | |||
| // Empty bucket_boundaries | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail1."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {}, {1}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail2) { | |||
| // Empty bucket_batch_sizes | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail2."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {1}, {}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail3) { | |||
| // Negative boundaries | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail3."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {-1, 1}, {1, 2, 3}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail4) { | |||
| // Boundaries not strictly increasing | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail4."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {2, 2}, {1, 2, 3}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail5) { | |||
| // Incorrect size of bucket_batch_size | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail5."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {1, 2}, {1, 2}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail6) { | |||
| // Negative bucket_batch_size | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail6."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image"}, {1, 2}, {1, -2, 3}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestBucketBatchByLengthFail7) { | |||
| // This should fail because element_length_function is not specified and column_names has more than 1 element. | |||
| // Calling with function pointer | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestBucketBatchByLengthFail7."; | |||
| // Create a Mnist Dataset | |||
| std::string folder_path = datasets_root_path_ + "/testMnistData/"; | |||
| std::shared_ptr<Dataset> ds = Mnist(folder_path, RandomSampler(false, 10)); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create a BucketBatchByLength operation on ds | |||
| ds = ds->BucketBatchByLength({"image", "label"}, {1, 2}, {1, 2, 3}); | |||
| EXPECT_EQ(ds, nullptr); | |||
| } | |||
| TEST_F(MindDataTestPipeline, TestConcatFail1) { | |||
| MS_LOG(INFO) << "Doing MindDataTestPipeline-TestConcatFail1."; | |||
| @@ -148,7 +336,7 @@ TEST_F(MindDataTestPipeline, TestConcatSuccess) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| uint64_t i = 0; | |||
| @@ -200,7 +388,7 @@ TEST_F(MindDataTestPipeline, TestConcatSuccess2) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| uint64_t i = 0; | |||
| @@ -239,7 +427,7 @@ TEST_F(MindDataTestPipeline, TestImageFolderBatchAndRepeat) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -293,7 +481,7 @@ TEST_F(MindDataTestPipeline, TestProjectMap) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -366,7 +554,7 @@ TEST_F(MindDataTestPipeline, TestProjectMapAutoInjection) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -468,7 +656,7 @@ TEST_F(MindDataTestPipeline, TestRenameSuccess) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -514,7 +702,7 @@ TEST_F(MindDataTestPipeline, TestRepeatDefault) { | |||
| std::shared_ptr <Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map <std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| uint64_t i = 0; | |||
| @@ -557,7 +745,7 @@ TEST_F(MindDataTestPipeline, TestRepeatOne) { | |||
| std::shared_ptr <Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map <std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| uint64_t i = 0; | |||
| @@ -630,7 +818,7 @@ TEST_F(MindDataTestPipeline, TestShuffleDataset) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -666,7 +854,7 @@ TEST_F(MindDataTestPipeline, TestSkipDataset) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -718,7 +906,7 @@ TEST_F(MindDataTestPipeline, TestTakeDatasetDefault) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -776,7 +964,7 @@ TEST_F(MindDataTestPipeline, TestTakeDatasetNormal) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -830,7 +1018,7 @@ TEST_F(MindDataTestPipeline, TestTensorOpsAndMap) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -938,7 +1126,7 @@ TEST_F(MindDataTestPipeline, TestZipSuccess) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||
| @@ -992,7 +1180,7 @@ TEST_F(MindDataTestPipeline, TestZipSuccess2) { | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| // iterate over the dataset and get each row | |||
| std::unordered_map<std::string, std::shared_ptr<Tensor>> row; | |||
| iter->GetNextRow(&row); | |||