Browse Source

Fix code review

tags/v1.5.0-rc1
ZPaC 4 years ago
parent
commit
fd956058fc
13 changed files with 97 additions and 17 deletions
  1. +5
    -0
      mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pserver_kernel.cc
  2. +3
    -1
      mindspore/ccsrc/fl/server/iteration_timer.cc
  3. +2
    -0
      mindspore/ccsrc/fl/server/kernel/dense_grad_accum_kernel.h
  4. +8
    -0
      mindspore/ccsrc/fl/server/kernel/fed_avg_kernel.h
  5. +1
    -1
      mindspore/ccsrc/fl/server/kernel/round/push_weight_kernel.cc
  6. +2
    -2
      mindspore/ccsrc/fl/server/round.cc
  7. +9
    -1
      mindspore/ccsrc/fl/server/server.h
  8. +6
    -2
      mindspore/ccsrc/ps/core/communicator/http_communicator.cc
  9. +13
    -3
      mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc
  10. +40
    -7
      mindspore/ccsrc/ps/optimizer_info.cc
  11. +3
    -0
      mindspore/ccsrc/ps/optimizer_info_builder.cc
  12. +4
    -0
      mindspore/ccsrc/ps/ps_cache/ps_cache_manager.cc
  13. +1
    -0
      mindspore/ccsrc/ps/ps_cache/ps_data/ps_data_prefetch.cc

+ 5
- 0
mindspore/ccsrc/backend/kernel_compiler/cpu/ps/pserver_kernel.cc View File

@@ -20,6 +20,11 @@ namespace mindspore {
namespace kernel {
namespace ps {
void PServerKernel::Shard(std::vector<size_t> *shape, int axis) {
MS_EXCEPTION_IF_NULL(shape);
if ((*shape).size() <= IntToSize(axis)) {
MS_LOG(EXCEPTION) << "Shape size is invalid.";
return;
}
(*shape)[IntToSize(axis)] =
LongToSize(Util::LocalShard(SizeToLong((*shape)[IntToSize(axis)]), SizeToLong(rank_id_), SizeToLong(pserver_num_)));
}


+ 3
- 1
mindspore/ccsrc/fl/server/iteration_timer.cc View File

@@ -40,7 +40,9 @@ void IterationTimer::Start(const std::chrono::milliseconds &duration) {

void IterationTimer::Stop() {
running_ = false;
monitor_thread_.join();
if (monitor_thread_.joinable()) {
monitor_thread_.join();
}
}

void IterationTimer::SetTimeOutCallBack(const TimeOutCb &timeout_cb) {


+ 2
- 0
mindspore/ccsrc/fl/server/kernel/dense_grad_accum_kernel.h View File

@@ -60,6 +60,8 @@ class DenseGradAccumKernel : public AggregationKernel {
MS_LOG(ERROR) << "The inputs number of DenseGradAccumKernel should be 2, but got " << inputs.size();
return false;
}
MS_ERROR_IF_NULL_W_RET_VAL(inputs[0], false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[1], false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[0]->addr, false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[1]->addr, false);



+ 8
- 0
mindspore/ccsrc/fl/server/kernel/fed_avg_kernel.h View File

@@ -97,6 +97,10 @@ class FedAvgKernel : public AggregationKernel {
MS_LOG(ERROR) << "Federated average allreduce failed.";
return;
}
if (data_size_addr[0] == 0) {
MS_LOG(ERROR) << "After AllReduce, the data size is 0.";
return;
}
LocalMetaStore::GetInstance().put_value(kCtxFedAvgTotalDataSize, data_size_addr[0]);
for (size_t i = 0; i < weight_size / sizeof(T); i++) {
weight_addr[i] /= data_size_addr[0];
@@ -115,6 +119,10 @@ class FedAvgKernel : public AggregationKernel {
MS_LOG(ERROR) << "The inputs number of FedAvgKernel should be 4, but got " << inputs.size();
return false;
}
MS_ERROR_IF_NULL_W_RET_VAL(inputs[0], false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[1], false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[2], false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[3], false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[0]->addr, false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[1]->addr, false);
MS_ERROR_IF_NULL_W_RET_VAL(inputs[2]->addr, false);


+ 1
- 1
mindspore/ccsrc/fl/server/kernel/round/push_weight_kernel.cc View File

@@ -123,7 +123,7 @@ std::map<std::string, Address> PushWeightKernel::ParseFeatureMap(const schema::R
MS_ERROR_IF_NULL_W_RET_VAL(push_weight_req, {});
std::map<std::string, Address> upload_feature_map;
auto fbs_feature_map = push_weight_req->feature_map();
MS_ERROR_IF_NULL_W_RET_VAL(push_weight_req, upload_feature_map);
MS_ERROR_IF_NULL_W_RET_VAL(fbs_feature_map, upload_feature_map);
for (size_t i = 0; i < fbs_feature_map->size(); i++) {
std::string weight_full_name = fbs_feature_map->Get(i)->weight_fullname()->str();
float *weight_data = const_cast<float *>(fbs_feature_map->Get(i)->data()->data());


+ 2
- 2
mindspore/ccsrc/fl/server/round.cc View File

@@ -170,10 +170,10 @@ size_t Round::time_window() const { return time_window_; }

void Round::OnFirstCountEvent(const std::shared_ptr<ps::core::MessageHandler> &message) {
MS_ERROR_IF_NULL_WO_RET_VAL(kernel_);
MS_ERROR_IF_NULL_WO_RET_VAL(iter_timer_);
MS_LOG(INFO) << "Round " << name_ << " first count event is triggered.";
// The timer starts only after the first count event is triggered by DistributedCountService.
if (check_timeout_) {
MS_ERROR_IF_NULL_WO_RET_VAL(iter_timer_);
iter_timer_->Start(std::chrono::milliseconds(time_window_));
}

@@ -184,10 +184,10 @@ void Round::OnFirstCountEvent(const std::shared_ptr<ps::core::MessageHandler> &m

void Round::OnLastCountEvent(const std::shared_ptr<ps::core::MessageHandler> &message) {
MS_ERROR_IF_NULL_WO_RET_VAL(kernel_);
MS_ERROR_IF_NULL_WO_RET_VAL(iter_timer_);
MS_LOG(INFO) << "Round " << name_ << " last count event is triggered.";
// Same as the first count event, the timer must be stopped by DistributedCountService.
if (check_timeout_) {
MS_ERROR_IF_NULL_WO_RET_VAL(iter_timer_);
iter_timer_->Stop();
}



+ 9
- 1
mindspore/ccsrc/fl/server/server.h View File

@@ -72,7 +72,15 @@ class Server {
scheduler_ip_(""),
scheduler_port_(0),
server_num_(0),
worker_num_(0) {}
worker_num_(0),
fl_server_port_(0),
cipher_initial_client_cnt_(0),
cipher_exchange_secrets_cnt_(0),
cipher_share_secrets_cnt_(0),
cipher_get_clientlist_cnt_(0),
cipher_reconstruct_secrets_up_cnt_(0),
cipher_reconstruct_secrets_down_cnt_(0),
cipher_time_window_(0) {}
~Server() = default;
Server(const Server &) = delete;
Server &operator=(const Server &) = delete;


+ 6
- 2
mindspore/ccsrc/ps/core/communicator/http_communicator.cc View File

@@ -42,9 +42,12 @@ bool HttpCommunicator::Start() {

bool HttpCommunicator::Stop() {
MS_EXCEPTION_IF_NULL(http_server_);
bool res = http_server_->Stop();
if (!http_server_->Stop()) {
MS_LOG(ERROR) << "Stopping http server failed.";
return false;
}
running_ = false;
return res;
return true;
}

void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) {
@@ -60,6 +63,7 @@ void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const Me

std::string url = "/";
url += msg_type;
MS_EXCEPTION_IF_NULL(http_server_);
bool is_succeed = http_server_->RegisterRoute(url, &http_msg_callbacks_[msg_type]);
if (!is_succeed) {
MS_LOG(EXCEPTION) << "Http server register handler for url " << url << " failed.";


+ 13
- 3
mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc View File

@@ -57,7 +57,10 @@ bool TcpCommunicator::Start() {
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);
server_node_->set_handler(tcp_msg_callback_);

server_node_->Start();
if (!server_node_->Start()) {
MS_LOG(EXCEPTION) << "Starting server node failed.";
return false;
}
running_ = true;
running_thread_ = std::thread([&]() {
while (running_) {
@@ -69,8 +72,14 @@ bool TcpCommunicator::Start() {

bool TcpCommunicator::Stop() {
MS_EXCEPTION_IF_NULL(server_node_);
server_node_->Finish();
server_node_->Stop();
if (!server_node_->Finish()) {
MS_LOG(ERROR) << "Finishing server node failed.";
return false;
}
if (!server_node_->Stop()) {
MS_LOG(ERROR) << "Stopping server node failed.";
return false;
}
running_ = false;
return true;
}
@@ -81,6 +90,7 @@ void TcpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const Mes
}

void TcpCommunicator::RegisterEventCallback(const core::ClusterEvent &event, const EventCallback &event_cb) {
MS_EXCEPTION_IF_NULL(server_node_);
server_node_->RegisterEventCallback(event, event_cb);
}



+ 40
- 7
mindspore/ccsrc/ps/optimizer_info.cc View File

@@ -23,7 +23,10 @@

namespace mindspore {
namespace ps {
void OptimizerInfo::AddWorkspace(const AddressPtr &workspace) { workspaces_.push_back(workspace); }
void OptimizerInfo::AddWorkspace(const AddressPtr &workspace) {
MS_EXCEPTION_IF_NULL(workspace);
workspaces_.push_back(workspace);
}

const std::vector<AddressPtr> &OptimizerInfo::inputs() const { return inputs_; }

@@ -42,6 +45,7 @@ size_t OptimizerInfo::indices_index() { return 0; }
template <typename T>
void OptimizerInfo::UpdateOptimInputValue(const std::string &optim_type, const std::string &input_name, void *data,
const Lengths &lens) {
MS_EXCEPTION_IF_NULL(data);
if (kOptimToOriginIdx.count(optim_type) == 0 || kOptimToPSSendIdx.count(optim_type) == 0) {
MS_LOG(EXCEPTION) << "Optimizer type " << optim_type << " in not supported.";
}
@@ -96,8 +100,8 @@ void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {

void DenseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &, size_t n, size_t, size_t) {
if (n > 1) {
MS_EXCEPTION_IF_NULL(gradient()->addr);
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
MS_EXCEPTION_IF_NULL(accum_grad_data);
size_t size = gradient()->size / sizeof(float);
for (size_t i = 0; i < size; i++) {
accum_grad_data[i] /= n;
@@ -116,8 +120,8 @@ void DenseOptimInfo::Reset() {

void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
// Append grad data to the end
MS_EXCEPTION_IF_NULL(gradient()->addr);
float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr);
MS_EXCEPTION_IF_NULL(accum_grad_data);

size_t grad_index = this->grad_index();
size_t grad_offset = 0;
@@ -143,6 +147,7 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
gradient()->size += incr_grad_size;

// Append indice data to the end
MS_EXCEPTION_IF_NULL(indices()->addr);
int *accum_indices_data = reinterpret_cast<int *>(indices()->addr);
MS_EXCEPTION_IF_NULL(accum_indices_data);

@@ -153,10 +158,10 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {
}

void *incr_indice_data_temp = const_cast<float *>(values.data()) + indice_offset;
int *incr_indice_data = reinterpret_cast<int *>(incr_indice_data_temp);

MS_EXCEPTION_IF_NULL(incr_indice_data_temp);
int *incr_indice_data = reinterpret_cast<int *>(incr_indice_data_temp);
MS_EXCEPTION_IF_NULL(incr_indice_data);

size_t incr_indice_size = lengths[indices_index];
size_t incr_indice_data_size = incr_indice_size * sizeof(int);
dst_size = incr_indice_data_size;
@@ -176,8 +181,9 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) {

void SparseOptimInfo::ComputeMean(const std::vector<std::vector<size_t>> &shapes, size_t n, size_t server_num,
size_t rank_id) {
MS_EXCEPTION_IF_NULL(gradient());
MS_EXCEPTION_IF_NULL(indices());
if (n == 0 || indices()->size == 0) {
MS_LOG(EXCEPTION) << "The size of shapes or indices are 0.";
}
size_t indices_size = static_cast<size_t>(indices()->size / sizeof(int));
size_t segment_size = gradient()->size / indices()->size;

@@ -259,6 +265,11 @@ void SparseOptimInfo::Reset() {
MomentumOptimInfo::MomentumOptimInfo(const AddressPtr &weight, const AddressPtr &accumulate,
const AddressPtr &learning_rate, const AddressPtr &gradient,
const AddressPtr &momentum) {
MS_EXCEPTION_IF_NULL(weight);
MS_EXCEPTION_IF_NULL(accumulate);
MS_EXCEPTION_IF_NULL(learning_rate);
MS_EXCEPTION_IF_NULL(gradient);
MS_EXCEPTION_IF_NULL(momentum);
inputs_.push_back(weight);
inputs_.push_back(accumulate);
inputs_.push_back(learning_rate);
@@ -275,12 +286,14 @@ const size_t SparseOptimInfo::indice_size() const { return indices_offset_; }
const AddressPtr &MomentumOptimInfo::gradient() {
size_t origin_grad_index = kMomentumOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
MS_EXCEPTION_IF_NULL(inputs_[origin_grad_index]);
return inputs_[origin_grad_index];
}

const AddressPtr &MomentumOptimInfo::indices() {
size_t origin_grad_index = kMomentumOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
MS_EXCEPTION_IF_NULL(inputs_[origin_grad_index]);
return inputs_[origin_grad_index];
}

@@ -294,6 +307,17 @@ SparseAdamOptimInfo::SparseAdamOptimInfo(const AddressPtr &weight, const Address
const AddressPtr &learning_rate, const AddressPtr &beta1,
const AddressPtr &beta2, const AddressPtr &epsilon, const AddressPtr &grad,
const AddressPtr &indices, bool sharded) {
MS_EXCEPTION_IF_NULL(weight);
MS_EXCEPTION_IF_NULL(m);
MS_EXCEPTION_IF_NULL(v);
MS_EXCEPTION_IF_NULL(beta1_power);
MS_EXCEPTION_IF_NULL(beta2_power);
MS_EXCEPTION_IF_NULL(learning_rate);
MS_EXCEPTION_IF_NULL(beta1);
MS_EXCEPTION_IF_NULL(beta2);
MS_EXCEPTION_IF_NULL(epsilon);
MS_EXCEPTION_IF_NULL(grad);
MS_EXCEPTION_IF_NULL(indices);
inputs_.push_back(weight);
inputs_.push_back(m);
inputs_.push_back(v);
@@ -322,12 +346,14 @@ void SparseAdamOptimInfo::Update(const Values &values, const Lengths &lens) {
const AddressPtr &SparseAdamOptimInfo::gradient() {
size_t origin_grad_index = kSparseAdamOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
MS_EXCEPTION_IF_NULL(inputs_[origin_grad_index]);
return inputs_[origin_grad_index];
}

const AddressPtr &SparseAdamOptimInfo::indices() {
size_t origin_indices_index = kSparseAdamOriginIdx.at("indices");
EXC_IF_VEC_IDX_OOB(inputs_, origin_indices_index);
MS_EXCEPTION_IF_NULL(inputs_[origin_indices_index]);
return inputs_[origin_indices_index];
}

@@ -345,6 +371,11 @@ size_t SparseAdamOptimInfo::indices_index() {

SparseFtrlOptimInfo::SparseFtrlOptimInfo(const AddressPtr &weight, const AddressPtr &accum, const AddressPtr &linear,
const AddressPtr &grad, const AddressPtr &indices, bool sharded) {
MS_EXCEPTION_IF_NULL(weight);
MS_EXCEPTION_IF_NULL(accum);
MS_EXCEPTION_IF_NULL(linear);
MS_EXCEPTION_IF_NULL(grad);
MS_EXCEPTION_IF_NULL(indices);
inputs_.push_back(weight);
inputs_.push_back(accum);
inputs_.push_back(linear);
@@ -358,12 +389,14 @@ SparseFtrlOptimInfo::SparseFtrlOptimInfo(const AddressPtr &weight, const Address
const AddressPtr &SparseFtrlOptimInfo::gradient() {
size_t origin_grad_index = kSparseFtrlOriginIdx.at("grad");
EXC_IF_VEC_IDX_OOB(inputs_, origin_grad_index);
MS_EXCEPTION_IF_NULL(inputs_[origin_grad_index]);
return inputs_[origin_grad_index];
}

const AddressPtr &SparseFtrlOptimInfo::indices() {
size_t origin_indices_index = kSparseFtrlOriginIdx.at("indices");
EXC_IF_VEC_IDX_OOB(inputs_, origin_indices_index);
MS_EXCEPTION_IF_NULL(inputs_[origin_indices_index]);
return inputs_[origin_indices_index];
}



+ 3
- 0
mindspore/ccsrc/ps/optimizer_info_builder.cc View File

@@ -29,6 +29,7 @@ OptimizerInfo *OptimizerInfoBuilder::Build(const std::shared_ptr<PServerKernel>
const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num,
bool sharded) {
MS_EXCEPTION_IF_NULL(pserver_kernel);
MS_EXCEPTION_IF_NULL(weight);
MS_EXCEPTION_IF_NULL(inputs_shape);
OptimizerInfo *optim_info =
BuildInputs(weight, keys, values, lens, inputs_shape, worker_num, pserver_kernel, sharded);
@@ -40,6 +41,7 @@ OptimizerInfo *OptimizerInfoBuilder::Build(const std::shared_ptr<PServerKernel>
}

void OptimizerInfoBuilder::BuildWorkspaces(OptimizerInfo *info, const std::vector<size_t> &ws_sizes, size_t) {
MS_EXCEPTION_IF_NULL(info);
for (size_t i = 0; i < ws_sizes.size(); i++) {
size_t size = ws_sizes[i];
AddressPtr workspace = std::make_shared<kernel::Address>();
@@ -116,6 +118,7 @@ AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type,
OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Keys &, const Values &values,
const Lengths &lens, const InputsShapePtr &, size_t,
const std::shared_ptr<PServerKernel> &, bool) {
MS_EXCEPTION_IF_NULL(weight);
AddressPtr weight_addr = std::make_shared<kernel::Address>();
MS_EXCEPTION_IF_NULL(weight_addr);
weight_addr->addr = weight->data();


+ 4
- 0
mindspore/ccsrc/ps/ps_cache/ps_cache_manager.cc View File

@@ -641,6 +641,7 @@ bool PsCacheManager::ParseHostDataHostToDevice(size_t id) {

bool PsCacheManager::ParseHostDataDeviceToHost() {
MS_ERROR_IF_NULL(embedding_device_cache_);
MS_ERROR_IF_NULL(embedding_host_cache_);
int *device_to_host_ids = embedding_device_cache_->device_to_host_ids.get();
int *device_to_host_index = embedding_host_cache_->device_to_host_index.get();
MS_ERROR_IF_NULL(device_to_host_ids);
@@ -1058,6 +1059,7 @@ bool PsCacheManager::SyncHostEmbeddingTable() {

bool PsCacheManager::SyncDeviceEmbeddingTable() {
MS_ERROR_IF_NULL(embedding_device_cache_);
MS_ERROR_IF_NULL(embedding_device_cache_->cache_);
const auto &device_hash_map = embedding_device_cache_->device_hash_map_;
MS_ERROR_IF_NULL(device_hash_map);
const auto &hash_id_to_index = device_hash_map->hash_id_to_index();
@@ -1110,6 +1112,8 @@ bool PsCacheManager::SyncDeviceEmbeddingTable() {
}

void PsCacheManager::DumpHashTables(bool dump_device_tables) const {
MS_EXCEPTION_IF_NULL(embedding_device_cache_);
MS_EXCEPTION_IF_NULL(embedding_device_cache_->cache_);
for (const auto &item : hash_tables_) {
const auto &param_name = item.first;
size_t cache_vocab_size = item.second.cache_vocab_size;


+ 1
- 0
mindspore/ccsrc/ps/ps_cache/ps_data/ps_data_prefetch.cc View File

@@ -31,6 +31,7 @@ void PsDataPrefetch::CreateDataChannel(const std::string &channel_name, size_t s
if (iter != ps_data_channel_map_.end()) {
MS_LOG(WARNING) << "The ps data channel already exists, channel name:" << channel_name;
auto channel = iter->second;
MS_ERROR_IF_NULL_WO_RET_VAL(channel);
channel->set_step_num(step_num);
} else {
auto channel = std::make_shared<PsDataChannel>(channel_name, step_num);


Loading…
Cancel
Save