| @@ -42,6 +42,7 @@ if (NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU))) | |||||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc") | list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc") | ||||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_adam_ps_kernel.cc") | list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_adam_ps_kernel.cc") | ||||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_ftrl_ps_kernel.cc") | list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_ftrl_ps_kernel.cc") | ||||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/sparse_apply_lazy_adam_ps_kernel.cc") | |||||
| endif() | endif() | ||||
| if (ENABLE_GPU) | if (ENABLE_GPU) | ||||
| @@ -13,7 +13,7 @@ | |||||
| * See the License for the specific language governing permissions and | * See the License for the specific language governing permissions and | ||||
| * limitations under the License. | * limitations under the License. | ||||
| */ | */ | ||||
| #ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_CPU_KERNEL_H_ | |||||
| #ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_PS_KERNEL_H_ | |||||
| #define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_PS_KERNEL_H_ | #define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_ADAM_PS_KERNEL_H_ | ||||
| #include <vector> | #include <vector> | ||||
| @@ -0,0 +1,104 @@ | |||||
| /** | |||||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||||
| * | |||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||||
| * you may not use this file except in compliance with the License. | |||||
| * You may obtain a copy of the License at | |||||
| * | |||||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||||
| * | |||||
| * Unless required by applicable law or agreed to in writing, software | |||||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
| * See the License for the specific language governing permissions and | |||||
| * limitations under the License. | |||||
| */ | |||||
| #include "backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.h" | |||||
| #include <memory> | |||||
| #include "backend/kernel_compiler/common_utils.h" | |||||
| #include "runtime/device/cpu/cpu_device_address.h" | |||||
| #include "frontend/parallel/ps/util.h" | |||||
| namespace mindspore { | |||||
| namespace kernel { | |||||
| namespace ps { | |||||
| void SparseApplyLazyAdamPSKernel::InitKernel( | |||||
| const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) { | |||||
| const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes; | |||||
| std::vector<size_t> &var_shape = *(shape_vec[0]); | |||||
| std::vector<size_t> &m_shape = *(shape_vec[1]); | |||||
| std::vector<size_t> &v_shape = *(shape_vec[2]); | |||||
| const std::vector<size_t> &grad_shape = *(shape_vec[9]); | |||||
| const std::vector<size_t> &indices_shape = *(shape_vec[10]); | |||||
| Shard(&var_shape, 0); | |||||
| Shard(&m_shape, 0); | |||||
| Shard(&v_shape, 0); | |||||
| if (!IsSameShape(var_shape, m_shape)) { | |||||
| MS_LOG(EXCEPTION) << "var and m should have the same shape"; | |||||
| } | |||||
| if (!IsSameShape(var_shape, v_shape)) { | |||||
| MS_LOG(EXCEPTION) << "var and v should have the same shape"; | |||||
| } | |||||
| var_first_dim_size_ = var_shape[0]; | |||||
| for (size_t i = 1; i < var_shape.size(); ++i) { | |||||
| if (var_shape[i] != grad_shape[i]) { | |||||
| MS_LOG(EXCEPTION) << "The shape of var and grad must equal in dimension " << i; | |||||
| } | |||||
| var_outer_dim_size_ *= var_shape[i]; | |||||
| } | |||||
| if (indices_shape.size() != 1) { | |||||
| MS_LOG(EXCEPTION) << "indices must be 1D"; | |||||
| } | |||||
| indices_size_ = indices_shape[0]; | |||||
| if (grad_shape[0] != indices_size_) { | |||||
| MS_LOG(ERROR) << "The first dimension of grad shape must be equal to indices"; | |||||
| } | |||||
| /* | |||||
| if (AnfAlgo::HasNodeAttr(USE_NESTEROV, kernel_node)) { | |||||
| use_nesterov_ = AnfAlgo::GetNodeAttr<bool>(kernel_node, "use_nesterov"); | |||||
| } | |||||
| */ | |||||
| workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float)); | |||||
| workspace_size_list_.emplace_back(indices_size_ * sizeof(int)); | |||||
| workspace_size_list_.emplace_back(indices_size_ * var_outer_dim_size_ * sizeof(float)); | |||||
| workspace_size_list_.emplace_back(indices_size_ * sizeof(int)); | |||||
| workspace_size_list_.emplace_back(var_first_dim_size_ * var_outer_dim_size_ * sizeof(float)); | |||||
| } | |||||
| void SparseApplyLazyAdamPSKernel::ReInit( | |||||
| const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &shapes) { | |||||
| const std::vector<std::shared_ptr<std::vector<size_t>>> &shape_vec = *shapes; | |||||
| const std::vector<size_t> &indices_shape = *(shape_vec[0]); | |||||
| indices_size_ = indices_shape[0]; | |||||
| workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float); | |||||
| workspace_size_list_[1] = indices_size_ * sizeof(int); | |||||
| } | |||||
| void SparseApplyLazyAdamPSKernel::ReInit(const std::vector<AddressPtr> &inputs) { | |||||
| const auto &indices_addr = inputs[10]; | |||||
| indices_size_ = indices_addr->size / sizeof(int); | |||||
| workspace_size_list_[0] = indices_size_ * var_outer_dim_size_ * sizeof(float); | |||||
| workspace_size_list_[1] = indices_size_ * sizeof(int); | |||||
| } | |||||
| bool SparseApplyLazyAdamPSKernel::Execute(const std::vector<AddressPtr> &inputs, | |||||
| const std::vector<AddressPtr> &workspace, | |||||
| const std::vector<AddressPtr> &outputs) { | |||||
| ReInit(inputs); | |||||
| int *indices = reinterpret_cast<int *>(inputs[10]->addr); | |||||
| for (size_t i = 0; i < inputs[10]->size / sizeof(int); i++) { | |||||
| indices[i] -= rank_id_ * var_first_dim_size_; | |||||
| } | |||||
| return Launch(inputs, workspace, outputs); | |||||
| } | |||||
| const std::vector<size_t> &SparseApplyLazyAdamPSKernel::input_sizes() const { return GetInputSizeList(); } | |||||
| const std::vector<size_t> &SparseApplyLazyAdamPSKernel::output_sizes() const { return GetOutputSizeList(); } | |||||
| const std::vector<size_t> &SparseApplyLazyAdamPSKernel::workspace_sizes() const { return GetWorkspaceSizeList(); } | |||||
| } // namespace ps | |||||
| } // namespace kernel | |||||
| } // namespace mindspore | |||||
| @@ -0,0 +1,49 @@ | |||||
| /** | |||||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||||
| * | |||||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||||
| * you may not use this file except in compliance with the License. | |||||
| * You may obtain a copy of the License at | |||||
| * | |||||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||||
| * | |||||
| * Unless required by applicable law or agreed to in writing, software | |||||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||||
| * See the License for the specific language governing permissions and | |||||
| * limitations under the License. | |||||
| */ | |||||
| #ifndef MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_LAZY_ADAM_PS_KERNEL_H_ | |||||
| #define MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_LAZY_ADAM_PS_KERNEL_H_ | |||||
| #include <vector> | |||||
| #include <memory> | |||||
| #include "backend/kernel_compiler/cpu/ps/pserver_kernel.h" | |||||
| #include "backend/kernel_compiler/cpu/sparse_apply_lazy_adam_cpu_kernel.h" | |||||
| namespace mindspore { | |||||
| namespace kernel { | |||||
| namespace ps { | |||||
| using mindspore::kernel::SparseApplyLazyAdamCPUKernel; | |||||
| class SparseApplyLazyAdamPSKernel : public SparseApplyLazyAdamCPUKernel, public PServerKernel { | |||||
| public: | |||||
| SparseApplyLazyAdamPSKernel(size_t rank_id, size_t pserver_num) : PServerKernel(rank_id, pserver_num) {} | |||||
| ~SparseApplyLazyAdamPSKernel() override = default; | |||||
| void InitKernel(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override; | |||||
| void ReInit(const std::shared_ptr<std::vector<std::shared_ptr<std::vector<size_t>>>> &) override; | |||||
| bool Execute(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace, | |||||
| const std::vector<AddressPtr> &outputs) override; | |||||
| const std::vector<size_t> &input_sizes() const override; | |||||
| const std::vector<size_t> &output_sizes() const override; | |||||
| const std::vector<size_t> &workspace_sizes() const override; | |||||
| protected: | |||||
| void ReInit(const std::vector<AddressPtr> &) override; | |||||
| }; | |||||
| } // namespace ps | |||||
| } // namespace kernel | |||||
| } // namespace mindspore | |||||
| #endif // MINDSPORE_CCSRC_BACKEND_KERNEL_COMPILER_CPU_SPARSE_APPLY_LAZY_ADAM_PS_KERNEL_H_ | |||||
| @@ -33,7 +33,7 @@ class SparseApplyLazyAdamCPUKernel : public CPUKernel { | |||||
| bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace, | bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &workspace, | ||||
| const std::vector<AddressPtr> &outputs) override; | const std::vector<AddressPtr> &outputs) override; | ||||
| private: | |||||
| protected: | |||||
| size_t indices_size_{0}; | size_t indices_size_{0}; | ||||
| size_t var_first_dim_size_{0}; | size_t var_first_dim_size_{0}; | ||||
| size_t var_outer_dim_size_{1}; | size_t var_outer_dim_size_{1}; | ||||
| @@ -63,6 +63,7 @@ constexpr int kInitWeightToOptimIdCmd = 11; | |||||
| constexpr int kInitOptimInputsShapeCmd = 12; | constexpr int kInitOptimInputsShapeCmd = 12; | ||||
| constexpr int kInitEmbeddingsCmd = 20; | constexpr int kInitEmbeddingsCmd = 20; | ||||
| constexpr int kEmbeddingLookupCmd = 30; | constexpr int kEmbeddingLookupCmd = 30; | ||||
| constexpr int kFinalizeCmd = 40; | |||||
| constexpr size_t kInvalidKey = UINT64_MAX; | constexpr size_t kInvalidKey = UINT64_MAX; | ||||
| @@ -57,6 +57,16 @@ void DenseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { | |||||
| } | } | ||||
| } | } | ||||
| void DenseOptimInfo::ComputeMean(size_t n) { | |||||
| if (n > 1) { | |||||
| float *accum_grad_data = reinterpret_cast<float *>(gradient()->addr); | |||||
| size_t size = gradient()->size / sizeof(float); | |||||
| for (size_t i = 0; i < size; i++) { | |||||
| accum_grad_data[i] /= n; | |||||
| } | |||||
| } | |||||
| } | |||||
| void DenseOptimInfo::Reset() { memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); } | void DenseOptimInfo::Reset() { memset_s(gradient()->addr, gradient()->size, 0x00, gradient()->size); } | ||||
| void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { | void SparseOptimInfo::Accumulate(const Values &values, const Lengths &lengths) { | ||||
| @@ -33,6 +33,7 @@ class OptimizerInfo { | |||||
| virtual void Update(const Values &values, const Lengths &lengths) {} | virtual void Update(const Values &values, const Lengths &lengths) {} | ||||
| virtual void UpdateWeight(const WeightPtr &weight); | virtual void UpdateWeight(const WeightPtr &weight); | ||||
| virtual void Accumulate(const Values &values, const Lengths &lengths) = 0; | virtual void Accumulate(const Values &values, const Lengths &lengths) = 0; | ||||
| virtual void ComputeMean(size_t n) {} | |||||
| virtual void Reset() {} | virtual void Reset() {} | ||||
| void AddWorkspace(const AddressPtr &workspace); | void AddWorkspace(const AddressPtr &workspace); | ||||
| @@ -58,6 +59,7 @@ class DenseOptimInfo : public OptimizerInfo { | |||||
| ~DenseOptimInfo() override = default; | ~DenseOptimInfo() override = default; | ||||
| void Accumulate(const Values &values, const Lengths &lens) override; | void Accumulate(const Values &values, const Lengths &lens) override; | ||||
| void ComputeMean(size_t n) override; | |||||
| void Reset() override; | void Reset() override; | ||||
| }; | }; | ||||
| @@ -41,7 +41,7 @@ | |||||
| #include "backend/kernel_compiler/kernel.h" | #include "backend/kernel_compiler/kernel.h" | ||||
| #include "backend/kernel_compiler/cpu/cpu_kernel_factory.h" | #include "backend/kernel_compiler/cpu/cpu_kernel_factory.h" | ||||
| #include "backend/kernel_compiler/cpu/ps/pserver_kernel.h" | #include "backend/kernel_compiler/cpu/ps/pserver_kernel.h" | ||||
| #include "backend/kernel_compiler/cpu/ps/sparse_apply_adam_ps_kernel.h" | |||||
| #include "backend/kernel_compiler/cpu/ps/sparse_apply_lazy_adam_ps_kernel.h" | |||||
| #include "backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.h" | #include "backend/kernel_compiler/cpu/ps/sparse_apply_ftrl_ps_kernel.h" | ||||
| #include "backend/kernel_compiler/cpu/ps/apply_momentum_ps_kernel.h" | #include "backend/kernel_compiler/cpu/ps/apply_momentum_ps_kernel.h" | ||||
| #include "backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.h" | #include "backend/kernel_compiler/cpu/ps/embedding_look_up_ps_kernel.h" | ||||
| @@ -90,6 +90,7 @@ class ParameterServer { | |||||
| void HandleInitInputsShape(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | void HandleInitInputsShape(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | ||||
| void HandleInitEmbeddings(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | void HandleInitEmbeddings(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | ||||
| void HandleEmbeddingLookup(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | void HandleEmbeddingLookup(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | ||||
| void HandleFinalize(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, ::ps::KVPairs<T> *res); | |||||
| ParameterServer *ps_; | ParameterServer *ps_; | ||||
| typedef void (ServerHandler::*RequestHandler)(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, | typedef void (ServerHandler::*RequestHandler)(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, | ||||
| @@ -165,6 +166,7 @@ void ParameterServer<T>::ServerHandler::Init() { | |||||
| handlers_[kInitOptimInputsShapeCmd] = &ServerHandler::HandleInitInputsShape; | handlers_[kInitOptimInputsShapeCmd] = &ServerHandler::HandleInitInputsShape; | ||||
| handlers_[kInitEmbeddingsCmd] = &ServerHandler::HandleInitEmbeddings; | handlers_[kInitEmbeddingsCmd] = &ServerHandler::HandleInitEmbeddings; | ||||
| handlers_[kEmbeddingLookupCmd] = &ServerHandler::HandleEmbeddingLookup; | handlers_[kEmbeddingLookupCmd] = &ServerHandler::HandleEmbeddingLookup; | ||||
| handlers_[kFinalizeCmd] = &ServerHandler::HandleFinalize; | |||||
| } | } | ||||
| template <typename T> | template <typename T> | ||||
| @@ -256,16 +258,16 @@ void ParameterServer<T>::ServerHandler::HandleEmbeddingLookup(const ::ps::KVMeta | |||||
| ps_->DoEmbeddingLookup(key, req_data.keys.segment(1, req_data.keys.size()), res); | ps_->DoEmbeddingLookup(key, req_data.keys.segment(1, req_data.keys.size()), res); | ||||
| } | } | ||||
| template <typename T> | |||||
| void ParameterServer<T>::ServerHandler::HandleFinalize(const ::ps::KVMeta &req_meta, const ::ps::KVPairs<T> &req_data, | |||||
| ::ps::KVPairs<T> *res) { | |||||
| ::ps::Finalize(0, false); | |||||
| } | |||||
| template <typename T> | template <typename T> | ||||
| bool ParameterServer<T>::Init(const FuncGraphPtr &func_graph) { | bool ParameterServer<T>::Init(const FuncGraphPtr &func_graph) { | ||||
| const char *server_num = getenv(kEnvPServerNum); | |||||
| const char *worker_num = getenv(kEnvWorkerNum); | |||||
| if (server_num != nullptr) { | |||||
| pserver_num_ = *server_num - '0'; | |||||
| } | |||||
| if (worker_num != nullptr) { | |||||
| worker_num_ = *worker_num - '0'; | |||||
| } | |||||
| pserver_num_ = ::ps::NumServers(); | |||||
| worker_num_ = ::ps::NumWorkers(); | |||||
| func_graph_ = func_graph; | func_graph_ = func_graph; | ||||
| rank_id_ = ::ps::MyRank(); | rank_id_ = ::ps::MyRank(); | ||||
| handler_.reset(new ServerHandler(this)); | handler_.reset(new ServerHandler(this)); | ||||
| @@ -319,7 +321,7 @@ void ParameterServer<T>::InitOptimInputsShape(const Keys &keys, const Values &va | |||||
| if (optimizers_.count(key) == 0 && optim_inputs_shape_.count(key) > 0) { | if (optimizers_.count(key) == 0 && optim_inputs_shape_.count(key) > 0) { | ||||
| if (optim_name == kSparseAdam) { | if (optim_name == kSparseAdam) { | ||||
| std::shared_ptr<PServerKernel> optimizer = | std::shared_ptr<PServerKernel> optimizer = | ||||
| std::make_shared<kernel::ps::SparseApplyAdamPSKernel>(rank_id_, pserver_num_); | |||||
| std::make_shared<kernel::ps::SparseApplyLazyAdamPSKernel>(rank_id_, pserver_num_); | |||||
| optimizer->InitKernel(optim_inputs_shape_[key]); | optimizer->InitKernel(optim_inputs_shape_[key]); | ||||
| optimizers_[key] = optimizer; | optimizers_[key] = optimizer; | ||||
| } else if (optim_name == kApplyMomentum) { | } else if (optim_name == kApplyMomentum) { | ||||
| @@ -368,10 +370,11 @@ void ParameterServer<T>::InitEmbeddingTable( | |||||
| } | } | ||||
| WeightPtr embedding = std::make_shared<Weight>(total_dims, 0); | WeightPtr embedding = std::make_shared<Weight>(total_dims, 0); | ||||
| T *embedding_data = embedding->data(); | |||||
| std::default_random_engine engine; | std::default_random_engine engine; | ||||
| std::normal_distribution<float> random(0, 0.01); | std::normal_distribution<float> random(0, 0.01); | ||||
| for (size_t i = 0; i < total_dims; i++) { | for (size_t i = 0; i < total_dims; i++) { | ||||
| (*embedding)[i] = random(engine); | |||||
| embedding_data[i] = random(engine); | |||||
| } | } | ||||
| weights_[key] = embedding; | weights_[key] = embedding; | ||||
| @@ -402,6 +405,7 @@ void ParameterServer<T>::UpdateWeights() { | |||||
| const std::vector<kernel::AddressPtr> &workspaces = optim_info->workspaces(); | const std::vector<kernel::AddressPtr> &workspaces = optim_info->workspaces(); | ||||
| const std::vector<kernel::AddressPtr> &outputs = optim_info->outputs(); | const std::vector<kernel::AddressPtr> &outputs = optim_info->outputs(); | ||||
| optim_info->ComputeMean(worker_num_); | |||||
| optimizer->Execute(inputs, workspaces, outputs); | optimizer->Execute(inputs, workspaces, outputs); | ||||
| optim_info->Reset(); | optim_info->Reset(); | ||||
| } | } | ||||
| @@ -50,6 +50,7 @@ class Worker { | |||||
| void InitPSParamAndOptim(const std::string ¶m_name, void *param_data, size_t param_size); | void InitPSParamAndOptim(const std::string ¶m_name, void *param_data, size_t param_size); | ||||
| void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids, | void DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids, | ||||
| const ::ps::SArray<int> &lens, ::ps::SArray<T> *lookup_result, int cmd); | const ::ps::SArray<int> &lens, ::ps::SArray<T> *lookup_result, int cmd); | ||||
| void Finalize(); | |||||
| private: | private: | ||||
| Worker() : kv_worker_(nullptr), running_(false), key_cnt_(0) {} | Worker() : kv_worker_(nullptr), running_(false), key_cnt_(0) {} | ||||
| @@ -118,6 +119,11 @@ void Worker<T>::DoPSEmbeddingLookup(const ::ps::SArray<::ps::Key> &keys, const : | |||||
| kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd); | kv_worker_->EmbeddingLookup(keys, lookup_ids, lens, lookup_result, cmd); | ||||
| } | } | ||||
| template <typename T> | |||||
| void Worker<T>::Finalize() { | |||||
| kv_worker_->Finalize(); | |||||
| } | |||||
| template <typename T> | template <typename T> | ||||
| void Worker<T>::InitPSParamData(const std::vector<size_t> &keys, void *origin_addr, size_t size) { | void Worker<T>::InitPSParamData(const std::vector<size_t> &keys, void *origin_addr, size_t size) { | ||||
| ::ps::SArray<T> addr(reinterpret_cast<T *>(origin_addr), size / sizeof(T)); | ::ps::SArray<T> addr(reinterpret_cast<T *>(origin_addr), size / sizeof(T)); | ||||
| @@ -58,6 +58,7 @@ class WorkerProxy : public ::ps::KVWorker<T> { | |||||
| const ::ps::SArray<int> &lens = {}, const Callback &cb = nullptr, int priority = 0); | const ::ps::SArray<int> &lens = {}, const Callback &cb = nullptr, int priority = 0); | ||||
| void PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<T> &vals, const ::ps::SArray<int> &lens = {}, | void PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<T> &vals, const ::ps::SArray<int> &lens = {}, | ||||
| int cmd = 0, int priority = 0); | int cmd = 0, int priority = 0); | ||||
| void Finalize(); | |||||
| private: | private: | ||||
| template <typename C> | template <typename C> | ||||
| @@ -146,6 +147,17 @@ void WorkerProxy<T>::PushData(const ::ps::SArray<::ps::Key> &keys, const ::ps::S | |||||
| obj_->WaitRequest(ts); | obj_->WaitRequest(ts); | ||||
| } | } | ||||
| template <typename T> | |||||
| void WorkerProxy<T>::Finalize() { | |||||
| int ts = obj_->NewRequest(::ps::kServerGroup); | |||||
| ::ps::KVPairs<T> kvs; | |||||
| kvs.keys.push_back(0); | |||||
| kvs.vals.push_back(0.0f); | |||||
| Send(obj_, ts, true, false, kFinalizeCmd, kvs, broadcast_slicer_); | |||||
| obj_->WaitRequest(ts); | |||||
| ::ps::Finalize(0, false); | |||||
| } | |||||
| template <typename T> | template <typename T> | ||||
| template <typename C> | template <typename C> | ||||
| int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids, | int WorkerProxy<T>::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const ::ps::SArray<int> &lookup_ids, | ||||
| @@ -75,7 +75,7 @@ def train(net, data, label): | |||||
| print(res) | print(res) | ||||
| print("+++++++++++++++++++++++++++") | print("+++++++++++++++++++++++++++") | ||||
| diff = res.asnumpy()[0] - 2.3025851 | diff = res.asnumpy()[0] - 2.3025851 | ||||
| assert np.all(diff < 1.e-7) | |||||
| assert np.all(diff < 1.e-6) | |||||
| @pytest.mark.level0 | @pytest.mark.level0 | ||||