Browse Source

Capsulate address ptr generation in PS.

tags/v1.0.0
ZPaC 5 years ago
parent
commit
bb0a5a30cd
11 changed files with 282 additions and 212 deletions
  1. +10
    -3
      mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_proxy_kernel.cc
  2. +3
    -0
      mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pull_kernel.h
  3. +5
    -1
      mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.h
  4. +47
    -0
      mindspore/ccsrc/frontend/parallel/ps/common.h
  5. +98
    -79
      mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc
  6. +4
    -0
      mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h
  7. +74
    -116
      mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc
  8. +11
    -1
      mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.h
  9. +6
    -5
      mindspore/ccsrc/frontend/parallel/ps/parameter_server.h
  10. +9
    -3
      mindspore/ccsrc/frontend/parallel/ps/worker.h
  11. +15
    -4
      mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h

+ 10
- 3
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/embedding_look_up_proxy_kernel.cc View File

@@ -50,6 +50,12 @@ void EmbeddingLookUpProxyKernel::InitKernel(const CNodePtr &kernel_node) {
bool EmbeddingLookUpProxyKernel::Launch(const std::vector<kernel::AddressPtr> &inputs,
const std::vector<kernel::AddressPtr> & /*workspace*/,
const std::vector<kernel::AddressPtr> &outputs) {
if (inputs.size() != 2) {
MS_LOG(EXCEPTION) << "Inputs size is " << inputs.size() << ", but EmbeddingLookUpProxyKernel needs 2.";
}
if (outputs.size() != 1) {
MS_LOG(EXCEPTION) << "Outputs size is " << outputs.size() << ", but EmbeddingLookUpProxyKernel needs 1.";
}
auto indices_addr = reinterpret_cast<int *>(inputs[1]->addr);
auto output_addr = reinterpret_cast<float *>(outputs[0]->addr);
size_t input_size = inputs[1]->size;
@@ -59,17 +65,18 @@ bool EmbeddingLookUpProxyKernel::Launch(const std::vector<kernel::AddressPtr> &i
::ps::SArray<int> lookup_ids(size, 0);
::ps::SArray<int> lengths{size};
::ps::SArray<float> lookup_result(output_size / sizeof(float), 0);

auto ret = memcpy_s(lookup_ids.data(), input_size, indices_addr, input_size);
auto ret = memcpy_s(lookup_ids.data(), lookup_ids.size() * sizeof(int), indices_addr, input_size);
if (ret != EOK) {
MS_LOG(EXCEPTION) << "Lookup id memcpy failed.";
return false;
}
parallel::ps::worker.DoPSEmbeddingLookup({key_}, lookup_ids, lengths, &lookup_result,
parallel::ps::kEmbeddingLookupCmd);

auto ret2 = memcpy_s(output_addr, output_size, lookup_result.data(), output_size);
auto ret2 = memcpy_s(output_addr, outputs[0]->size, lookup_result.data(), output_size);
if (ret2 != EOK) {
MS_LOG(EXCEPTION) << "Lookup result memcpy failed.";
return false;
}
return true;
}


+ 3
- 0
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pull_kernel.h View File

@@ -33,6 +33,9 @@ class PullKernel : public CPUKernel {
~PullKernel() override = default;

bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &, const std::vector<AddressPtr> &) {
if (inputs.size() != 2) {
MS_LOG(EXCEPTION) << "Inputs size is " << inputs.size() << ", but PullKernel needs 2.";
}
bool init_in_server = parallel::ps::worker.GetParamInitInServer(param_name_);
// If init_in_server, forward kernel should run in server too.
if (!init_in_server) {


+ 5
- 1
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/push_kernel.h View File

@@ -34,6 +34,9 @@ class PushKernel : public CPUKernel {

bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &,
const std::vector<AddressPtr> &outputs) {
if (outputs.size() != 1) {
MS_LOG(EXCEPTION) << "Outputs size is " << outputs.size() << ", but PushKernel needs 1.";
}
std::vector<size_t> keys;
std::vector<uintptr_t> addrs;
std::vector<int> sizes;
@@ -43,9 +46,10 @@ class PushKernel : public CPUKernel {
sizes.push_back(SizeToInt(input->size) / sizeof(T));
}
parallel::ps::Worker<T>::GetInstance().Push(keys, addrs, sizes);
auto ret = memcpy_s(outputs[0]->addr, sizeof(size_t), &key_, sizeof(size_t));
auto ret = memcpy_s(outputs[0]->addr, outputs[0]->size, &key_, sizeof(size_t));
if (ret != EOK) {
MS_LOG(EXCEPTION) << "Lookup id memcpy failed.";
return false;
}
return true;
}


+ 47
- 0
mindspore/ccsrc/frontend/parallel/ps/common.h View File

@@ -20,6 +20,8 @@
#include <iostream>
#include <vector>
#include <memory>
#include <map>
#include <string>
#include "ps/ps.h"

namespace mindspore {
@@ -83,6 +85,51 @@ using WeightPtr = std::shared_ptr<Weight>;
using GradPtr = std::shared_ptr<Grad>;
using InputsShape = std::vector<std::shared_ptr<std::vector<size_t>>>;
using InputsShapePtr = std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>>;

constexpr size_t INDEX_NOT_SEND = UINT_MAX;
using OptimOriginIdx = std::map<std::string, size_t>;
using OptimPSSendIdx = std::map<std::string, size_t>;

const OptimOriginIdx kMomentumOriginIdx = {{"weight", 0}, {"accum", 1}, {"lr", 2}, {"grad", 3}, {"momentum", 4}};
const OptimPSSendIdx kMomentumPSSendIdx = {
{"weight", INDEX_NOT_SEND}, {"accum", INDEX_NOT_SEND}, {"lr", 0}, {"grad", 1}, {"momentum", 2}};

const OptimOriginIdx kSparseAdamOriginIdx = {{"weight", 0}, {"m", 1}, {"v", 2}, {"beta1_power", 3},
{"beta2_power", 4}, {"lr", 5}, {"beta1", 6}, {"beta2", 7},
{"eps", 8}, {"grad", 9}, {"indices", 10}};
const OptimPSSendIdx kSparseAdamPSSendIdx = {{"weight", INDEX_NOT_SEND},
{"m", INDEX_NOT_SEND},
{"v", INDEX_NOT_SEND},
{"beta1_power", 0},
{"beta2_power", 1},
{"lr", 2},
{"beta1", 3},
{"beta2", 4},
{"eps", 5},
{"grad", 6},
{"indices", 7}};

const OptimOriginIdx kSparseFtrlOriginIdx = {{"weight", 0}, {"accum", 1}, {"linear", 2}, {"grad", 3}, {"indices", 4}};
const OptimPSSendIdx kSparseFtrlPSSendIdx = {
{"weight", INDEX_NOT_SEND}, {"accum", INDEX_NOT_SEND}, {"linear", INDEX_NOT_SEND}, {"grad", 0}, {"indices", 1}};

const std::map<std::string, OptimOriginIdx> kOptimToOriginIdx = {{kApplyMomentum, kMomentumOriginIdx},
{kSparseAdam, kSparseAdamOriginIdx},
{kSparseLazyAdam, kSparseAdamOriginIdx},
{kSparseFtrl, kSparseFtrlOriginIdx}};
const std::map<std::string, OptimOriginIdx> kOptimToPSSendIdx = {{kApplyMomentum, kMomentumPSSendIdx},
{kSparseAdam, kSparseAdamPSSendIdx},
{kSparseLazyAdam, kSparseAdamPSSendIdx},
{kSparseFtrl, kSparseFtrlPSSendIdx}};

#define EXC_IF_VEC_IDX_OOB(vec, idx) \
{ \
size_t vec_size = vec.size(); \
if (idx >= vec_size) { \
MS_LOG(EXCEPTION) << "Vector " << #vec << " size is " << vec_size << ". So index " << idx \
<< " is out of bound."; \
} \
}
} // namespace ps
} // namespace parallel
} // namespace mindspore


+ 98
- 79
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc View File

@@ -17,6 +17,8 @@
#include "frontend/parallel/ps/optimizer_info.h"
#include <map>
#include <memory>
#include <string>
#include <functional>
#include "frontend/parallel/ps/util.h"

namespace mindspore {
@@ -38,6 +40,36 @@ size_t OptimizerInfo::grad_index() { return 0; }

size_t OptimizerInfo::indices_index() { return 0; }

template <typename T>
void OptimizerInfo::UpdateOptimInputValue(const std::string &optim_type, const std::string &input_name, void *data,
const Lengths &lens) {
if (kOptimToOriginIdx.count(optim_type) == 0 || kOptimToPSSendIdx.count(optim_type) == 0) {
MS_LOG(EXCEPTION) << "Optimizer type " << optim_type << " in not supported.";
}
const OptimOriginIdx &origin_input_map = kOptimToOriginIdx.at(optim_type);
const OptimPSSendIdx &ps_send_index_map = kOptimToPSSendIdx.at(optim_type);
if (ps_send_index_map.count(input_name) == 0 || origin_input_map.count(input_name) == 0) {
MS_LOG(EXCEPTION) << "Optimizer " << optim_type << " has no input for " << input_name;
}

size_t origin_index = origin_input_map.at(input_name);
size_t ps_send_index = ps_send_index_map.at(input_name);
if (ps_send_index > lens.size() || origin_index > inputs_.size()) {
MS_LOG(EXCEPTION) << "Index is out of bound for optimizer " << optim_type << ", origin_index:" << origin_index
<< ", ps_send_index:" << ps_send_index;
}
EXC_IF_VEC_IDX_OOB(lens, ps_send_index);
size_t size = lens[ps_send_index] * sizeof(T);
size_t offset = std::accumulate(lens.begin(), lens.begin() + ps_send_index, 0, std::plus<int>());
AddressPtr optim_input = inputs_[origin_index];
int ret = memcpy_s(optim_input->addr, optim_input->size, reinterpret_cast<T *>(data) + offset, size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
return;
}

void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
size_t size = gradient()->size / sizeof(float);
@@ -77,8 +109,9 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
}
float *incr_grad_data = values.data() + grad_offset;
size_t incr_grad_size = lengths[grad_index] * sizeof(float);

auto ret = memcpy_s(accum_grad_data + grads_offset_, incr_grad_size, incr_grad_data, incr_grad_size);
size_t dst_size = incr_grad_size;
size_t src_size = incr_grad_size;
auto ret = memcpy_s(accum_grad_data + grads_offset_, dst_size, incr_grad_data, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@@ -97,6 +130,8 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
int *incr_indice_data = reinterpret_cast<int *>(values.data()) + indice_offset;
size_t incr_indice_size = lengths[indices_index];
size_t incr_indice_data_size = incr_indice_size * sizeof(int);
dst_size = incr_indice_data_size;
src_size = incr_indice_data_size;
auto ret2 =
memcpy_s(accum_indices_data + indices_offset_, incr_indice_data_size, incr_indice_data, incr_indice_data_size);
if (ret2 != 0) {
@@ -109,6 +144,8 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {

void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes, size_t n, size_t server_num,
size_t rank_id) {
MS_EXCEPTION_IF_NULL(gradient());
MS_EXCEPTION_IF_NULL(indices());
size_t indices_size = static_cast<size_t>(indices()->size / sizeof(int));
int segment_size = gradient()->size / indices()->size;

@@ -116,7 +153,6 @@ void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes
std::vector<int> new_indices(indices_size);
mindspore::kernel::SparseGradient<int> unique_sparse_grad({new_grad.data(), new_indices.data(), indices_size});

// const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes;
if (shapes.size() < 2 || shapes[1].empty()) {
MS_LOG(EXCEPTION) << "No input shape found";
}
@@ -153,13 +189,13 @@ void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes
&unique_sparse_grad);

int reduced_grad_size = unique_sparse_grad.indices_size_ * segment_size * sizeof(float);
auto ret = memcpy_s(gradient()->addr, reduced_grad_size, unique_sparse_grad.value_, reduced_grad_size);
auto ret = memcpy_s(gradient()->addr, gradient()->size, unique_sparse_grad.value_, reduced_grad_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
int reduced_indice_size = unique_sparse_grad.indices_size_ * sizeof(int);
ret = memcpy_s(indices()->addr, reduced_indice_size, unique_sparse_grad.indices_, reduced_indice_size);
ret = memcpy_s(indices()->addr, indices()->size, unique_sparse_grad.indices_, reduced_indice_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@@ -193,22 +229,27 @@ MomentumOptimInfo::MomentumOptimInfo(const AddressPtr &weight, const AddressPtr
}

void MomentumOptimInfo::Update(const Values &values, const Lengths &lens) {
const size_t lr_offset = 0;
float *lr = values.data() + lr_offset;
auto ret = memcpy_s(inputs_[2]->addr, sizeof(float), lr, sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
UpdateOptimInputValue<float>(kApplyMomentum, "lr", values.data(), lens);
}

const size_t SparseOptimInfo::indice_size() const { return indices_offset_; }

const AddressPtr &MomentumOptimInfo::gradient() { return inputs_[3]; }
const AddressPtr &MomentumOptimInfo::gradient() {
size_t origin_grad_index = kMomentumOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
return inputs_[origin_grad_index];
}

const AddressPtr &MomentumOptimInfo::indices() { return inputs_[3]; }
const AddressPtr &MomentumOptimInfo::indices() {
size_t origin_grad_index = kMomentumOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
return inputs_[origin_grad_index];
}

size_t MomentumOptimInfo::grad_index() { return 1; }
size_t MomentumOptimInfo::grad_index() {
size_t ps_grad_index = kMomentumPSSendIdx.at("grad");
return ps_grad_index;
}

SparseAdamOptimInfo::SparseAdamOptimInfo(const AddressPtr &weight, const AddressPtr &m, const AddressPtr &v,
const AddressPtr &beta1_power, const AddressPtr &beta2_power,
@@ -231,73 +272,37 @@ SparseAdamOptimInfo::SparseAdamOptimInfo(const AddressPtr &weight, const Address
}

void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
float *data_ptr = values.data();
int offset = 0;

AddressPtr &beta1_power = inputs_[3];
int size = lens[0];
int bytes = sizeof(float);
auto ret = memcpy_s(beta1_power->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}

offset += size;
AddressPtr &beta2_power = inputs_[4];
size = lens[1];
ret = memcpy_s(beta2_power->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}

offset += size;
AddressPtr &lr = inputs_[5];
size = lens[2];
ret = memcpy_s(lr->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}

offset += size;
AddressPtr &beta1 = inputs_[6];
size = lens[3];
ret = memcpy_s(beta1->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}

offset += size;
AddressPtr &beta2 = inputs_[7];
size = lens[4];
ret = memcpy_s(beta2->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}

offset += size;
AddressPtr &epsilon = inputs_[8];
size = lens[5];
ret = memcpy_s(epsilon->addr, size * bytes, data_ptr + offset, size * bytes);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
UpdateOptimInputValue<float>(kSparseAdam, "beta1_power", values.data(), lens);
UpdateOptimInputValue<float>(kSparseAdam, "beta2_power", values.data(), lens);
UpdateOptimInputValue<float>(kSparseAdam, "lr", values.data(), lens);
UpdateOptimInputValue<float>(kSparseAdam, "beta1", values.data(), lens);
UpdateOptimInputValue<float>(kSparseAdam, "beta2", values.data(), lens);
UpdateOptimInputValue<float>(kSparseAdam, "eps", values.data(), lens);
}

const AddressPtr &SparseAdamOptimInfo::gradient() { return inputs_[9]; }
const AddressPtr &SparseAdamOptimInfo::gradient() {
size_t origin_grad_index = kSparseAdamOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
return inputs_[origin_grad_index];
}

const AddressPtr &SparseAdamOptimInfo::indices() { return inputs_[10]; }
const AddressPtr &SparseAdamOptimInfo::indices() {
size_t origin_indices_index = kSparseAdamOriginIdx.at("indices");
EXC_IF_VEC_IDX_OOB(inputs_, origin_indices_index);
return inputs_[origin_indices_index];
}

bool SparseAdamOptimInfo::IsSparse() const { return true; }

size_t SparseAdamOptimInfo::grad_index() { return 6; }
size_t SparseAdamOptimInfo::grad_index() {
size_t ps_grad_index = kSparseAdamPSSendIdx.at("grad");
return ps_grad_index;
}

size_t SparseAdamOptimInfo::indices_index() { return 7; }
size_t SparseAdamOptimInfo::indices_index() {
size_t ps_indices_index = kSparseAdamPSSendIdx.at("indices");
return ps_indices_index;
}

SparseFtrlOptimInfo::SparseFtrlOptimInfo(const AddressPtr &weight, const AddressPtr &accum, const AddressPtr &linear,
const AddressPtr &grad, const AddressPtr &indices) {
@@ -310,15 +315,29 @@ SparseFtrlOptimInfo::SparseFtrlOptimInfo(const AddressPtr &weight, const Address
indices_offset_ = indices->size / sizeof(int);
}

const AddressPtr &SparseFtrlOptimInfo::gradient() { return inputs_[3]; }
const AddressPtr &SparseFtrlOptimInfo::gradient() {
size_t origin_grad_index = kSparseFtrlOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
return inputs_[origin_grad_index];
}

const AddressPtr &SparseFtrlOptimInfo::indices() { return inputs_[4]; }
const AddressPtr &SparseFtrlOptimInfo::indices() {
size_t origin_indices_index = kSparseFtrlOriginIdx.at("indices");
EXC_IF_VEC_IDX_OOB(inputs_, origin_indices_index);
return inputs_[origin_indices_index];
}

bool SparseFtrlOptimInfo::IsSparse() const { return true; }

size_t SparseFtrlOptimInfo::grad_index() { return 0; }
size_t SparseFtrlOptimInfo::grad_index() {
size_t ps_grad_index = kSparseFtrlPSSendIdx.at("grad");
return ps_grad_index;
}

size_t SparseFtrlOptimInfo::indices_index() { return 1; }
size_t SparseFtrlOptimInfo::indices_index() {
size_t ps_indices_index = kSparseFtrlPSSendIdx.at("indices");
return ps_indices_index;
}
} // namespace ps
} // namespace parallel
} // namespace mindspore

+ 4
- 0
mindspore/ccsrc/frontend/parallel/ps/optimizer_info.h View File

@@ -18,6 +18,7 @@
#define MINDSPORE_CCSRC_FRONTEND_PARALLEL_PS_OPTIMIZER_INFO_H_

#include <vector>
#include <string>
#include "backend/kernel_compiler/kernel.h"
#include "frontend/parallel/ps/common.h"

@@ -49,6 +50,9 @@ class OptimizerInfo {
virtual size_t indices_index();

protected:
template <typename T>
void UpdateOptimInputValue(const std::string &optim_type, const std::string &input_name, void *data,
const Lengths &lens);
std::vector<AddressPtr> inputs_;
std::vector<AddressPtr> workspaces_;
std::vector<AddressPtr> outputs_;


+ 74
- 116
mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc View File

@@ -45,34 +45,76 @@ void OptimizerInfoBuilder::BuildWorkspaces(OptimizerInfo *info, const std::vecto
}
}

template <typename T>
AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type, const std::string &input_name,
void *ps_data, const Lengths &ps_lens,
const InputsShapePtr &inputs_shape) {
// Take note of that the data type maybe inconsistent in ps_data.
MS_LOG(INFO) << "Get input address pointer for optimizer:" << optim_type << ", input name:" << input_name;
AddressPtr addr_ptr = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(addr_ptr);
size_t addr_data_size = 0;
size_t addr_data_offset = 0;
size_t ps_index = INDEX_NOT_SEND;

if (kOptimToOriginIdx.count(optim_type) == 0 || kOptimToPSSendIdx.count(optim_type) == 0) {
MS_LOG(EXCEPTION) << "Optimizer type " << optim_type << " in not supported.";
}
const OptimOriginIdx &origin_input_map = kOptimToOriginIdx.at(optim_type);
const OptimPSSendIdx &ps_send_index_map = kOptimToPSSendIdx.at(optim_type);
if (ps_send_index_map.count(input_name) == 0 || origin_input_map.count(input_name) == 0) {
MS_LOG(EXCEPTION) << "Optimizer " << optim_type << " has no input for " << input_name;
}
ps_index = ps_send_index_map.at(input_name);
if (ps_index == INDEX_NOT_SEND) {
MS_LOG(EXCEPTION) << "Input " << input_name << " is not supposed to be sent to PS.";
}

if (inputs_shape != nullptr) {
// addr_data_size should be calculated by inputs_shape if it's passed.
size_t origin_index = origin_input_map.at(input_name);
EXC_IF_VEC_IDX_OOB((*inputs_shape), origin_index);
auto shape = *((*inputs_shape)[origin_index]);
addr_data_size = std::accumulate(shape.begin(), shape.end(), worker_num_, std::multiplies<size_t>());
} else {
EXC_IF_VEC_IDX_OOB(ps_lens, ps_index);
addr_data_size = ps_lens[ps_index];
}
addr_data_offset = std::accumulate(ps_lens.begin(), ps_lens.begin() + ps_index, 0, std::plus<int>());

// The size in ps_lens instead of addr_data_size is the size of real data.
T *buffer = new T[addr_data_size];
addr_ptr->size = ps_lens[ps_index] * sizeof(T);
addr_ptr->addr = buffer;
int ret = memcpy_s(addr_ptr->addr, addr_ptr->size, reinterpret_cast<T *>(ps_data) + addr_data_offset, addr_ptr->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] buffer;
return nullptr;
}
return addr_ptr;
}

OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values,
const Lengths &lens, const InputsShapePtr &inputs_shape,
size_t worker_num, const std::shared_ptr<PServerKernel> &) {
AddressPtr weight_addr = std::make_shared<kernel::Address>();
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);
float *data_ptr = values.data();
float *copy_data_ptr = new float[values.size()];
auto ret = memcpy_s(copy_data_ptr, values.size() * sizeof(float), data_ptr, values.size() * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] copy_data_ptr;
return nullptr;
}

AddressPtr accumulate = std::make_shared<kernel::Address>();
accumulate->addr = new float[weight->size()];
accumulate->size = weight->size() * sizeof(float);
memset_s(accumulate->addr, accumulate->size, 0x00, accumulate->size);
AddressPtr learning_rate = std::make_shared<kernel::Address>();
learning_rate->addr = copy_data_ptr;
learning_rate->size = lens[0] * sizeof(float);
AddressPtr gradient = std::make_shared<kernel::Address>();
gradient->addr = reinterpret_cast<float *>(learning_rate->addr) + lens[0];
gradient->size = lens[1] * sizeof(float);
AddressPtr momentum = std::make_shared<kernel::Address>();
momentum->addr = reinterpret_cast<float *>(gradient->addr) + lens[1];
momentum->size = lens[2] * sizeof(float);
int ret = memset_s(accumulate->addr, accumulate->size, 0x00, accumulate->size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(accumulate->addr);
return nullptr;
}

AddressPtr learning_rate = GenInputAddrPtr<float>(kApplyMomentum, "lr", values.data(), lens);
AddressPtr gradient = GenInputAddrPtr<float>(kApplyMomentum, "grad", values.data(), lens);
AddressPtr momentum = GenInputAddrPtr<float>(kApplyMomentum, "momentum", values.data(), lens);
return new MomentumOptimInfo(weight_addr, accumulate, learning_rate, gradient, momentum);
}

@@ -82,6 +124,7 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
AddressPtr weight_addr = std::make_shared<kernel::Address>();
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);

AddressPtr m = std::make_shared<kernel::Address>();
m->addr = new float[weight->size()];
m->size = weight->size() * sizeof(float);
@@ -91,6 +134,7 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}

AddressPtr v = std::make_shared<kernel::Address>();
v->addr = new float[weight->size()];
v->size = weight->size() * sizeof(float);
@@ -102,75 +146,14 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
return nullptr;
}

float *data_ptr = values.data();
float *copy_data_ptr = new float[values.size()];
ret = memcpy_s(copy_data_ptr, values.size() * sizeof(float), data_ptr, values.size() * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] copy_data_ptr;
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}

AddressPtr beta1_power = std::make_shared<kernel::Address>();
beta1_power->addr = copy_data_ptr;
beta1_power->size = lens[0] * sizeof(float);
AddressPtr beta2_power = std::make_shared<kernel::Address>();
beta2_power->addr = reinterpret_cast<float *>(beta1_power->addr) + lens[0];
beta2_power->size = lens[1] * sizeof(float);

AddressPtr learning_rate = std::make_shared<kernel::Address>();
learning_rate->addr = reinterpret_cast<float *>(beta2_power->addr) + lens[1];
learning_rate->size = lens[2] * sizeof(float);

AddressPtr beta1 = std::make_shared<kernel::Address>();
beta1->addr = reinterpret_cast<float *>(learning_rate->addr) + lens[2];
beta1->size = lens[3] * sizeof(float);

AddressPtr beta2 = std::make_shared<kernel::Address>();
beta2->addr = reinterpret_cast<float *>(beta1->addr) + lens[3];
beta2->size = lens[4] * sizeof(float);

AddressPtr epsilon = std::make_shared<kernel::Address>();
epsilon->addr = reinterpret_cast<float *>(beta2->addr) + lens[4];
epsilon->size = lens[5] * sizeof(float);

const std::shared_ptr<std::vector<size_t>> &grad_shape = (*inputs_shape)[9];
size_t total_grad_size =
std::accumulate((*grad_shape).begin(), (*grad_shape).end(), sizeof(float), std::multiplies<size_t>());
AddressPtr grad = std::make_shared<kernel::Address>();
grad->addr = new float[total_grad_size * worker_num];
ret = memcpy_s(grad->addr, lens[6] * sizeof(float), reinterpret_cast<float *>(epsilon->addr) + lens[5],
lens[6] * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(grad->addr);
delete[] copy_data_ptr;
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
grad->size = lens[6] * sizeof(float);

const std::shared_ptr<std::vector<size_t>> &indices_shape = (*inputs_shape)[10];
size_t total_indice_size =
std::accumulate((*indices_shape).begin(), (*indices_shape).end(), sizeof(int), std::multiplies<size_t>());
AddressPtr indices = std::make_shared<kernel::Address>();
indices->addr = new int[total_indice_size * worker_num];
size_t indices_data_size = lens[7] * sizeof(int);
int *indices_data = reinterpret_cast<int *>(epsilon->addr) + lens[5] + lens[6];
ret = memcpy_s(indices->addr, indices_data_size, indices_data, indices_data_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<int *>(indices->addr);
delete[] reinterpret_cast<float *>(grad->addr);
delete[] copy_data_ptr;
delete[] reinterpret_cast<float *>(v->addr);
delete[] reinterpret_cast<float *>(m->addr);
return nullptr;
}
indices->size = indices_data_size;
AddressPtr beta1_power = GenInputAddrPtr<float>(kSparseAdam, "beta1_power", values.data(), lens);
AddressPtr beta2_power = GenInputAddrPtr<float>(kSparseAdam, "beta2_power", values.data(), lens);
AddressPtr learning_rate = GenInputAddrPtr<float>(kSparseAdam, "lr", values.data(), lens);
AddressPtr beta1 = GenInputAddrPtr<float>(kSparseAdam, "beta1", values.data(), lens);
AddressPtr beta2 = GenInputAddrPtr<float>(kSparseAdam, "beta2", values.data(), lens);
AddressPtr epsilon = GenInputAddrPtr<float>(kSparseAdam, "eps", values.data(), lens);
AddressPtr grad = GenInputAddrPtr<float>(kSparseAdam, "grad", values.data(), lens, inputs_shape);
AddressPtr indices = GenInputAddrPtr<float>(kSparseAdam, "indices", values.data(), lens, inputs_shape);

return new SparseAdamOptimInfo(weight_addr, m, v, beta1_power, beta2_power, learning_rate, beta1, beta2, epsilon,
grad, indices);
@@ -183,6 +166,7 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
AddressPtr weight_addr = std::make_shared<kernel::Address>();
weight_addr->addr = weight->data();
weight_addr->size = weight->size() * sizeof(float);

AddressPtr accum = std::make_shared<kernel::Address>();
accum->addr = new float[weight->size()];
accum->size = weight->size() * sizeof(float);
@@ -190,6 +174,7 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
float *tmp = reinterpret_cast<float *>(accum->addr);
tmp[i] = std::dynamic_pointer_cast<SparseApplyFtrlPSKernel>(pserver_kernel)->init_accum();
}

AddressPtr linear = std::make_shared<kernel::Address>();
linear->addr = new float[weight->size()];
int ret = memset_s(linear->addr, weight->size() * sizeof(float), 0x00, weight->size() * sizeof(float));
@@ -200,35 +185,8 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight,
}
linear->size = weight->size() * sizeof(float);

const std::shared_ptr<std::vector<size_t>> &grad_shape = (*inputs_shape)[3];
size_t total_grad_size = std::accumulate((*grad_shape).begin(), (*grad_shape).end(), 1, std::multiplies<size_t>());
AddressPtr grad = std::make_shared<kernel::Address>();
grad->addr = new float[total_grad_size * worker_num];
ret = memcpy_s(grad->addr, lens[0] * sizeof(float), values.data(), lens[0] * sizeof(float));
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<float *>(grad->addr);
delete[] reinterpret_cast<float *>(linear->addr);
return nullptr;
}
grad->size = lens[0] * sizeof(float);

const std::shared_ptr<std::vector<size_t>> &indices_shape = (*inputs_shape)[4];
size_t total_indice_size =
std::accumulate((*indices_shape).begin(), (*indices_shape).end(), 1, std::multiplies<size_t>());
AddressPtr indices = std::make_shared<kernel::Address>();
indices->addr = new int[total_indice_size * worker_num];
size_t indices_data_size = lens[1] * sizeof(int);
int *indices_data = reinterpret_cast<int *>(values.data()) + lens[0];
ret = memcpy_s(indices->addr, indices_data_size, indices_data, indices_data_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
delete[] reinterpret_cast<int *>(indices->addr);
delete[] reinterpret_cast<float *>(grad->addr);
delete[] reinterpret_cast<float *>(linear->addr);
return nullptr;
}
indices->size = indices_data_size;
AddressPtr grad = GenInputAddrPtr<float>(kSparseFtrl, "grad", values.data(), lens, inputs_shape);
AddressPtr indices = GenInputAddrPtr<float>(kSparseFtrl, "indices", values.data(), lens, inputs_shape);

return new SparseFtrlOptimInfo(weight_addr, accum, linear, grad, indices);
}


+ 11
- 1
mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.h View File

@@ -19,6 +19,7 @@

#include <vector>
#include <memory>
#include <string>
#include "backend/kernel_compiler/kernel.h"
#include "backend/kernel_compiler/cpu/ps/pserver_kernel.h"
#include "frontend/parallel/ps/optimizer_info.h"
@@ -30,7 +31,7 @@ using mindspore::kernel::KernelMod;
using mindspore::kernel::ps::PServerKernel;
class OptimizerInfoBuilder {
public:
OptimizerInfoBuilder() = default;
explicit OptimizerInfoBuilder(size_t worker_num) : worker_num_(worker_num) {}
virtual ~OptimizerInfoBuilder() = default;

OptimizerInfo *Build(const std::shared_ptr<PServerKernel> &pserver_kernel, const WeightPtr &weight, const Keys &keys,
@@ -43,10 +44,17 @@ class OptimizerInfoBuilder {

virtual void BuildWorkspaces(OptimizerInfo *info, const std::vector<size_t> &ws_sizes, size_t worker_num);
virtual void BuildOutputs(OptimizerInfo *info, size_t worker_num) {}

protected:
template <typename T>
AddressPtr GenInputAddrPtr(const std::string &optim_type, const std::string &input_name, void *ps_data,
const Lengths &lens, const InputsShapePtr &inputs_shape = nullptr);
size_t worker_num_;
};

class MomentumOptimInfoBuilder : public OptimizerInfoBuilder {
public:
explicit MomentumOptimInfoBuilder(size_t worker_num) : OptimizerInfoBuilder(worker_num) {}
OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens,
const InputsShapePtr &inputs_shape, size_t worker_num,
const std::shared_ptr<PServerKernel> &pserver_kernel) override;
@@ -54,6 +62,7 @@ class MomentumOptimInfoBuilder : public OptimizerInfoBuilder {

class SparseAdamOptimInfoBuilder : public OptimizerInfoBuilder {
public:
explicit SparseAdamOptimInfoBuilder(size_t worker_num) : OptimizerInfoBuilder(worker_num) {}
OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens,
const InputsShapePtr &inputs_shape, size_t worker_num,
const std::shared_ptr<PServerKernel> &pserver_kernel) override;
@@ -61,6 +70,7 @@ class SparseAdamOptimInfoBuilder : public OptimizerInfoBuilder {

class SparseFtrlOptimInfoBuilder : public OptimizerInfoBuilder {
public:
explicit SparseFtrlOptimInfoBuilder(size_t worker_num) : OptimizerInfoBuilder(worker_num) {}
OptimizerInfo *BuildInputs(const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens,
const InputsShapePtr &inputs_shape, size_t worker_num,
const std::shared_ptr<PServerKernel> &pserver_kernel) override;


+ 6
- 5
mindspore/ccsrc/frontend/parallel/ps/parameter_server.h View File

@@ -347,9 +347,11 @@ bool ParameterServer<T>::Init(const FuncGraphPtr &func_graph) {
template <typename T>
void ParameterServer<T>::InitOptimInfoBuilders() {
std::shared_ptr<OptimizerInfoBuilder> momentum_info_builder = std::make_shared<MomentumOptimInfoBuilder>();
std::shared_ptr<OptimizerInfoBuilder> sparse_adam_info_builder = std::make_shared<SparseAdamOptimInfoBuilder>();
std::shared_ptr<OptimizerInfoBuilder> sparse_ftrl_info_builder = std::make_shared<SparseFtrlOptimInfoBuilder>();
std::shared_ptr<OptimizerInfoBuilder> momentum_info_builder = std::make_shared<MomentumOptimInfoBuilder>(worker_num_);
std::shared_ptr<OptimizerInfoBuilder> sparse_adam_info_builder =
std::make_shared<SparseAdamOptimInfoBuilder>(worker_num_);
std::shared_ptr<OptimizerInfoBuilder> sparse_ftrl_info_builder =
std::make_shared<SparseFtrlOptimInfoBuilder>(worker_num_);
optim_info_builders_[kApplyMomentum] = momentum_info_builder;
optim_info_builders_[kSparseAdam] = sparse_adam_info_builder;
optim_info_builders_[kSparseFtrl] = sparse_ftrl_info_builder;
@@ -383,8 +385,7 @@ void ParameterServer<T>::InitOptimInputsShape(const Keys &keys, const Values &va
inputs_shape->push_back(shape);
original_inputs_shape->push_back(original_shape);
int len = lengths[i];
for (int j = 0; j < len; j++) {
for (int j = 0; j < lengths[i]; j++) {
shape->push_back(values[val_idx]);
original_shape->push_back(values[val_idx++]);
}


+ 9
- 3
mindspore/ccsrc/frontend/parallel/ps/worker.h View File

@@ -129,9 +129,13 @@ void Worker<T>::Push(const std::vector<size_t> &keys, std::vector<uintptr_t> add
size_t total_size = std::accumulate(sizes.begin(), sizes.end(), 0, std::plus<int>());
::ps::SArray<T> total_buffer(total_size, 0);
size_t offset = 0;
size_t dst_size = 0;
size_t src_size = 0;
for (size_t i = 0; i < sizes.size(); i++) {
auto ret = memcpy_s(total_buffer.data() + offset / sizeof(T), sizes[i] * sizeof(T),
reinterpret_cast<void *>(addrs[i]), sizes[i] * sizeof(T));
dst_size = sizes[i] * sizeof(T);
src_size = sizes[i] * sizeof(T);
auto ret =
memcpy_s(total_buffer.data() + offset / sizeof(T), dst_size, reinterpret_cast<void *>(addrs[i]), src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@@ -160,7 +164,9 @@ void Worker<T>::Pull(const size_t key, void *dev_addr, const size_t size) {
continue;
}
kv_worker_->PullData({key}, &variables);
auto ret = memcpy_s(dev_addr, size, variables.data(), size);
size_t dst_size = size;
size_t src_size = size;
auto ret = memcpy_s(dev_addr, dst_size, variables.data(), src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;


+ 15
- 4
mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h View File

@@ -318,7 +318,9 @@ int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps:
for (size_t i = 0; i < lookup_ids.size(); i++) {
auto &pair = id_addr_map[static_cast<Key>(lookup_ids[i])];
int size = pair->second * sizeof(T);
auto ret = memcpy_s(result_addr + offset, size, pair->first, size);
size_t dst_size = size;
size_t src_size = size;
auto ret = memcpy_s(result_addr + offset, dst_size, pair->first, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@@ -547,12 +549,17 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
const size_t indice_index, const T *original_data, const T *grads, int *indices,
::ps::SArray<T> *reduced_data) {
int offset = 0;
size_t dst_size = 0;
size_t src_size = 0;
for (size_t i = 0; i < lengths.size(); i++) {
if (i != grad_index && i != indice_index) {
int data_size = lengths[i] * sizeof(T);
auto ret = memcpy_s(reduced_data->data() + offset, data_size, original_data + offset, data_size);
dst_size = data_size;
src_size = data_size;
auto ret = memcpy_s(reduced_data->data() + offset, dst_size, original_data + offset, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
}
}
offset += lengths[i];
@@ -564,7 +571,9 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
grad_offset += lengths[i];
}
int data_size = lengths[grad_index] * sizeof(T);
auto ret = memcpy_s(reduced_data->data() + grad_offset, data_size, grads, data_size);
dst_size = data_size;
src_size = data_size;
auto ret = memcpy_s(reduced_data->data() + grad_offset, dst_size, grads, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;
@@ -574,7 +583,9 @@ void WorkerProxy<T>::BuildSparseValue(const ::ps::SArray<int> &lengths, const si
int indice_offset = grad_offset + lengths[grad_index];
data_size = lengths[indice_index] * sizeof(T);
T *indice_data = reduced_data->data() + indice_offset;
ret = memcpy_s(indice_data, data_size, indices, data_size);
dst_size = data_size;
src_size = data_size;
ret = memcpy_s(indice_data, dst_size, indices, src_size);
if (ret != 0) {
MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")";
return;


Loading…
Cancel
Save