From 58d640617875ab04f6c89ef9f8b8276668a9fa0e Mon Sep 17 00:00:00 2001 From: ZPaC Date: Thu, 17 Sep 2020 10:19:16 +0800 Subject: [PATCH] Add pointer check for PS --- .../frontend/parallel/ps/optimizer_info.cc | 46 ++++++++++--- .../parallel/ps/optimizer_info_builder.cc | 41 +++++++++--- .../frontend/parallel/ps/parameter_server.h | 41 ++++++++++++ mindspore/ccsrc/frontend/parallel/ps/worker.h | 15 +++-- .../ccsrc/frontend/parallel/ps/worker_proxy.h | 66 +++++++++++++++++-- mindspore/context.py | 9 ++- 6 files changed, 186 insertions(+), 32 deletions(-) diff --git a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc index 84c31e6607..1b408b7788 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc +++ b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info.cc @@ -62,7 +62,13 @@ void OptimizerInfo::UpdateOptimInputValue(const std::string &optim_type, const s size_t size = lens[ps_send_index] * sizeof(T); size_t offset = std::accumulate(lens.begin(), lens.begin() + ps_send_index, 0, std::plus()); AddressPtr optim_input = inputs_[origin_index]; - int ret = memcpy_s(optim_input->addr, optim_input->size, reinterpret_cast(data) + offset, size); + MS_EXCEPTION_IF_NULL(optim_input); + + void *dst_data = optim_input->addr; + T *src_data = reinterpret_cast(data) + offset; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + int ret = memcpy_s(optim_input->addr, optim_input->size, src_data, size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -71,6 +77,7 @@ void OptimizerInfo::UpdateOptimInputValue(const std::string &optim_type, const s } void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { + MS_EXCEPTION_IF_NULL(gradient()->addr); float *accum_grad_data = reinterpret_cast(gradient()->addr); size_t size = gradient()->size / sizeof(float); size_t grad_index = this->grad_index(); @@ -96,11 +103,19 @@ void DenseOptimInfo::ComputeMean(const std::vector> &, size_ } } -void DenseOptimInfo::Reset() { memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); } +void DenseOptimInfo::Reset() { + MS_EXCEPTION_IF_NULL(gradient()->addr); + int ret = memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); + if (ret != 0) { + MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")"; + return; + } +} void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { // Append grad data to the end float *accum_grad_data = reinterpret_cast(gradient()->addr); + MS_EXCEPTION_IF_NULL(accum_grad_data); size_t grad_index = this->grad_index(); size_t grad_offset = 0; @@ -108,10 +123,16 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { grad_offset += lengths[i]; } float *incr_grad_data = values.data() + grad_offset; + MS_EXCEPTION_IF_NULL(incr_grad_data); + size_t incr_grad_size = lengths[grad_index] * sizeof(float); size_t dst_size = incr_grad_size; size_t src_size = incr_grad_size; - auto ret = memcpy_s(accum_grad_data + grads_offset_, dst_size, incr_grad_data, src_size); + void *dst_data = accum_grad_data + grads_offset_; + void *src_data = incr_grad_data; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + auto ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -121,6 +142,7 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { // Append indice data to the end int *accum_indices_data = reinterpret_cast(indices()->addr); + MS_EXCEPTION_IF_NULL(accum_indices_data); size_t indices_index = this->indices_index(); size_t indice_offset = 0; @@ -128,12 +150,16 @@ void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { indice_offset += lengths[i]; } int *incr_indice_data = reinterpret_cast(values.data()) + indice_offset; + MS_EXCEPTION_IF_NULL(incr_indice_data); size_t incr_indice_size = lengths[indices_index]; size_t incr_indice_data_size = incr_indice_size * sizeof(int); dst_size = incr_indice_data_size; src_size = incr_indice_data_size; - auto ret2 = - memcpy_s(accum_indices_data + indices_offset_, incr_indice_data_size, incr_indice_data, incr_indice_data_size); + dst_data = accum_indices_data + indices_offset_; + src_data = incr_indice_data; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + auto ret2 = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret2 != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret2 << ")"; return; @@ -167,6 +193,8 @@ void SparseOptimInfo::ComputeMean(const std::vector> &shapes MS_LOG(ERROR) << "Invalid first dim size"; } + MS_EXCEPTION_IF_NULL(gradient()->addr); + MS_EXCEPTION_IF_NULL(indices()->addr); float *grad_data = reinterpret_cast(gradient()->addr); int *indices_data = reinterpret_cast(indices()->addr); @@ -189,12 +217,14 @@ void SparseOptimInfo::ComputeMean(const std::vector> &shapes &unique_sparse_grad); int reduced_grad_size = unique_sparse_grad.indices_size_ * segment_size * sizeof(float); + MS_EXCEPTION_IF_NULL(unique_sparse_grad.value_); auto ret = memcpy_s(gradient()->addr, gradient()->size, unique_sparse_grad.value_, reduced_grad_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; } int reduced_indice_size = unique_sparse_grad.indices_size_ * sizeof(int); + MS_EXCEPTION_IF_NULL(unique_sparse_grad.indices_); ret = memcpy_s(indices()->addr, indices()->size, unique_sparse_grad.indices_, reduced_indice_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; @@ -210,10 +240,8 @@ void SparseOptimInfo::ComputeMean(const std::vector> &shapes } void SparseOptimInfo::Reset() { - auto &gradient = this->gradient(); - gradient->size = 0; - auto &indices = this->indices(); - indices->size = 0; + gradient()->size = 0; + indices()->size = 0; grads_offset_ = 0; indices_offset_ = 0; } diff --git a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc index 01eeecf5e0..8acac1261c 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc +++ b/mindspore/ccsrc/frontend/parallel/ps/optimizer_info_builder.cc @@ -27,7 +27,10 @@ using mindspore::kernel::ps::SparseApplyFtrlPSKernel; OptimizerInfo *OptimizerInfoBuilder::Build(const std::shared_ptr &pserver_kernel, const WeightPtr &weight, const Keys &keys, const Values &values, const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num) { + MS_EXCEPTION_IF_NULL(pserver_kernel); + MS_EXCEPTION_IF_NULL(inputs_shape); OptimizerInfo *optim_info = BuildInputs(weight, keys, values, lens, inputs_shape, worker_num, pserver_kernel); + MS_EXCEPTION_IF_NULL(optim_info); std::vector ws_sizes = pserver_kernel->workspace_sizes(); BuildWorkspaces(optim_info, ws_sizes, worker_num); BuildOutputs(optim_info, worker_num); @@ -39,7 +42,9 @@ void OptimizerInfoBuilder::BuildWorkspaces(OptimizerInfo *info, const std::vecto for (size_t i = 0; i < ws_sizes.size(); i++) { size_t size = ws_sizes[i]; AddressPtr workspace = std::make_shared(); + MS_EXCEPTION_IF_NULL(workspace); workspace->addr = new float[size]; + MS_EXCEPTION_IF_NULL(workspace->addr); workspace->size = size; info->AddWorkspace(workspace); } @@ -49,13 +54,11 @@ template AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type, const std::string &input_name, void *ps_data, const Lengths &ps_lens, const InputsShapePtr &inputs_shape) { + MS_EXCEPTION_IF_NULL(ps_data); // Take note of that the data type maybe inconsistent in ps_data. MS_LOG(INFO) << "Get input address pointer for optimizer:" << optim_type << ", input name:" << input_name; AddressPtr addr_ptr = std::make_shared(); MS_EXCEPTION_IF_NULL(addr_ptr); - size_t addr_data_size = 0; - size_t addr_data_offset = 0; - size_t ps_index = INDEX_NOT_SEND; if (kOptimToOriginIdx.count(optim_type) == 0 || kOptimToPSSendIdx.count(optim_type) == 0) { MS_LOG(EXCEPTION) << "Optimizer type " << optim_type << " in not supported."; @@ -65,11 +68,12 @@ AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type, if (ps_send_index_map.count(input_name) == 0 || origin_input_map.count(input_name) == 0) { MS_LOG(EXCEPTION) << "Optimizer " << optim_type << " has no input for " << input_name; } - ps_index = ps_send_index_map.at(input_name); + size_t ps_index = ps_send_index_map.at(input_name); if (ps_index == INDEX_NOT_SEND) { MS_LOG(EXCEPTION) << "Input " << input_name << " is not supposed to be sent to PS."; } + size_t addr_data_size, addr_data_offset; if (inputs_shape != nullptr) { // addr_data_size should be calculated by inputs_shape if it's passed. size_t origin_index = origin_input_map.at(input_name); @@ -86,7 +90,14 @@ AddressPtr OptimizerInfoBuilder::GenInputAddrPtr(const std::string &optim_type, T *buffer = new T[addr_data_size]; addr_ptr->size = ps_lens[ps_index] * sizeof(T); addr_ptr->addr = buffer; - int ret = memcpy_s(addr_ptr->addr, addr_ptr->size, reinterpret_cast(ps_data) + addr_data_offset, addr_ptr->size); + + size_t dst_size = addr_ptr->size; + size_t src_size = addr_ptr->size; + void *dst_data = addr_ptr->addr; + void *src_data = reinterpret_cast(ps_data) + addr_data_offset; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + int ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; delete[] buffer; @@ -99,11 +110,14 @@ OptimizerInfo *MomentumOptimInfoBuilder::BuildInputs(const WeightPtr &weight, co const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num, const std::shared_ptr &) { AddressPtr weight_addr = std::make_shared(); + MS_EXCEPTION_IF_NULL(weight_addr); weight_addr->addr = weight->data(); weight_addr->size = weight->size() * sizeof(float); AddressPtr accumulate = std::make_shared(); + MS_EXCEPTION_IF_NULL(accumulate); accumulate->addr = new float[weight->size()]; + MS_EXCEPTION_IF_NULL(accumulate->addr); accumulate->size = weight->size() * sizeof(float); int ret = memset_s(accumulate->addr, accumulate->size, 0x00, accumulate->size); if (ret != 0) { @@ -122,25 +136,30 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num, const std::shared_ptr &) { AddressPtr weight_addr = std::make_shared(); + MS_EXCEPTION_IF_NULL(weight_addr); weight_addr->addr = weight->data(); weight_addr->size = weight->size() * sizeof(float); AddressPtr m = std::make_shared(); + MS_EXCEPTION_IF_NULL(m); m->addr = new float[weight->size()]; + MS_EXCEPTION_IF_NULL(m->addr); m->size = weight->size() * sizeof(float); int ret = memset_s(m->addr, m->size, 0x00, m->size); if (ret != 0) { - MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")"; delete[] reinterpret_cast(m->addr); return nullptr; } AddressPtr v = std::make_shared(); + MS_EXCEPTION_IF_NULL(v); v->addr = new float[weight->size()]; + MS_EXCEPTION_IF_NULL(v->addr); v->size = weight->size() * sizeof(float); ret = memset_s(v->addr, v->size, 0x00, v->size); if (ret != 0) { - MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")"; delete[] reinterpret_cast(v->addr); delete[] reinterpret_cast(m->addr); return nullptr; @@ -154,7 +173,6 @@ OptimizerInfo *SparseAdamOptimInfoBuilder::BuildInputs(const WeightPtr &weight, AddressPtr epsilon = GenInputAddrPtr(kSparseAdam, "eps", values.data(), lens); AddressPtr grad = GenInputAddrPtr(kSparseAdam, "grad", values.data(), lens, inputs_shape); AddressPtr indices = GenInputAddrPtr(kSparseAdam, "indices", values.data(), lens, inputs_shape); - return new SparseAdamOptimInfo(weight_addr, m, v, beta1_power, beta2_power, learning_rate, beta1, beta2, epsilon, grad, indices); } @@ -163,12 +181,16 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight, const Lengths &lens, const InputsShapePtr &inputs_shape, size_t worker_num, const std::shared_ptr &pserver_kernel) { + MS_EXCEPTION_IF_NULL(inputs_shape); AddressPtr weight_addr = std::make_shared(); + MS_EXCEPTION_IF_NULL(weight_addr); weight_addr->addr = weight->data(); weight_addr->size = weight->size() * sizeof(float); AddressPtr accum = std::make_shared(); + MS_EXCEPTION_IF_NULL(accum); accum->addr = new float[weight->size()]; + MS_EXCEPTION_IF_NULL(accum->addr); accum->size = weight->size() * sizeof(float); for (size_t i = 0; i < weight->size(); i++) { float *tmp = reinterpret_cast(accum->addr); @@ -176,7 +198,9 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight, } AddressPtr linear = std::make_shared(); + MS_EXCEPTION_IF_NULL(linear); linear->addr = new float[weight->size()]; + MS_EXCEPTION_IF_NULL(linear->addr); int ret = memset_s(linear->addr, weight->size() * sizeof(float), 0x00, weight->size() * sizeof(float)); if (ret != 0) { MS_LOG(EXCEPTION) << "memset_s error, errorno(" << ret << ")"; @@ -187,7 +211,6 @@ OptimizerInfo *SparseFtrlOptimInfoBuilder::BuildInputs(const WeightPtr &weight, AddressPtr grad = GenInputAddrPtr(kSparseFtrl, "grad", values.data(), lens, inputs_shape); AddressPtr indices = GenInputAddrPtr(kSparseFtrl, "indices", values.data(), lens, inputs_shape); - return new SparseFtrlOptimInfo(weight_addr, accum, linear, grad, indices); } } // namespace ps diff --git a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h index 616ab093a5..0ab2622206 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h +++ b/mindspore/ccsrc/frontend/parallel/ps/parameter_server.h @@ -172,6 +172,7 @@ class FuncGraph; template void ParameterServer::ServerHandler::operator()(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVServer *server) { + MS_EXCEPTION_IF_NULL(server); ::ps::KVPairs res; if (handlers_.count(req_meta.cmd) > 0) { auto &handler_ptr = handlers_[req_meta.cmd]; @@ -199,12 +200,14 @@ void ParameterServer::ServerHandler::Init() { template void ParameterServer::ServerHandler::HandlePushReq(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { + MS_EXCEPTION_IF_NULL(res); ps_->AccumGrad(req_data.keys, req_data.vals, req_data.lens); } template void ParameterServer::ServerHandler::HandlePullReq(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { + MS_EXCEPTION_IF_NULL(res); res->keys = req_data.keys; ::ps::Key key = req_data.keys[0]; res->vals = *(ps_->weight(key)); @@ -214,6 +217,7 @@ template void ParameterServer::ServerHandler::HandleInitWeights(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { std::unique_lock lock(ps_->mutex()); + MS_EXCEPTION_IF_NULL(res); size_t key_num = req_data.keys.size(); T *data_ptr = req_data.vals.data(); size_t pos = 0; @@ -223,10 +227,12 @@ void ParameterServer::ServerHandler::HandleInitWeights(const ::ps::KVMeta &re if (!ps_->HasWeight(key)) { WeightPtr weight_ptr = std::make_shared<::ps::SArray>(); + MS_EXCEPTION_IF_NULL(weight_ptr); weight_ptr->CopyFrom(data_ptr + pos, data_len); ps_->InitWeight(key, weight_ptr); GradPtr grad_ptr = std::make_shared<::ps::SArray>(data_len, 0); + MS_EXCEPTION_IF_NULL(grad_ptr); ps_->InitGrad(key, grad_ptr); } pos += data_len; @@ -238,6 +244,7 @@ void ParameterServer::ServerHandler::HandleInitWeightToOptimId(const ::ps::KV const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { std::unique_lock lock(ps_->mutex()); + MS_EXCEPTION_IF_NULL(res); size_t key_num = req_data.keys.size(); for (size_t i = 0; i < key_num; i++) { Key key = req_data.keys[i]; @@ -255,6 +262,7 @@ template void ParameterServer::ServerHandler::HandleInitInputsShape(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { std::unique_lock lock(ps_->mutex()); + MS_EXCEPTION_IF_NULL(res); const Key &key = req_data.keys[0]; if (init_optim_info_[key]) { return; @@ -268,13 +276,18 @@ template void ParameterServer::ServerHandler::HandleInitEmbeddings(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { std::unique_lock lock(ps_->mutex()); + MS_EXCEPTION_IF_NULL(res); const Key &key = req_data.keys[0]; MS_LOG(INFO) << "Initializing embedding table for key:" << key; std::shared_ptr>>> shapes = std::make_shared>>>(); + MS_EXCEPTION_IF_NULL(shapes); std::shared_ptr> input_shape = std::make_shared>(); + MS_EXCEPTION_IF_NULL(input_shape); std::shared_ptr> indices_shape = std::make_shared>(); + MS_EXCEPTION_IF_NULL(indices_shape); std::shared_ptr> output_shape = std::make_shared>(); + MS_EXCEPTION_IF_NULL(output_shape); shapes->push_back(input_shape); shapes->push_back(indices_shape); shapes->push_back(output_shape); @@ -297,6 +310,7 @@ template void ParameterServer::ServerHandler::HandleCheckReadyForPush(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { + MS_EXCEPTION_IF_NULL(res); const Key &key = req_data.keys[0]; bool ready = ps_->ReadyForPush(key); res->keys.push_back(key); @@ -307,6 +321,7 @@ template void ParameterServer::ServerHandler::HandleCheckReadyForPull(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { + MS_EXCEPTION_IF_NULL(res); const Key &key = req_data.keys[0]; bool ready = ps_->ReadyForPull(key); res->keys.push_back(key); @@ -316,6 +331,7 @@ void ParameterServer::ServerHandler::HandleCheckReadyForPull(const ::ps::KVMe template void ParameterServer::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { + MS_EXCEPTION_IF_NULL(res); const Key &key = req_data.keys[0]; for (size_t i = 1; i < req_data.keys.size(); i++) { res->keys.push_back(req_data.keys[i]); @@ -326,6 +342,7 @@ void ParameterServer::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta template void ParameterServer::ServerHandler::HandleFinalize(const ::ps::KVMeta &req_meta, const ::ps::KVPairs &req_data, ::ps::KVPairs *res) { + MS_EXCEPTION_IF_NULL(res); ps_->Finalize(); } @@ -371,7 +388,9 @@ void ParameterServer::InitWeightKeyToOptims(const Key &key, const int &optim_ template void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &values, const Lengths &lengths) { InputsShapePtr inputs_shape = std::make_shared(); + MS_EXCEPTION_IF_NULL(inputs_shape); InputsShapePtr original_inputs_shape = std::make_shared(); + MS_EXCEPTION_IF_NULL(original_inputs_shape); int val_idx = 0; const Key &key = keys[0]; MS_LOG(INFO) << "Initializing optimizer inputs shape for key:" << key; @@ -381,7 +400,9 @@ void ParameterServer::InitOptimInputsShape(const Keys &keys, const Values &va } for (size_t i = 0; i < keys.size(); i++) { auto shape = std::make_shared>(); + MS_EXCEPTION_IF_NULL(shape); auto original_shape = std::make_shared>(); + MS_EXCEPTION_IF_NULL(original_shape); inputs_shape->push_back(shape); original_inputs_shape->push_back(original_shape); @@ -425,6 +446,7 @@ template const CNodePtr ParameterServer::GetCNode(const std::string &name) const { std::list cnodes = func_graph_->GetOrderedCnodes(); for (CNodePtr cnode : cnodes) { + MS_EXCEPTION_IF_NULL(cnode); std::string fullname = cnode->fullname_with_scope(); if (fullname.find(name) != std::string::npos && fullname.find("Push") != std::string::npos) { return cnode; @@ -435,6 +457,7 @@ const CNodePtr ParameterServer::GetCNode(const std::string &name) const { template void ParameterServer::InitWeight(const Key &key, const WeightPtr &weight) { + MS_EXCEPTION_IF_NULL(weight); if ((weights_.count(key) == 0) || (is_embedding_[key] && weights_.count(key) != 0)) { MS_LOG(INFO) << "Initializing weight for key " << key << ", server rank " << rank_id_; weights_[key] = weight; @@ -445,6 +468,7 @@ void ParameterServer::InitWeight(const Key &key, const WeightPtr &weight) { template void ParameterServer::InitGrad(const Key &key, const GradPtr &grad) { + MS_EXCEPTION_IF_NULL(grad); if (grads_.count(key) == 0) { grads_[key] = grad; grads_accum_counter_[key] = 0; @@ -454,6 +478,7 @@ void ParameterServer::InitGrad(const Key &key, const GradPtr &grad) { template void ParameterServer::InitEmbeddingTable( const Key &key, const std::shared_ptr>>> &shapes) { + MS_EXCEPTION_IF_NULL(shapes); if (weights_.count(key) == 0) { std::shared_ptr lookup = std::make_shared(rank_id_, pserver_num_, worker_num_); @@ -464,6 +489,7 @@ void ParameterServer::InitEmbeddingTable( const std::vector &input_shapes = lookup->input_sizes(); size_t total_dims = std::accumulate(input_shapes.begin(), input_shapes.end(), 1, std::multiplies()); WeightPtr embedding = std::make_shared(total_dims, 0); + MS_EXCEPTION_IF_NULL(embedding); T *embedding_data = embedding->data(); std::default_random_engine engine; std::normal_distribution random(0, 0.01); @@ -580,7 +606,9 @@ WeightPtr ParameterServer::weight(const Key &key) { MS_LOG(EXCEPTION) << "Invalid weight key " << key; } WeightPtr weight_ptr = weights_[key]; + MS_EXCEPTION_IF_NULL(weight_ptr); WeightPtr copy_weight_ptr = std::make_shared<::ps::SArray>(weight_ptr->size(), 0); + MS_EXCEPTION_IF_NULL(copy_weight_ptr); copy_weight_ptr->CopyFrom(weight_ptr->data(), weight_ptr->size()); tokens_[key] -= 1; return copy_weight_ptr; @@ -589,6 +617,7 @@ WeightPtr ParameterServer::weight(const Key &key) { template void ParameterServer::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, ::ps::KVPairs *res) { std::unique_lock lock(mutex_); + MS_EXCEPTION_IF_NULL(res); if (weights_.count(key) == 0) { MS_LOG(ERROR) << "Invalid embedding table key " << key; return; @@ -598,7 +627,9 @@ void ParameterServer::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, return; } WeightPtr table_ptr = weights_[key]; + MS_EXCEPTION_IF_NULL(table_ptr); std::shared_ptr table_lookup_op = embedding_lookup_ops_[key]; + MS_EXCEPTION_IF_NULL(table_lookup_op); // Update shapes of lookup operator std::vector> shapes = {}; @@ -610,13 +641,16 @@ void ParameterServer::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, const std::vector output_shapes = table_lookup_op->output_sizes(); std::vector inputs; AddressPtr embedding_table = std::make_shared(); + MS_EXCEPTION_IF_NULL(embedding_table); AddressPtr indices = std::make_shared(); + MS_EXCEPTION_IF_NULL(indices); inputs.push_back(embedding_table); inputs.push_back(indices); embedding_table->addr = table_ptr->data(); embedding_table->size = table_ptr->size() * sizeof(T); std::unique_ptr tmp_ids(new int[lookup_ids.size()]); + MS_EXCEPTION_IF_NULL(tmp_ids); for (size_t i = 0; i < lookup_ids.size(); i++) { tmp_ids[i] = static_cast(lookup_ids[i]); } @@ -626,7 +660,9 @@ void ParameterServer::DoEmbeddingLookup(Key key, const LookupIds &lookup_ids, std::vector workspaces; std::vector outputs; AddressPtr output = std::make_shared(); + MS_EXCEPTION_IF_NULL(output); std::shared_ptr addr = std::make_shared(output_shapes[0] / sizeof(T), 0); + MS_EXCEPTION_IF_NULL(addr); output->addr = addr->data(); output->size = output_shapes[0]; @@ -680,6 +716,7 @@ void ParameterServer::GetEmbeddingTableParamPtr() { auto cnodes = func_graph_->GetOrderedCnodes(); Key count = 0; for (auto cnode : cnodes) { + MS_EXCEPTION_IF_NULL(cnode); std::string cnode_name = AnfAlgo::GetCNodeName(cnode); if (cnode_name == kEmbeddingLookupOpName) { auto embedding_table = AnfAlgo::GetInputNode(cnode, 0); @@ -703,6 +740,7 @@ void ParameterServer::SyncEmbeddingTables() { std::vector new_tensor_shape(input_shapes.begin(), input_shapes.end()); tensor::TensorPtr new_tensor = std::make_shared(kNumberTypeFloat32, new_tensor_shape); + MS_EXCEPTION_IF_NULL(new_tensor); float *new_tensor_data_ptr = reinterpret_cast(new_tensor->data_c()); size_t new_tensor_size = static_cast(new_tensor->data().nbytes()); size_t embedding_table_size = weights_[key]->size() * sizeof(float); @@ -710,6 +748,8 @@ void ParameterServer::SyncEmbeddingTables() { MS_LOG(EXCEPTION) << "Shape of embedding table can't match. New tensor size:" << new_tensor_size << ", embedding_table size:" << embedding_table_size; } + MS_EXCEPTION_IF_NULL(new_tensor_data_ptr); + MS_EXCEPTION_IF_NULL(weights_[key]->data()); int ret = memcpy_s(new_tensor_data_ptr, new_tensor_size, weights_[key]->data(), embedding_table_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; @@ -724,6 +764,7 @@ void ParameterServer::SyncEmbeddingTables() { template void ParameterServer::Run(const FuncGraphPtr &func_graph) { + MS_EXCEPTION_IF_NULL(func_graph); MS_LOG(INFO) << "PServer starts connecting to scheduler and workers..."; ::ps::Start(0); MS_LOG(INFO) << "PServer connected successfully."; diff --git a/mindspore/ccsrc/frontend/parallel/ps/worker.h b/mindspore/ccsrc/frontend/parallel/ps/worker.h index 1adc8b3e15..d3de384335 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/worker.h +++ b/mindspore/ccsrc/frontend/parallel/ps/worker.h @@ -53,7 +53,7 @@ class Worker { void SetOptimInputShapes(size_t key, const ShapeVector &shape); void AddEmbeddingTable(const ::ps::Key &key, const size_t &row_count); void InitPSEmbeddingTable(const std::vector &keys, std::vector shapes, const ShapeVector &sizes); - void InitPSParamAndOptim(const std::string ¶m_name, tensor::TensorPtr tensor); + void InitPSParamAndOptim(const std::string ¶m_name, const tensor::TensorPtr &tensor); void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *lookup_result, int cmd); void Finalize(); @@ -132,10 +132,13 @@ void Worker::Push(const std::vector &keys, std::vector add size_t dst_size = 0; size_t src_size = 0; for (size_t i = 0; i < sizes.size(); i++) { + void *dst_data = total_buffer.data() + offset / sizeof(T); + void *src_data = reinterpret_cast(addrs[i]); + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); dst_size = sizes[i] * sizeof(T); src_size = sizes[i] * sizeof(T); - auto ret = - memcpy_s(total_buffer.data() + offset / sizeof(T), dst_size, reinterpret_cast(addrs[i]), src_size); + auto ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -159,6 +162,7 @@ void Worker::Push(const std::vector &keys, std::vector add template void Worker::Pull(const size_t key, void *dev_addr, const size_t size) { + MS_EXCEPTION_IF_NULL(dev_addr); ::ps::SArray variables(size / sizeof(T), 0); while (!kv_worker_->IsReadyForPull(key)) { continue; @@ -176,6 +180,7 @@ void Worker::Pull(const size_t key, void *dev_addr, const size_t size) { template void Worker::DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *lookup_result, int cmd) { + MS_EXCEPTION_IF_NULL(lookup_result); kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd); } @@ -192,6 +197,7 @@ void Worker::Finalize() { template void Worker::InitPSParamData(const std::vector &keys, void *origin_addr, size_t size) { + MS_EXCEPTION_IF_NULL(origin_addr); ::ps::SArray addr(reinterpret_cast(origin_addr), size / sizeof(T)); ::ps::SArray<::ps::Key> key(keys); ::ps::SArray lens; @@ -316,7 +322,8 @@ void Worker::InitPSEmbeddingTable(const std::vector &keys, std::vecto } template -void Worker::InitPSParamAndOptim(const std::string ¶m_name, tensor::TensorPtr tensor) { +void Worker::InitPSParamAndOptim(const std::string ¶m_name, const tensor::TensorPtr &tensor) { + MS_EXCEPTION_IF_NULL(tensor); void *param_data = tensor->data_c(); size_t param_size = LongToSize(tensor->data().nbytes()); ShapeVector param_shape = tensor->shape_c(); diff --git a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h index dc6bfaa233..8e6f5a2bb8 100644 --- a/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h +++ b/mindspore/ccsrc/frontend/parallel/ps/worker_proxy.h @@ -144,6 +144,7 @@ void WorkerProxy::AddEmbeddingTable(const ::ps::Key &key, const size_t &row_c ::ps::Range range(begin, end); if (embedding_table_ranges_.count(key) == 0) { embedding_table_ranges_[key] = std::make_shared>(); + MS_EXCEPTION_IF_NULL(embedding_table_ranges_[key]); } embedding_table_ranges_[key]->push_back(range); } @@ -168,6 +169,7 @@ template void WorkerProxy::EmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, const ::ps::SArray &lens, ::ps::SArray *outs, int cmd, const Callback &cb, int priority) { + MS_EXCEPTION_IF_NULL(outs); int ts = AddLookupCB(keys, lookup_ids, outs, cmd, cb); ::ps::KVPairs kvs; kvs.keys = keys; @@ -265,6 +267,7 @@ void WorkerProxy::PushSparseData(const ::ps::SArray<::ps::Key> &keys, const : template void WorkerProxy::PullData(const ::ps::SArray<::ps::Key> &keys, ::ps::SArray *vals, ::ps::SArray *lens, int cmd, int priority) { + MS_EXCEPTION_IF_NULL(vals); int ts = AddGeneralRspCB(keys, vals, lens, cmd, nullptr); ::ps::KVPairs kvs; kvs.keys = keys; @@ -295,6 +298,7 @@ template template int WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray &lookup_ids, C *lookup_result, int cmd, const Callback &cb) { + MS_EXCEPTION_IF_NULL(lookup_result); int ts = lookup_customer_->NewRequest(::ps::kServerGroup); const auto &callback = [this, ts, keys, lookup_ids, lookup_result, cb]() mutable { mutex_.lock(); @@ -310,17 +314,27 @@ int WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps: T *addr = s.vals.data() + offset; offset += len; id_addr_map[key] = std::make_shared>(std::make_pair(addr, len)); + MS_EXCEPTION_IF_NULL(id_addr_map[key]); } } T *result_addr = lookup_result->data(); + MS_EXCEPTION_IF_NULL(result_addr); int offset = 0; + size_t dst_size = 0; + size_t src_size = 0; + void *dst_data = nullptr; + void *src_data = nullptr; for (size_t i = 0; i < lookup_ids.size(); i++) { auto &pair = id_addr_map[static_cast(lookup_ids[i])]; int size = pair->second * sizeof(T); - size_t dst_size = size; - size_t src_size = size; - auto ret = memcpy_s(result_addr + offset, dst_size, pair->first, src_size); + dst_size = size; + src_size = size; + dst_data = result_addr + offset; + src_data = pair->first; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + auto ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -373,6 +387,7 @@ template void WorkerProxy::LookupIdSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced, const std::map &attrs) { + MS_EXCEPTION_IF_NULL(sliced); int *lookup_ids = send.lens.data(); size_t id_size = send.lens.size(); @@ -414,6 +429,7 @@ template void WorkerProxy::SparseSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced, const std::map &attrs) { + MS_EXCEPTION_IF_NULL(sliced); // Init variables T *data = send.vals.data(); @@ -527,15 +543,29 @@ void WorkerProxy::PrepareSparseGradient(const size_t begin, const size_t end, const std::vector> &indice_to_grads, const int *all_indice, const size_t segment_size, T *gradient, int *indices) { + MS_EXCEPTION_IF_NULL(all_indice); + MS_EXCEPTION_IF_NULL(gradient); + MS_EXCEPTION_IF_NULL(indices); int offset = 0; int index = 0; size_t segment_data_size = segment_size * sizeof(T); + size_t dst_size = 0; + size_t src_size = 0; + void *dst_data = nullptr; + void *src_data = nullptr; for (auto &pair : indice_to_grads) { if (distinct_ids.count(pair.first) == 0) { continue; } indices[index++] = pair.first; - auto ret = memcpy_s(gradient + offset, segment_data_size, pair.second, segment_data_size); + + dst_size = segment_data_size; + src_size = segment_data_size; + dst_data = gradient + offset; + src_data = pair.second; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + auto ret = memcpy_s(gradient + offset, dst_size, pair.second, src_size); if (ret != 0) { MS_LOG(ERROR) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -548,15 +578,25 @@ template void WorkerProxy::BuildSparseValue(const ::ps::SArray &lengths, const size_t grad_index, const size_t indice_index, const T *original_data, const T *grads, int *indices, ::ps::SArray *reduced_data) { + MS_EXCEPTION_IF_NULL(original_data); + MS_EXCEPTION_IF_NULL(grads); + MS_EXCEPTION_IF_NULL(indices); + MS_EXCEPTION_IF_NULL(reduced_data); int offset = 0; size_t dst_size = 0; size_t src_size = 0; + void *dst_data = nullptr; + void *src_data = nullptr; for (size_t i = 0; i < lengths.size(); i++) { if (i != grad_index && i != indice_index) { int data_size = lengths[i] * sizeof(T); dst_size = data_size; src_size = data_size; - auto ret = memcpy_s(reduced_data->data() + offset, dst_size, original_data + offset, src_size); + dst_data = reduced_data->data() + offset; + src_data = const_cast(original_data) + offset; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + auto ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -573,7 +613,11 @@ void WorkerProxy::BuildSparseValue(const ::ps::SArray &lengths, const si int data_size = lengths[grad_index] * sizeof(T); dst_size = data_size; src_size = data_size; - auto ret = memcpy_s(reduced_data->data() + grad_offset, dst_size, grads, src_size); + dst_data = reduced_data->data() + grad_offset; + src_data = const_cast(grads); + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + auto ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -585,7 +629,11 @@ void WorkerProxy::BuildSparseValue(const ::ps::SArray &lengths, const si T *indice_data = reduced_data->data() + indice_offset; dst_size = data_size; src_size = data_size; - ret = memcpy_s(indice_data, dst_size, indices, src_size); + dst_data = indice_data; + src_data = indices; + MS_EXCEPTION_IF_NULL(dst_data); + MS_EXCEPTION_IF_NULL(src_data); + ret = memcpy_s(dst_data, dst_size, src_data, src_size); if (ret != 0) { MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; @@ -596,6 +644,7 @@ template void WorkerProxy::BroadcastSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced, const std::map &attr) { + MS_EXCEPTION_IF_NULL(sliced); sliced->resize(server_num_); for (int i = 0; i < server_num_; i++) { sliced->at(i).first = true; @@ -608,6 +657,7 @@ template void WorkerProxy::RoundRobinSlicer(int timestamp, const ::ps::KVPairs &send, const std::vector<::ps::Range> &, std::vector>> *sliced, const std::map &attr) { + MS_EXCEPTION_IF_NULL(sliced); sliced->resize(server_num_); auto keys = send.keys; auto vals = send.vals; @@ -646,6 +696,7 @@ void WorkerProxy::WorkerInitEmbeddingSlicer(int timestamp, const ::ps::KVPair const std::vector<::ps::Range> &, std::vector>> *sliced, const std::map &attrs) { + MS_EXCEPTION_IF_NULL(sliced); sliced->resize(server_num_); auto keys = send.keys; auto vals = send.vals; @@ -714,6 +765,7 @@ void WorkerProxy::ProcessResponse(const ::ps::Message &msg) { template void WorkerProxy::Send(::ps::Customer *customer, int timestamp, bool push, bool pull, int cmd, const ::ps::KVPairs &kvs, const Slicer &slicer, std::map attrs) { + MS_EXCEPTION_IF_NULL(customer); SlicedKVs sliced; slicer(timestamp, kvs, ::ps::Postoffice::Get()->GetServerKeyRanges(), &sliced, attrs); diff --git a/mindspore/context.py b/mindspore/context.py index 39b3fae3ac..d6784c9e39 100644 --- a/mindspore/context.py +++ b/mindspore/context.py @@ -639,14 +639,17 @@ def set_ps_context(**kwargs): Note: Some other environment variables should also be set for parameter server training mode. These environment variables are listed below: + + .. code-block:: + MS_SERVER_NUM # Server number MS_WORKER_NUM # Worker number MS_SCHED_HOST # Scheduler IP address MS_SCHED_PORT # Scheduler port MS_ROLE # The role of this process: - MS_SCHED represents the scheduler, - MS_WORKER represents the worker, - MS_PSERVER represents the Server + # MS_SCHED represents the scheduler, + # MS_WORKER represents the worker, + # MS_PSERVER represents the Server Args: