diff --git a/mindspore/ccsrc/ps/core/node.cc b/mindspore/ccsrc/ps/core/node.cc index ee42d79824..31666eec9d 100644 --- a/mindspore/ccsrc/ps/core/node.cc +++ b/mindspore/ccsrc/ps/core/node.cc @@ -21,12 +21,7 @@ namespace ps { namespace core { std::string Node::node_id() const { return node_info_.node_id_; } -uint32_t Node::rank_id() const { - if (!is_ready_.load()) { - MS_LOG(EXCEPTION) << "The cluster is not ready yet to get rank id!"; - } - return node_info_.rank_id_; -} +uint32_t Node::rank_id() const { return node_info_.rank_id_; } NodeRole Node::role() const { return node_info_.node_role_; } diff --git a/mindspore/ccsrc/ps/parameter_server.cc b/mindspore/ccsrc/ps/parameter_server.cc index 0cd510b53a..35e315e788 100644 --- a/mindspore/ccsrc/ps/parameter_server.cc +++ b/mindspore/ccsrc/ps/parameter_server.cc @@ -33,8 +33,7 @@ void ParameterServer::Run(const FuncGraphPtr &func_graph) { } Init(func_graph); server_node_->Start(); - rank_id_ = server_node_->rank_id(); - PSContext::instance()->SetPSRankId(rank_id_); + PSContext::instance()->SetPSRankId(server_node_->rank_id()); thread_->join(); SyncEmbeddingTables(); MS_LOG(INFO) << "PServer finished updating models, starts finalizing..."; @@ -118,22 +117,22 @@ void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &value MS_EXCEPTION_IF_NULL(cnode); if (optim_name == kSparseAdam) { std::shared_ptr optimizer = - std::make_shared(rank_id_, pserver_num_, worker_num_); + std::make_shared(server_node_->rank_id(), pserver_num_, worker_num_); optimizer->InitKernel(cnode, optim_inputs_shape_[key]); optimizers_[key] = optimizer; } else if (optim_name == kSparseLazyAdam) { std::shared_ptr optimizer = - std::make_shared(rank_id_, pserver_num_, worker_num_); + std::make_shared(server_node_->rank_id(), pserver_num_, worker_num_); optimizer->InitKernel(cnode, optim_inputs_shape_[key]); optimizers_[key] = optimizer; } else if (optim_name == kApplyMomentum) { std::shared_ptr optimizer = - std::make_shared(rank_id_, pserver_num_, worker_num_); + std::make_shared(server_node_->rank_id(), pserver_num_, worker_num_); optimizer->InitKernel(cnode, optim_inputs_shape_[key]); optimizers_[key] = optimizer; } else if (optim_name == kSparseFtrl) { std::shared_ptr optimizer = - std::make_shared(rank_id_, pserver_num_, worker_num_); + std::make_shared(server_node_->rank_id(), pserver_num_, worker_num_); optimizer->InitKernel(cnode, optim_inputs_shape_[key]); optimizers_[key] = optimizer; } @@ -144,7 +143,7 @@ void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &value void ParameterServer::InitWeight(const Key &key, const WeightPtr &weight) { MS_EXCEPTION_IF_NULL(weight); if ((weights_.count(key) == 0) || (is_embedding_[key] && weights_.count(key) != 0)) { - MS_LOG(INFO) << "Initializing weight for key " << key << ", server rank " << rank_id_; + MS_LOG(INFO) << "Initializing weight for key " << key << ", server rank " << server_node_->rank_id(); weights_[key] = weight; tokens_[key] = 0; is_embedding_[key] = false; @@ -165,7 +164,7 @@ void ParameterServer::InitEmbeddingTable( MS_EXCEPTION_IF_NULL(shapes); if (weights_.count(key) == 0) { std::shared_ptr lookup = - std::make_shared(rank_id_, pserver_num_, worker_num_); + std::make_shared(server_node_->rank_id(), pserver_num_, worker_num_); lookup->InitKernel(shapes); embedding_lookup_ops_[key] = lookup; @@ -244,7 +243,7 @@ void ParameterServer::UpdateWeights() { [](std::shared_ptr> input_shapes) -> std::vector { return *input_shapes; }); } optimizer->ReInit(shapes); - optim_info->ComputeMean(shapes, worker_num_, pserver_num_, rank_id_); + optim_info->ComputeMean(shapes, worker_num_, pserver_num_, server_node_->rank_id()); optimizer->Execute(inputs, workspaces, outputs); optim_info->Reset(); } @@ -296,7 +295,6 @@ WeightPtr ParameterServer::weight(const Key &key) { MS_LOG(EXCEPTION) << "Invalid weight key " << key; } WeightPtr weight_ptr = weights_[key]; - MS_LOG(DEBUG) << "The weight ptr size is:" << weight_ptr->size(); MS_EXCEPTION_IF_NULL(weight_ptr); WeightPtr copy_weight_ptr = std::make_shared>(weight_ptr->size(), 0); MS_EXCEPTION_IF_NULL(copy_weight_ptr); diff --git a/mindspore/ccsrc/ps/parameter_server.h b/mindspore/ccsrc/ps/parameter_server.h index 4f312c4b3c..594236e423 100644 --- a/mindspore/ccsrc/ps/parameter_server.h +++ b/mindspore/ccsrc/ps/parameter_server.h @@ -77,7 +77,6 @@ class ParameterServer { ParameterServer() : pserver_num_(0), worker_num_(0), - rank_id_(0), grad_accum_count_(0), handler_(nullptr), func_graph_(nullptr), @@ -144,7 +143,6 @@ class ParameterServer { size_t pserver_num_; size_t worker_num_; - size_t rank_id_; size_t grad_accum_count_; std::unique_ptr handler_; FuncGraphPtr func_graph_; diff --git a/mindspore/ccsrc/ps/worker.cc b/mindspore/ccsrc/ps/worker.cc index 5b370f9501..47c97a2772 100644 --- a/mindspore/ccsrc/ps/worker.cc +++ b/mindspore/ccsrc/ps/worker.cc @@ -306,6 +306,7 @@ void Worker::DoPSEmbeddingLookup(const Key &key, const std::vector &lookup_ int64_t single_id_len = SizeToLong(lookup_result->size() / lookup_ids.size()); std::unordered_map>> id_addr_map; std::shared_ptr> values = std::make_shared>(); + std::shared_ptr> keys = std::make_shared>(); int64_t value_offset = 0; for (size_t i = 0; i < resp.size(); ++i) { KVMessage message; @@ -315,12 +316,17 @@ void Worker::DoPSEmbeddingLookup(const Key &key, const std::vector &lookup_ } for (auto k = 0; k < message.keys_size(); k++) { const Key &key = message.keys(k); - float *addr = values->data() + value_offset; - value_offset += single_id_len; - id_addr_map[key] = std::make_shared>(std::make_pair(addr, single_id_len)); + keys->push_back(key); } } + for (size_t i = 0; i < keys->size(); i++) { + const Key &key = keys->at(i); + float *addr = values->data() + value_offset; + value_offset += single_id_len; + id_addr_map[key] = std::make_shared>(std::make_pair(addr, single_id_len)); + } + float *result_addr = lookup_result->data(); MS_EXCEPTION_IF_NULL(result_addr); int64_t offset = 0;