| @@ -34,12 +34,12 @@ namespace mindspore::dataset { | |||
| // TreeConsumer | |||
| TreeConsumer::TreeConsumer() { tree_adapter_ = std::make_unique<TreeAdapter>(); } | |||
| Status TreeConsumer::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->BuildAndPrepare(std::move(d)); } | |||
| Status TreeConsumer::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->Compile(std::move(d)); } | |||
| Status TreeConsumer::Terminate() { return tree_adapter_->AllTasks()->DoServiceStop(); } | |||
| // IteratorConsumer | |||
| Status IteratorConsumer::Init(std::shared_ptr<DatasetNode> d) { | |||
| return tree_adapter_->BuildAndPrepare(std::move(d), num_epochs_); | |||
| return tree_adapter_->Compile(std::move(d), num_epochs_); | |||
| } | |||
| Status IteratorConsumer::GetNextAsVector(std::vector<TensorPtr> *out) { | |||
| @@ -74,9 +74,7 @@ Status IteratorConsumer::GetNextAsMap(std::unordered_map<std::string, TensorPtr> | |||
| } | |||
| // ToDevice | |||
| Status ToDevice::Init(std::shared_ptr<DatasetNode> d) { | |||
| return tree_adapter_->BuildAndPrepare(std::move(d), num_epochs_); | |||
| } | |||
| Status ToDevice::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->Compile(std::move(d), num_epochs_); } | |||
| Status ToDevice::Send() { | |||
| std::unique_ptr<DataBuffer> db; | |||
| @@ -386,7 +384,7 @@ TreeGetters::TreeGetters() : dataset_size_(-1), init_flag_(false), row_flag_(fal | |||
| } | |||
| Status TreeGetters::Init(std::shared_ptr<DatasetNode> d) { | |||
| Status s = tree_adapter_->BuildAndPrepare(std::move(d), 1); | |||
| Status s = tree_adapter_->Compile(std::move(d), 1); | |||
| if (!s.IsError()) { | |||
| init_flag_ = true; | |||
| } | |||
| @@ -464,9 +462,9 @@ Status TreeGetters::GetNumClasses(int64_t *num_classes) { | |||
| RETURN_IF_NOT_OK(root->GetNumClasses(num_classes)); | |||
| return Status::OK(); | |||
| } | |||
| Status BuildVocabConsumer::Init(std::shared_ptr<DatasetNode> d) { | |||
| return tree_adapter_->BuildAndPrepare(std::move(d), 1); | |||
| } | |||
| Status BuildVocabConsumer::Init(std::shared_ptr<DatasetNode> d) { return tree_adapter_->Compile(std::move(d), 1); } | |||
| Status BuildVocabConsumer::Start() { | |||
| // Getting one row would trigger building the vocab | |||
| TensorRow row; | |||
| @@ -6,6 +6,7 @@ add_library(engine-opt OBJECT | |||
| pre/cache_error_pass.cc | |||
| pre/cache_transform_pass.cc | |||
| pre/epoch_injection_pass.cc | |||
| pre/input_validation_pass.cc | |||
| pre/removal_pass.cc | |||
| optional/tensor_op_fusion_pass.cc | |||
| util/printer_pass.cc | |||
| @@ -0,0 +1,30 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include <vector> | |||
| #include "minddata/dataset/include/datasets.h" | |||
| #include "minddata/dataset/engine/opt/pre/input_validation_pass.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| Status InputValidationPass::Visit(std::shared_ptr<DatasetNode> node, bool *modified) { | |||
| *modified = false; | |||
| RETURN_IF_NOT_OK(node->ValidateParams()); | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,39 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef DATASET_ENGINE_OPT_PRE_INPUT_VALIDATION_PASS_H_ | |||
| #define DATASET_ENGINE_OPT_PRE_INPUT_VALIDATION_PASS_H_ | |||
| #include <memory> | |||
| #include <vector> | |||
| #include "minddata/dataset/engine/opt/pass.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| /// \class InputValidationPass | |||
| /// \brief This is a parse pass that validates input parameters of the IR tree. | |||
| class InputValidationPass : public NodePass { | |||
| /// \brief Runs a validatation pass to check input parameters | |||
| /// \param[in] node The node being visited | |||
| /// \param[inout] *modified indicates whether the node has been visited | |||
| /// \return Status code | |||
| Status Visit(std::shared_ptr<DatasetNode> node, bool *modified) override; | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| #endif // DATASET_ENGINE_OPT_PRE_INPUT_VALIDATION_PASS_H_ | |||
| @@ -18,24 +18,133 @@ | |||
| #include "minddata/dataset/core/client.h" | |||
| #include "minddata/dataset/include/datasets.h" | |||
| #include "minddata/dataset/engine/opt/pass.h" | |||
| #include "minddata/dataset/engine/opt/pre/input_validation_pass.h" | |||
| namespace mindspore { | |||
| namespace dataset { | |||
| Status TreeAdapter::BuildAndPrepare(std::shared_ptr<DatasetNode> root_ir, int32_t num_epoch) { | |||
| // Check whether this function has been called before. If so, return failure | |||
| CHECK_FAIL_RETURN_UNEXPECTED(tree_ == nullptr, "ExecutionTree is already built."); | |||
| Status TreeAdapter::PrePass(std::shared_ptr<DatasetNode> ir) { | |||
| // Vector of actions in validation pass | |||
| std::vector<std::unique_ptr<NodePass>> validations; | |||
| MS_LOG(INFO) << "Running pre pass loops."; | |||
| validations.push_back(std::make_unique<InputValidationPass>()); | |||
| // Vector of flags for each action | |||
| // Apply validation actions | |||
| for (auto i = 0; i < validations.size(); i++) { | |||
| auto modified = false; | |||
| // InputValidationPass does not change the IR tree. We don't need to capture the "modified" value. | |||
| RETURN_IF_NOT_OK(validations[i]->Run(ir, &modified)); | |||
| } | |||
| // Vector of actions in pre-pass phase | |||
| std::vector<std::unique_ptr<Pass>> actions; | |||
| // We will gradually move CacheErrorPass, EpochInjectionPass, CacheTransformPass | |||
| // from ExecutionTree::PrepareTreePreAction to here. | |||
| // Vector of flags for each action | |||
| std::vector<bool> modified(actions.size(), false); | |||
| // Apply pre-pass actions | |||
| for (auto i = 0; i < actions.size(); i++) { | |||
| auto m = false; | |||
| RETURN_IF_NOT_OK(actions[i]->Run(ir, &m)); | |||
| modified[i] = m; | |||
| } | |||
| MS_LOG(INFO) << "Pre pass complete."; | |||
| return Status::OK(); | |||
| } | |||
| Status TreeAdapter::Optimize(std::shared_ptr<DatasetNode> ir) { | |||
| // Vector of optimizations | |||
| std::vector<std::unique_ptr<NodePass>> optimizations; | |||
| MS_LOG(INFO) << "Running optimization pass loops"; | |||
| // We will gradually move TensorOpFusionPass from ExecutionTree::Optimize to here. | |||
| // Vector of flags for each optimization | |||
| std::vector<bool> modified(optimizations.size(), false); | |||
| // Apply optimization pass actions | |||
| for (auto i = 0; i < optimizations.size(); i++) { | |||
| auto m = false; | |||
| RETURN_IF_NOT_OK(optimizations[i]->Run(ir, &m)); | |||
| modified[i] = m; | |||
| } | |||
| MS_LOG(INFO) << "Optimization pass complete."; | |||
| return Status::OK(); | |||
| } | |||
| Status TreeAdapter::PostPass(std::shared_ptr<DatasetNode> ir) { | |||
| // Vector of actions in post-pass phase | |||
| std::vector<std::unique_ptr<Pass>> actions; | |||
| MS_LOG(INFO) << "Running post pass loops."; | |||
| // We will gradually move RepeatPass from ExecutionTree::PrepareTreePostAction to here. | |||
| // Vector of flags for each action | |||
| std::vector<bool> modified(actions.size(), false); | |||
| for (auto i = 0; i < actions.size(); i++) { | |||
| auto m = false; | |||
| RETURN_IF_NOT_OK(actions[i]->Run(ir, &m)); | |||
| modified[i] = m; | |||
| } | |||
| MS_LOG(INFO) << "Post passes complete."; | |||
| return Status::OK(); | |||
| } | |||
| Status TreeAdapter::BuildExecutionTree(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op) { | |||
| // Build the DatasetOp ExecutionTree from the optmized IR tree | |||
| std::vector<std::shared_ptr<DatasetOp>> ops = ir->Build(); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node."); | |||
| (*op) = ops.front(); // return the first op to be added as child by the caller of this function | |||
| RETURN_IF_NOT_OK(tree_->AssociateNode(*op)); | |||
| for (size_t i = 1; i < ops.size(); i++) { | |||
| RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i])); | |||
| RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i])); | |||
| } | |||
| // Build the children of IR, once they return, add the return value to *op | |||
| for (std::shared_ptr<DatasetNode> child_ir : ir->Children()) { | |||
| std::shared_ptr<DatasetOp> child_op; | |||
| RETURN_IF_NOT_OK(BuildExecutionTree(child_ir, &child_op)); | |||
| RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status TreeAdapter::Compile(std::shared_ptr<DatasetNode> root_ir, int32_t num_epochs) { | |||
| num_epochs_ = num_epochs; | |||
| optimize_ = true; // Always ON (temporary) | |||
| RETURN_UNEXPECTED_IF_NULL(root_ir); | |||
| // Pre-pass of the IR tree | |||
| RETURN_IF_NOT_OK(PrePass(root_ir)); | |||
| // Optional phase of optimization | |||
| if (optimize_) { | |||
| RETURN_IF_NOT_OK(Optimize(root_ir)); | |||
| } | |||
| // Post-pass of the IR tree | |||
| RETURN_IF_NOT_OK(PostPass(root_ir)); | |||
| // This will evolve in the long run | |||
| tree_ = std::make_unique<ExecutionTree>(); | |||
| std::shared_ptr<DatasetOp> root_op; | |||
| RETURN_IF_NOT_OK(DFSBuildTree(root_ir, &root_op)); | |||
| RETURN_IF_NOT_OK(BuildExecutionTree(root_ir, &root_op)); | |||
| RETURN_IF_NOT_OK(tree_->AssignRoot(root_op)); | |||
| // Note: We will gradually move the pre pass, optimizer pass, and post pass | |||
| // on ExecutionTree to perform on IR tree. | |||
| // Prepare the tree | |||
| RETURN_IF_NOT_OK(tree_->Prepare(num_epoch)); | |||
| RETURN_IF_NOT_OK(tree_->Prepare(num_epochs)); | |||
| // After the tree is prepared, the col_name_id_map can safely be obtained | |||
| column_name_map_ = tree_->root()->column_name_id_map(); | |||
| @@ -65,30 +174,6 @@ Status TreeAdapter::GetNext(TensorRow *row) { | |||
| return Status::OK(); | |||
| } | |||
| Status TreeAdapter::DFSBuildTree(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op) { | |||
| // validate the op can be built first before building the DatasetOp | |||
| RETURN_IF_NOT_OK(ir->ValidateParams()); | |||
| std::vector<std::shared_ptr<DatasetOp>> ops = ir->Build(); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node."); | |||
| (*op) = ops.front(); // return the first op to be added as child by the caller of this function | |||
| RETURN_IF_NOT_OK(tree_->AssociateNode(*op)); | |||
| for (size_t i = 1; i < ops.size(); i++) { | |||
| RETURN_IF_NOT_OK(tree_->AssociateNode(ops[i])); | |||
| RETURN_IF_NOT_OK(ops[i - 1]->AddChild(ops[i])); | |||
| } | |||
| // Build the children of ir, once they return, add the return value to *op | |||
| for (const auto &child_ir : ir->Children()) { | |||
| std::shared_ptr<DatasetOp> child_op; | |||
| RETURN_IF_NOT_OK(DFSBuildTree(child_ir, &child_op)); | |||
| RETURN_IF_NOT_OK(ops.back()->AddChild(child_op)); // append children to the last of ops | |||
| } | |||
| return Status::OK(); | |||
| } | |||
| Status TreeAdapter::Launch() const { | |||
| CHECK_FAIL_RETURN_UNEXPECTED(tree_ != nullptr, "Tree is a nullptr."); | |||
| return tree_->Launch(); | |||
| @@ -36,10 +36,9 @@ class TreeAdapter { | |||
| ~TreeAdapter() = default; | |||
| // This will construct an ExeTree from a Dataset root and Prepare() the ExeTree | |||
| // This function is only meant to be called once and needs to be called before GetNext | |||
| // ExeTree will be launched when the first GetNext is called | |||
| Status BuildAndPrepare(std::shared_ptr<DatasetNode> root, int32_t num_epoch = -1); | |||
| // This function performs syntax checking, semantics checking, optimizes, and then builds | |||
| // the Execution tree. | |||
| Status Compile(std::shared_ptr<DatasetNode> root_ir, int32_t num_epochs = -1); | |||
| // This is the main method TreeConsumer uses to interact with TreeAdapter | |||
| // 1. GetNext will Launch() the ExeTree on its first call by iterator (tree is already prepared) | |||
| @@ -58,14 +57,34 @@ class TreeAdapter { | |||
| Status Launch() const; | |||
| // Set optional optimization pass | |||
| void SetOptimize(bool value) { optimize_ = value; } | |||
| // Optional optimizations status | |||
| bool OptimizationEnabled() const { return optimize_; } | |||
| // Getter function to get the total number of epochs to be run on this tree. | |||
| // @return total number of epochs | |||
| int32_t num_epochs() { return num_epochs_; } | |||
| private: | |||
| // This RECURSIVE function converts IR nodes into DatasetOp in ExecutionTree. IR could build a vector of ops. In | |||
| // such case, the first node is returned. Op is added as child when the current function returns. | |||
| Status DFSBuildTree(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op); | |||
| // This function runs a mandatory pass checking the syntax and semantics of the IR tree. | |||
| Status PrePass(std::shared_ptr<DatasetNode> ir); | |||
| // This function runs an optional optimization pass on the IR tree. | |||
| Status Optimize(std::shared_ptr<DatasetNode> ir); | |||
| // This function runs a mandatory pass augmenting the IR tree before the execution. | |||
| Status PostPass(std::shared_ptr<DatasetNode> ir); | |||
| // This RECURSIVE function walks the (optimized) IR tree in DFS to build its corresponding Execution tree. | |||
| Status BuildExecutionTree(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op); | |||
| std::unique_ptr<DataBuffer> cur_db_; | |||
| std::unordered_map<std::string, int32_t> column_name_map_; | |||
| std::unique_ptr<ExecutionTree> tree_; | |||
| int32_t num_epochs_; | |||
| bool optimize_; // Flag to enable optional optimization pass | |||
| }; | |||
| } // namespace dataset | |||
| } // namespace mindspore | |||
| @@ -56,7 +56,7 @@ TEST_F(MindDataTestTreeAdapter, TestSimpleTreeAdapter) { | |||
| mindspore::dataset::TreeAdapter tree_adapter; | |||
| Status rc = tree_adapter.BuildAndPrepare(ds->IRNode(), 1); | |||
| Status rc = tree_adapter.Compile(ds->IRNode(), 1); | |||
| EXPECT_TRUE(rc.IsOk()); | |||
| @@ -91,7 +91,7 @@ TEST_F(MindDataTestTreeAdapter, TestTreeAdapterWithRepeat) { | |||
| mindspore::dataset::TreeAdapter tree_adapter; | |||
| Status rc = tree_adapter.BuildAndPrepare(ds->IRNode(), 2); | |||
| Status rc = tree_adapter.Compile(ds->IRNode(), 2); | |||
| EXPECT_TRUE(rc.IsOk()); | |||
| const std::unordered_map<std::string, int32_t> map = tree_adapter.GetColumnNameMap(); | |||
| @@ -128,7 +128,7 @@ TEST_F(MindDataTestTreeAdapter, TestProjectMapTreeAdapter) { | |||
| mindspore::dataset::TreeAdapter tree_adapter; | |||
| Status rc = tree_adapter.BuildAndPrepare(ds->IRNode(), 2); | |||
| Status rc = tree_adapter.Compile(ds->IRNode(), 2); | |||
| EXPECT_TRUE(rc.IsOk()); | |||