| @@ -25,7 +25,9 @@ if (ENABLE_CPU) | |||
| file(GLOB_RECURSE CPU_SRC_LIST RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} | |||
| "cpu/*.cc" | |||
| ) | |||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/ps/push_kernel.cc" "cpu/ps/pull_kernel.cc") | |||
| if (NOT ENABLE_MPI) | |||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/allgather_cpu_kernel.cc") | |||
| list(REMOVE_ITEM CPU_SRC_LIST "cpu/reduce_scatter_cpu_kernel.cc") | |||
| @@ -0,0 +1,25 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include "kernel/cpu/ps/pull_kernel.h" | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| MS_REG_CPU_KERNEL_T( | |||
| Pull, KernelAttr().AddInputAttr(kNumberTypeUInt64).AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| PullKernel, float); | |||
| } // namespace kernel | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,85 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_ | |||
| #define MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_ | |||
| #include <vector> | |||
| #include <string> | |||
| #include "parallel/ps/worker.h" | |||
| #include "parallel/ps/util.h" | |||
| #include "kernel/cpu/cpu_kernel.h" | |||
| #include "kernel/cpu/cpu_kernel_factory.h" | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| template <typename T> | |||
| class PullKernel : public CPUKernel { | |||
| public: | |||
| PullKernel() : keys_size_(sizeof(size_t)), var_size_(sizeof(size_t)) {} | |||
| ~PullKernel() override = default; | |||
| bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &, const std::vector<AddressPtr> &) { | |||
| // If the paramter is embedding table, don't Pull from PServer. | |||
| if (param_name_.find("embedding") == std::string::npos && param_name_.find("wide_w") == std::string::npos) { | |||
| parallel::ps::Worker<T>::GetInstance().Pull(key_, inputs[1]->addr, inputs[1]->size); | |||
| } | |||
| return true; | |||
| } | |||
| void Init(const CNodePtr &kernel_node) { | |||
| size_t input_num = AnfAlgo::GetInputTensorNum(kernel_node); | |||
| if (input_num != 2) { | |||
| MS_LOG(ERROR) << "Input number is " << input_num << ", but pull needs 2 inputs."; | |||
| return; | |||
| } | |||
| auto key_shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, 0); | |||
| for (size_t i = 0; i < key_shape.size(); i++) { | |||
| keys_size_ *= key_shape[i]; | |||
| } | |||
| auto var_shape = AnfAlgo::GetPrevNodeOutputInferShape(kernel_node, 1); | |||
| for (size_t i = 0; i < var_shape.size(); i++) { | |||
| var_size_ *= var_shape[i]; | |||
| } | |||
| auto param_node = AnfAlgo::GetInputNode(kernel_node, 1); | |||
| MS_EXCEPTION_IF_NULL(param_node); | |||
| param_name_ = param_node->fullname_with_scope(); | |||
| if (mindspore::parallel::ps::Util::IsRoleOfWorker()) { | |||
| key_ = AnfAlgo::GetNodeAttr<size_t>(kernel_node, kAttrPsKey); | |||
| } | |||
| InitSizeLists(); | |||
| return; | |||
| } | |||
| void InitKernel(const CNodePtr &kernel_node) { return; } | |||
| protected: | |||
| void InitSizeLists() { | |||
| input_size_list_.push_back(keys_size_); | |||
| input_size_list_.push_back(var_size_); | |||
| output_size_list_.push_back(0); | |||
| } | |||
| private: | |||
| size_t key_; | |||
| size_t keys_size_; | |||
| size_t var_size_; | |||
| std::string param_name_; | |||
| }; | |||
| } // namespace kernel | |||
| } // namespace mindspore | |||
| #endif // MINDSPORE_CCSRC_KERNEL_PS_PULL_KERNEL_H_ | |||
| @@ -0,0 +1,38 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #include "kernel/cpu/ps/push_kernel.h" | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| MS_REG_CPU_KERNEL_T(Push, | |||
| KernelAttr() | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeFloat32) | |||
| .AddInputAttr(kNumberTypeInt32) | |||
| .AddOutputAttr(kNumberTypeUInt64), | |||
| PushKernel, float); | |||
| MS_REG_CPU_KERNEL_T( | |||
| Push, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeUInt64), | |||
| PushKernel, float); | |||
| } // namespace kernel | |||
| } // namespace mindspore | |||
| @@ -0,0 +1,80 @@ | |||
| /** | |||
| * Copyright 2020 Huawei Technologies Co., Ltd | |||
| * | |||
| * Licensed under the Apache License, Version 2.0 (the "License"); | |||
| * you may not use this file except in compliance with the License. | |||
| * You may obtain a copy of the License at | |||
| * | |||
| * http://www.apache.org/licenses/LICENSE-2.0 | |||
| * | |||
| * Unless required by applicable law or agreed to in writing, software | |||
| * distributed under the License is distributed on an "AS IS" BASIS, | |||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |||
| * See the License for the specific language governing permissions and | |||
| * limitations under the License. | |||
| */ | |||
| #ifndef MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_ | |||
| #define MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_ | |||
| #include <vector> | |||
| #include <algorithm> | |||
| #include "parallel/ps/worker.h" | |||
| #include "parallel/ps/util.h" | |||
| #include "kernel/cpu/cpu_kernel.h" | |||
| #include "kernel/cpu/cpu_kernel_factory.h" | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| template <typename T> | |||
| class PushKernel : public CPUKernel { | |||
| public: | |||
| PushKernel() : key_(UINT64_MAX) {} | |||
| ~PushKernel() override = default; | |||
| bool Launch(const std::vector<AddressPtr> &inputs, const std::vector<AddressPtr> &, | |||
| const std::vector<AddressPtr> &outputs) { | |||
| std::vector<size_t> keys; | |||
| std::vector<uintptr_t> addrs; | |||
| std::vector<int> sizes; | |||
| for (auto input : inputs) { | |||
| keys.push_back(key_); | |||
| addrs.push_back(reinterpret_cast<uintptr_t>(input->addr)); | |||
| sizes.push_back(SizeToInt(input->size) / sizeof(T)); | |||
| } | |||
| parallel::ps::Worker<T>::GetInstance().Push(keys, addrs, sizes); | |||
| memcpy(outputs[0]->addr, &key_, sizeof(size_t)); | |||
| return true; | |||
| } | |||
| void Init(const CNodePtr &kernel_node) { | |||
| key_ = AnfAlgo::GetNodeAttr<size_t>(kernel_node, kAttrPsKey); | |||
| auto optim_input_shapes = AnfAlgo::GetNodeAttr<std::vector<std::vector<int>>>(kernel_node, "optim_input_shapes"); | |||
| std::vector<int> only_shape_indices = AnfAlgo::GetNodeAttr<std::vector<int>>(kernel_node, "only_shape_indices"); | |||
| MS_LOG(INFO) << "Key " << key_ << " optimizer input shapes are:" << optim_input_shapes; | |||
| MS_LOG(INFO) << "Only init shape indices are " << only_shape_indices; | |||
| for (size_t i = 0; i < optim_input_shapes.size(); i++) { | |||
| auto shape = optim_input_shapes[i]; | |||
| mindspore::parallel::ps::Worker<float>::GetInstance().SetOptimInputShapes(key_, shape); | |||
| if (std::count(only_shape_indices.begin(), only_shape_indices.end(), i) == 0) { | |||
| size_t size = sizeof(T); | |||
| for (size_t j = 0; j < shape.size(); j++) { | |||
| size *= shape[j]; | |||
| } | |||
| input_size_list_.push_back(size); | |||
| } | |||
| } | |||
| output_size_list_.push_back(sizeof(size_t)); | |||
| return; | |||
| } | |||
| void InitKernel(const CNodePtr &kernel_node) { return; } | |||
| private: | |||
| size_t key_; | |||
| }; | |||
| } // namespace kernel | |||
| } // namespace mindspore | |||
| #endif // MINDSPORE_CCSRC_KERNEL_PS_PUSH_KERNEL_H_ | |||