|
|
|
@@ -297,9 +297,9 @@ std::shared_ptr<Dataset> Dataset::Repeat(int32_t count) { |
|
|
|
} |
|
|
|
|
|
|
|
// Function to create a ShuffleOp |
|
|
|
std::shared_ptr<ShuffleDataset> Dataset::Shuffle(int32_t shuffle_size) { |
|
|
|
std::shared_ptr<ShuffleDataset> Dataset::Shuffle(int32_t buffer_size) { |
|
|
|
// Pass in reshuffle_each_epoch with true |
|
|
|
auto ds = std::make_shared<ShuffleDataset>(shuffle_size, true); |
|
|
|
auto ds = std::make_shared<ShuffleDataset>(buffer_size, true); |
|
|
|
|
|
|
|
if (!ds->ValidateParams()) { |
|
|
|
return nullptr; |
|
|
|
@@ -354,7 +354,6 @@ std::shared_ptr<ZipDataset> Dataset::Zip(const std::vector<std::shared_ptr<Datas |
|
|
|
} |
|
|
|
|
|
|
|
// OTHER FUNCTIONS |
|
|
|
// (In alphabetical order) |
|
|
|
|
|
|
|
// Helper function to create default RandomSampler. |
|
|
|
std::shared_ptr<SamplerObj> CreateDefaultSampler() { |
|
|
|
@@ -364,11 +363,11 @@ std::shared_ptr<SamplerObj> CreateDefaultSampler() { |
|
|
|
} |
|
|
|
|
|
|
|
// Helper function to compute a default shuffle size |
|
|
|
int64_t ComputeShuffleSize(int64_t num_files, int64_t num_devices, int64_t num_rows, int64_t total_rows) { |
|
|
|
Status ComputeShuffleSize(int64_t num_files, int64_t num_devices, int64_t num_rows, int64_t total_rows, |
|
|
|
int64_t *shuffle_size) { |
|
|
|
const int64_t average_files_multiplier = 4; |
|
|
|
const int64_t shuffle_max = 10000; |
|
|
|
int64_t avg_rows_per_file = 0; |
|
|
|
int64_t shuffle_size = 0; |
|
|
|
|
|
|
|
// Adjust the num rows per shard if sharding was given |
|
|
|
if (num_devices > 0) { |
|
|
|
@@ -387,8 +386,20 @@ int64_t ComputeShuffleSize(int64_t num_files, int64_t num_devices, int64_t num_r |
|
|
|
// get the average per file |
|
|
|
avg_rows_per_file = num_rows / num_files; |
|
|
|
|
|
|
|
shuffle_size = std::max(avg_rows_per_file * average_files_multiplier, shuffle_max); |
|
|
|
return shuffle_size; |
|
|
|
*shuffle_size = std::max(avg_rows_per_file * average_files_multiplier, shuffle_max); |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
// Helper function to inject a shuffle operator over top of current operator being built |
|
|
|
Status AddShuffleOp(int64_t num_files, int64_t num_devices, int64_t num_rows, int64_t total_rows, |
|
|
|
int32_t connector_que_size, int32_t rows_per_buffer, std::shared_ptr<DatasetOp> *shuffle_op) { |
|
|
|
std::shared_ptr<ShuffleOp> new_shuffle_op = nullptr; |
|
|
|
int64_t shuffle_size = 0; |
|
|
|
RETURN_EMPTY_IF_ERROR(ComputeShuffleSize(num_files, num_devices, num_rows, total_rows, &shuffle_size)); |
|
|
|
MS_LOG(INFO) << "Dataset::AddShuffleOp - num_rows: " << num_rows << ", shuffle_size: " << shuffle_size; |
|
|
|
// Add the shuffle op |
|
|
|
*shuffle_op = std::make_shared<ShuffleOp>(shuffle_size, GetSeed(), connector_que_size, true, rows_per_buffer); |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
// Helper function to validate dataset params |
|
|
|
@@ -694,20 +705,21 @@ std::vector<std::shared_ptr<DatasetOp>> CLUEDataset::Build() { |
|
|
|
std::shared_ptr<ClueOp> clue_op = |
|
|
|
std::make_shared<ClueOp>(num_workers_, rows_per_buffer_, num_samples_, worker_connector_size_, ck_map, |
|
|
|
dataset_files_, connector_que_size_, shuffle_files, num_shards_, shard_id_); |
|
|
|
clue_op->Init(); |
|
|
|
RETURN_EMPTY_IF_ERROR(clue_op->Init()); |
|
|
|
if (shuffle_ == ShuffleMode::kGlobal) { |
|
|
|
// Inject ShuffleOp |
|
|
|
int64_t shuffle_size = 0; |
|
|
|
std::shared_ptr<DatasetOp> shuffle_op = nullptr; |
|
|
|
int64_t num_rows = 0; |
|
|
|
|
|
|
|
// First, get the number of rows in the datset and then compute the shuffle size |
|
|
|
// First, get the number of rows in the dataset |
|
|
|
RETURN_EMPTY_IF_ERROR(ClueOp::CountAllFileRows(dataset_files_, &num_rows)); |
|
|
|
shuffle_size = ComputeShuffleSize(dataset_files_.size(), num_shards_, num_rows, 0); |
|
|
|
MS_LOG(INFO) << "CLUEDataset::Build - num_rows: " << num_rows << ", shuffle_size: " << shuffle_size; |
|
|
|
std::shared_ptr<DatasetOp> op = |
|
|
|
std::make_shared<ShuffleOp>(shuffle_size, GetSeed(), worker_connector_size_, true, rows_per_buffer_); |
|
|
|
node_ops.push_back(op); |
|
|
|
|
|
|
|
// Add the shuffle op after this op |
|
|
|
RETURN_EMPTY_IF_ERROR(AddShuffleOp(dataset_files_.size(), num_shards_, num_rows, 0, connector_que_size_, |
|
|
|
rows_per_buffer_, &shuffle_op)); |
|
|
|
node_ops.push_back(shuffle_op); |
|
|
|
} |
|
|
|
|
|
|
|
node_ops.push_back(clue_op); |
|
|
|
return node_ops; |
|
|
|
} |
|
|
|
@@ -925,18 +937,15 @@ std::vector<std::shared_ptr<DatasetOp>> TextFileDataset::Build() { |
|
|
|
|
|
|
|
if (shuffle_ == ShuffleMode::kGlobal) { |
|
|
|
// Inject ShuffleOp |
|
|
|
|
|
|
|
std::shared_ptr<DatasetOp> shuffle_op = nullptr; |
|
|
|
int64_t shuffle_size = 0; |
|
|
|
int64_t num_rows = 0; |
|
|
|
|
|
|
|
// First, get the number of rows in the dataset and then compute the shuffle size |
|
|
|
// First, get the number of rows in the dataset |
|
|
|
RETURN_EMPTY_IF_ERROR(TextFileOp::CountAllFileRows(dataset_files_, &num_rows)); |
|
|
|
shuffle_size = ComputeShuffleSize(dataset_files_.size(), num_shards_, num_rows, 0); |
|
|
|
MS_LOG(INFO) << "TextFileDataset::Build - num_rows: " << num_rows << ", shuffle_size: " << shuffle_size; |
|
|
|
|
|
|
|
// Add the shuffle op after this op |
|
|
|
shuffle_op = std::make_shared<ShuffleOp>(shuffle_size, GetSeed(), connector_que_size_, true, rows_per_buffer_); |
|
|
|
RETURN_EMPTY_IF_ERROR(AddShuffleOp(dataset_files_.size(), num_shards_, num_rows, 0, connector_que_size_, |
|
|
|
rows_per_buffer_, &shuffle_op)); |
|
|
|
node_ops.push_back(shuffle_op); |
|
|
|
} |
|
|
|
|
|
|
|
|