| @@ -123,6 +123,7 @@ if(ENABLE_CACHE) | |||
| add_dependencies(cpp-API engine-cache-client) | |||
| add_dependencies(engine-ir-cache engine-cache-client) | |||
| add_dependencies(engine-ir-datasetops engine-cache-client) | |||
| add_dependencies(engine-ir-datasetops-source engine-cache-client) | |||
| add_dependencies(engine-opt engine-cache-client) | |||
| add_dependencies(engine-datasetops engine-cache-client) | |||
| add_dependencies(engine-perf engine-cache-client) | |||
| @@ -1001,32 +1001,33 @@ ManifestDataset::ManifestDataset(const std::vector<char> &dataset_file, const st | |||
| MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file, | |||
| const std::vector<std::vector<char>> &columns_list, | |||
| const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded) { | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) { | |||
| auto sampler_obj = sampler ? sampler->Parse() : nullptr; | |||
| nlohmann::json sample = nullptr; | |||
| if (padded_sample) { | |||
| sample = *padded_sample; | |||
| } | |||
| auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, | |||
| sample, num_padded); | |||
| sample, num_padded, cache); | |||
| ir_node_ = std::static_pointer_cast<DatasetNode>(ds); | |||
| } | |||
| MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file, | |||
| const std::vector<std::vector<char>> &columns_list, const Sampler *sampler, | |||
| const nlohmann::json *padded_sample, int64_t num_padded) { | |||
| const nlohmann::json *padded_sample, int64_t num_padded, | |||
| const std::shared_ptr<DatasetCache> &cache) { | |||
| auto sampler_obj = sampler ? sampler->Parse() : nullptr; | |||
| nlohmann::json sample = nullptr; | |||
| if (padded_sample) { | |||
| sample = *padded_sample; | |||
| } | |||
| auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, | |||
| sample, num_padded); | |||
| sample, num_padded, cache); | |||
| ir_node_ = std::static_pointer_cast<DatasetNode>(ds); | |||
| } | |||
| MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file, | |||
| const std::vector<std::vector<char>> &columns_list, | |||
| const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded) { | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) { | |||
| auto sampler_obj = sampler.get().Parse(); | |||
| nlohmann::json sample = nullptr; | |||
| if (padded_sample) { | |||
| @@ -1034,13 +1035,13 @@ MindDataDataset::MindDataDataset(const std::vector<char> &dataset_file, | |||
| } | |||
| auto ds = std::make_shared<MindDataNode>(CharToString(dataset_file), VectorCharToString(columns_list), sampler_obj, | |||
| sample, num_padded); | |||
| sample, num_padded, cache); | |||
| ir_node_ = std::static_pointer_cast<DatasetNode>(ds); | |||
| } | |||
| MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files, | |||
| const std::vector<std::vector<char>> &columns_list, | |||
| const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded) { | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) { | |||
| auto sampler_obj = sampler ? sampler->Parse() : nullptr; | |||
| nlohmann::json sample = nullptr; | |||
| if (padded_sample) { | |||
| @@ -1048,12 +1049,13 @@ MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_f | |||
| } | |||
| auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list), | |||
| sampler_obj, sample, num_padded); | |||
| sampler_obj, sample, num_padded, cache); | |||
| ir_node_ = std::static_pointer_cast<DatasetNode>(ds); | |||
| } | |||
| MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files, | |||
| const std::vector<std::vector<char>> &columns_list, const Sampler *sampler, | |||
| const nlohmann::json *padded_sample, int64_t num_padded) { | |||
| const nlohmann::json *padded_sample, int64_t num_padded, | |||
| const std::shared_ptr<DatasetCache> &cache) { | |||
| auto sampler_obj = sampler ? sampler->Parse() : nullptr; | |||
| nlohmann::json sample = nullptr; | |||
| if (padded_sample) { | |||
| @@ -1061,20 +1063,20 @@ MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_f | |||
| } | |||
| auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list), | |||
| sampler_obj, sample, num_padded); | |||
| sampler_obj, sample, num_padded, cache); | |||
| ir_node_ = std::static_pointer_cast<DatasetNode>(ds); | |||
| } | |||
| MindDataDataset::MindDataDataset(const std::vector<std::vector<char>> &dataset_files, | |||
| const std::vector<std::vector<char>> &columns_list, | |||
| const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded) { | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache) { | |||
| auto sampler_obj = sampler.get().Parse(); | |||
| nlohmann::json sample = nullptr; | |||
| if (padded_sample) { | |||
| sample = *padded_sample; | |||
| } | |||
| auto ds = std::make_shared<MindDataNode>(VectorCharToString(dataset_files), VectorCharToString(columns_list), | |||
| sampler_obj, sample, num_padded); | |||
| sampler_obj, sample, num_padded, cache); | |||
| ir_node_ = std::static_pointer_cast<DatasetNode>(ds); | |||
| } | |||
| #endif | |||
| @@ -177,9 +177,9 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) { | |||
| nlohmann::json padded_sample_json; | |||
| std::map<std::string, std::string> sample_bytes; | |||
| THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes)); | |||
| auto minddata = | |||
| std::make_shared<MindDataNode>(dataset_file, toStringVector(columns_list), | |||
| toSamplerObj(sampler, true), padded_sample_json, num_padded); | |||
| auto minddata = std::make_shared<MindDataNode>(dataset_file, toStringVector(columns_list), | |||
| toSamplerObj(sampler, true), padded_sample_json, | |||
| num_padded, nullptr); | |||
| minddata->SetSampleBytes(&sample_bytes); | |||
| THROW_IF_ERROR(minddata->ValidateParams()); | |||
| return minddata; | |||
| @@ -189,9 +189,9 @@ PYBIND_REGISTER(MindDataNode, 2, ([](const py::module *m) { | |||
| nlohmann::json padded_sample_json; | |||
| std::map<std::string, std::string> sample_bytes; | |||
| THROW_IF_ERROR(ToJson(padded_sample, &padded_sample_json, &sample_bytes)); | |||
| auto minddata = | |||
| std::make_shared<MindDataNode>(toStringVector(dataset_file), toStringVector(columns_list), | |||
| toSamplerObj(sampler, true), padded_sample_json, num_padded); | |||
| auto minddata = std::make_shared<MindDataNode>( | |||
| toStringVector(dataset_file), toStringVector(columns_list), toSamplerObj(sampler, true), | |||
| padded_sample_json, num_padded, nullptr); | |||
| minddata->SetSampleBytes(&sample_bytes); | |||
| THROW_IF_ERROR(minddata->ValidateParams()); | |||
| return minddata; | |||
| @@ -40,7 +40,7 @@ using mindrecord::ShardOperator; | |||
| using mindrecord::ShardReader; | |||
| // Builder constructor. Creates the builder object. | |||
| MindRecordOp::Builder::Builder() : build_dataset_file_({}) { | |||
| MindRecordOp::Builder::Builder() : build_dataset_file_({}), builder_sampler_(nullptr) { | |||
| // Some arguments to the MindRecordOp constructor have a default argument that is taken | |||
| // from the client config. | |||
| // The user may choose to change these values for the construction of the MindRecordOp by | |||
| @@ -69,11 +69,14 @@ Status MindRecordOp::Builder::Build(std::shared_ptr<MindRecordOp> *ptr) { | |||
| } | |||
| std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>(); | |||
| if (builder_sampler_ == nullptr) { | |||
| builder_sampler_ = std::make_shared<MindRecordSamplerRT>(shard_reader.get()); | |||
| } | |||
| new_mind_record_op = | |||
| std::make_shared<MindRecordOp>(build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, | |||
| build_op_connector_queue_size_, build_columns_to_load_, build_operators_, | |||
| build_num_padded_, sample_json, build_sample_bytes_, std::move(shard_reader)); | |||
| new_mind_record_op = std::make_shared<MindRecordOp>( | |||
| build_num_mind_record_workers_, build_dataset_file_, build_load_dataset_, build_op_connector_queue_size_, | |||
| build_columns_to_load_, build_operators_, build_num_padded_, sample_json, build_sample_bytes_, | |||
| std::move(shard_reader), builder_sampler_); | |||
| RETURN_IF_NOT_OK(new_mind_record_op->Init()); | |||
| *ptr = std::move(new_mind_record_op); | |||
| @@ -115,9 +118,8 @@ MindRecordOp::MindRecordOp(int32_t num_mind_record_workers, std::vector<std::str | |||
| int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load, | |||
| const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded, | |||
| const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes, | |||
| std::unique_ptr<ShardReader> shard_reader) | |||
| : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, | |||
| std::make_shared<MindRecordSamplerRT>(shard_reader.get())), | |||
| std::unique_ptr<ShardReader> shard_reader, std::shared_ptr<SamplerRT> sampler) | |||
| : MappableLeafOp(num_mind_record_workers, op_connector_queue_size, std::move(sampler)), | |||
| dataset_file_(dataset_file), | |||
| load_dataset_(load_dataset), | |||
| columns_to_load_(columns_to_load), | |||
| @@ -275,6 +277,7 @@ Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, int64_t row_id, in | |||
| RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, {}, mindrecord::json(), task_type)); | |||
| std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]); | |||
| fetched_row->setPath(file_path); | |||
| fetched_row->setId(row_id); | |||
| } | |||
| if (tupled_buffer.empty()) return Status::OK(); | |||
| if (task_type == mindrecord::TaskType::kCommonTask) { | |||
| @@ -284,6 +287,7 @@ Status MindRecordOp::GetRowFromReader(TensorRow *fetched_row, int64_t row_id, in | |||
| RETURN_IF_NOT_OK(LoadTensorRow(fetched_row, columns_blob, columns_json, task_type)); | |||
| std::vector<std::string> file_path(fetched_row->size(), dataset_file_[0]); | |||
| fetched_row->setPath(file_path); | |||
| fetched_row->setId(row_id); | |||
| } | |||
| } | |||
| @@ -25,6 +25,7 @@ | |||
| #include <tuple> | |||
| #include <unordered_map> | |||
| #include <unordered_set> | |||
| #include <utility> | |||
| #include <vector> | |||
| #include "minddata/dataset/engine/data_schema.h" | |||
| @@ -107,6 +108,14 @@ class MindRecordOp : public MappableLeafOp { | |||
| return *this; | |||
| } | |||
| // Setter method | |||
| // @param std::shared_ptr<Sampler> sampler | |||
| // @return Builder setter method returns reference to the builder. | |||
| Builder &SetSampler(std::shared_ptr<SamplerRT> sampler) { | |||
| builder_sampler_ = std::move(sampler); | |||
| return *this; | |||
| } | |||
| Status SanityCheck() const; | |||
| static int32_t num_mind_record_workers() { return kDefaultMindRecordWorkers; } | |||
| @@ -128,6 +137,7 @@ class MindRecordOp : public MappableLeafOp { | |||
| int64_t build_num_padded_; | |||
| py::handle build_sample_; | |||
| std::map<std::string, std::string> build_sample_bytes_; | |||
| std::shared_ptr<SamplerRT> builder_sampler_; | |||
| }; | |||
| // Constructor of the MindRecordOp. | |||
| @@ -137,11 +147,12 @@ class MindRecordOp : public MappableLeafOp { | |||
| // @param op_connector_queue_size - The output connector queue size | |||
| // @param columns_to_load - The list of columns to use (column name) | |||
| // @param operators - ShardOperators for Shuffle, Category, Sample | |||
| // @param sampler - sampler tells MindRecordOp what to read | |||
| MindRecordOp(int32_t num_mind_record_workers, std::vector<std::string> dataset_file, bool load_dataset, | |||
| int32_t op_connector_queue_size, const std::vector<std::string> &columns_to_load, | |||
| const std::vector<std::shared_ptr<ShardOperator>> &operators, int64_t num_padded_, | |||
| const mindrecord::json &sample_json, const std::map<std::string, std::string> &sample_bytes_, | |||
| std::unique_ptr<ShardReader> shard_reader); | |||
| std::unique_ptr<ShardReader> shard_reader, std::shared_ptr<SamplerRT> sampler); | |||
| // Destructor | |||
| ~MindRecordOp() override; | |||
| @@ -62,6 +62,8 @@ Status MindRecordSamplerRT::InitSampler() { | |||
| Status MindRecordSamplerRT::ResetSampler() { | |||
| // drive the shard reader reshuffle tasks to redo the sampling for another epoch | |||
| // Note that when cache is attached, this function is driven by cache lookup op rather than mindrecord op. | |||
| // Therefore, the reshuffle of tasks might happen in the middle of mindrecord's epoch | |||
| next_id_ = 0; | |||
| shard_reader_->ShuffleTask(); | |||
| return Status::OK(); | |||
| @@ -76,6 +76,10 @@ class CacheLookupNode : public DatasetNode, public SamplerObj { | |||
| /// \return Status of the node visit | |||
| Status AcceptAfter(IRNodePass *const p, bool *const modified) override; | |||
| /// \brief Sampler getter | |||
| /// \return SamplerObj of the current node | |||
| std::shared_ptr<SamplerObj> Sampler() { return sampler_; } | |||
| private: | |||
| std::shared_ptr<SamplerObj> sampler_; | |||
| std::shared_ptr<DatasetOp> lookup_op_; | |||
| @@ -237,7 +237,7 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| /// \return True if this node is not a data source node | |||
| const bool IsNotADataSource() const { return (mappable_ == kNotADataSource); } | |||
| /// \brief Check if this node is a descendant of an operator with cache. Currently used in leaf nodes | |||
| /// \brief Check if this node is a descendant of an operator with cache. | |||
| /// \return True if a cache-enabled operator is an ancestor of this node | |||
| const bool IsDescendantOfCache() const { return descendant_of_cache_; } | |||
| @@ -247,7 +247,7 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| return node != nullptr && node->parent_ == nullptr && node->Children().empty(); | |||
| } | |||
| /// \brief Mark to indicate this node is a descendant of an operator with cache. Currently used in leaf nodes | |||
| /// \brief Mark to indicate this node is a descendant of an operator with cache. | |||
| void HasCacheAbove() { descendant_of_cache_ = true; } | |||
| /// \brief Getter of the number of workers | |||
| @@ -335,7 +335,8 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| enum DataSource { kNotADataSource = 0, kNonMappableSource = 1, kMappableSource = 2 }; | |||
| enum DataSource mappable_; | |||
| bool nary_op_; // an indicator of whether the current node supports multiple children, true for concat/zip node | |||
| bool descendant_of_cache_; | |||
| bool descendant_of_cache_; // an indicator of whether the current node is a descendant of cache. | |||
| // Initially set to false, will set to true by the optimizer when conditions are met. | |||
| }; | |||
| // MappableSourceNode represents the leaf nodes that can be randomly accessed with indexes. | |||
| @@ -38,8 +38,7 @@ MapNode::MapNode(std::shared_ptr<DatasetNode> child, std::vector<std::shared_ptr | |||
| output_columns_(output_columns), | |||
| project_columns_(project_columns), | |||
| DatasetNode(std::move(cache)), | |||
| callbacks_(callbacks), | |||
| under_a_cache_(false) { | |||
| callbacks_(callbacks) { | |||
| this->AddChild(child); | |||
| } | |||
| @@ -68,9 +67,9 @@ Status MapNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) { | |||
| // This is temporary code. | |||
| // Because the randomness of its tensor operations is not known in TensorOperation form until we convert them | |||
| // to TensorOp, we need to check the randomness here. | |||
| // When TensorOperation captures the randomness behaviour, remove this code and the member "under_a_cache_" | |||
| // When TensorOperation captures the randomness behaviour, remove this code | |||
| // and the temporary code in CacheValidation pre pass in IR optimizer. | |||
| if (under_a_cache_) { | |||
| if (IsDescendantOfCache()) { | |||
| auto itr = std::find_if(tensor_ops.begin(), tensor_ops.end(), [](const auto &it) { return !it->Deterministic(); }); | |||
| if (itr != tensor_ops.end()) { | |||
| RETURN_STATUS_UNEXPECTED("MapNode containing random operation is not supported as a descendant of cache."); | |||
| @@ -79,9 +79,6 @@ class MapNode : public DatasetNode { | |||
| /// \brief setter to set all tensor operations | |||
| void setOperations(const std::vector<std::shared_ptr<TensorOperation>> &operations); | |||
| /// \brief indicate this Map will be cached | |||
| void Cached() { under_a_cache_ = true; } | |||
| /// \brief Getter functions | |||
| /// \brief Getter of tensor operations | |||
| /// \return Vector of operations the Map node will process | |||
| @@ -102,7 +99,6 @@ class MapNode : public DatasetNode { | |||
| std::vector<std::string> output_columns_; | |||
| std::vector<std::string> project_columns_; | |||
| std::vector<std::shared_ptr<DSCallback>> callbacks_; | |||
| bool under_a_cache_; | |||
| }; | |||
| } // namespace dataset | |||
| @@ -181,7 +181,11 @@ Status CLUENode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) | |||
| RETURN_IF_NOT_OK(clue_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { | |||
| // If a global shuffle is used for Clue, it will inject a shuffle op over the Clue. | |||
| // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. | |||
| // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset Clue's shuffle | |||
| // option to false. | |||
| if (shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| std::shared_ptr<DatasetOp> shuffle_op = nullptr; | |||
| int64_t num_rows = 0; | |||
| @@ -119,7 +119,11 @@ Status CSVNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) { | |||
| RETURN_IF_NOT_OK(csv_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { | |||
| // If a global shuffle is used for CSV, it will inject a shuffle op over the CSV. | |||
| // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. | |||
| // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset CSV's shuffle | |||
| // option to false. | |||
| if (shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| std::shared_ptr<DatasetOp> shuffle_op = nullptr; | |||
| int64_t num_rows = 0; | |||
| @@ -20,10 +20,13 @@ | |||
| #include <memory> | |||
| #include <stack> | |||
| #include <string> | |||
| #include <utility> | |||
| #include <vector> | |||
| #include "minddata/dataset/engine/datasetops/source/mindrecord_op.h" | |||
| #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h" | |||
| #include "minddata/dataset/engine/ir/datasetops/cache_lookup_node.h" | |||
| #include "minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h" | |||
| #include "minddata/dataset/engine/opt/pass.h" | |||
| #include "minddata/dataset/util/status.h" | |||
| @@ -31,36 +34,40 @@ namespace mindspore { | |||
| namespace dataset { | |||
| MindDataNode::MindDataNode(const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list, | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded) | |||
| : MappableSourceNode(), | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded, | |||
| std::shared_ptr<DatasetCache> cache = nullptr) | |||
| : MappableSourceNode(std::move(cache)), | |||
| dataset_file_(std::string()), | |||
| dataset_files_(dataset_files), | |||
| search_for_pattern_(false), | |||
| columns_list_(columns_list), | |||
| sampler_(sampler), | |||
| input_sampler_(sampler), | |||
| sampler_(std::make_shared<MindRecordSamplerObj>()), | |||
| padded_sample_(padded_sample), | |||
| sample_bytes_({}), | |||
| num_padded_(num_padded) {} | |||
| MindDataNode::MindDataNode(const std::string &dataset_file, const std::vector<std::string> &columns_list, | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded) | |||
| : MappableSourceNode(), | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded, | |||
| std::shared_ptr<DatasetCache> cache = nullptr) | |||
| : MappableSourceNode(std::move(cache)), | |||
| dataset_file_(dataset_file), | |||
| dataset_files_({}), | |||
| search_for_pattern_(true), | |||
| columns_list_(columns_list), | |||
| sampler_(sampler), | |||
| input_sampler_(sampler), | |||
| sampler_(std::make_shared<MindRecordSamplerObj>()), | |||
| padded_sample_(padded_sample), | |||
| sample_bytes_({}), | |||
| num_padded_(num_padded) {} | |||
| std::shared_ptr<DatasetNode> MindDataNode::Copy() { | |||
| std::shared_ptr<MindDataNode> node; | |||
| std::shared_ptr<SamplerObj> sampler = (sampler_ == nullptr) ? nullptr : sampler_->SamplerCopy(); | |||
| std::shared_ptr<SamplerObj> sampler = (input_sampler_ == nullptr) ? nullptr : input_sampler_->SamplerCopy(); | |||
| if (dataset_files_.empty()) { | |||
| node = std::make_shared<MindDataNode>(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_); | |||
| node = std::make_shared<MindDataNode>(dataset_file_, columns_list_, sampler, padded_sample_, num_padded_, cache_); | |||
| } else { | |||
| node = std::make_shared<MindDataNode>(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_); | |||
| node = std::make_shared<MindDataNode>(dataset_files_, columns_list_, sampler, padded_sample_, num_padded_, cache_); | |||
| } | |||
| node->SetSampleBytes(&sample_bytes_); | |||
| return node; | |||
| @@ -82,7 +89,7 @@ Status MindDataNode::ValidateParams() { | |||
| search_for_pattern_ ? std::vector<std::string>{dataset_file_} : dataset_files_; | |||
| RETURN_IF_NOT_OK(ValidateDatasetFilesParam("MindDataNode", dataset_file_vec)); | |||
| RETURN_IF_NOT_OK(ValidateDatasetSampler("MindDataNode", sampler_)); | |||
| RETURN_IF_NOT_OK(ValidateDatasetSampler("MindDataNode", input_sampler_)); | |||
| if (!columns_list_.empty()) { | |||
| RETURN_IF_NOT_OK(ValidateDatasetColumnParam("MindDataNode", "columns_list", columns_list_)); | |||
| @@ -153,22 +160,46 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerO | |||
| void MindDataNode::SetSampleBytes(std::map<std::string, std::string> *sample_bytes) { sample_bytes_ = *sample_bytes; } | |||
| Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_ops) { | |||
| RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators_, num_padded_)); | |||
| RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators_, num_padded_)); | |||
| std::shared_ptr<SamplerRT> sampler_rt = nullptr; | |||
| // Build the sampler IR into a runtime sampler. | |||
| // This will also create a shard reader object, saved in this node's sampler_. | |||
| RETURN_IF_NOT_OK(sampler_->SamplerBuild(&sampler_rt)); | |||
| // Now we need to acquire the newly created shard reader from this node's sampler_. | |||
| // There are two cases: | |||
| // 1. If this node is cached, now after cache transform pass, its sampler_ has already been replaced by cache lookup | |||
| // node, and we should find the shard reader from cache lookup node's sampler_. | |||
| // 2. If this node is not cached, just acquire the shard reader from this node's sampler_. | |||
| std::unique_ptr<ShardReader> shard_reader; | |||
| if (IsDescendantOfCache()) { | |||
| auto cache_lookup_sampler = std::dynamic_pointer_cast<CacheLookupNode>(sampler_); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(cache_lookup_sampler != nullptr, | |||
| "Internal error. MindDataNode is cached, its sampler should be cache lookup node"); | |||
| auto mr_sampler = std::dynamic_pointer_cast<MindRecordSamplerObj>(cache_lookup_sampler->Sampler()); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(mr_sampler != nullptr, | |||
| "Internal error. CacheLookupNode's sampler should be a MindRecordSamplerObj object"); | |||
| RETURN_IF_NOT_OK(mr_sampler->GetShardReader(&shard_reader)); | |||
| } else { | |||
| auto mr_sampler = std::dynamic_pointer_cast<MindRecordSamplerObj>(sampler_); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(mr_sampler != nullptr, | |||
| "Internal error. MindDataNode's sampler should be a MindRecordSamplerObj object"); | |||
| RETURN_IF_NOT_OK(mr_sampler->GetShardReader(&shard_reader)); | |||
| } | |||
| std::shared_ptr<MindRecordOp> mindrecord_op; | |||
| std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>(); | |||
| // If pass a string to MindData(), it will be treated as a pattern to search for matched files, | |||
| // else if pass a vector to MindData(), it will be treated as specified files to be read | |||
| if (search_for_pattern_) { | |||
| std::vector<std::string> dataset_file_vec_ = {dataset_file_}; | |||
| mindrecord_op = std::make_shared<MindRecordOp>(num_workers_, dataset_file_vec_, search_for_pattern_, | |||
| connector_que_size_, columns_list_, operators_, num_padded_, | |||
| padded_sample_, sample_bytes_, std::move(shard_reader)); | |||
| mindrecord_op = std::make_shared<MindRecordOp>( | |||
| num_workers_, dataset_file_vec_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_, | |||
| padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt)); | |||
| } else { | |||
| mindrecord_op = std::make_shared<MindRecordOp>(num_workers_, dataset_files_, search_for_pattern_, | |||
| connector_que_size_, columns_list_, operators_, num_padded_, | |||
| padded_sample_, sample_bytes_, std::move(shard_reader)); | |||
| mindrecord_op = std::make_shared<MindRecordOp>( | |||
| num_workers_, dataset_files_, search_for_pattern_, connector_que_size_, columns_list_, operators_, num_padded_, | |||
| padded_sample_, sample_bytes_, std::move(shard_reader), std::move(sampler_rt)); | |||
| } | |||
| RETURN_IF_NOT_OK(mindrecord_op->Init()); | |||
| @@ -181,7 +212,7 @@ Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o | |||
| // Get the shard id of node | |||
| Status MindDataNode::GetShardId(int32_t *shard_id) { | |||
| *shard_id = sampler_->ShardId(); | |||
| *shard_id = input_sampler_->ShardId(); | |||
| return Status::OK(); | |||
| } | |||
| @@ -195,7 +226,7 @@ Status MindDataNode::GetDatasetSize(const std::shared_ptr<DatasetSizeGetter> &si | |||
| } | |||
| int64_t num_rows = -1; | |||
| std::vector<std::shared_ptr<ShardOperator>> operators; | |||
| RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators, num_padded_)); | |||
| RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(input_sampler_, &operators, num_padded_)); | |||
| if (search_for_pattern_) { | |||
| dataset_files_ = {dataset_file_}; | |||
| @@ -32,11 +32,13 @@ class MindDataNode : public MappableSourceNode { | |||
| public: | |||
| /// \brief Constructor | |||
| MindDataNode(const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list, | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded); | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded, | |||
| std::shared_ptr<DatasetCache> cache); | |||
| /// \brief Constructor | |||
| MindDataNode(const std::string &dataset_file, const std::vector<std::string> &columns_list, | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded); | |||
| const std::shared_ptr<SamplerObj> &sampler, nlohmann::json padded_sample, int64_t num_padded, | |||
| std::shared_ptr<DatasetCache> cache); | |||
| /// \brief Destructor | |||
| ~MindDataNode() = default; | |||
| @@ -109,7 +111,9 @@ class MindDataNode : public MappableSourceNode { | |||
| std::vector<std::string> dataset_files_; // search_for_pattern_ will be false in this mode | |||
| bool search_for_pattern_; | |||
| std::vector<std::string> columns_list_; | |||
| std::shared_ptr<SamplerObj> sampler_; | |||
| std::shared_ptr<SamplerObj> input_sampler_; // The sampler from users input, will be used to create a set of shard | |||
| // operators. | |||
| std::shared_ptr<SamplerObj> sampler_; // An auto-created sampler, IR of runtime MindRecordSamplerRT sampler | |||
| nlohmann::json padded_sample_; | |||
| std::map<std::string, std::string> sample_bytes_; // enable in python | |||
| int64_t num_padded_; | |||
| @@ -11,6 +11,7 @@ set(DATASET_ENGINE_IR_DATASETOPS_SOURCE_SAMPLERS_SRC_FILES | |||
| subset_random_sampler_ir.cc | |||
| subset_sampler_ir.cc | |||
| weighted_random_sampler_ir.cc | |||
| mindrecord_sampler_ir.cc | |||
| ) | |||
| add_library(engine-ir-datasetops-source-samplers OBJECT ${DATASET_ENGINE_IR_DATASETOPS_SOURCE_SAMPLERS_SRC_FILES}) | |||
| @@ -0,0 +1,56 @@ | |||
| /** | |||
| * Copyright 2021 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include "minddata/dataset/engine/ir/datasetops/source/samplers/mindrecord_sampler_ir.h" | |||
| #include <memory> | |||
| #include <utility> | |||
| #ifndef ENABLE_ANDROID | |||
| #include "minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.h" | |||
| #include "minddata/mindrecord/include/shard_reader.h" | |||
| #endif | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| #ifndef ENABLE_ANDROID | |||
| // This function not only creates a runtime sampler object, but also creates a ShardReader, | |||
| // which will also be needed to build a runtime MindRecordOp | |||
| // (cannot add another output parameter because it has to override base class's function) | |||
| Status MindRecordSamplerObj::SamplerBuild(std::shared_ptr<SamplerRT> *sampler) { | |||
| shard_reader_ = std::make_unique<mindrecord::ShardReader>(); | |||
| *sampler = std::make_shared<MindRecordSamplerRT>(shard_reader_.get()); | |||
| return Status::OK(); | |||
| } | |||
| std::shared_ptr<SamplerObj> MindRecordSamplerObj::SamplerCopy() { | |||
| auto sampler = std::make_shared<MindRecordSamplerObj>(); | |||
| return sampler; | |||
| } | |||
| // Function to acquire the unique pointer of the newly created ShardReader object | |||
| // Note this function can only be called after SamplerBuild is finished, and can only be called once. Otherwise this | |||
| // function will return error status. | |||
| Status MindRecordSamplerObj::GetShardReader(std::unique_ptr<mindrecord::ShardReader> *shard_reader) { | |||
| CHECK_FAIL_RETURN_UNEXPECTED(shard_reader_ != nullptr, "Internal error. Attempt to get an empty shard reader."); | |||
| *shard_reader = std::move(shard_reader_); | |||
| return Status::OK(); | |||
| } | |||
| #endif | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,67 @@ | |||
| /** | |||
| * Copyright 2021 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_MINDRECORD_SAMPLER_IR_H | |||
| #define MINDSPORE_MINDRECORD_SAMPLER_IR_H | |||
| #include <memory> | |||
| #include "minddata/dataset/engine/ir/datasetops/source/samplers/samplers_ir.h" | |||
| #include "include/api/status.h" | |||
| #ifndef ENABLE_ANDROID | |||
| #include "minddata/mindrecord/include/shard_reader.h" | |||
| #endif | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| #ifndef ENABLE_ANDROID | |||
| class MindRecordSamplerObj : public SamplerObj { | |||
| public: | |||
| /// \brief Constructor | |||
| MindRecordSamplerObj() : shard_reader_(nullptr) {} | |||
| /// \brief Destructor | |||
| ~MindRecordSamplerObj() = default; | |||
| /// \brief Convert a MindRecordSamplerObj into a runtime MindRecordSamplerRT object | |||
| /// Note that this function not only creates a runtime sampler object, but also creates a ShardReader, | |||
| /// which will also be needed to build a runtime MindRecordOp | |||
| /// \param[out] sampler Shared pointer to the newly created runtime sampler | |||
| /// \return The Status code of the function. It returns OK status if sampler is created successfully. | |||
| Status SamplerBuild(std::shared_ptr<SamplerRT> *sampler) override; | |||
| /// \brief Function to copy a MindRecordSamplerObj | |||
| /// \return Shared pointer to the newly created SamplerObj | |||
| std::shared_ptr<SamplerObj> SamplerCopy() override; | |||
| /// \brief Function for parameter check. This class requires no input parameter. | |||
| /// \return The Status code of the function. This function always return OK status. | |||
| Status ValidateParams() override { return Status::OK(); } | |||
| /// \brief Function to acquire the unique pointer of the newly created ShardReader object | |||
| /// Note that this function can only be called after SamplerBuild is called, and can only be called once | |||
| /// \param shard_reader Unique pointer to the newly created ShardReader object | |||
| /// \return The Status code of the function. It returns OK status if acquired a non-empty ShardReader object. | |||
| Status GetShardReader(std::unique_ptr<mindrecord::ShardReader> *shard_reader); | |||
| private: | |||
| std::unique_ptr<mindrecord::ShardReader> shard_reader_; | |||
| }; | |||
| #endif | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // MINDSPORE_MINDRECORD_SAMPLER_IR_H | |||
| @@ -87,7 +87,11 @@ Status TextFileNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o | |||
| sorted_dataset_files, connector_que_size_, shuffle_files, num_shards_, shard_id_); | |||
| RETURN_IF_NOT_OK(text_file_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { | |||
| // If a global shuffle is used for TextFile, it will inject a shuffle op over the TextFile. | |||
| // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. | |||
| // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset TextFile's shuffle | |||
| // option to false. | |||
| if (shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| std::shared_ptr<DatasetOp> shuffle_op = nullptr; | |||
| int64_t num_rows = 0; | |||
| @@ -129,7 +129,11 @@ Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *const node_o | |||
| RETURN_IF_NOT_OK(tf_reader_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal && !IsDescendantOfCache()) { | |||
| // If a global shuffle is used for TFRecord, it will inject a shuffle op over the TFRecord. | |||
| // But, if there is a cache in the tree, we do not need the global shuffle and the shuffle op should not be built. | |||
| // This is achieved in the cache transform pass where we call MakeSimpleProducer to reset TFRecord's shuffle | |||
| // option to false. | |||
| if (shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| std::shared_ptr<DatasetOp> shuffle_op = nullptr; | |||
| @@ -94,6 +94,9 @@ Status CacheTransformPass::CachePass::Visit(std::shared_ptr<NonMappableSourceNod | |||
| } | |||
| #endif | |||
| // Almost the same with NonMappableSourceNode's Visit, only this one is not guarded by the compiler | |||
| // directive #ifndef ENABLE_ANDROID, also and there is no need to call MakeSimpleProducer() because | |||
| // RandomOp doesn't support sampling or sharding | |||
| Status CacheTransformPass::CachePass::Visit(std::shared_ptr<RandomNode> node, bool *const modified) { | |||
| if (node->IsCached()) { | |||
| MS_LOG(INFO) << "Cache transform pass: CacheOp found, identified descendant tree."; | |||
| @@ -137,11 +140,25 @@ Status CacheTransformPass::CachePass::Visit(std::shared_ptr<MappableSourceNode> | |||
| } | |||
| #ifndef ENABLE_ANDROID | |||
| // Perform leaf node cache transform identification | |||
| // Almost the same with MappableSourceNode's Visit, only in this one we also marked this node's descendant_of_cache_ | |||
| // field to true. Later when building, MindDataNode will take different actions based on this information. | |||
| Status CacheTransformPass::CachePass::Visit(std::shared_ptr<MindDataNode> node, bool *const modified) { | |||
| if (node->IsCached() || is_caching_) { | |||
| return Status(StatusCode::kMDNotImplementedYet, __LINE__, __FILE__, | |||
| "There is currently no support for MindRecordOp under cache."); | |||
| if (node->IsCached()) { | |||
| MS_LOG(INFO) << "Cache transform pass: CacheOp found, identified descendant tree."; | |||
| is_caching_ = true; | |||
| } | |||
| // Cache might also be injected to the non-leaf node upper in the tree, so is_caching_ might also be set to true | |||
| // by the other Visit() with DatasetNode argument | |||
| if (is_caching_) { | |||
| node->HasCacheAbove(); | |||
| MS_LOG(DEBUG) << "Cache transform pass: Mappable leaf in a cache descendant tree detected"; | |||
| // If a leaf has already been assigned, then we have more than one leaf inside this cache descendant tree. | |||
| if (leaf_node_) { | |||
| return Status(StatusCode::kMDNotImplementedYet, __LINE__, __FILE__, | |||
| "There is currently no support for multiple leaf nodes under cache."); | |||
| } | |||
| // If we are a leaf in the caching path, then save this leaf | |||
| leaf_node_ = node; | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| @@ -121,7 +121,7 @@ Status CacheValidationPass::Visit(std::shared_ptr<MapNode> node, bool *const mod | |||
| // to TensorOp, we need to check the randomness in MapNode::Build(). | |||
| // By setting this MapNode is under a cache, we will check the randomness of its tensor operations without the need | |||
| // to walk the IR tree again. | |||
| node->Cached(); | |||
| node->HasCacheAbove(); | |||
| auto tfuncs = node->TensorOperations(); | |||
| for (size_t i = 0; i < tfuncs.size(); i++) { | |||
| @@ -1247,22 +1247,25 @@ class MindDataDataset : public Dataset { | |||
| public: | |||
| explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list, | |||
| const std::shared_ptr<Sampler> &sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded); | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache); | |||
| explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list, | |||
| const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded); | |||
| const Sampler *sampler, const nlohmann::json *padded_sample, int64_t num_padded, | |||
| const std::shared_ptr<DatasetCache> &cache); | |||
| explicit MindDataDataset(const std::vector<char> &dataset_file, const std::vector<std::vector<char>> &columns_list, | |||
| const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded); | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache); | |||
| explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files, | |||
| const std::vector<std::vector<char>> &columns_list, const std::shared_ptr<Sampler> &sampler, | |||
| const nlohmann::json *padded_sample, int64_t num_padded); | |||
| const nlohmann::json *padded_sample, int64_t num_padded, | |||
| const std::shared_ptr<DatasetCache> &cache); | |||
| explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files, | |||
| const std::vector<std::vector<char>> &columns_list, const Sampler *sampler, | |||
| const nlohmann::json *padded_sample, int64_t num_padded); | |||
| const nlohmann::json *padded_sample, int64_t num_padded, | |||
| const std::shared_ptr<DatasetCache> &cache); | |||
| explicit MindDataDataset(const std::vector<std::vector<char>> &dataset_files, | |||
| const std::vector<std::vector<char>> &columns_list, | |||
| const std::reference_wrapper<Sampler> sampler, const nlohmann::json *padded_sample, | |||
| int64_t num_padded); | |||
| int64_t num_padded, const std::shared_ptr<DatasetCache> &cache); | |||
| ~MindDataDataset() = default; | |||
| }; | |||
| @@ -1276,13 +1279,14 @@ class MindDataDataset : public Dataset { | |||
| /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. | |||
| /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. | |||
| /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. | |||
| /// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). | |||
| /// \return Shared pointer to the current MindDataDataset | |||
| inline std::shared_ptr<MindDataDataset> MindData( | |||
| const std::string &dataset_file, const std::vector<std::string> &columns_list = {}, | |||
| const std::shared_ptr<Sampler> &sampler = std::make_shared<RandomSampler>(), nlohmann::json *padded_sample = nullptr, | |||
| int64_t num_padded = 0) { | |||
| int64_t num_padded = 0, const std::shared_ptr<DatasetCache> &cache = nullptr) { | |||
| return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, | |||
| padded_sample, num_padded); | |||
| padded_sample, num_padded, cache); | |||
| } | |||
| /// \brief Function to create a MindDataDataset | |||
| @@ -1293,12 +1297,14 @@ inline std::shared_ptr<MindDataDataset> MindData( | |||
| /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. | |||
| /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. | |||
| /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. | |||
| /// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). | |||
| /// \return Shared pointer to the current MindDataDataset | |||
| inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file, | |||
| const std::vector<std::string> &columns_list, const Sampler *sampler, | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, | |||
| const std::shared_ptr<DatasetCache> &cache = nullptr) { | |||
| return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, | |||
| padded_sample, num_padded); | |||
| padded_sample, num_padded, cache); | |||
| } | |||
| /// \brief Function to create a MindDataDataset | |||
| @@ -1309,13 +1315,15 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file | |||
| /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. | |||
| /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. | |||
| /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. | |||
| /// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). | |||
| /// \return Shared pointer to the current MindDataDataset | |||
| inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file, | |||
| const std::vector<std::string> &columns_list, | |||
| const std::reference_wrapper<Sampler> sampler, | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, | |||
| const std::shared_ptr<DatasetCache> &cache = nullptr) { | |||
| return std::make_shared<MindDataDataset>(StringToChar(dataset_file), VectorStringToChar(columns_list), sampler, | |||
| padded_sample, num_padded); | |||
| padded_sample, num_padded, cache); | |||
| } | |||
| /// \brief Function to create a MindDataDataset | |||
| @@ -1327,13 +1335,14 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::string &dataset_file | |||
| /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. | |||
| /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. | |||
| /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. | |||
| /// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). | |||
| /// \return Shared pointer to the current MindDataDataset | |||
| inline std::shared_ptr<MindDataDataset> MindData( | |||
| const std::vector<std::string> &dataset_files, const std::vector<std::string> &columns_list = {}, | |||
| const std::shared_ptr<Sampler> &sampler = std::make_shared<RandomSampler>(), nlohmann::json *padded_sample = nullptr, | |||
| int64_t num_padded = 0) { | |||
| int64_t num_padded = 0, const std::shared_ptr<DatasetCache> &cache = nullptr) { | |||
| return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, | |||
| padded_sample, num_padded); | |||
| padded_sample, num_padded, cache); | |||
| } | |||
| /// \brief Function to create a MindDataDataset | |||
| @@ -1343,12 +1352,14 @@ inline std::shared_ptr<MindDataDataset> MindData( | |||
| /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. | |||
| /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. | |||
| /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. | |||
| /// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). | |||
| /// \return Shared pointer to the current MindDataDataset | |||
| inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string> &dataset_files, | |||
| const std::vector<std::string> &columns_list, const Sampler *sampler, | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, | |||
| const std::shared_ptr<DatasetCache> &cache = nullptr) { | |||
| return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, | |||
| padded_sample, num_padded); | |||
| padded_sample, num_padded, cache); | |||
| } | |||
| /// \brief Function to create a MindDataDataset | |||
| @@ -1358,13 +1369,15 @@ inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string> | |||
| /// supported sampler list: SubsetRandomSampler, PkSampler, RandomSampler, SequentialSampler, DistributedSampler. | |||
| /// \param[in] padded_sample Samples will be appended to dataset, where keys are the same as column_list. | |||
| /// \param[in] num_padded Number of padding samples. Dataset size plus num_padded should be divisible by num_shards. | |||
| /// \param[in] cache Tensor cache to use. (default=nullptr which means no cache is used). | |||
| /// \return Shared pointer to the current MindDataDataset | |||
| inline std::shared_ptr<MindDataDataset> MindData(const std::vector<std::string> &dataset_files, | |||
| const std::vector<std::string> &columns_list, | |||
| const std::reference_wrapper<Sampler> sampler, | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0) { | |||
| nlohmann::json *padded_sample = nullptr, int64_t num_padded = 0, | |||
| const std::shared_ptr<DatasetCache> &cache = nullptr) { | |||
| return std::make_shared<MindDataDataset>(VectorStringToChar(dataset_files), VectorStringToChar(columns_list), sampler, | |||
| padded_sample, num_padded); | |||
| padded_sample, num_padded, cache); | |||
| } | |||
| class MnistDataset : public Dataset { | |||
| @@ -63,10 +63,12 @@ void ShardTaskList::TaskListSwap(ShardTaskList &orig_tasks, ShardTaskList &new_t | |||
| // When swapping, if the orig_tasks contains fields that need to be preserved after the swap, then swapping with a | |||
| // new_tasks that does not have those fields will result in clobbering/losing the data after the swap. | |||
| // The task_list_ should not be lost/clobbered. | |||
| new_tasks.task_list_ = std::move(orig_tasks.task_list_); | |||
| // This function can be called in the middle of mindrecord's epoch, when orig_tasks.task_list_ is still being | |||
| // used by mindrecord op's worker threads. So don't touch its task_list_ since this field should be preserved anyways. | |||
| // Now, it's safe to drive the swap. | |||
| std::swap(orig_tasks, new_tasks); | |||
| std::swap(orig_tasks.categories, new_tasks.categories); | |||
| std::swap(orig_tasks.permutation_, new_tasks.permutation_); | |||
| std::swap(orig_tasks.sample_ids_, new_tasks.sample_ids_); | |||
| } | |||
| void ShardTaskList::PopBack() { task_list_.pop_back(); } | |||
| @@ -3052,6 +3052,8 @@ class MindDataset(MappableDataset): | |||
| plus num_padded should be divisible by num_shards. | |||
| num_samples (int, optional): The number of samples to be included in the dataset | |||
| (default=None, all samples). | |||
| cache (DatasetCache, optional): Use tensor caching service to speed up dataset processing. | |||
| (default=None, which means no cache is used). | |||
| Raises: | |||
| ValueError: If num_shards is specified but shard_id is None. | |||
| @@ -3068,9 +3070,9 @@ class MindDataset(MappableDataset): | |||
| @check_minddataset | |||
| def __init__(self, dataset_file, columns_list=None, num_parallel_workers=None, shuffle=None, num_shards=None, | |||
| shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None): | |||
| shard_id=None, sampler=None, padded_sample=None, num_padded=None, num_samples=None, cache=None): | |||
| super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples, | |||
| shuffle=shuffle, num_shards=num_shards, shard_id=shard_id) | |||
| shuffle=shuffle, num_shards=num_shards, shard_id=shard_id, cache=cache) | |||
| if isinstance(dataset_file, list): | |||
| self.load_dataset = False | |||
| else: | |||
| @@ -32,7 +32,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiSamplerNull) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true, "127.0.0.1", 50053, 1, 1); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false, "127.0.0.1", 50053, 1, 1); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create an ImageFolder Dataset, this folder_path only has 2 images in it | |||
| @@ -52,7 +52,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCApiNestedCache) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create an ImageFolder Dataset, this folder_path only has 2 images in it | |||
| @@ -80,7 +80,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheImageFolderCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create an ImageFolder Dataset, this folder_path only has 2 images in it | |||
| @@ -121,7 +121,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCocoCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a Coco Dataset, this folder_path has 6 images in it | |||
| @@ -164,7 +164,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMnistCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a Mnist Dataset | |||
| @@ -205,7 +205,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCelebaCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a CelebA Dataset, this folder_path has 4 records in it | |||
| @@ -247,7 +247,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheManifestCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a Manifest Dataset, this file_path has 2 records in it | |||
| @@ -288,7 +288,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCifar10CApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a Cifar10 Dataset | |||
| @@ -329,7 +329,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCifar100CApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a Cifar100 Dataset | |||
| @@ -370,7 +370,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheVocCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a VOC Dataset, this folder_path has 9 records in it | |||
| @@ -412,7 +412,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| std::string folder_path = datasets_root_path_ + "/testAlbum/images"; | |||
| @@ -449,12 +449,50 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheAlbumCApi) { | |||
| iter->Stop(); | |||
| } | |||
| TEST_F(MindDataTestCacheOp, DISABLED_TestCacheMindRecordCApi) { | |||
| session_id_type env_session; | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a MindData Dataset | |||
| // Pass one mindrecord shard file to parse dataset info, and search for other mindrecord files with same dataset info, | |||
| // thus all records in imagenet.mindrecord0 ~ imagenet.mindrecord3 will be read | |||
| std::string file_path = datasets_root_path_ + "/../mindrecord/testMindDataSet/testImageNetData/imagenet.mindrecord0"; | |||
| // Create a MindRecord Dataset, 20 records in it | |||
| std::shared_ptr<Dataset> ds = MindData(file_path, {}, std::make_shared<RandomSampler>(), nullptr, 0, some_cache); | |||
| EXPECT_NE(ds, nullptr); | |||
| // Create an iterator over the result of the above dataset | |||
| // This will trigger the creation of the Execution Tree and launch it. | |||
| std::shared_ptr<Iterator> iter = ds->CreateIterator(); | |||
| EXPECT_NE(iter, nullptr); | |||
| // Iterate the dataset and get each row | |||
| std::unordered_map<std::string, mindspore::MSTensor> row; | |||
| ASSERT_OK(iter->GetNextRow(&row)); | |||
| uint64_t i = 0; | |||
| while (row.size() != 0) { | |||
| i++; | |||
| ASSERT_OK(iter->GetNextRow(&row)); | |||
| } | |||
| EXPECT_EQ(i, 20); | |||
| // Manually terminate the pipeline | |||
| iter->Stop(); | |||
| } | |||
| TEST_F(MindDataTestCacheOp, DISABLED_TestCacheRandomDataCApi) { | |||
| session_id_type env_session; | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a RandomDataset | |||
| @@ -496,7 +534,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi1) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a TFRecord Dataset, this file_path has 3 records in it | |||
| @@ -539,7 +577,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi2) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a TFRecord Dataset, this file_path has 3 records in it | |||
| @@ -590,7 +628,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTFRecordCApi3) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a TFRecord Dataset, this file_path has 3 records in it | |||
| @@ -637,7 +675,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheTextfileCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a TextFile Dataset, this file_path has 3 records in it | |||
| @@ -680,7 +718,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheCsvCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a CSV Dataset, this file_path has 3 records in it | |||
| @@ -724,7 +762,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCacheClueCApi) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create a CLUE Dataset, this file_path has 3 records in it | |||
| @@ -769,7 +807,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare1) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create an ImageFolder Dataset, this folder_path only has 2 images in it | |||
| @@ -821,7 +859,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShare2) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create an ImageFolder Dataset, this folder_path only has 2 images in it | |||
| @@ -874,7 +912,7 @@ TEST_F(MindDataTestCacheOp, DISABLED_TestCApiCacheShareFailure1) { | |||
| Status s = GetSessionFromEnv(&env_session); | |||
| EXPECT_EQ(s, Status::OK()); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, true); | |||
| std::shared_ptr<DatasetCache> some_cache = CreateDatasetCache(env_session, 0, false); | |||
| EXPECT_NE(some_cache, nullptr); | |||
| // Create an ImageFolder Dataset, this folder_path only has 2 images in it | |||
| @@ -29,6 +29,11 @@ UT_TEST_DIR="${BUILD_PATH}/mindspore/tests/ut/cpp" | |||
| DateStamp=$(date +%Y%m%d_%H%M%S); | |||
| CPP_TEST_LOG_OUTPUT="/tmp/ut_tests_cache_${DateStamp}.log" | |||
| ## prepare data for dataset & mindrecord | |||
| cp -fr ${PROJECT_PATH}/tests/ut/data ${UT_TEST_DIR} | |||
| ## prepare album dataset, uses absolute path so has to be generated | |||
| python ${UT_TEST_DIR}/data/dataset/testAlbum/gen_json.py | |||
| # start cache server with a spilling path to be used for all tests | |||
| cmd="${CACHE_ADMIN} --start -s /tmp" | |||
| CacheAdminCmd "${cmd}" 0 | |||
| @@ -121,6 +121,9 @@ HandleRcExit $? 0 0 | |||
| PytestCmd "test_cache_map.py" "test_cache_map_voc" 1 | |||
| HandleRcExit $? 0 0 | |||
| PytestCmd "test_cache_map.py" "test_cache_map_mindrecord" 1 | |||
| HandleRcExit $? 0 0 | |||
| PytestCmd "test_cache_map.py" "test_cache_map_python_sampler" 1 | |||
| HandleRcExit $? 0 0 | |||
| @@ -413,47 +413,6 @@ def test_cache_map_failure5(): | |||
| logger.info('test_cache_failure5 Ended.\n') | |||
| @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") | |||
| def test_cache_map_failure6(): | |||
| """ | |||
| Test no-cache-supporting MindRecord leaf with Map under cache (failure) | |||
| repeat | |||
| | | |||
| Cache | |||
| | | |||
| Map(resize) | |||
| | | |||
| MindRecord | |||
| """ | |||
| logger.info("Test cache failure 6") | |||
| if "SESSION_ID" in os.environ: | |||
| session_id = int(os.environ['SESSION_ID']) | |||
| else: | |||
| raise RuntimeError("Testcase requires SESSION_ID environment variable") | |||
| some_cache = ds.DatasetCache(session_id=session_id, size=0) | |||
| columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] | |||
| num_readers = 1 | |||
| # The dataset has 5 records | |||
| data = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list, num_readers) | |||
| resize_op = c_vision.Resize((224, 224)) | |||
| data = data.map(input_columns=["img_data"], operations=resize_op, cache=some_cache) | |||
| data = data.repeat(4) | |||
| with pytest.raises(RuntimeError) as e: | |||
| num_iter = 0 | |||
| for _ in data.create_dict_iterator(): | |||
| num_iter += 1 | |||
| assert "There is currently no support for MindRecordOp under cache" in str(e.value) | |||
| assert num_iter == 0 | |||
| logger.info('test_cache_failure6 Ended.\n') | |||
| @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") | |||
| def test_cache_map_failure7(): | |||
| """ | |||
| @@ -1997,6 +1956,79 @@ class ReverseSampler(ds.Sampler): | |||
| yield i | |||
| @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") | |||
| def test_cache_map_mindrecord1(): | |||
| """ | |||
| Test mappable mindrecord leaf with cache op right over the leaf | |||
| cache | |||
| | | |||
| MindRecord | |||
| """ | |||
| logger.info("Test cache map mindrecord1") | |||
| if "SESSION_ID" in os.environ: | |||
| session_id = int(os.environ['SESSION_ID']) | |||
| else: | |||
| session_id = 1 | |||
| some_cache = ds.DatasetCache(session_id=session_id, size=0) | |||
| # This dataset has 5 records | |||
| columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] | |||
| ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list, cache=some_cache) | |||
| num_epoch = 4 | |||
| iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) | |||
| epoch_count = 0 | |||
| for _ in range(num_epoch): | |||
| assert sum([1 for _ in iter1]) == 5 | |||
| epoch_count += 1 | |||
| assert epoch_count == num_epoch | |||
| logger.info("test_cache_map_mindrecord1 Ended.\n") | |||
| @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") | |||
| def test_cache_map_mindrecord2(): | |||
| """ | |||
| Test mappable mindrecord leaf with the cache op later in the tree above the map(decode) | |||
| cache | |||
| | | |||
| Map(decode) | |||
| | | |||
| MindRecord | |||
| """ | |||
| logger.info("Test cache map mindrecord2") | |||
| if "SESSION_ID" in os.environ: | |||
| session_id = int(os.environ['SESSION_ID']) | |||
| else: | |||
| session_id = 1 | |||
| some_cache = ds.DatasetCache(session_id=session_id, size=0) | |||
| # This dataset has 5 records | |||
| columns_list = ["id", "file_name", "label_name", "img_data", "label_data"] | |||
| ds1 = ds.MindDataset(MIND_RECORD_DATA_DIR, columns_list) | |||
| decode_op = c_vision.Decode() | |||
| ds1 = ds1.map(input_columns=["img_data"], operations=decode_op, cache=some_cache) | |||
| num_epoch = 4 | |||
| iter1 = ds1.create_dict_iterator(num_epochs=num_epoch, output_numpy=True) | |||
| epoch_count = 0 | |||
| for _ in range(num_epoch): | |||
| assert sum([1 for _ in iter1]) == 5 | |||
| epoch_count += 1 | |||
| assert epoch_count == num_epoch | |||
| logger.info("test_cache_map_mindrecord2 Ended.\n") | |||
| @pytest.mark.skipif(os.environ.get('RUN_CACHE_TEST') != 'TRUE', reason="Require to bring up cache server") | |||
| def test_cache_map_python_sampler1(): | |||
| """ | |||
| @@ -2169,7 +2201,6 @@ if __name__ == '__main__': | |||
| test_cache_map_failure3() | |||
| test_cache_map_failure4() | |||
| test_cache_map_failure5() | |||
| test_cache_map_failure6() | |||
| test_cache_map_failure7() | |||
| test_cache_map_failure8() | |||
| test_cache_map_failure9() | |||
| @@ -2210,6 +2241,8 @@ if __name__ == '__main__': | |||
| test_cache_map_cifar4() | |||
| test_cache_map_voc1() | |||
| test_cache_map_voc2() | |||
| test_cache_map_mindrecord1() | |||
| test_cache_map_mindrecord2() | |||
| test_cache_map_python_sampler1() | |||
| test_cache_map_python_sampler2() | |||
| test_cache_map_nested_repeat() | |||