|
|
|
@@ -20,6 +20,7 @@ |
|
|
|
#include <iostream> |
|
|
|
#include <utility> |
|
|
|
#include <fstream> |
|
|
|
#include <algorithm> |
|
|
|
#include <thread> |
|
|
|
#include "nlohmann/json.hpp" |
|
|
|
#include "backend/session/anf_runtime_algorithm.h" |
|
|
|
@@ -499,235 +500,329 @@ int Sign(float x) { |
|
|
|
return 0; |
|
|
|
} |
|
|
|
|
|
|
|
void DeduplicateIndexedSlices(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, |
|
|
|
size_t outer_dim) { |
|
|
|
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); |
|
|
|
MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad->indices_); |
|
|
|
std::unordered_map<int, size_t> index_map; |
|
|
|
size_t unique_indices_size = 0; |
|
|
|
for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) { |
|
|
|
int index = origin_sparse_grad.indices_[i]; |
|
|
|
if (index < 0 || IntToSize(index) >= first_dim) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
auto iter = index_map.find(index); |
|
|
|
if (iter == index_map.end()) { |
|
|
|
index_map[index] = unique_indices_size; |
|
|
|
unique_grad->indices_[unique_indices_size] = index; |
|
|
|
size_t start_index = unique_indices_size * outer_dim; |
|
|
|
size_t end_index = start_index + outer_dim; |
|
|
|
for (size_t j = start_index, k = i * outer_dim; j < end_index; ++j, ++k) { |
|
|
|
unique_grad->value_[j] = origin_sparse_grad.value_[k]; |
|
|
|
} |
|
|
|
unique_indices_size++; |
|
|
|
} else { |
|
|
|
size_t first_index = iter->second; |
|
|
|
size_t start_index = first_index * outer_dim; |
|
|
|
size_t end_index = start_index + outer_dim; |
|
|
|
for (size_t j = start_index, k = i * outer_dim; j < end_index; ++j, ++k) { |
|
|
|
unique_grad->value_[j] += origin_sparse_grad.value_[k]; |
|
|
|
} |
|
|
|
namespace { |
|
|
|
struct BucketSparseGradient { |
|
|
|
float *value_; |
|
|
|
int *indices_; |
|
|
|
int *global_indices_; |
|
|
|
size_t indices_size_; |
|
|
|
}; |
|
|
|
|
|
|
|
struct MultiThreadReduceSparseGradientParam { |
|
|
|
SparseGradient *input_grad_{nullptr}; |
|
|
|
SparseGradient *workspace_grad_{nullptr}; |
|
|
|
SparseGradient *output_grad_{nullptr}; |
|
|
|
size_t max_index_{0}; |
|
|
|
size_t value_stride_{0}; |
|
|
|
size_t thread_num_{0}; |
|
|
|
bool use_sort_reduce_{false}; |
|
|
|
}; |
|
|
|
|
|
|
|
void CalculateEachBucketSize(const std::shared_ptr<SparseGradient> &sparse_grad, size_t max_index, |
|
|
|
std::vector<size_t> *each_bucket_size) { |
|
|
|
MS_LOG(DEBUG) << "Start"; |
|
|
|
MS_EXCEPTION_IF_NULL(sparse_grad); |
|
|
|
MS_EXCEPTION_IF_NULL(sparse_grad->indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(each_bucket_size); |
|
|
|
size_t bucket_num = each_bucket_size->size(); |
|
|
|
for (size_t i = 0; i < sparse_grad->indices_size_; ++i) { |
|
|
|
int index = sparse_grad->indices_[i]; |
|
|
|
if (index >= 0 && IntToSize(index) < max_index) { |
|
|
|
auto bucket_id = index % bucket_num; |
|
|
|
each_bucket_size->at(bucket_id)++; |
|
|
|
} |
|
|
|
} |
|
|
|
unique_grad->indices_size_ = unique_indices_size; |
|
|
|
MS_LOG(DEBUG) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
struct WorkerParamsForReduceSparseGradient { |
|
|
|
size_t slice_start_{0}; |
|
|
|
size_t slice_end_{0}; |
|
|
|
size_t max_length_{0}; |
|
|
|
size_t outer_dim_{0}; |
|
|
|
std::vector<std::pair<int, size_t>> *sorted_indices_{nullptr}; |
|
|
|
std::vector<size_t> *slice_positions_{nullptr}; |
|
|
|
float *src_value_{nullptr}; |
|
|
|
SparseGradient *unique_grad_{nullptr}; |
|
|
|
}; |
|
|
|
void SplitAndCalculateSegmentBucketSize(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
std::vector<std::shared_ptr<SparseGradient>> *segments_ptr, |
|
|
|
std::vector<std::shared_ptr<std::vector<size_t>>> *segment_bucket_sizes_ptr) { |
|
|
|
MS_EXCEPTION_IF_NULL(param.input_grad_); |
|
|
|
MS_EXCEPTION_IF_NULL(segment_bucket_sizes_ptr); |
|
|
|
MS_EXCEPTION_IF_NULL(segments_ptr); |
|
|
|
auto &segments = *segments_ptr; |
|
|
|
auto &segment_bucket_sizes = *segment_bucket_sizes_ptr; |
|
|
|
auto input_grad = param.input_grad_; |
|
|
|
if (param.thread_num_ < 1) { |
|
|
|
MS_EXCEPTION(ArgumentError) << "Input param thread num must > 0!"; |
|
|
|
} |
|
|
|
size_t thread_indices_size = input_grad->indices_size_ / param.thread_num_; |
|
|
|
size_t left_indices_size = input_grad->indices_size_ % param.thread_num_; |
|
|
|
std::vector<std::thread> threads; |
|
|
|
threads.reserve(param.thread_num_); |
|
|
|
segments.reserve(param.thread_num_); |
|
|
|
|
|
|
|
void WorkerForReduceSparseGradient(WorkerParamsForReduceSparseGradient param) { |
|
|
|
MS_EXCEPTION_IF_NULL(param.sorted_indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.slice_positions_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.src_value_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.unique_grad_); |
|
|
|
auto outer_dim = param.outer_dim_; |
|
|
|
auto &sorted_indices = *(param.sorted_indices_); |
|
|
|
auto &slice_positions = *(param.slice_positions_); |
|
|
|
auto unique_grad = param.unique_grad_; |
|
|
|
for (size_t slice_id = param.slice_start_; slice_id < param.slice_end_; ++slice_id) { |
|
|
|
size_t cur_pos = slice_positions[slice_id]; |
|
|
|
int index = sorted_indices[cur_pos].first; |
|
|
|
unique_grad->indices_[slice_id] = index; |
|
|
|
size_t start_index = slice_id * outer_dim; |
|
|
|
auto ret_code = memcpy_s(unique_grad->value_ + start_index, (param.max_length_ - start_index) * sizeof(float), |
|
|
|
param.src_value_ + sorted_indices[cur_pos].second, outer_dim * sizeof(float)); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
} |
|
|
|
cur_pos++; |
|
|
|
size_t end_pos; |
|
|
|
if (slice_id + 1 < slice_positions.size()) { |
|
|
|
end_pos = slice_positions[slice_id + 1]; |
|
|
|
} else { |
|
|
|
end_pos = sorted_indices.size(); |
|
|
|
} |
|
|
|
while (cur_pos < end_pos) { |
|
|
|
for (size_t i = 0; i < outer_dim; ++i) { |
|
|
|
unique_grad->value_[start_index + i] += param.src_value_[sorted_indices[cur_pos].second + i]; |
|
|
|
} |
|
|
|
cur_pos++; |
|
|
|
size_t current_indices_offset = 0; |
|
|
|
for (size_t i = 0; i < param.thread_num_; ++i) { |
|
|
|
segment_bucket_sizes.emplace_back(std::make_shared<std::vector<size_t>>(param.thread_num_, 0)); |
|
|
|
size_t indices_size = thread_indices_size; |
|
|
|
if (i < left_indices_size) { |
|
|
|
indices_size += 1; |
|
|
|
} |
|
|
|
segments.emplace_back(std::make_shared<SparseGradient>()); |
|
|
|
segments[i]->value_ = input_grad->value_ + current_indices_offset * param.value_stride_; |
|
|
|
segments[i]->indices_ = input_grad->indices_ + current_indices_offset; |
|
|
|
segments[i]->indices_size_ = indices_size; |
|
|
|
threads.emplace_back( |
|
|
|
std::thread(CalculateEachBucketSize, segments[i], param.max_index_, segment_bucket_sizes[i].get())); |
|
|
|
current_indices_offset += indices_size; |
|
|
|
} |
|
|
|
|
|
|
|
for (size_t i = 0; i < param.thread_num_; ++i) { |
|
|
|
threads[i].join(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
void RunMultiThreadReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, |
|
|
|
size_t outer_dim, std::vector<std::pair<int, size_t>> *sorted_indices, |
|
|
|
std::vector<size_t> *slice_positions) { |
|
|
|
void CopySegmentIndicesToBucket(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
const std::shared_ptr<SparseGradient> &segment, size_t bucket_offset, |
|
|
|
const std::vector<std::shared_ptr<BucketSparseGradient>> &buckets) { |
|
|
|
MS_LOG(DEBUG) << "Start"; |
|
|
|
size_t thread_num = 24; |
|
|
|
if (slice_positions->size() < thread_num) { |
|
|
|
thread_num = slice_positions->size(); |
|
|
|
MS_EXCEPTION_IF_NULL(segment); |
|
|
|
MS_EXCEPTION_IF_NULL(segment->indices_); |
|
|
|
std::vector<size_t> bucket_data_num(param.thread_num_, 0); |
|
|
|
for (size_t i = 0; i < segment->indices_size_; ++i) { |
|
|
|
int index = segment->indices_[i]; |
|
|
|
if (index >= 0 && IntToSize(index) < param.max_index_) { |
|
|
|
auto bucket_id = index % param.thread_num_; |
|
|
|
auto bucket_index = bucket_data_num[bucket_id]; |
|
|
|
buckets[bucket_id]->indices_[bucket_index] = index; |
|
|
|
buckets[bucket_id]->global_indices_[bucket_index] = bucket_offset + i; |
|
|
|
bucket_data_num[bucket_id]++; |
|
|
|
} |
|
|
|
} |
|
|
|
size_t stride = (slice_positions->size() + thread_num - 1) / thread_num; |
|
|
|
thread_num = (slice_positions->size() + stride - 1) / stride; |
|
|
|
std::vector<std::thread> threads; |
|
|
|
size_t max_length = sorted_indices->size() * outer_dim; |
|
|
|
MS_LOG(DEBUG) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
void GatherSegmentIndicesToOutputBucket(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
const std::vector<std::shared_ptr<SparseGradient>> &segments, |
|
|
|
const std::vector<std::shared_ptr<std::vector<size_t>>> &segment_bucket_sizes, |
|
|
|
std::vector<std::shared_ptr<BucketSparseGradient>> *buckets_ptr) { |
|
|
|
MS_EXCEPTION_IF_NULL(param.output_grad_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.output_grad_->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.output_grad_->indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(buckets_ptr); |
|
|
|
auto &buckets = *buckets_ptr; |
|
|
|
size_t thread_num = param.thread_num_; |
|
|
|
if (thread_num != segment_bucket_sizes.size()) { |
|
|
|
MS_EXCEPTION(ArgumentError) << "Input param thread num not equal to segment size!"; |
|
|
|
} |
|
|
|
std::vector<size_t> bucket_data_size(thread_num, 0); |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
size_t slice_start = i * stride; |
|
|
|
size_t slice_end = 0; |
|
|
|
if (i == thread_num - 1) { |
|
|
|
slice_end = slice_positions->size(); |
|
|
|
} else { |
|
|
|
slice_end = slice_start + stride; |
|
|
|
for (size_t j = 0; j < thread_num; ++j) { |
|
|
|
bucket_data_size[j] += segment_bucket_sizes[i]->at(j); |
|
|
|
} |
|
|
|
WorkerParamsForReduceSparseGradient params{ |
|
|
|
slice_start, slice_end, max_length, outer_dim, sorted_indices, slice_positions, origin_sparse_grad.value_, |
|
|
|
unique_grad}; |
|
|
|
threads.emplace_back(std::thread(WorkerForReduceSparseGradient, params)); |
|
|
|
} |
|
|
|
size_t current_indices_offset = 0; |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
buckets.emplace_back(std::make_shared<BucketSparseGradient>()); |
|
|
|
buckets[i]->value_ = param.output_grad_->value_ + current_indices_offset * param.value_stride_; |
|
|
|
buckets[i]->indices_ = param.output_grad_->indices_ + current_indices_offset; |
|
|
|
buckets[i]->global_indices_ = param.workspace_grad_->indices_ + current_indices_offset; |
|
|
|
buckets[i]->indices_size_ = bucket_data_size[i]; |
|
|
|
current_indices_offset += bucket_data_size[i]; |
|
|
|
} |
|
|
|
std::vector<size_t> tmp_bucket_data_size(thread_num, 0); |
|
|
|
std::vector<std::vector<std::shared_ptr<BucketSparseGradient>>> each_thread_buckets; |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
std::vector<std::shared_ptr<BucketSparseGradient>> thread_buckets; |
|
|
|
for (size_t j = 0; j < thread_num; ++j) { |
|
|
|
thread_buckets.emplace_back(std::make_shared<BucketSparseGradient>()); |
|
|
|
thread_buckets[j]->indices_ = buckets[j]->indices_ + tmp_bucket_data_size[j]; |
|
|
|
thread_buckets[j]->global_indices_ = buckets[j]->global_indices_ + tmp_bucket_data_size[j]; |
|
|
|
thread_buckets[j]->value_ = buckets[j]->value_ + tmp_bucket_data_size[j] * param.value_stride_; |
|
|
|
thread_buckets[j]->indices_size_ = segment_bucket_sizes[i]->at(j); |
|
|
|
tmp_bucket_data_size[j] += segment_bucket_sizes[i]->at(j); |
|
|
|
} |
|
|
|
each_thread_buckets.emplace_back(thread_buckets); |
|
|
|
} |
|
|
|
std::vector<std::thread> threads; |
|
|
|
threads.reserve(thread_num); |
|
|
|
current_indices_offset = 0; |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
threads.emplace_back( |
|
|
|
std::thread(CopySegmentIndicesToBucket, param, segments[i], current_indices_offset, each_thread_buckets[i])); |
|
|
|
current_indices_offset += segments[i]->indices_size_; |
|
|
|
} |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
threads[i].join(); |
|
|
|
} |
|
|
|
MS_LOG(DEBUG) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
void ReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *unique_grad, size_t first_dim, |
|
|
|
size_t outer_dim, bool use_multi_threads) { |
|
|
|
void SortAndReduceBucketSparseGradient(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
const std::shared_ptr<BucketSparseGradient> &bucket, |
|
|
|
const std::shared_ptr<SparseGradient> &reduced_bucket) { |
|
|
|
MS_LOG(DEBUG) << "Start"; |
|
|
|
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); |
|
|
|
MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad->indices_); |
|
|
|
std::vector<std::pair<int, size_t>> sorted_indices; |
|
|
|
sorted_indices.reserve(origin_sparse_grad.indices_size_); |
|
|
|
for (size_t i = 0; i < origin_sparse_grad.indices_size_; ++i) { |
|
|
|
int index = origin_sparse_grad.indices_[i]; |
|
|
|
if (index >= 0 && IntToSize(index) < first_dim) { |
|
|
|
sorted_indices.emplace_back(std::pair<int, size_t>(index, i * outer_dim)); |
|
|
|
} |
|
|
|
} |
|
|
|
std::sort( |
|
|
|
sorted_indices.begin(), sorted_indices.end(), |
|
|
|
[](const std::pair<int, size_t> &left, const std::pair<int, size_t> &right) { return left.first < right.first; }); |
|
|
|
int last_index = 0; |
|
|
|
std::vector<size_t> slice_positions; |
|
|
|
slice_positions.reserve(sorted_indices.size()); |
|
|
|
MS_EXCEPTION_IF_NULL(bucket); |
|
|
|
MS_EXCEPTION_IF_NULL(bucket->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(bucket->indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_bucket); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_bucket->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_bucket->indices_); |
|
|
|
std::vector<std::pair<int, int>> sorted_indices; |
|
|
|
sorted_indices.reserve(bucket->indices_size_); |
|
|
|
for (size_t i = 0; i < bucket->indices_size_; ++i) { |
|
|
|
int index = bucket->indices_[i]; |
|
|
|
int global_index = bucket->global_indices_[i]; |
|
|
|
sorted_indices.emplace_back(std::pair<int, int>(index, global_index)); |
|
|
|
} |
|
|
|
std::sort(sorted_indices.begin(), sorted_indices.end()); |
|
|
|
|
|
|
|
float *global_value = param.input_grad_->value_; |
|
|
|
size_t unique_indices_size = 0; |
|
|
|
size_t max_length = reduced_bucket->indices_size_ * param.value_stride_; |
|
|
|
int last_index{0}; |
|
|
|
size_t value_offset{0}; |
|
|
|
for (size_t i = 0; i < sorted_indices.size(); ++i) { |
|
|
|
if (i == 0 || last_index != sorted_indices[i].first) { |
|
|
|
slice_positions.emplace_back(i); |
|
|
|
int index = sorted_indices[i].first; |
|
|
|
int global_index = sorted_indices[i].second; |
|
|
|
int global_value_offset = global_index * param.value_stride_; |
|
|
|
if (i == 0 || index != last_index) { |
|
|
|
if (i != 0) { |
|
|
|
unique_indices_size++; |
|
|
|
} |
|
|
|
reduced_bucket->indices_[unique_indices_size] = index; |
|
|
|
value_offset = unique_indices_size * param.value_stride_; |
|
|
|
auto ret_code = memcpy_s(reduced_bucket->value_ + value_offset, (max_length - value_offset) * sizeof(float), |
|
|
|
global_value + global_value_offset, param.value_stride_ * sizeof(float)); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
} |
|
|
|
} else { |
|
|
|
for (size_t j = 0; j < param.value_stride_; ++j) { |
|
|
|
reduced_bucket->value_[value_offset + j] += global_value[global_value_offset + j]; |
|
|
|
} |
|
|
|
} |
|
|
|
last_index = sorted_indices[i].first; |
|
|
|
last_index = index; |
|
|
|
} |
|
|
|
if (use_multi_threads) { |
|
|
|
RunMultiThreadReduceSparseGradient(origin_sparse_grad, unique_grad, outer_dim, &sorted_indices, &slice_positions); |
|
|
|
} else { |
|
|
|
size_t max_length = sorted_indices.size() * outer_dim; |
|
|
|
WorkerParamsForReduceSparseGradient params{0, |
|
|
|
slice_positions.size(), |
|
|
|
max_length, |
|
|
|
outer_dim, |
|
|
|
&sorted_indices, |
|
|
|
&slice_positions, |
|
|
|
origin_sparse_grad.value_, |
|
|
|
unique_grad}; |
|
|
|
WorkerForReduceSparseGradient(params); |
|
|
|
} |
|
|
|
unique_grad->indices_size_ = slice_positions.size(); |
|
|
|
reduced_bucket->indices_size_ = unique_indices_size; |
|
|
|
MS_LOG(DEBUG) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
void ReduceMultiSparseGradient(const std::vector<std::shared_ptr<SparseGradient>> &unique_slice_grads, |
|
|
|
SparseGradient *tmp_grad, SparseGradient *unique_grad, size_t first_dim, |
|
|
|
size_t outer_dim) { |
|
|
|
void ReduceBucketSparseGradient(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
const std::shared_ptr<BucketSparseGradient> &bucket, |
|
|
|
const std::shared_ptr<SparseGradient> &reduced_bucket) { |
|
|
|
MS_LOG(DEBUG) << "Start"; |
|
|
|
if (unique_slice_grads.empty()) { |
|
|
|
return; |
|
|
|
} |
|
|
|
size_t index_data_size = outer_dim * sizeof(float); |
|
|
|
MS_EXCEPTION_IF_NULL(bucket); |
|
|
|
MS_EXCEPTION_IF_NULL(bucket->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(bucket->indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_bucket); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_bucket->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_bucket->indices_); |
|
|
|
|
|
|
|
float *global_value = param.input_grad_->value_; |
|
|
|
std::unordered_map<int, size_t> index_map; |
|
|
|
size_t unique_indices_size = 0; |
|
|
|
for (size_t i = 0; i < unique_slice_grads.size(); ++i) { |
|
|
|
auto &slice_grad = unique_slice_grads[i]; |
|
|
|
auto ret_code = memcpy_s(tmp_grad->value_ + unique_indices_size * outer_dim, |
|
|
|
(tmp_grad->indices_size_ - unique_indices_size) * index_data_size, slice_grad->value_, |
|
|
|
slice_grad->indices_size_ * index_data_size); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
} |
|
|
|
ret_code = |
|
|
|
memcpy_s(tmp_grad->indices_ + unique_indices_size, (tmp_grad->indices_size_ - unique_indices_size) * sizeof(int), |
|
|
|
slice_grad->indices_, slice_grad->indices_size_ * sizeof(int)); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
size_t max_length = reduced_bucket->indices_size_ * param.value_stride_; |
|
|
|
for (size_t i = 0; i < bucket->indices_size_; ++i) { |
|
|
|
int index = bucket->indices_[i]; |
|
|
|
int global_index = bucket->global_indices_[i]; |
|
|
|
auto iter = index_map.find(index); |
|
|
|
if (iter == index_map.end()) { |
|
|
|
reduced_bucket->indices_[unique_indices_size] = index; |
|
|
|
size_t start_index = unique_indices_size * param.value_stride_; |
|
|
|
index_map[index] = start_index; |
|
|
|
auto ret_code = memcpy_s(reduced_bucket->value_ + start_index, (max_length - start_index) * sizeof(float), |
|
|
|
global_value + global_index * param.value_stride_, param.value_stride_ * sizeof(float)); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
} |
|
|
|
unique_indices_size++; |
|
|
|
} else { |
|
|
|
size_t start_index = iter->second; |
|
|
|
size_t end_index = start_index + param.value_stride_; |
|
|
|
for (size_t j = start_index, k = global_index * param.value_stride_; j < end_index; ++j, ++k) { |
|
|
|
reduced_bucket->value_[j] += global_value[k]; |
|
|
|
} |
|
|
|
} |
|
|
|
unique_indices_size += slice_grad->indices_size_; |
|
|
|
} |
|
|
|
tmp_grad->indices_size_ = unique_indices_size; |
|
|
|
ReduceSparseGradient(*tmp_grad, unique_grad, first_dim, outer_dim); |
|
|
|
reduced_bucket->indices_size_ = unique_indices_size; |
|
|
|
MS_LOG(DEBUG) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
void TwoLevelReduceSparseGradient(const SparseGradient &origin_sparse_grad, SparseGradient *tmp_grad, |
|
|
|
SparseGradient *unique_grad, size_t first_dim, size_t outer_dim) { |
|
|
|
MS_LOG(DEBUG) << "Start"; |
|
|
|
MS_EXCEPTION_IF_NULL(origin_sparse_grad.value_); |
|
|
|
MS_EXCEPTION_IF_NULL(origin_sparse_grad.indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(unique_grad->indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(tmp_grad); |
|
|
|
MS_EXCEPTION_IF_NULL(tmp_grad->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(tmp_grad->indices_); |
|
|
|
size_t thread_num = 24; |
|
|
|
if (origin_sparse_grad.indices_size_ < thread_num) { |
|
|
|
thread_num = origin_sparse_grad.indices_size_; |
|
|
|
} |
|
|
|
size_t thread_indices_size = origin_sparse_grad.indices_size_ / thread_num; |
|
|
|
size_t left_indices_size = origin_sparse_grad.indices_size_ % thread_num; |
|
|
|
void ReduceBucketSparseGradientToWorkspace(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
const std::vector<std::shared_ptr<BucketSparseGradient>> &buckets, |
|
|
|
std::vector<std::shared_ptr<SparseGradient>> *reduced_buckets_ptr) { |
|
|
|
MS_EXCEPTION_IF_NULL(param.workspace_grad_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.workspace_grad_->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(param.workspace_grad_->indices_); |
|
|
|
MS_EXCEPTION_IF_NULL(reduced_buckets_ptr); |
|
|
|
auto &reduced_buckets = *reduced_buckets_ptr; |
|
|
|
size_t thread_num = buckets.size(); |
|
|
|
std::vector<std::thread> threads; |
|
|
|
threads.reserve(thread_num); |
|
|
|
std::vector<std::shared_ptr<SparseGradient>> unique_slice_grads; |
|
|
|
|
|
|
|
size_t current_indices_offset = 0; |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
size_t indices_size = thread_indices_size; |
|
|
|
if (i == thread_num - 1) { |
|
|
|
indices_size = thread_indices_size + left_indices_size; |
|
|
|
reduced_buckets.emplace_back(std::make_shared<SparseGradient>()); |
|
|
|
reduced_buckets[i]->value_ = param.workspace_grad_->value_ + current_indices_offset * param.value_stride_; |
|
|
|
reduced_buckets[i]->indices_ = param.workspace_grad_->indices_ + current_indices_offset; |
|
|
|
reduced_buckets[i]->indices_size_ = buckets[i]->indices_size_; |
|
|
|
if (param.use_sort_reduce_) { |
|
|
|
threads.emplace_back(std::thread(SortAndReduceBucketSparseGradient, param, buckets[i], reduced_buckets[i])); |
|
|
|
} else { |
|
|
|
threads.emplace_back(std::thread(ReduceBucketSparseGradient, param, buckets[i], reduced_buckets[i])); |
|
|
|
} |
|
|
|
size_t value_offset = i * thread_indices_size * outer_dim; |
|
|
|
size_t indices_offset = i * thread_indices_size; |
|
|
|
auto slice_grad = SparseGradient( |
|
|
|
{origin_sparse_grad.value_ + value_offset, origin_sparse_grad.indices_ + indices_offset, indices_size}); |
|
|
|
unique_slice_grads.emplace_back(std::make_shared<SparseGradient>()); |
|
|
|
unique_slice_grads[i]->value_ = unique_grad->value_ + value_offset; |
|
|
|
unique_slice_grads[i]->indices_ = unique_grad->indices_ + indices_offset; |
|
|
|
unique_slice_grads[i]->indices_size_ = indices_size; |
|
|
|
threads.emplace_back( |
|
|
|
std::thread(ReduceSparseGradient, slice_grad, unique_slice_grads[i].get(), first_dim, outer_dim, false)); |
|
|
|
current_indices_offset += buckets[i]->indices_size_; |
|
|
|
} |
|
|
|
for (size_t i = 0; i < thread_num; ++i) { |
|
|
|
threads[i].join(); |
|
|
|
} |
|
|
|
ReduceMultiSparseGradient(unique_slice_grads, tmp_grad, unique_grad, first_dim, outer_dim); |
|
|
|
} |
|
|
|
|
|
|
|
void MergeReduceSparseGradient(const MultiThreadReduceSparseGradientParam ¶m, |
|
|
|
const std::vector<std::shared_ptr<SparseGradient>> &reduced_buckets) { |
|
|
|
MS_EXCEPTION_IF_NULL(param.output_grad_); |
|
|
|
auto output_grad = param.output_grad_; |
|
|
|
MS_EXCEPTION_IF_NULL(output_grad->value_); |
|
|
|
MS_EXCEPTION_IF_NULL(output_grad->indices_); |
|
|
|
size_t stride_data_size = param.value_stride_ * sizeof(float); |
|
|
|
size_t unique_indices_size = 0; |
|
|
|
for (size_t i = 0; i < reduced_buckets.size(); ++i) { |
|
|
|
auto &bucket = reduced_buckets[i]; |
|
|
|
MS_EXCEPTION_IF_NULL(bucket); |
|
|
|
if (bucket->indices_size_ == 0) { |
|
|
|
continue; |
|
|
|
} |
|
|
|
auto ret_code = memcpy_s(output_grad->value_ + unique_indices_size * param.value_stride_, |
|
|
|
(output_grad->indices_size_ - unique_indices_size) * stride_data_size, bucket->value_, |
|
|
|
bucket->indices_size_ * stride_data_size); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
} |
|
|
|
ret_code = memcpy_s(output_grad->indices_ + unique_indices_size, |
|
|
|
(output_grad->indices_size_ - unique_indices_size) * sizeof(int), bucket->indices_, |
|
|
|
bucket->indices_size_ * sizeof(int)); |
|
|
|
if (ret_code != EOK) { |
|
|
|
MS_LOG(EXCEPTION) << "Failed to copy data!"; |
|
|
|
} |
|
|
|
unique_indices_size += bucket->indices_size_; |
|
|
|
} |
|
|
|
output_grad->indices_size_ = unique_indices_size; |
|
|
|
} |
|
|
|
} // namespace |
|
|
|
|
|
|
|
void BucketReduceSparseGradient(const ReduceSparseGradientParam ¶m) { |
|
|
|
MS_LOG(DEBUG) << "Start"; |
|
|
|
MS_EXCEPTION_IF_NULL(param.input_grad_); |
|
|
|
size_t thread_num = 23; |
|
|
|
if (param.input_grad_->indices_size_ < thread_num) { |
|
|
|
thread_num = param.input_grad_->indices_size_; |
|
|
|
} |
|
|
|
MultiThreadReduceSparseGradientParam multi_thread_param({param.input_grad_, param.workspace_grad_, param.output_grad_, |
|
|
|
param.max_index_, param.value_stride_, thread_num, |
|
|
|
param.use_sort_reduce_}); |
|
|
|
std::vector<std::shared_ptr<SparseGradient>> segments; |
|
|
|
std::vector<std::shared_ptr<std::vector<size_t>>> segment_bucket_sizes; |
|
|
|
SplitAndCalculateSegmentBucketSize(multi_thread_param, &segments, &segment_bucket_sizes); |
|
|
|
|
|
|
|
std::vector<std::shared_ptr<BucketSparseGradient>> buckets; |
|
|
|
GatherSegmentIndicesToOutputBucket(multi_thread_param, segments, segment_bucket_sizes, &buckets); |
|
|
|
|
|
|
|
std::vector<std::shared_ptr<SparseGradient>> reduced_buckets; |
|
|
|
ReduceBucketSparseGradientToWorkspace(multi_thread_param, buckets, &reduced_buckets); |
|
|
|
|
|
|
|
MergeReduceSparseGradient(multi_thread_param, reduced_buckets); |
|
|
|
MS_LOG(DEBUG) << "End"; |
|
|
|
} |
|
|
|
|
|
|
|
|