Signed-off-by: alex-yuyue <yue.yu1@huawei.com>tags/v1.1.0
| @@ -219,7 +219,7 @@ Status DataSchema::ColumnOrderLoad(nlohmann::json column_tree, const std::vector | |||
| // Find the column in the json document | |||
| auto column_info = column_tree.find(common::SafeCStr(curr_col_name)); | |||
| if (column_info == column_tree.end()) { | |||
| RETURN_STATUS_UNEXPECTED("Failed to find column " + curr_col_name); | |||
| RETURN_STATUS_UNEXPECTED("Invalid data, failed to find column name: " + curr_col_name); | |||
| } | |||
| // At this point, columnInfo.value() is the subtree in the json document that contains | |||
| // all of the data for a given column. This data will formulate our schema column. | |||
| @@ -246,7 +246,7 @@ Status DataSchema::ColumnOrderLoad(nlohmann::json column_tree, const std::vector | |||
| i++; | |||
| } | |||
| if (index == -1) { | |||
| RETURN_STATUS_UNEXPECTED("Failed to find column " + curr_col_name); | |||
| RETURN_STATUS_UNEXPECTED("Invalid data, failed to find column name: " + curr_col_name); | |||
| } | |||
| nlohmann::json column_child_tree = column_tree[index]; | |||
| RETURN_IF_NOT_OK(ColumnLoad(column_child_tree, curr_col_name)); | |||
| @@ -91,27 +91,24 @@ Status BatchNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> BatchNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status BatchNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| #ifdef ENABLE_PYTHON | |||
| // if col_order_ isn't empty, then a project node needs to be attached after batch node. (same as map) | |||
| // this means project_node needs to be the parent of batch_node. this means node_ops = [project_node, batch_node] | |||
| // this means project_node needs to be the parent of batch_node. this means *node_ops = [project_node, batch_node] | |||
| if (!col_order_.empty()) { | |||
| auto project_op = std::make_shared<ProjectOp>(col_order_); | |||
| node_ops.push_back(project_op); | |||
| node_ops->push_back(project_op); | |||
| } | |||
| node_ops.push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, | |||
| in_col_names_, out_col_names_, batch_size_func_, batch_map_func_, | |||
| pad_map_)); | |||
| node_ops->push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, | |||
| in_col_names_, out_col_names_, batch_size_func_, batch_map_func_, | |||
| pad_map_)); | |||
| #else | |||
| node_ops.push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, | |||
| in_col_names_, pad_map_)); | |||
| node_ops->push_back(std::make_shared<BatchOp>(batch_size_, drop_remainder_, pad_, connector_que_size_, num_workers_, | |||
| in_col_names_, pad_map_)); | |||
| #endif | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get Dataset size | |||
| @@ -57,8 +57,9 @@ class BatchNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -54,17 +54,15 @@ void BucketBatchByLengthNode::Print(std::ostream &out) const { | |||
| out << Name() + "(columns:" + PrintColumns(column_names_) + ",...)"; | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> BucketBatchByLengthNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status BucketBatchByLengthNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| bucket_boundaries_.insert(bucket_boundaries_.begin(), 0); | |||
| node_ops.push_back(std::make_shared<BucketBatchByLengthOp>( | |||
| node_ops->push_back(std::make_shared<BucketBatchByLengthOp>( | |||
| column_names_, bucket_boundaries_, bucket_batch_sizes_, element_length_function_, pad_info_, | |||
| pad_to_bucket_boundary_, drop_remainder_, connector_que_size_)); | |||
| if (bucket_boundaries_[0] == 0) { | |||
| bucket_boundaries_.erase(bucket_boundaries_.begin()); | |||
| } | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| Status BucketBatchByLengthNode::ValidateParams() { | |||
| @@ -53,8 +53,9 @@ class BucketBatchByLengthNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -54,15 +54,12 @@ void BuildSentenceVocabNode::Print(std::ostream &out) const { | |||
| } | |||
| // Function to build BuildSentenceVocabNode | |||
| std::vector<std::shared_ptr<DatasetOp>> BuildSentenceVocabNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status BuildSentenceVocabNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| std::shared_ptr<BuildSentencePieceVocabOp> build_sentence_piece_vocab_op; | |||
| build_sentence_piece_vocab_op = std::make_shared<BuildSentencePieceVocabOp>( | |||
| vocab_, col_names_, vocab_size_, character_coverage_, model_type_, params_, connector_que_size_); | |||
| node_ops.push_back(build_sentence_piece_vocab_op); | |||
| return node_ops; | |||
| node_ops->push_back(build_sentence_piece_vocab_op); | |||
| return Status::OK(); | |||
| } | |||
| Status BuildSentenceVocabNode::ValidateParams() { | |||
| @@ -51,8 +51,9 @@ class BuildSentenceVocabNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -50,15 +50,12 @@ void BuildVocabNode::Print(std::ostream &out) const { | |||
| } | |||
| // Function to build BuildVocabNode | |||
| std::vector<std::shared_ptr<DatasetOp>> BuildVocabNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status BuildVocabNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| std::shared_ptr<BuildVocabOp> build_vocab_op; | |||
| build_vocab_op = std::make_shared<BuildVocabOp>(vocab_, columns_, freq_range_, top_k_, special_tokens_, | |||
| special_first_, num_workers_, connector_que_size_); | |||
| node_ops.push_back(build_vocab_op); | |||
| return node_ops; | |||
| node_ops->push_back(build_vocab_op); | |||
| return Status::OK(); | |||
| } | |||
| Status BuildVocabNode::ValidateParams() { | |||
| @@ -50,8 +50,9 @@ class BuildVocabNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -68,17 +68,15 @@ Status ConcatNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> ConcatNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status ConcatNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| if (children_flag_and_nums_.empty() || children_start_end_index_.empty()) { | |||
| node_ops.push_back(std::make_shared<ConcatOp>(connector_que_size_)); | |||
| node_ops->push_back(std::make_shared<ConcatOp>(connector_que_size_)); | |||
| } else { | |||
| node_ops.push_back(std::make_shared<ConcatOp>(connector_que_size_, sampler_->Build(), children_flag_and_nums_, | |||
| children_start_end_index_)); | |||
| node_ops->push_back(std::make_shared<ConcatOp>(connector_que_size_, sampler_->Build(), children_flag_and_nums_, | |||
| children_start_end_index_)); | |||
| } | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Visitor accepting method for NodePass | |||
| @@ -51,8 +51,9 @@ class ConcatNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -60,7 +60,7 @@ Status AddShuffleOp(int64_t num_files, int64_t num_devices, int64_t num_rows, in | |||
| int32_t connector_que_size, int32_t rows_per_buffer, std::shared_ptr<DatasetOp> *shuffle_op) { | |||
| std::shared_ptr<ShuffleOp> new_shuffle_op = nullptr; | |||
| int64_t shuffle_size = 0; | |||
| RETURN_EMPTY_IF_ERROR(ComputeShuffleSize(num_files, num_devices, num_rows, total_rows, &shuffle_size)); | |||
| RETURN_IF_NOT_OK(ComputeShuffleSize(num_files, num_devices, num_rows, total_rows, &shuffle_size)); | |||
| MS_LOG(INFO) << "Dataset::AddShuffleOp - num_rows: " << num_rows << ", shuffle_size: " << shuffle_size; | |||
| // Add the shuffle op | |||
| *shuffle_op = std::make_shared<ShuffleOp>(shuffle_size, GetSeed(), connector_que_size, true, rows_per_buffer); | |||
| @@ -35,15 +35,6 @@ class SamplerObj; | |||
| class NodePass; | |||
| class DatasetSizeGetter; | |||
| #define RETURN_EMPTY_IF_ERROR(_s) \ | |||
| do { \ | |||
| Status __rc = (_s); \ | |||
| if (__rc.IsError()) { \ | |||
| MS_LOG(ERROR) << __rc; \ | |||
| return {}; \ | |||
| } \ | |||
| } while (false) | |||
| // Names for non-leaf IR node | |||
| constexpr char kBatchNode[] = "Batch"; | |||
| constexpr char kBucketBatchByLengthNode[] = "BucketBatchByLength"; | |||
| @@ -148,7 +139,7 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| /// \brief << Stream output operator overload | |||
| /// \notes This allows you to write the debug print info using stream operators | |||
| /// \param out - reference to the output stream being overloaded | |||
| /// \param dO - reference to the DatasetOp to display | |||
| /// \param node - reference to the DatasetNode to display | |||
| /// \return - the output stream must be returned | |||
| friend std::ostream &operator<<(std::ostream &out, const DatasetNode &node) { | |||
| node.PrintTree(out); | |||
| @@ -160,8 +151,9 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| std::shared_ptr<DatasetNode> DeepCopy(); | |||
| /// \brief Pure virtual function to convert a DatasetNode class into a runtime dataset object | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| virtual std::vector<std::shared_ptr<DatasetOp>> Build() = 0; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| virtual Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) = 0; | |||
| /// \brief Pure virtual function for derived class to implement parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -225,10 +217,6 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| /// \return Status of the node visit | |||
| virtual Status AcceptAfter(NodePass *p, bool *modified); | |||
| /// \brief Method to get status from Node.Build() | |||
| /// \notes Remove me after changing return val of Build() | |||
| Status BuildStatus() { return build_status; } | |||
| virtual bool IsSizeDefined() { return true; } | |||
| protected: | |||
| @@ -240,7 +228,6 @@ class DatasetNode : public std::enable_shared_from_this<DatasetNode> { | |||
| int32_t rows_per_buffer_; | |||
| int32_t connector_que_size_; | |||
| int32_t worker_connector_size_; | |||
| Status build_status; // remove me after changing return val of Build() | |||
| std::string PrintColumns(const std::vector<std::string> &columns) const; | |||
| Status AddCacheOp(std::vector<std::shared_ptr<DatasetOp>> *node_ops); | |||
| void PrintNode(std::ostream &out, int *level) const; | |||
| @@ -40,11 +40,9 @@ std::shared_ptr<DatasetNode> EpochCtrlNode::Copy() { | |||
| void EpochCtrlNode::Print(std::ostream &out) const { out << Name() + "(epoch:" + std::to_string(num_epochs_) + ")"; } | |||
| // Function to build the EpochCtrlOp | |||
| std::vector<std::shared_ptr<DatasetOp>> EpochCtrlNode::Build() { | |||
| // A dummy vector | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<EpochCtrlOp>(num_epochs_)); | |||
| return node_ops; | |||
| Status EpochCtrlNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<EpochCtrlOp>(num_epochs_)); | |||
| return Status::OK(); | |||
| } | |||
| // Function to validate the parameters for EpochCtrlNode | |||
| @@ -47,8 +47,9 @@ class EpochCtrlNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -43,12 +43,9 @@ void FilterNode::Print(std::ostream &out) const { | |||
| out << Name() + "(<predicate>," + "input_cols:" + PrintColumns(input_columns_) + ")"; | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> FilterNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<FilterOp>(input_columns_, num_workers_, connector_que_size_, predicate_)); | |||
| return node_ops; | |||
| Status FilterNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<FilterOp>(input_columns_, num_workers_, connector_que_size_, predicate_)); | |||
| return Status::OK(); | |||
| } | |||
| Status FilterNode::ValidateParams() { | |||
| @@ -48,8 +48,9 @@ class FilterNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -52,10 +52,7 @@ void MapNode::Print(std::ostream &out) const { | |||
| ",<project_cols>" + ",...)"; | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> MapNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status MapNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| std::vector<std::shared_ptr<TensorOp>> tensor_ops; | |||
| // Build tensorOp from tensorOperation vector | |||
| @@ -74,13 +71,12 @@ std::vector<std::shared_ptr<DatasetOp>> MapNode::Build() { | |||
| if (!project_columns_.empty()) { | |||
| auto project_op = std::make_shared<ProjectOp>(project_columns_); | |||
| node_ops.push_back(project_op); | |||
| node_ops->push_back(project_op); | |||
| } | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(map_op); | |||
| return node_ops; | |||
| node_ops->push_back(map_op); | |||
| return Status::OK(); | |||
| } | |||
| Status MapNode::ValidateParams() { | |||
| @@ -50,8 +50,9 @@ class MapNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -51,12 +51,9 @@ Status ProjectNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> ProjectNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<ProjectOp>(columns_)); | |||
| return node_ops; | |||
| Status ProjectNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<ProjectOp>(columns_)); | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| @@ -47,8 +47,9 @@ class ProjectNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -56,12 +56,9 @@ Status RenameNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> RenameNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<RenameOp>(input_columns_, output_columns_, connector_que_size_)); | |||
| return node_ops; | |||
| Status RenameNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<RenameOp>(input_columns_, output_columns_, connector_que_size_)); | |||
| return Status::OK(); | |||
| } | |||
| } // namespace dataset | |||
| @@ -48,8 +48,9 @@ class RenameNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -37,12 +37,9 @@ std::shared_ptr<DatasetNode> RepeatNode::Copy() { | |||
| void RepeatNode::Print(std::ostream &out) const { out << Name() + "(count:" + std::to_string(repeat_count_) + ")"; } | |||
| std::vector<std::shared_ptr<DatasetOp>> RepeatNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<RepeatOp>(repeat_count_)); | |||
| return node_ops; | |||
| Status RepeatNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<RepeatOp>(repeat_count_)); | |||
| return Status::OK(); | |||
| } | |||
| Status RepeatNode::ValidateParams() { | |||
| @@ -49,8 +49,9 @@ class RepeatNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -39,9 +39,11 @@ std::shared_ptr<DatasetNode> RootNode::Copy() { | |||
| void RootNode::Print(std::ostream &out) const { out << Name(); } | |||
| std::vector<std::shared_ptr<DatasetOp>> RootNode::Build() { | |||
| Status RootNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // root node doesn't build a runtime Op. this function should return Status::Error when called. | |||
| return {}; | |||
| std::string err_msg = "Root node doesn't build a runtime Op"; | |||
| MS_LOG(ERROR) << err_msg; | |||
| RETURN_STATUS_UNEXPECTED(err_msg); | |||
| } | |||
| // Function to validate the parameters for RootNode | |||
| @@ -47,8 +47,9 @@ class RootNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Getter of number of epochs | |||
| int32_t num_epochs() { return num_epochs_; } | |||
| @@ -43,13 +43,10 @@ void ShuffleNode::Print(std::ostream &out) const { | |||
| } | |||
| // Function to build the ShuffleOp | |||
| std::vector<std::shared_ptr<DatasetOp>> ShuffleNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<ShuffleOp>(shuffle_size_, shuffle_seed_, connector_que_size_, reset_every_epoch_, | |||
| rows_per_buffer_)); | |||
| return node_ops; | |||
| Status ShuffleNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<ShuffleOp>(shuffle_size_, shuffle_seed_, connector_que_size_, reset_every_epoch_, | |||
| rows_per_buffer_)); | |||
| return Status::OK(); | |||
| } | |||
| // Function to validate the parameters for ShuffleNode | |||
| @@ -46,7 +46,10 @@ class ShuffleNode : public DatasetNode { | |||
| /// \return A shared pointer to the new copy | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| Status ValidateParams() override; | |||
| @@ -37,12 +37,9 @@ std::shared_ptr<DatasetNode> SkipNode::Copy() { | |||
| void SkipNode::Print(std::ostream &out) const { out << Name() + "(skip_count:" + std::to_string(skip_count_) + ")"; } | |||
| // Function to build the SkipOp | |||
| std::vector<std::shared_ptr<DatasetOp>> SkipNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<SkipOp>(skip_count_, connector_que_size_)); | |||
| return node_ops; | |||
| Status SkipNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<SkipOp>(skip_count_, connector_que_size_)); | |||
| return Status::OK(); | |||
| } | |||
| // Function to validate the parameters for SkipNode | |||
| @@ -47,8 +47,9 @@ class SkipNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -64,22 +64,18 @@ Status AlbumNode::ValidateParams() { | |||
| } | |||
| // Function to build AlbumNode | |||
| std::vector<std::shared_ptr<DatasetOp>> AlbumNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status AlbumNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| build_status = schema->LoadSchemaFile(schema_path_, column_names_); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| RETURN_IF_NOT_OK(schema->LoadSchemaFile(schema_path_, column_names_)); | |||
| // Argument that is not exposed to user in the API. | |||
| std::set<std::string> extensions = {}; | |||
| RETURN_EMPTY_IF_ERROR(AddCacheOp(&node_ops)); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(std::make_shared<AlbumOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, | |||
| decode_, extensions, std::move(schema), std::move(sampler_->Build()))); | |||
| return node_ops; | |||
| node_ops->push_back(std::make_shared<AlbumOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, | |||
| decode_, extensions, std::move(schema), std::move(sampler_->Build()))); | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -48,9 +48,10 @@ class AlbumNode : public MappableSourceNode { | |||
| /// \return A shared pointer to the new copy | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create a runtime dataset op object from this class | |||
| /// \return shared pointer to the newly created DatasetOp | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -60,25 +60,19 @@ Status CelebANode::ValidateParams() { | |||
| } | |||
| // Function to build CelebANode | |||
| std::vector<std::shared_ptr<DatasetOp>> CelebANode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status CelebANode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| // label is like this:0 1 0 0 1...... | |||
| RETURN_EMPTY_IF_ERROR( | |||
| schema->AddColumn(ColDescriptor("attr", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("attr", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(std::make_shared<CelebAOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, | |||
| decode_, usage_, extensions_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| node_ops->push_back(std::make_shared<CelebAOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, | |||
| decode_, usage_, extensions_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -50,8 +50,9 @@ class CelebANode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -53,27 +53,23 @@ Status Cifar100Node::ValidateParams() { | |||
| } | |||
| // Function to build CifarOp for Cifar100 | |||
| std::vector<std::shared_ptr<DatasetOp>> Cifar100Node::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status Cifar100Node::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Do internal Schema generation. | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| TensorShape scalar = TensorShape::CreateScalar(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor("coarse_label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar))); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor("fine_label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar))); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(std::make_shared<CifarOp>(CifarOp::CifarType::kCifar100, usage_, num_workers_, rows_per_buffer_, | |||
| dataset_dir_, connector_que_size_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| node_ops->push_back(std::make_shared<CifarOp>(CifarOp::CifarType::kCifar100, usage_, num_workers_, rows_per_buffer_, | |||
| dataset_dir_, connector_que_size_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -48,8 +48,9 @@ class Cifar100Node : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -53,25 +53,21 @@ Status Cifar10Node::ValidateParams() { | |||
| } | |||
| // Function to build CifarOp for Cifar10 | |||
| std::vector<std::shared_ptr<DatasetOp>> Cifar10Node::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status Cifar10Node::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Do internal Schema generation. | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| TensorShape scalar = TensorShape::CreateScalar(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar))); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(std::make_shared<CifarOp>(CifarOp::CifarType::kCifar10, usage_, num_workers_, rows_per_buffer_, | |||
| dataset_dir_, connector_que_size_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| node_ops->push_back(std::make_shared<CifarOp>(CifarOp::CifarType::kCifar10, usage_, num_workers_, rows_per_buffer_, | |||
| dataset_dir_, connector_que_size_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -48,8 +48,9 @@ class Cifar10Node : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -83,10 +83,7 @@ std::vector<std::string> CLUENode::split(const std::string &s, char delim) { | |||
| } | |||
| // Function to build CLUENode | |||
| std::vector<std::shared_ptr<DatasetOp>> CLUENode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status CLUENode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| std::map<std::string, std::string> key_map; | |||
| if (task_ == "AFQMC") { | |||
| if (usage_ == "train") { | |||
| @@ -209,8 +206,7 @@ std::vector<std::shared_ptr<DatasetOp>> CLUENode::Build() { | |||
| num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map, sorted_dataset_files, | |||
| connector_que_size_, shuffle_files, num_shards_, shard_id_, std::move(sampler_->Build())); | |||
| build_status = clue_op->Init(); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(clue_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| @@ -218,20 +214,18 @@ std::vector<std::shared_ptr<DatasetOp>> CLUENode::Build() { | |||
| int64_t num_rows = 0; | |||
| // First, get the number of rows in the dataset | |||
| build_status = ClueOp::CountAllFileRows(sorted_dataset_files, &num_rows); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| RETURN_IF_NOT_OK(ClueOp::CountAllFileRows(sorted_dataset_files, &num_rows)); | |||
| // Add the shuffle op after this op | |||
| build_status = AddShuffleOp(sorted_dataset_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| node_ops.push_back(shuffle_op); | |||
| RETURN_IF_NOT_OK(AddShuffleOp(sorted_dataset_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op)); | |||
| node_ops->push_back(shuffle_op); | |||
| } | |||
| RETURN_EMPTY_IF_ERROR(AddCacheOp(&node_ops)); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(clue_op); | |||
| node_ops->push_back(clue_op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -50,8 +50,9 @@ class CLUENode : public NonMappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -63,10 +63,7 @@ Status CocoNode::ValidateParams() { | |||
| } | |||
| // Function to build CocoNode | |||
| std::vector<std::shared_ptr<DatasetOp>> CocoNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status CocoNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| CocoOp::TaskType task_type; | |||
| if (task_ == "Detection") { | |||
| task_type = CocoOp::TaskType::Detection; | |||
| @@ -79,52 +76,52 @@ std::vector<std::shared_ptr<DatasetOp>> CocoNode::Build() { | |||
| } | |||
| std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor(std::string("image"), DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| switch (task_type) { | |||
| case CocoOp::TaskType::Detection: | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("bbox"), DataType(DataType::DE_FLOAT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("category_id"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("iscrowd"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| break; | |||
| case CocoOp::TaskType::Stuff: | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("segmentation"), DataType(DataType::DE_FLOAT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("iscrowd"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| break; | |||
| case CocoOp::TaskType::Keypoint: | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("keypoints"), DataType(DataType::DE_FLOAT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("num_keypoints"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| break; | |||
| case CocoOp::TaskType::Panoptic: | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("bbox"), DataType(DataType::DE_FLOAT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("category_id"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string("iscrowd"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor(std::string("area"), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| break; | |||
| default: | |||
| MS_LOG(ERROR) << "CocoNode::Build : Invalid task type"; | |||
| return {}; | |||
| std::string err_msg = "CocoNode::Build : Invalid task type"; | |||
| MS_LOG(ERROR) << err_msg; | |||
| RETURN_STATUS_UNEXPECTED(err_msg); | |||
| } | |||
| std::shared_ptr<CocoOp> op = | |||
| std::make_shared<CocoOp>(task_type, dataset_dir_, annotation_file_, num_workers_, rows_per_buffer_, | |||
| connector_que_size_, decode_, std::move(schema), std::move(sampler_->Build())); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(op); | |||
| node_ops->push_back(op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -48,8 +48,9 @@ class CocoNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -85,10 +85,7 @@ Status CSVNode::ValidateParams() { | |||
| } | |||
| // Function to build CSVNode | |||
| std::vector<std::shared_ptr<DatasetOp>> CSVNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status CSVNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); | |||
| // CSVOp by itself is a non-mappable dataset that does not support sampling. | |||
| @@ -120,8 +117,7 @@ std::vector<std::shared_ptr<DatasetOp>> CSVNode::Build() { | |||
| rows_per_buffer_, num_samples_, worker_connector_size_, connector_que_size_, shuffle_files, | |||
| num_shards_, shard_id_, std::move(sampler_->Build())); | |||
| build_status = csv_op->Init(); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(csv_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| @@ -129,21 +125,19 @@ std::vector<std::shared_ptr<DatasetOp>> CSVNode::Build() { | |||
| int64_t num_rows = 0; | |||
| // First, get the number of rows in the dataset | |||
| build_status = CsvOp::CountAllFileRows(sorted_dataset_files, column_names_.empty(), &num_rows); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| RETURN_IF_NOT_OK(CsvOp::CountAllFileRows(sorted_dataset_files, column_names_.empty(), &num_rows)); | |||
| // Add the shuffle op after this op | |||
| build_status = AddShuffleOp(sorted_dataset_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| RETURN_IF_NOT_OK(AddShuffleOp(sorted_dataset_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op)); | |||
| node_ops.push_back(shuffle_op); | |||
| node_ops->push_back(shuffle_op); | |||
| } | |||
| RETURN_EMPTY_IF_ERROR(AddCacheOp(&node_ops)); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(csv_op); | |||
| node_ops->push_back(csv_op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -71,8 +71,9 @@ class CSVNode : public NonMappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -45,14 +45,14 @@ void GeneratorNode::Print(std::ostream &out) const { | |||
| GeneratorNode::GeneratorNode(py::function generator_function, const std::shared_ptr<SchemaObj> &schema) | |||
| : generator_function_(generator_function), schema_(schema) {} | |||
| std::vector<std::shared_ptr<DatasetOp>> GeneratorNode::Build() { | |||
| Status GeneratorNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| std::unique_ptr<DataSchema> data_schema = std::make_unique<DataSchema>(); | |||
| if (schema_ != nullptr) { | |||
| column_names_.clear(); | |||
| column_types_.clear(); | |||
| std::string schema_json_string = schema_->to_json(); | |||
| RETURN_EMPTY_IF_ERROR(data_schema->LoadSchemaString(schema_json_string, {})); | |||
| RETURN_IF_NOT_OK(data_schema->LoadSchemaString(schema_json_string, {})); | |||
| for (int32_t i = 0; i < data_schema->NumColumns(); i++) { | |||
| ColDescriptor col = data_schema->column(i); | |||
| @@ -61,8 +61,6 @@ std::vector<std::shared_ptr<DatasetOp>> GeneratorNode::Build() { | |||
| } | |||
| } | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| // GeneratorOp's constructor takes in a prefetch_size, which isn't being set by user nor is it being used by | |||
| // GeneratorOp internally. Here it is given a zero which is the default in generator builder | |||
| std::shared_ptr<GeneratorOp> op = std::make_shared<GeneratorOp>(generator_function_, column_names_, column_types_, 0, | |||
| @@ -72,17 +70,11 @@ std::vector<std::shared_ptr<DatasetOp>> GeneratorNode::Build() { | |||
| // needs to be called when the op is built. The caveat is that Init needs to be made public (before it is private). | |||
| // This method can be privatized once we move Init() to Generator's functor. However, that is a bigger change which | |||
| // best be delivered when the test cases for this api is ready. | |||
| Status rc = op->Init(); | |||
| build_status = rc; // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| if (rc.IsOk()) { | |||
| node_ops.push_back(op); | |||
| } else { | |||
| MS_LOG(ERROR) << "Fail to Init GeneratorOp : " << rc.ToString(); | |||
| } | |||
| RETURN_IF_NOT_OK(op->Init()); | |||
| node_ops->push_back(op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // no validation is needed for generator op. | |||
| @@ -53,8 +53,9 @@ class GeneratorNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -60,26 +60,21 @@ Status ImageFolderNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> ImageFolderNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status ImageFolderNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Do internal Schema generation. | |||
| // This arg is exist in ImageFolderOp, but not externalized (in Python API). | |||
| std::unique_ptr<DataSchema> schema = std::make_unique<DataSchema>(); | |||
| TensorShape scalar = TensorShape::CreateScalar(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_INT32), TensorImpl::kFlexible, 0, &scalar))); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(std::make_shared<ImageFolderOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, | |||
| recursive_, decode_, exts_, class_indexing_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| return node_ops; | |||
| node_ops->push_back(std::make_shared<ImageFolderOp>(num_workers_, rows_per_buffer_, dataset_dir_, connector_que_size_, | |||
| recursive_, decode_, exts_, class_indexing_, std::move(schema), | |||
| std::move(sampler_->Build()))); | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -54,8 +54,9 @@ class ImageFolderNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -81,27 +81,23 @@ Status ManifestNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> ManifestNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status ManifestNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Do internal Schema generation. | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| TensorShape scalar = TensorShape::CreateScalar(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar))); | |||
| std::shared_ptr<ManifestOp> manifest_op; | |||
| manifest_op = | |||
| std::make_shared<ManifestOp>(num_workers_, rows_per_buffer_, dataset_file_, connector_que_size_, decode_, | |||
| class_index_, std::move(schema), std::move(sampler_->Build()), usage_); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(manifest_op); | |||
| node_ops->push_back(manifest_op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -49,8 +49,9 @@ class ManifestNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -148,12 +148,8 @@ Status MindDataNode::BuildMindDatasetSamplerChain(const std::shared_ptr<SamplerO | |||
| // Helper function to set sample_bytes from py::byte type | |||
| void MindDataNode::SetSampleBytes(std::map<std::string, std::string> *sample_bytes) { sample_bytes_ = *sample_bytes; } | |||
| std::vector<std::shared_ptr<DatasetOp>> MindDataNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| build_status = BuildMindDatasetSamplerChain(sampler_, &operators_, num_padded_); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| Status MindDataNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| RETURN_IF_NOT_OK(BuildMindDatasetSamplerChain(sampler_, &operators_, num_padded_)); | |||
| std::shared_ptr<MindRecordOp> mindrecord_op; | |||
| // If pass a string to MindData(), it will be treated as a pattern to search for matched files, | |||
| @@ -169,11 +165,10 @@ std::vector<std::shared_ptr<DatasetOp>> MindDataNode::Build() { | |||
| padded_sample_, sample_bytes_); | |||
| } | |||
| build_status = mindrecord_op->Init(); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| node_ops.push_back(mindrecord_op); | |||
| RETURN_IF_NOT_OK(mindrecord_op->Init()); | |||
| node_ops->push_back(mindrecord_op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -53,8 +53,9 @@ class MindDataNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -49,23 +49,19 @@ Status MnistNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> MnistNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status MnistNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Do internal Schema generation. | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("image", DataType(DataType::DE_UINT8), TensorImpl::kCv, 1))); | |||
| TensorShape scalar = TensorShape::CreateScalar(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| RETURN_IF_NOT_OK( | |||
| schema->AddColumn(ColDescriptor("label", DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 0, &scalar))); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(std::make_shared<MnistOp>(usage_, num_workers_, rows_per_buffer_, dataset_dir_, | |||
| connector_que_size_, std::move(schema), std::move(sampler_->Build()))); | |||
| node_ops->push_back(std::make_shared<MnistOp>(usage_, num_workers_, rows_per_buffer_, dataset_dir_, | |||
| connector_que_size_, std::move(schema), std::move(sampler_->Build()))); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -48,8 +48,9 @@ class MnistNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -61,17 +61,16 @@ int32_t RandomNode::GenRandomInt(int32_t min, int32_t max) { | |||
| } | |||
| // Build for RandomNode | |||
| std::vector<std::shared_ptr<DatasetOp>> RandomNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status RandomNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| rand_gen_.seed(GetSeed()); // seed the random generator | |||
| // If total rows was not given, then randomly pick a number | |||
| std::shared_ptr<SchemaObj> schema_obj; | |||
| if (!schema_path_.empty()) { | |||
| schema_obj = Schema(schema_path_); | |||
| if (schema_obj == nullptr) { | |||
| return {}; | |||
| std::string err_msg = "RandomNode::Build : Invalid schema path"; | |||
| MS_LOG(ERROR) << err_msg; | |||
| RETURN_STATUS_UNEXPECTED(err_msg); | |||
| } | |||
| } | |||
| @@ -109,12 +108,11 @@ std::vector<std::shared_ptr<DatasetOp>> RandomNode::Build() { | |||
| std::shared_ptr<RandomDataOp> op; | |||
| op = std::make_shared<RandomDataOp>(num_workers_, connector_que_size_, rows_per_buffer_, total_rows_, | |||
| std::move(data_schema_), std::move(sampler_->Build())); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(op); | |||
| node_ops->push_back(op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -68,8 +68,9 @@ class RandomNode : public NonMappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -63,10 +63,7 @@ Status TextFileNode::ValidateParams() { | |||
| } | |||
| // Function to build TextFileNode | |||
| std::vector<std::shared_ptr<DatasetOp>> TextFileNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status TextFileNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); | |||
| // TextFileOp by itself is a non-mappable dataset that does not support sampling. | |||
| @@ -81,15 +78,13 @@ std::vector<std::shared_ptr<DatasetOp>> TextFileNode::Build() { | |||
| // Do internal Schema generation. | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| RETURN_EMPTY_IF_ERROR( | |||
| schema->AddColumn(ColDescriptor("text", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| RETURN_IF_NOT_OK(schema->AddColumn(ColDescriptor("text", DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| // Create and initalize TextFileOp | |||
| std::shared_ptr<TextFileOp> text_file_op = std::make_shared<TextFileOp>( | |||
| num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, std::move(schema), sorted_dataset_files, | |||
| connector_que_size_, shuffle_files, num_shards_, shard_id_, std::move(sampler_->Build())); | |||
| build_status = text_file_op->Init(); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(text_file_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| @@ -97,22 +92,19 @@ std::vector<std::shared_ptr<DatasetOp>> TextFileNode::Build() { | |||
| int64_t num_rows = 0; | |||
| // First, get the number of rows in the dataset | |||
| build_status = TextFileOp::CountAllFileRows(sorted_dataset_files, &num_rows); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| RETURN_IF_NOT_OK(TextFileOp::CountAllFileRows(sorted_dataset_files, &num_rows)); | |||
| // Add the shuffle op after this op | |||
| build_status = AddShuffleOp(sorted_dataset_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| node_ops.push_back(shuffle_op); | |||
| RETURN_IF_NOT_OK(AddShuffleOp(sorted_dataset_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op)); | |||
| node_ops->push_back(shuffle_op); | |||
| } | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| // Add TextFileOp | |||
| node_ops.push_back(text_file_op); | |||
| node_ops->push_back(text_file_op); | |||
| return node_ops; | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -50,8 +50,9 @@ class TextFileNode : public NonMappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -104,10 +104,7 @@ Status TFRecordNode::ValidateParams() { | |||
| } | |||
| // Function to build TFRecordNode | |||
| std::vector<std::shared_ptr<DatasetOp>> TFRecordNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status TFRecordNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Sort the datasets file in a lexicographical order | |||
| std::vector<std::string> sorted_dir_files = dataset_files_; | |||
| std::sort(sorted_dir_files.begin(), sorted_dir_files.end()); | |||
| @@ -115,10 +112,10 @@ std::vector<std::shared_ptr<DatasetOp>> TFRecordNode::Build() { | |||
| // Create Schema Object | |||
| std::unique_ptr<DataSchema> data_schema = std::make_unique<DataSchema>(); | |||
| if (!schema_path_.empty()) { | |||
| RETURN_EMPTY_IF_ERROR(data_schema->LoadSchemaFile(schema_path_, columns_list_)); | |||
| RETURN_IF_NOT_OK(data_schema->LoadSchemaFile(schema_path_, columns_list_)); | |||
| } else if (schema_obj_ != nullptr) { | |||
| std::string schema_json_string = schema_obj_->to_json(); | |||
| RETURN_EMPTY_IF_ERROR(data_schema->LoadSchemaString(schema_json_string, columns_list_)); | |||
| RETURN_IF_NOT_OK(data_schema->LoadSchemaString(schema_json_string, columns_list_)); | |||
| } | |||
| bool shuffle_files = (shuffle_ == ShuffleMode::kGlobal || shuffle_ == ShuffleMode::kFiles); | |||
| @@ -135,8 +132,7 @@ std::vector<std::shared_ptr<DatasetOp>> TFRecordNode::Build() { | |||
| std::move(data_schema), connector_que_size_, columns_list_, shuffle_files, num_shards_, | |||
| shard_id_, shard_equal_rows_, std::move(sampler_->Build())); | |||
| build_status = tf_reader_op->Init(); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(tf_reader_op->Init()); | |||
| if (cache_ == nullptr && shuffle_ == ShuffleMode::kGlobal) { | |||
| // Inject ShuffleOp | |||
| @@ -145,21 +141,18 @@ std::vector<std::shared_ptr<DatasetOp>> TFRecordNode::Build() { | |||
| int64_t num_rows = 0; | |||
| // First, get the number of rows in the dataset | |||
| build_status = TFReaderOp::CountTotalRows(&num_rows, sorted_dir_files); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| RETURN_IF_NOT_OK(TFReaderOp::CountTotalRows(&num_rows, sorted_dir_files)); | |||
| // Add the shuffle op after this op | |||
| build_status = AddShuffleOp(sorted_dir_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op); | |||
| RETURN_EMPTY_IF_ERROR(build_status); // remove me after changing return val of Build() | |||
| node_ops.push_back(shuffle_op); | |||
| RETURN_IF_NOT_OK(AddShuffleOp(sorted_dir_files.size(), num_shards_, num_rows, 0, connector_que_size_, | |||
| rows_per_buffer_, &shuffle_op)); | |||
| node_ops->push_back(shuffle_op); | |||
| } | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| // Add TFReaderOp | |||
| node_ops.push_back(tf_reader_op); | |||
| return node_ops; | |||
| node_ops->push_back(tf_reader_op); | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -77,8 +77,9 @@ class TFRecordNode : public NonMappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -84,41 +84,37 @@ Status VOCNode::ValidateParams() { | |||
| } | |||
| // Function to build VOCNode | |||
| std::vector<std::shared_ptr<DatasetOp>> VOCNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status VOCNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| auto schema = std::make_unique<DataSchema>(); | |||
| VOCOp::TaskType task_type_; | |||
| if (task_ == "Segmentation") { | |||
| task_type_ = VOCOp::TaskType::Segmentation; | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnImage), DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnTarget), DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| } else if (task_ == "Detection") { | |||
| task_type_ = VOCOp::TaskType::Detection; | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnImage), DataType(DataType::DE_UINT8), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnBbox), DataType(DataType::DE_FLOAT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnLabel), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnDifficult), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| RETURN_EMPTY_IF_ERROR(schema->AddColumn( | |||
| RETURN_IF_NOT_OK(schema->AddColumn( | |||
| ColDescriptor(std::string(kColumnTruncate), DataType(DataType::DE_UINT32), TensorImpl::kFlexible, 1))); | |||
| } | |||
| std::shared_ptr<VOCOp> voc_op; | |||
| voc_op = std::make_shared<VOCOp>(task_type_, usage_, dataset_dir_, class_index_, num_workers_, rows_per_buffer_, | |||
| connector_que_size_, decode_, std::move(schema), std::move(sampler_->Build())); | |||
| build_status = AddCacheOp(&node_ops); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(AddCacheOp(node_ops)); | |||
| node_ops.push_back(voc_op); | |||
| return node_ops; | |||
| node_ops->push_back(voc_op); | |||
| return Status::OK(); | |||
| } | |||
| // Get the shard id of node | |||
| @@ -50,8 +50,9 @@ class VOCNode : public MappableSourceNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -42,16 +42,13 @@ void SyncWaitNode::Print(std::ostream &out) const { | |||
| } | |||
| // Function to build the BarrierOp | |||
| std::vector<std::shared_ptr<DatasetOp>> SyncWaitNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| Status SyncWaitNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| // Right now barrier should only take num_rows_per_buffer = 1 | |||
| // The reason for this is because having it otherwise can lead to blocking issues | |||
| // See barrier_op.h for more details | |||
| int32_t rows_per_buffer = 1; | |||
| node_ops.push_back(std::make_shared<BarrierOp>(rows_per_buffer, connector_que_size_, condition_name_, callback_)); | |||
| return node_ops; | |||
| node_ops->push_back(std::make_shared<BarrierOp>(rows_per_buffer, connector_que_size_, condition_name_, callback_)); | |||
| return Status::OK(); | |||
| } | |||
| // Function to validate the parameters for SyncWaitNode | |||
| @@ -49,8 +49,9 @@ class SyncWaitNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -38,12 +38,9 @@ std::shared_ptr<DatasetNode> TakeNode::Copy() { | |||
| void TakeNode::Print(std::ostream &out) const { out << Name() + "(num_rows:" + std::to_string(take_count_) + ")"; } | |||
| // Function to build the TakeOp | |||
| std::vector<std::shared_ptr<DatasetOp>> TakeNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<TakeOp>(take_count_, connector_que_size_)); | |||
| return node_ops; | |||
| Status TakeNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<TakeOp>(take_count_, connector_que_size_)); | |||
| return Status::OK(); | |||
| } | |||
| // Function to validate the parameters for TakeNode | |||
| @@ -47,8 +47,9 @@ class TakeNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -66,7 +66,7 @@ Status TransferNode::ValidateParams() { | |||
| } | |||
| // Function to build TransferNode | |||
| std::vector<std::shared_ptr<DatasetOp>> TransferNode::Build() { | |||
| Status TransferNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| if (queue_name_.empty()) { | |||
| // Get a uuid for queue name | |||
| queue_name_ = Services::GetUniqueID(); | |||
| @@ -90,21 +90,18 @@ std::vector<std::shared_ptr<DatasetOp>> TransferNode::Build() { | |||
| } else if (device_type_ == kAscendDevice) { | |||
| type = DeviceQueueOp::DeviceType::Ascend; | |||
| } else { | |||
| MS_LOG(ERROR) << "Unknown device target."; | |||
| return {}; | |||
| std::string err_msg = "Unknown device target."; | |||
| MS_LOG(ERROR) << err_msg; | |||
| RETURN_STATUS_UNEXPECTED(err_msg); | |||
| } | |||
| // Get device ID (shard ID) from children | |||
| device_id_ = 0; | |||
| build_status = this->GetShardId(&device_id_); // remove me after changing return val of Build() | |||
| RETURN_EMPTY_IF_ERROR(build_status); | |||
| RETURN_IF_NOT_OK(this->GetShardId(&device_id_)); | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<DeviceQueueOp>(queue_name_, type, device_id_, prefetch_size_, send_epoch_end_, | |||
| total_batch_, create_data_info_queue_)); | |||
| return node_ops; | |||
| node_ops->push_back(std::make_shared<DeviceQueueOp>(queue_name_, type, device_id_, prefetch_size_, send_epoch_end_, | |||
| total_batch_, create_data_info_queue_)); | |||
| return Status::OK(); | |||
| } | |||
| // Visitor accepting method for NodePass | |||
| @@ -48,8 +48,9 @@ class TransferNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return shared pointer to the list of newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -55,12 +55,9 @@ Status ZipNode::ValidateParams() { | |||
| return Status::OK(); | |||
| } | |||
| std::vector<std::shared_ptr<DatasetOp>> ZipNode::Build() { | |||
| // A vector containing shared pointer to the Dataset Ops that this object will create | |||
| std::vector<std::shared_ptr<DatasetOp>> node_ops; | |||
| node_ops.push_back(std::make_shared<ZipOp>(rows_per_buffer_, connector_que_size_)); | |||
| return node_ops; | |||
| Status ZipNode::Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) { | |||
| node_ops->push_back(std::make_shared<ZipOp>(rows_per_buffer_, connector_que_size_)); | |||
| return Status::OK(); | |||
| } | |||
| // Get Dataset size | |||
| @@ -47,8 +47,9 @@ class ZipNode : public DatasetNode { | |||
| std::shared_ptr<DatasetNode> Copy() override; | |||
| /// \brief a base class override function to create the required runtime dataset op objects for this class | |||
| /// \return The list of shared pointers to the newly created DatasetOps | |||
| std::vector<std::shared_ptr<DatasetOp>> Build() override; | |||
| /// \param node_ops - A vector containing shared pointer to the Dataset Ops that this object will create | |||
| /// \return Status Status::OK() if build successfully | |||
| Status Build(std::vector<std::shared_ptr<DatasetOp>> *node_ops) override; | |||
| /// \brief Parameters validation | |||
| /// \return Status Status::OK() if all the parameters are valid | |||
| @@ -97,8 +97,8 @@ Status TreeAdapter::PostPass(std::shared_ptr<DatasetNode> ir) { | |||
| Status TreeAdapter::BuildExecutionTree(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op) { | |||
| // Build the DatasetOp ExecutionTree from the optimized IR tree | |||
| std::vector<std::shared_ptr<DatasetOp>> ops = ir->Build(); | |||
| RETURN_IF_NOT_OK(ir->BuildStatus()); // remove me after changing return val of Build() | |||
| std::vector<std::shared_ptr<DatasetOp>> ops; | |||
| RETURN_IF_NOT_OK(ir->Build(&ops)); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty(), "Unable to build node."); | |||
| @@ -35,7 +35,8 @@ class MindDataTestOptimizationPass : public UT::DatasetOpTesting { | |||
| // this recursive function helps build a ExecutionTree from a IR node, it is copied from TreeAdapter | |||
| Status DFSBuild(std::shared_ptr<DatasetNode> ir, std::shared_ptr<DatasetOp> *op, ExecutionTree *tree) { | |||
| std::vector<std::shared_ptr<DatasetOp>> ops = ir->Build(); | |||
| std::vector<std::shared_ptr<DatasetOp>> ops; | |||
| RETURN_IF_NOT_OK(ir->Build(&ops)); | |||
| CHECK_FAIL_RETURN_UNEXPECTED(!ops.empty() && tree != nullptr && op != nullptr, "Fail To Build Tree."); | |||
| (*op) = ops.front(); | |||
| RETURN_IF_NOT_OK(tree->AssociateNode(*op)); | |||
| @@ -312,6 +312,15 @@ def test_tf_wrong_schema(): | |||
| assert exception_occurred, "test_tf_wrong_schema failed." | |||
| def test_tfrecord_invalid_columns(): | |||
| logger.info("test_tfrecord_columns_list") | |||
| invalid_columns_list = ["not_exist"] | |||
| data = ds.TFRecordDataset(FILES, columns_list=invalid_columns_list) | |||
| with pytest.raises(RuntimeError) as info: | |||
| _ = data.create_dict_iterator(num_epochs=1, output_numpy=True).__next__() | |||
| assert "Invalid data, failed to find column name: not_exist" in str(info.value) | |||
| if __name__ == '__main__': | |||
| test_tfrecord_shape() | |||
| test_tfrecord_read_all_dataset() | |||
| @@ -331,3 +340,4 @@ if __name__ == '__main__': | |||
| test_tfrecord_schema_columns_list() | |||
| test_tfrecord_invalid_files() | |||
| test_tf_wrong_schema() | |||
| test_tfrecord_invalid_columns() | |||