Browse Source

!26176 [minddata] sync code clean to opensource branch

Merge pull request !26176 from xiefangqi/md_sync_enterp_bugfix_to_opensource
tags/v1.6.0
i-robot Gitee 4 years ago
parent
commit
c7fd469c73
39 changed files with 127 additions and 32 deletions
  1. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.cc
  2. +4
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/build_sentence_piece_vocab_op.cc
  3. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc
  4. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc
  5. +5
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc
  6. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc
  7. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc
  8. +31
    -26
      mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.cc
  9. +3
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.h
  10. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc
  11. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc
  12. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc
  13. +8
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc
  14. +0
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h
  15. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc
  16. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc
  17. +8
    -1
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc
  18. +4
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h
  19. +3
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc
  20. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc
  21. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc
  22. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc
  23. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.cc
  24. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc
  25. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc
  26. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc
  27. +6
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc
  28. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc
  29. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.cc
  30. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc
  31. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc
  32. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc
  33. +4
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc
  34. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc
  35. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc
  36. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc
  37. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/gnn/edge.h
  38. +11
    -0
      mindspore/ccsrc/minddata/dataset/engine/gnn/graph_data_client.cc
  39. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/gnn/graph_data_impl.cc

+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/consumers/python_tree_consumer.cc View File

@@ -60,6 +60,7 @@ PythonSaveToDisk::PythonSaveToDisk(const std::string &datasetPath, int32_t numFi
: SaveToDisk(datasetPath, numFiles, datasetType) {}

Status PythonTreeGetters::GetRow(TensorRow *const r) {
RETURN_UNEXPECTED_IF_NULL(r);
py::gil_scoped_release gil_release;
return TreeGetters::GetRow(r);
}


+ 4
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/build_sentence_piece_vocab_op.cc View File

@@ -130,6 +130,10 @@ std::unordered_map<std::string, std::string> BuildSentencePieceVocabOp::BuildPar
bool BuildSentencePieceVocabOp::Done() { return read_done_; }

void BuildSentencePieceVocabOp::Next(std::string *sentence) {
if (sentence == nullptr) {
MS_LOG(ERROR) << "BuildSentencePieceVocab get nullptr element, please check data.";
return;
}
TensorRow new_row;
Status s = sentence_queue_->PopFront(&new_row);



+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc View File

@@ -47,6 +47,7 @@ Status CacheLookupOp::WorkerEntry(int32_t worker_id) {
}
Status CacheLookupOp::ResetSampler() { return Status::OK(); }
Status CacheLookupOp::HandshakeRandomAccessOp(const RandomAccessOp *op) {
RETURN_UNEXPECTED_IF_NULL(op);
// We act like a sampler and as a dataset op. During handshake with leaf op,
// We must wait until the leaf op has indexed everything.
RETURN_IF_NOT_OK(sampler_->HandshakeRandomAccessOp(op));
@@ -65,6 +66,7 @@ void CacheLookupOp::SamplerPrint(std::ostream &out, bool show_all) const {
}
}
Status CacheLookupOp::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
std::vector<row_id_type> cache_miss;
RETURN_IF_NOT_OK(keys_miss_->Pop(0, &cache_miss));
// Ignore the case we have no cache miss, we can't return empty samples.


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc View File

@@ -105,6 +105,7 @@ Status ConcatOp::ComputeColMap() {

// Gets the number of classes
Status ConcatOp::GetNumClasses(int64_t *num_classes) {
RETURN_UNEXPECTED_IF_NULL(num_classes);
int64_t max_num_classes = -1;
for (const auto &child : child_) {
// Choose a dataset which can get valid num_classes
@@ -147,6 +148,7 @@ bool ConcatOp::IgnoreSample() {
}

Status ConcatOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
bool is_not_mappable_or_second_ne_zero = true;

if (!children_flag_and_nums_.empty()) {


+ 5
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc View File

@@ -251,12 +251,14 @@ void DatasetOp::Print(std::ostream &out, bool show_all) const {
}

Status DatasetOp::GetNextRowPullMode(TensorRow *const row) {
RETURN_UNEXPECTED_IF_NULL(row);
RETURN_UNEXPECTED_IF_NULL(child_[0]);
return child_[0]->GetNextRowPullMode(row);
}

// Gets the next row from the given child
Status DatasetOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
// pop is a blocked call and will throw an interruption if the whole group shuts down.
RETURN_IF_NOT_OK(out_connector_->PopFront(row));
return Status::OK();
@@ -264,6 +266,7 @@ Status DatasetOp::GetNextRow(TensorRow *row) {

// Gets the number of classes
Status DatasetOp::GetNumClasses(int64_t *num_classes) {
RETURN_UNEXPECTED_IF_NULL(num_classes);
if (child_.size() == 1) {
return child_[0]->GetNumClasses(num_classes);
} else if (child_.size() > 1) {
@@ -280,6 +283,7 @@ Status DatasetOp::GetNumClasses(int64_t *num_classes) {
}

Status DatasetOp::GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) {
RETURN_UNEXPECTED_IF_NULL(output_class_indexing);
if (child_.size() == 1) {
return child_[0]->GetClassIndexing(output_class_indexing);
} else if (child_.size() > 1) {
@@ -355,6 +359,7 @@ Status DatasetOp::ComputeColMap() {

// Getter for the sampler, and it also removes the sampler from the op
Status DatasetOp::FetchRemoveSampler(std::shared_ptr<SamplerRT> *sampler) {
RETURN_UNEXPECTED_IF_NULL(sampler);
*sampler = sampler_; // It's okay if it sampler_ points to nullptr
sampler_.reset(); // clear our member-copy of this pointer. We no longer have this sampler
return Status::OK();


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc View File

@@ -41,6 +41,7 @@ void EpochCtrlOp::Print(std::ostream &out, bool show_all) const {
}

Status EpochCtrlOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
if (child_.empty()) {
RETURN_STATUS_UNEXPECTED("EpochCtrlOp can't be the leaf node(first operator) of pipeline.");
}


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc View File

@@ -150,6 +150,7 @@ Status FilterOp::CheckInput(const TensorRow &input) const {
}

Status FilterOp::InvokePredicateFunc(const TensorRow &input, bool *out_predicate) {
RETURN_UNEXPECTED_IF_NULL(out_predicate);
RETURN_IF_NOT_OK(CheckInput(input));

TensorRow output;


+ 31
- 26
mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.cc View File

@@ -16,6 +16,7 @@

#include <set>
#include <string>
#include <utility>
#include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h"

namespace mindspore {
@@ -32,6 +33,7 @@ CpuMapJob::~CpuMapJob() = default;

// A function to execute a cpu map job
Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
int32_t num_rows = in.size();
for (int32_t row = 0; row < num_rows; row++) {
TensorRow input_row = in[row];
@@ -40,32 +42,7 @@ Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
// Call compute function for cpu
Status rc = ops_[i]->Compute(input_row, &result_row);
if (rc.IsError()) {
std::string err_msg = "";
std::string op_name = ops_[i]->Name();
// Need to remove the suffix "Op" which length is 2
std::string abbr_op_name = op_name.substr(0, op_name.length() - 2);
err_msg += "map operation: [" + abbr_op_name + "] failed. ";
if (input_row.getPath().size() > 0 && !input_row.getPath()[0].empty()) {
err_msg += "The corresponding data files: " + input_row.getPath()[0];
if (input_row.getPath().size() > 1) {
std::set<std::string> path_set;
path_set.insert(input_row.getPath()[0]);
for (auto j = 1; j < input_row.getPath().size(); j++) {
if (!input_row.getPath()[j].empty() && path_set.find(input_row.getPath()[j]) == path_set.end()) {
err_msg += ", " + input_row.getPath()[j];
path_set.insert(input_row.getPath()[j]);
}
}
}
err_msg += ". ";
}
std::string tensor_err_msg = rc.GetErrDescription();
if (rc.GetLineOfCode() < 0) {
err_msg += "Error description:\n";
}
err_msg += tensor_err_msg;
rc.SetErrDescription(err_msg);
RETURN_IF_NOT_OK(rc);
RETURN_IF_NOT_OK(RebuildMapErrorMsg(input_row, i, &rc));
}

// Assign result_row to to_process for the next TensorOp processing, except for the last TensorOp in the list.
@@ -78,5 +55,33 @@ Status CpuMapJob::Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) {
return Status::OK();
}

Status CpuMapJob::RebuildMapErrorMsg(const TensorRow &input_row, const size_t &i, Status *rc) {
std::string err_msg = "";
std::string op_name = ops_[i]->Name();
// Need to remove the suffix "Op" which length is 2
std::string abbr_op_name = op_name.substr(0, op_name.length() - 2);
err_msg += "map operation: [" + abbr_op_name + "] failed. ";
if (input_row.getPath().size() > 0 && !input_row.getPath()[0].empty()) {
err_msg += "The corresponding data files: " + input_row.getPath()[0];
if (input_row.getPath().size() > 1) {
std::set<std::string> path_set;
path_set.insert(input_row.getPath()[0]);
for (auto j = 1; j < input_row.getPath().size(); j++) {
if (!input_row.getPath()[j].empty() && path_set.find(input_row.getPath()[j]) == path_set.end()) {
err_msg += ", " + input_row.getPath()[j];
path_set.insert(input_row.getPath()[j]);
}
}
}
err_msg += ". ";
}
std::string tensor_err_msg = rc->GetErrDescription();
if (rc->GetLineOfCode() < 0) {
err_msg += "Error description:\n";
}
err_msg += tensor_err_msg;
rc->SetErrDescription(err_msg);
return *rc;
}
} // namespace dataset
} // namespace mindspore

+ 3
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/cpu_map_job.h View File

@@ -35,6 +35,9 @@ class CpuMapJob : public MapJob {

// A pure virtual run function to execute a cpu map job
Status Run(std::vector<TensorRow> in, std::vector<TensorRow> *out) override;

private:
Status RebuildMapErrorMsg(const TensorRow &input_row, const size_t &i, Status *rc);
};

} // namespace dataset


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc View File

@@ -33,6 +33,7 @@ RenameOp::~RenameOp() {}

// Gets a row from the child operator and projects the row.
Status RenameOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
RETURN_IF_NOT_OK(child_[0]->GetNextRow(row));
if (row->eoe()) {
UpdateRepeatAndEpochCounter();


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc View File

@@ -58,6 +58,7 @@ void RepeatOp::Print(std::ostream &out, bool show_all) const {
// This function sets the `retryIfEoe` flag when popping from the child connector. This way,
// this function will retry to pop the connector again and will get the non-EOE row if any.
Status RepeatOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
if (child_.empty()) {
RETURN_STATUS_UNEXPECTED("Pipeline init failed, RepeatOp can't be the first op in pipeline.");
}


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc View File

@@ -46,6 +46,7 @@ void SkipOp::Print(std::ostream &out, bool show_all) const {
Status SkipOp::operator()() { RETURN_STATUS_UNEXPECTED("Logic error. SkipOp is an inlined operator."); }

Status SkipOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
bool eoe_received = false;
while (skip_count_ < max_skips_) {
RETURN_IF_NOT_OK(child_[0]->GetNextRow(row));


+ 8
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc View File

@@ -93,6 +93,10 @@ Status AlbumOp::PrepareData() {
// Optimization: Could take in a tensor
// This function does not return status because we want to just skip bad input, not crash
bool AlbumOp::CheckImageType(const std::string &file_name, bool *valid) {
if (valid == nullptr) {
MS_LOG(ERROR) << "Album parameter can't be nullptr.";
return false;
}
std::ifstream file_handle;
constexpr int read_num = 3;
*valid = false;
@@ -387,13 +391,13 @@ Status AlbumOp::loadColumnData(const std::string &file, int32_t index, nlohmann:
return LoadFloatArrayTensor(column_value, i, row);
}
// int value
if (!is_array &&
(data_schema_->Column(i).Type() == DataType::DE_INT64 || data_schema_->Column(i).Type() == DataType::DE_INT32)) {
bool judge_int =
(data_schema_->Column(i).Type() == DataType::DE_INT64) || (data_schema_->Column(i).Type() == DataType::DE_INT32);
if (!is_array && judge_int) {
return LoadIntTensor(column_value, i, row);
}
// int array
if (is_array &&
(data_schema_->Column(i).Type() == DataType::DE_INT64 || data_schema_->Column(i).Type() == DataType::DE_INT32)) {
if (is_array && judge_int) {
return LoadIntArrayTensor(column_value, i, row);
} else {
MS_LOG(WARNING) << "Value type for column: " << data_schema_->Column(i).Name() << " is not supported.";


+ 0
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h View File

@@ -75,7 +75,6 @@ class CifarOp : public MappableLeafOp {
// @return Status The status code returned
Status LoadTensorRow(row_id_type index, TensorRow *trow) override;

private:
// Read block data from cifar file
// @return
Status ReadCifarBlockDataAsync();


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/clue_op.cc View File

@@ -256,6 +256,7 @@ int64_t CountTotalRowsPerFile(const std::string &file) {
int64_t ClueOp::CountTotalRows(const std::string &file) { return CountTotalRowsPerFile(file); }

Status ClueOp::CountAllFileRows(const std::vector<std::string> &files, int64_t *count) {
RETURN_UNEXPECTED_IF_NULL(count);
std::shared_ptr<ClueOp> op;
*count = 0;
for (auto file : files) {


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.cc View File

@@ -483,6 +483,7 @@ Status CocoOp::ReadImageToTensor(const std::string &path, const ColDescriptor &c
}

Status CocoOp::CountTotalRows(int64_t *count) {
RETURN_UNEXPECTED_IF_NULL(count);
RETURN_IF_NOT_OK(PrepareData());
*count = static_cast<int64_t>(image_ids_.size());
return Status::OK();
@@ -501,6 +502,7 @@ Status CocoOp::ComputeColMap() {
}

Status CocoOp::GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) {
RETURN_UNEXPECTED_IF_NULL(output_class_indexing);
if ((*output_class_indexing).empty()) {
if ((task_type_ != TaskType::Detection) && (task_type_ != TaskType::Panoptic)) {
MS_LOG(ERROR) << "Invalid parameter, GetClassIndex only valid in \"Detection\" and \"Panoptic\" task.";


+ 8
- 1
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.cc View File

@@ -15,6 +15,7 @@
*/
#include "minddata/dataset/engine/datasetops/source/csv_op.h"

#include <algorithm>
#include <fstream>
#include <iomanip>
#include <stdexcept>
@@ -228,7 +229,12 @@ int CsvOp::CsvParser::CountRows(int c) {

Status CsvOp::CsvParser::InitCsvParser() {
str_buf_.resize(CSV_BUFFER_SIZE);
InitSDL();
InitSD();
return Status::OK();
}

void CsvOp::CsvParser::InitSDL() {
// State diagram for counting rows
sdl = {// START_OF_FILE
// |---------------------------------------|
@@ -289,7 +295,9 @@ Status CsvOp::CsvParser::InitCsvParser() {
{{State::END_OF_LINE, Message::MS_NORMAL}, {State::UNQUOTE, &CsvParser::NullFunc}},
{{State::END_OF_LINE, Message::MS_QUOTE}, {State::QUOTE, &CsvParser::NullFunc}},
{{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::NullFunc}}};
}

void CsvOp::CsvParser::InitSD() {
// State diagram for CSV parser
sd = {// START_OF_FILE
// |-------------------------------------------------------------------|
@@ -441,7 +449,6 @@ Status CsvOp::CsvParser::InitCsvParser() {
}}},
{{State::END_OF_LINE, Message::MS_END_OF_LINE}, {State::END_OF_LINE, &CsvParser::NullFunc}},
{{State::END_OF_LINE, Message::MS_END_OF_FILE}, {State::END_OF_FILE, &CsvParser::EndFile}}};
return Status::OK();
}

Status CsvOp::LoadFile(const std::string &file, int64_t start_offset, int64_t end_offset, int32_t worker_id) {


+ 4
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/csv_op.h View File

@@ -126,6 +126,10 @@ class CsvOp : public NonMappableLeafOp {

int CatchException(int c);

void InitSDL();

void InitSD();

int32_t worker_id_;
JaggedConnector *rows_connector_;
const char csv_field_delim_;


+ 3
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.cc View File

@@ -246,6 +246,7 @@ Status ManifestOp::CountDatasetInfo() {
}

Status ManifestOp::CountTotalRows(int64_t *count) {
RETURN_UNEXPECTED_IF_NULL(count);
*count = 0;
RETURN_IF_NOT_OK(PrepareData());
*count = static_cast<int64_t>(image_labelname_.size());
@@ -266,6 +267,7 @@ Status ManifestOp::ComputeColMap() {

// Get number of classes
Status ManifestOp::GetNumClasses(int64_t *num_classes) {
RETURN_UNEXPECTED_IF_NULL(num_classes);
if (num_classes_ > 0) {
*num_classes = num_classes_;
return Status::OK();
@@ -279,6 +281,7 @@ Status ManifestOp::GetNumClasses(int64_t *num_classes) {
}

Status ManifestOp::GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) {
RETURN_UNEXPECTED_IF_NULL(output_class_indexing);
if ((*output_class_indexing).empty()) {
RETURN_IF_NOT_OK(PrepareData());
RETURN_IF_NOT_OK(CountDatasetInfo());


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc View File

@@ -301,6 +301,8 @@ Status MindRecordOp::RegisterAndLaunchThreads() {

Status MindRecordOp::CountTotalRows(const std::vector<std::string> dataset_path, bool load_dataset,
const std::shared_ptr<ShardOperator> &op, int64_t *count, int64_t num_padded) {
RETURN_UNEXPECTED_IF_NULL(op);
RETURN_UNEXPECTED_IF_NULL(count);
std::unique_ptr<ShardReader> shard_reader = std::make_unique<ShardReader>();
RETURN_IF_NOT_OK(shard_reader->CountTotalRows(dataset_path, load_dataset, op, count, num_padded));
return Status::OK();


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.cc View File

@@ -251,6 +251,7 @@ Status MnistOp::WalkAllFiles() {
}

Status MnistOp::CountTotalRows(const std::string &dir, const std::string &usage, int64_t *count) {
RETURN_UNEXPECTED_IF_NULL(count);
// the logic of counting the number of samples is copied from ParseMnistData() and uses CheckReader()
*count = 0;
const int64_t num_samples = 0;


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc View File

@@ -92,6 +92,7 @@ Status DistributedSamplerRT::InitSampler() {
}

Status DistributedSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (cnt_ > samples_per_tensor_) {
RETURN_STATUS_UNEXPECTED(
"Sampler index must be less than or equal to num_samples(total rows in dataset), but got:" +
@@ -212,6 +213,7 @@ void DistributedSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const
}

Status DistributedSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
args["sampler_name"] = "DistributedSampler";


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/mind_record_sampler.cc View File

@@ -23,6 +23,7 @@ MindRecordSamplerRT::MindRecordSamplerRT(mindrecord::ShardReader *shard_reader,
: SamplerRT(0, samples_per_tensor), shard_reader_(shard_reader), sample_ids_(nullptr), next_id_(0) {}

Status MindRecordSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (next_id_ > num_samples_) {
RETURN_STATUS_UNEXPECTED(
"Sampler index must be less than or equal to num_samples(total rows in dataset), but got: " +
@@ -78,6 +79,7 @@ void MindRecordSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
}

Status MindRecordSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
args["sampler_name"] = "MindRecordSampler";
*out_json = args;


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc View File

@@ -69,6 +69,7 @@ Status PKSamplerRT::InitSampler() {
}

Status PKSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (next_id_ > num_samples_ || num_samples_ == 0) {
RETURN_STATUS_UNEXPECTED(
"Sampler index must be less than or equal to num_samples(total rows in dataset), but got: " +
@@ -133,6 +134,7 @@ void PKSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
}

Status PKSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
args["sampler_name"] = "PKSampler";


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc View File

@@ -24,6 +24,7 @@ PythonSamplerRT::PythonSamplerRT(int64_t num_samples, py::object py_sampler_inst
py_sampler_instance(std::move(py_sampler_instance)) {}

Status PythonSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (need_to_reset_) {
(*out) = TensorRow(TensorRow::kFlagEOE);
} else {


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc View File

@@ -31,6 +31,7 @@ RandomSamplerRT::RandomSamplerRT(bool replacement, int64_t num_samples, bool res
reshuffle_each_epoch_(reshuffle_each_epoch) {}

Status RandomSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (next_id_ > num_samples_) {
RETURN_STATUS_UNEXPECTED("Sampler index must be less than or equal to num_samples(total rows in dataset), but got" +
std::to_string(next_id_) + ", num_samplers:" + std::to_string(num_samples_));
@@ -127,6 +128,7 @@ void RandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
}

Status RandomSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
args["sampler_name"] = "RandomSampler";


+ 6
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc View File

@@ -21,6 +21,7 @@
namespace mindspore {
namespace dataset {
Status RandomAccessOp::GetNumRowsInDataset(int64_t *num) const {
RETURN_UNEXPECTED_IF_NULL(num);
// The sampler base class itself does not compute it's own num_rows_ value.
// Instead, this value is computed by the derived leaf op during it's own initialization
// after it has interacted with it's storage layers.
@@ -41,6 +42,7 @@ SamplerRT::SamplerRT(int64_t num_samples, int64_t samples_per_tensor)
is_initialized(false) {}

Status SamplerRT::HandshakeRandomAccessOp(const RandomAccessOp *op) {
RETURN_UNEXPECTED_IF_NULL(op);
std::shared_ptr<SamplerRT> child_sampler;
if (HasChildSampler()) {
child_sampler = std::dynamic_pointer_cast<SamplerRT>(child_[0]);
@@ -70,6 +72,7 @@ Status SamplerRT::HandshakeRandomAccessOp(const RandomAccessOp *op) {
}

Status SamplerRT::CreateSamplerTensor(std::shared_ptr<Tensor> *sample_ids, int64_t num_elements) {
RETURN_UNEXPECTED_IF_NULL(sample_ids);
if (col_desc_ == nullptr) {
// a ColDescriptor for Tensor that holds SampleIds
col_desc_ = std::make_unique<ColDescriptor>("sampleIds", DataType(DataType::DE_INT64), TensorImpl::kFlexible, 1);
@@ -91,6 +94,7 @@ void SamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {

#ifdef ENABLE_PYTHON
Status SamplerRT::GetAllIdsThenReset(py::array *data) {
RETURN_UNEXPECTED_IF_NULL(data);
std::shared_ptr<Tensor> sample_ids;
TensorRow sample_row;

@@ -175,6 +179,7 @@ Status SamplerRT::AddChild(std::shared_ptr<SamplerRT> child) {
bool SamplerRT::HasChildSampler() const { return !child_.empty(); }

Status SamplerRT::GetAssociatedChildId(int64_t *out_associated_id, int64_t id) {
RETURN_UNEXPECTED_IF_NULL(out_associated_id);
if (child_ids_.empty()) {
RETURN_STATUS_UNEXPECTED("[Internal ERROR] Trying to get associated child id, but there are no child ids!");
}
@@ -184,6 +189,7 @@ Status SamplerRT::GetAssociatedChildId(int64_t *out_associated_id, int64_t id) {
return Status::OK();
}
Status SamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
args["num_samples"] = num_samples_;
if (this->HasChildSampler()) {


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc View File

@@ -24,6 +24,7 @@ SequentialSamplerRT::SequentialSamplerRT(int64_t start_index, int64_t num_sample
: SamplerRT(num_samples, samples_per_tensor), current_id_(start_index), start_index_(start_index), id_count_(0) {}

Status SequentialSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (id_count_ > num_samples_) {
RETURN_STATUS_UNEXPECTED(
"Sampler index must be less than or equal to num_samples(total rows in dataset), but got:" +
@@ -133,6 +134,7 @@ void SequentialSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
}

Status SequentialSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
args["sampler_name"] = "SequentialSampler";


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.cc View File

@@ -64,6 +64,7 @@ void SubsetRandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const
}

Status SubsetRandomSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SubsetSamplerRT::to_json(&args));
args["sampler_name"] = "SubsetRandomSampler";


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc View File

@@ -57,6 +57,7 @@ Status SubsetSamplerRT::ResetSampler() {

// Get the sample ids.
Status SubsetSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
// All samples have been drawn
if (sample_id_ == num_samples_) {
(*out) = TensorRow(TensorRow::kFlagEOE);
@@ -111,6 +112,7 @@ void SubsetSamplerRT::SamplerPrint(std::ostream &out, bool show_all) const {
}

Status SubsetSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
args["sampler_name"] = "SubsetSampler";


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc View File

@@ -113,6 +113,7 @@ Status WeightedRandomSamplerRT::ResetSampler() {

// Get the sample ids.
Status WeightedRandomSamplerRT::GetNextSample(TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (weights_.size() > static_cast<size_t>(num_rows_)) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
"Invalid parameter, size of sample weights must be less than or equal to num of data, "
@@ -188,6 +189,7 @@ void WeightedRandomSamplerRT::SamplerPrint(std::ostream &out, bool show_all) con
}

Status WeightedRandomSamplerRT::to_json(nlohmann::json *out_json) {
RETURN_UNEXPECTED_IF_NULL(out_json);
nlohmann::json args;
RETURN_IF_NOT_OK(SamplerRT::to_json(&args));
args["sampler_name"] = "WeightedRandomSampler";


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/text_file_op.cc View File

@@ -214,6 +214,7 @@ Status TextFileOp::CalculateNumRowsPerShard() {
}

Status TextFileOp::CountAllFileRows(const std::vector<std::string> &files, int64_t *count) {
RETURN_UNEXPECTED_IF_NULL(count);
*count = 0;
for (auto file : files) {
*count += CountTotalRows(file);


+ 4
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/tf_reader_op.cc View File

@@ -624,6 +624,7 @@ Status TFReaderOp::CreateSchema(const std::string tf_file, std::vector<std::stri

Status TFReaderOp::CountTotalRows(int64_t *out_total_rows, const std::vector<std::string> &filenames, int64_t threads,
bool estimate) {
RETURN_UNEXPECTED_IF_NULL(out_total_rows);
try {
if (threads > filenames.size()) {
threads = filenames.size();
@@ -631,6 +632,9 @@ Status TFReaderOp::CountTotalRows(int64_t *out_total_rows, const std::vector<std

std::vector<std::future<int64_t>> async_results;

if (threads <= 0) {
RETURN_STATUS_UNEXPECTED("Invalid data, the threads of TFReader should be greater than zero, but got zero.");
}
int64_t chunk_size = filenames.size() / threads;
int64_t remainder = filenames.size() % threads;



+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.cc View File

@@ -298,6 +298,7 @@ Status VOCOp::ReadAnnotationToTensor(const std::string &path, TensorRow *row) {
}

Status VOCOp::CountTotalRows(int64_t *count) {
RETURN_UNEXPECTED_IF_NULL(count);
switch (task_type_) {
case TaskType::Detection:
RETURN_IF_NOT_OK(ParseImageIds());
@@ -324,6 +325,7 @@ Status VOCOp::ComputeColMap() {
}

Status VOCOp::GetClassIndexing(std::vector<std::pair<std::string, std::vector<int32_t>>> *output_class_indexing) {
RETURN_UNEXPECTED_IF_NULL(output_class_indexing);
if ((*output_class_indexing).empty()) {
if (task_type_ != TaskType::Detection) {
MS_LOG(ERROR) << "Invalid parameter, GetClassIndexing only valid in \"Detection\" task.";


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc View File

@@ -41,6 +41,7 @@ void TakeOp::Print(std::ostream &out, bool show_all) const {
Status TakeOp::operator()() { RETURN_STATUS_UNEXPECTED("Logic error. SkipOp is an inlined operator."); }

Status TakeOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
bool eoe_received = false;
if (take_count_ < max_takes_) {
RETURN_IF_NOT_OK(child_[0]->GetNextRow(row));


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc View File

@@ -118,6 +118,7 @@ Status ZipOp::ComputeColMap() {
Status ZipOp::operator()() { RETURN_STATUS_UNEXPECTED("Logic error. SkipOp is an inlined operator."); }

Status ZipOp::GetNextRow(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(row);
int32_t skip_child = -1;
RETURN_IF_NOT_OK(getNextZippedRow(row, &skip_child));
if (row->eoe()) {


+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/gnn/edge.h View File

@@ -61,6 +61,7 @@ class Edge {
// Get nodes on the edge
// @param std::pair<std::shared_ptr<Node>, std::shared_ptr<Node>> *out_node - Source and destination nodes returned
Status GetNode(std::pair<std::shared_ptr<Node>, std::shared_ptr<Node>> *out_node) {
RETURN_UNEXPECTED_IF_NULL(out_node);
*out_node = std::make_pair(src_node_, dst_node_);
return Status::OK();
}


+ 11
- 0
mindspore/ccsrc/minddata/dataset/engine/gnn/graph_data_client.cc View File

@@ -86,6 +86,7 @@ Status GraphDataClient::Stop() {
}

Status GraphDataClient::GetAllNodes(NodeType node_type, std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -97,6 +98,7 @@ Status GraphDataClient::GetAllNodes(NodeType node_type, std::shared_ptr<Tensor>
}

Status GraphDataClient::GetAllEdges(EdgeType edge_type, std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -108,6 +110,7 @@ Status GraphDataClient::GetAllEdges(EdgeType edge_type, std::shared_ptr<Tensor>
}

Status GraphDataClient::GetNodesFromEdges(const std::vector<EdgeIdType> &edge_list, std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -122,6 +125,7 @@ Status GraphDataClient::GetNodesFromEdges(const std::vector<EdgeIdType> &edge_li

Status GraphDataClient::GetEdgesFromNodes(const std::vector<std::pair<NodeIdType, NodeIdType>> &node_list,
std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -141,6 +145,7 @@ Status GraphDataClient::GetEdgesFromNodes(const std::vector<std::pair<NodeIdType

Status GraphDataClient::GetAllNeighbors(const std::vector<NodeIdType> &node_list, NodeType neighbor_type,
const OutputFormat &format, std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -159,6 +164,7 @@ Status GraphDataClient::GetSampledNeighbors(const std::vector<NodeIdType> &node_
const std::vector<NodeIdType> &neighbor_nums,
const std::vector<NodeType> &neighbor_types, SamplingStrategy strategy,
std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -180,6 +186,7 @@ Status GraphDataClient::GetSampledNeighbors(const std::vector<NodeIdType> &node_

Status GraphDataClient::GetNegSampledNeighbors(const std::vector<NodeIdType> &node_list, NodeIdType samples_num,
NodeType neg_neighbor_type, std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -198,6 +205,7 @@ Status GraphDataClient::GraphDataClient::RandomWalk(const std::vector<NodeIdType
const std::vector<NodeType> &meta_path, float step_home_param,
float step_away_param, NodeIdType default_node,
std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
GnnGraphDataRequestPb request;
GnnGraphDataResponsePb response;
@@ -219,6 +227,7 @@ Status GraphDataClient::GraphDataClient::RandomWalk(const std::vector<NodeIdType

Status GraphDataClient::GetNodeFeature(const std::shared_ptr<Tensor> &nodes,
const std::vector<FeatureType> &feature_types, TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
if (!nodes || nodes->Size() == 0) {
RETURN_STATUS_UNEXPECTED("Input nodes is empty");
@@ -254,6 +263,7 @@ Status GraphDataClient::GetNodeFeature(const std::shared_ptr<Tensor> &nodes,

Status GraphDataClient::GetEdgeFeature(const std::shared_ptr<Tensor> &edges,
const std::vector<FeatureType> &feature_types, TensorRow *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
if (!edges || edges->Size() == 0) {
RETURN_STATUS_UNEXPECTED("Input edges is empty");
@@ -288,6 +298,7 @@ Status GraphDataClient::GetEdgeFeature(const std::shared_ptr<Tensor> &edges,
}

Status GraphDataClient::GraphInfo(py::dict *out) {
RETURN_UNEXPECTED_IF_NULL(out);
#if !defined(_WIN32) && !defined(_WIN64)
RETURN_IF_NOT_OK(CheckPid());
void *tag;


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/gnn/graph_data_impl.cc View File

@@ -134,6 +134,7 @@ Status GraphDataImpl::GetNodesFromEdges(const std::vector<EdgeIdType> &edge_list

Status GraphDataImpl::GetEdgesFromNodes(const std::vector<std::pair<NodeIdType, NodeIdType>> &node_list,
std::shared_ptr<Tensor> *out) {
RETURN_UNEXPECTED_IF_NULL(out);
if (node_list.empty()) {
RETURN_STATUS_UNEXPECTED("Input node list is empty.");
}
@@ -292,6 +293,7 @@ Status GraphDataImpl::NegativeSample(const std::vector<NodeIdType> &data, const
int32_t samples_num, std::vector<NodeIdType> *out_samples) {
CHECK_FAIL_RETURN_UNEXPECTED(!data.empty(), "Input data is empty.");
RETURN_UNEXPECTED_IF_NULL(start_index);
RETURN_UNEXPECTED_IF_NULL(out_samples);
size_t index = *start_index;
for (size_t i = index; i < shuffled_ids.size(); ++i) {
++index;


Loading…
Cancel
Save