Merge pull request !21350 from fangzehua/change_thread_pooltags/v1.5.0-rc1
| @@ -83,7 +83,7 @@ void AdamCPUKernel::LaunchAdamNnacl(const std::vector<kernel::AddressPtr> &input | |||
| MS_LOG(EXCEPTION) << "AdamFp32 failed."; | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, lens, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, lens, this, ¶llel_search_info_); | |||
| } | |||
| void AdamCPUKernel::InitKernel(const CNodePtr &kernel_node) { | |||
| @@ -19,6 +19,7 @@ | |||
| #include "runtime/device/cpu/cpu_device_address.h" | |||
| #include "nnacl/fp32/power_fp32.h" | |||
| #include "nnacl/fp32/sub_fp32.h" | |||
| #include "nnacl/fp32/mul_fp32.h" | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| @@ -54,7 +55,7 @@ void ArithmeticCPUKernel<T>::Sub(const T *input1, const T *input2, T *out) { | |||
| auto task = [&](size_t start, size_t end) { | |||
| ElementSub(input1 + start, input2 + start, out + start, end - start); | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, output_size_, MAX_SUB_SERIAL_SIZE); | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| if (op_para.in_elements_num0_ == 1 || op_para.in_elements_num1_ == 1) { | |||
| @@ -65,7 +66,7 @@ void ArithmeticCPUKernel<T>::Sub(const T *input1, const T *input2, T *out) { | |||
| ElementOptSub(input1 + start, input2, out + start, end - start, &op_para); | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, output_size_, MAX_SUB_SERIAL_SIZE); | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| } | |||
| @@ -84,6 +85,26 @@ void ArithmeticCPUKernel<T>::Sub(const T *input1, const T *input2, T *out) { | |||
| template <typename T> | |||
| void ArithmeticCPUKernel<T>::Mul(const T *input1, const T *input2, T *out) { | |||
| if constexpr (std::is_same_v<T, float>) { | |||
| if (input_shape1_ == input_shape2_) { | |||
| auto task = [&](size_t start, size_t end) { | |||
| ElementMul(input1 + start, input2 + start, out + start, end - start); | |||
| }; | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| if (op_para.in_elements_num0_ == 1 || op_para.in_elements_num1_ == 1) { | |||
| auto task = [&](size_t start, size_t end) { | |||
| if (op_para.in_elements_num0_ == 1) { | |||
| ElementOptMul(input1, input2 + start, out + start, end - start, &op_para); | |||
| } else { | |||
| ElementOptMul(input1 + start, input2, out + start, end - start, &op_para); | |||
| } | |||
| }; | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| } | |||
| BroadcastIterator base_iter(input_shape1_, input_shape2_, output_shape_); | |||
| auto task = [&input1, &input2, &out, &base_iter](size_t start, size_t end) { | |||
| auto iter = base_iter; | |||
| @@ -128,21 +149,21 @@ void ArithmeticCPUKernel<T>::RealDiv(const T *input1, const T *input2, T *out) { | |||
| auto task = [&](size_t start, size_t end) { | |||
| ElementRealDiv<T>(input1 + start, input2 + start, out + start, end - start, 1, 1); | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, output_size_, MAX_DIV_SERIAL_SIZE); | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| if (op_para.in_elements_num0_ == 1) { | |||
| auto task = [&](size_t start, size_t end) { | |||
| ElementRealDiv<T>(input1, input2 + start, out + start, end - start, 0, 1); | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, output_size_, MAX_DIV_SERIAL_SIZE); | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| if (op_para.in_elements_num1_ == 1) { | |||
| auto task = [&](size_t start, size_t end) { | |||
| ElementRealDiv<T>(input1 + start, input2, out + start, end - start, 1, 0); | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, output_size_, MAX_DIV_SERIAL_SIZE); | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| @@ -339,7 +360,7 @@ void ArithmeticCPUKernel<T>::SquaredDifference(const T *input1, const T *input2, | |||
| iter.GenNextPos(); | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, output_size_); | |||
| ParallelLaunchAutoSearch(task, output_size_, this, ¶llel_search_info_); | |||
| } | |||
| template <typename T> | |||
| @@ -77,6 +77,8 @@ MS_REG_CPU_KERNEL_T(RealDiv, KernelAttr(), ArithmeticCPUKernel, int64_t); | |||
| MS_REG_CPU_KERNEL_T(Div, KernelAttr(), ArithmeticCPUKernel, int32_t); | |||
| MS_REG_CPU_KERNEL_T(Div, KernelAttr(), ArithmeticCPUKernel, float); | |||
| MS_REG_CPU_KERNEL_T(Div, KernelAttr(), ArithmeticCPUKernel, int64_t); | |||
| MS_REG_CPU_KERNEL_T(Mul, KernelAttr(), ArithmeticCPUKernel, float); | |||
| MS_REG_CPU_KERNEL_T(Mul, KernelAttr(), ArithmeticCPUKernel, int32_t); | |||
| MS_REG_CPU_KERNEL_T( | |||
| FloorDiv, KernelAttr().AddInputAttr(kNumberTypeInt64).AddInputAttr(kNumberTypeInt64).AddOutputAttr(kNumberTypeInt64), | |||
| ArithmeticCPUKernel, int64_t); | |||
| @@ -20,6 +20,7 @@ | |||
| #include <map> | |||
| #include "backend/kernel_compiler/cpu/arithmetic_self_cpu_kernel.h" | |||
| #include "runtime/device/cpu/cpu_device_address.h" | |||
| #include "nnacl/fp32/exp_fp32.h" | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| @@ -31,7 +32,15 @@ void Square(const T *in, T *out, size_t size) { | |||
| out[i] = in[i] * in[i]; | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, size, MAX_SQUARE_SERIAL_SIZE); | |||
| ParallelLaunch(task, size, MAX_SQUARE_SERIAL_SIZE); | |||
| } | |||
| template <typename T> | |||
| void Exp(const T *in, T *out, size_t size) { | |||
| if constexpr (std::is_same_v<T, float>) { | |||
| auto task = [&in, &out](size_t start, size_t end) { ExpFp32(in + start, out + start, end - start); }; | |||
| ParallelLaunch(task, size, MAX_EXP_SERIAL_SIZE); | |||
| } | |||
| } | |||
| template <typename T> | |||
| @@ -57,7 +66,7 @@ void Neg(const T *in, T *out, size_t size) { | |||
| out[i] = -in[i]; | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelFor(task, size, MAX_NEG_SERIAL_SIZE); | |||
| ParallelLaunch(task, size, MAX_NEG_SERIAL_SIZE); | |||
| } | |||
| template <typename T> | |||
| @@ -262,6 +271,7 @@ void Identity(const T *in, T *out, size_t size) { | |||
| static const std::map<std::string, OperateType> kArithmeticOpTypeMap = {{prim::kPrimNeg->name(), NEG}, | |||
| {prim::kPrimSquare->name(), SQUARE}, | |||
| {prim::kPrimOnesLike->name(), ONESLIKE}, | |||
| {prim::kPrimExp->name(), EXP}, | |||
| {prim::kPrimZerosLike->name(), ZEROSLIKE}, | |||
| {prim::kPrimLogicalNot->name(), LOGICALNOT}, | |||
| {prim::kPrimSign->name(), SIGN}, | |||
| @@ -324,17 +334,29 @@ void ArithmeticSelfCPUKernel::LaunchKernel(const std::vector<AddressPtr> &inputs | |||
| T *output = reinterpret_cast<T *>(outputs[0]->addr); | |||
| size_t lens = outputs[0]->size > 0 ? static_cast<size_t>(outputs[0]->size / sizeof(T)) : 1; | |||
| static const std::map<OperateType, std::function<void(const T *in, T *out, size_t size)>> kArithmeticOpFuncMap = { | |||
| {SQUARE, Square<T>}, {SIGN, Sign<T>}, | |||
| {NEG, Neg<T>}, {LOGICALNOT, LogicalNot<T>}, | |||
| {ONESLIKE, OnesLike<T>}, {ZEROSLIKE, ZerosLike<T>}, | |||
| {FLOOR, Floor<T>}, {RECIPROCAL, Reciprocal<T>}, | |||
| {GELU, Gelu<T>}, {SIN, Sin<T>}, | |||
| {COS, Cos<T>}, {TAN, Tan<T>}, | |||
| {ASIN, Asin<T>}, {ACOS, ACos<T>}, | |||
| {ATAN, Atan<T>}, {SINH, Sinh<T>}, | |||
| {COSH, Cosh<T>}, {ASINH, Asinh<T>}, | |||
| {ACOSH, Acosh<T>}, {ATANH, Atanh<T>}, | |||
| {RINT, Rint<T>}, {ROUND, Round<T>}}; | |||
| {SQUARE, Square<T>}, | |||
| {SIGN, Sign<T>}, | |||
| {NEG, Neg<T>}, | |||
| {LOGICALNOT, LogicalNot<T>}, | |||
| {ONESLIKE, OnesLike<T>}, | |||
| {ZEROSLIKE, ZerosLike<T>}, | |||
| {FLOOR, Floor<T>}, | |||
| {RECIPROCAL, Reciprocal<T>}, | |||
| {GELU, Gelu<T>}, | |||
| {SIN, Sin<T>}, | |||
| {COS, Cos<T>}, | |||
| {TAN, Tan<T>}, | |||
| {ASIN, Asin<T>}, | |||
| {ACOS, ACos<T>}, | |||
| {ATAN, Atan<T>}, | |||
| {SINH, Sinh<T>}, | |||
| {COSH, Cosh<T>}, | |||
| {ASINH, Asinh<T>}, | |||
| {ACOSH, Acosh<T>}, | |||
| {ATANH, Atanh<T>}, | |||
| {RINT, Rint<T>}, | |||
| {ROUND, Round<T>}, | |||
| {EXP, Exp<T>}}; | |||
| if (kArithmeticOpFuncMap.find(operate_type_) != kArithmeticOpFuncMap.end()) { | |||
| kArithmeticOpFuncMap.at(operate_type_)(input, output, lens); | |||
| } else { | |||
| @@ -20,8 +20,9 @@ | |||
| #include "backend/kernel_compiler/cpu/cpu_kernel.h" | |||
| #include "backend/kernel_compiler/cpu/cpu_kernel_factory.h" | |||
| const float MAX_NEG_SERIAL_SIZE = 20000; | |||
| const float MAX_SQUARE_SERIAL_SIZE = 20000; | |||
| const float MAX_NEG_SERIAL_SIZE = 5000; | |||
| const float MAX_SQUARE_SERIAL_SIZE = 5000; | |||
| const float MAX_EXP_SERIAL_SIZE = 15000; | |||
| namespace mindspore { | |||
| namespace kernel { | |||
| @@ -58,6 +59,10 @@ class IdentityCPUKernel : public ArithmeticSelfCPUKernel { | |||
| MS_REG_CPU_KERNEL(Square, KernelAttr().AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeInt32), | |||
| ArithmeticSelfCPUKernel); | |||
| MS_REG_CPU_KERNEL(Square, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| ArithmeticSelfCPUKernel); | |||
| MS_REG_CPU_KERNEL(Exp, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| ArithmeticSelfCPUKernel); | |||
| MS_REG_CPU_KERNEL(Neg, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| ArithmeticSelfCPUKernel); | |||
| MS_REG_CPU_KERNEL(Neg, KernelAttr().AddInputAttr(kNumberTypeInt32).AddOutputAttr(kNumberTypeInt32), | |||
| @@ -90,7 +90,7 @@ bool BiasAddCPUKernel::Launch(const std::vector<AddressPtr> &inputs, const std:: | |||
| ElementAdd(src_addr + n_offset, bias_addr, output_addr + n_offset, input_shape_[1]); | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, input_shape_[0], ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, input_shape_[0], this, ¶llel_search_info_); | |||
| } | |||
| return true; | |||
| } | |||
| @@ -55,7 +55,7 @@ bool BiasAddGradCPUKernel::Launch(const std::vector<AddressPtr> &inputs, const s | |||
| auto task = [&](size_t start, size_t end) { | |||
| ReduceSumDim2Axis0(end - start, input_shape_[1], input_shape_[0], input_addr + start, output_addr + start); | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, input_shape_[1], ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, input_shape_[1], this, ¶llel_search_info_); | |||
| } | |||
| return true; | |||
| } | |||
| @@ -74,7 +74,7 @@ bool ConcatCPUKernel<T>::Launch(const std::vector<kernel::AddressPtr> &inputs, c | |||
| } | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, before_axis, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, before_axis, this, ¶llel_search_info_); | |||
| return true; | |||
| } | |||
| @@ -138,6 +138,77 @@ void CPUKernelUtils::ParallelForAutoSearch(const CTask &task, size_t count, Para | |||
| } | |||
| } | |||
| ActorThreadPool *GetActorMgrInnerThreadPool() { | |||
| auto actor_manager = ActorMgr::GetActorMgrRef(); | |||
| auto thread_pool = actor_manager->GetActorThreadPool(); | |||
| // Init thread_pool if env is windows or ascend, in case that it won't be init in graph_scheduler. | |||
| if (thread_pool == nullptr) { | |||
| const size_t kMaxThreadNum = 23; | |||
| size_t max_thread_num = std::thread::hardware_concurrency() - 1; | |||
| if (max_thread_num < 1) { | |||
| max_thread_num = 1; | |||
| } | |||
| max_thread_num = max_thread_num < kMaxThreadNum ? max_thread_num : kMaxThreadNum; | |||
| actor_manager->Initialize(true, 0, max_thread_num); | |||
| thread_pool = actor_manager->GetActorThreadPool(); | |||
| MS_EXCEPTION_IF_NULL(thread_pool); | |||
| } | |||
| return thread_pool; | |||
| } | |||
| // Use threadpool of mindrt | |||
| void ParallelLaunch(const CTask &task, size_t count, float block_size, Content content) { | |||
| auto thread_pool = GetActorMgrInnerThreadPool(); | |||
| size_t kernel_thread_num = thread_pool->GetKernelThreadNum(); | |||
| if (kernel_thread_num == 0) { | |||
| MS_LOG(EXCEPTION) << "Actor inner pool has been init, but kernel thread is 0!"; | |||
| } | |||
| size_t thread_num = count < block_size * kernel_thread_num ? std::ceil(count / block_size) : kernel_thread_num; | |||
| size_t once_compute_size = (count + thread_num - 1) / thread_num; | |||
| size_t task_num = count / once_compute_size; | |||
| if (count % once_compute_size != 0) { | |||
| task_num += 1; | |||
| } | |||
| auto func = [&](void *, int task_id, float, float) { | |||
| size_t start = task_id * once_compute_size; | |||
| size_t end = (start + once_compute_size) > count ? count : (start + once_compute_size); | |||
| task(start, end); | |||
| return common::SUCCESS; | |||
| }; | |||
| thread_pool->ParallelLaunch(func, content, task_num); | |||
| } | |||
| void ParallelLaunchAutoSearch(const CTask &task, size_t count, Content content, | |||
| ParallelSearchInfo *parallel_search_info) { | |||
| const size_t MAX_POW = 6; | |||
| const size_t AVG_COUNT = 5; | |||
| size_t current_pow = parallel_search_info->search_count / AVG_COUNT; | |||
| if (current_pow < MAX_POW) { | |||
| if (parallel_search_info->search_count % AVG_COUNT == 0) { | |||
| parallel_search_info->tmp_sum_cost_time = 0; | |||
| } | |||
| float block_size = static_cast<float>(count) / std::pow(2.0f, current_pow); | |||
| double start_time = GetTime(); | |||
| ParallelLaunch(task, count, block_size, content); | |||
| double cost_time = GetTime() - start_time; | |||
| parallel_search_info->tmp_sum_cost_time += cost_time; | |||
| parallel_search_info->search_count++; | |||
| if (parallel_search_info->search_count % AVG_COUNT == 0) { | |||
| double avg_time = parallel_search_info->tmp_sum_cost_time / AVG_COUNT; | |||
| if (parallel_search_info->min_cost_time > avg_time) { | |||
| parallel_search_info->min_cost_time = avg_time; | |||
| parallel_search_info->best_block_size = block_size; | |||
| parallel_search_info->best_pow = current_pow; | |||
| } else if (current_pow - parallel_search_info->best_pow >= 2) { | |||
| parallel_search_info->search_count = AVG_COUNT * MAX_POW; | |||
| } | |||
| } | |||
| } else { | |||
| ParallelLaunch(task, count, parallel_search_info->best_block_size, content); | |||
| } | |||
| } | |||
| std::vector<size_t> CPUKernelUtils::FlatShapeByAxis(const std::vector<size_t> &shape, int axis) { | |||
| if (axis < 0) { | |||
| axis = axis + SizeToInt(shape.size()); | |||
| @@ -25,6 +25,8 @@ | |||
| #include "backend/session/anf_runtime_algorithm.h" | |||
| #include "backend/kernel_compiler/common_utils.h" | |||
| #include "ir/anf.h" | |||
| #include "runtime/framework/graph_scheduler.h" | |||
| #include "actor/actormgr.h" | |||
| using mindspore::kernel::Address; | |||
| using mindspore::kernel::AddressPtr; | |||
| @@ -119,6 +121,7 @@ enum OperateType { | |||
| ATAN2, | |||
| RINT, | |||
| ROUND, | |||
| EXP, | |||
| IDENTITY, | |||
| }; | |||
| @@ -209,6 +212,12 @@ class TransposeIterator { | |||
| std::vector<size_t> axes_; | |||
| size_t pos_{0}; | |||
| }; | |||
| ActorThreadPool *GetActorMgrInnerThreadPool(); | |||
| void ParallelLaunch(const CTask &task, size_t count, float block_size = 128.0, Content content = nullptr); | |||
| void ParallelLaunchAutoSearch(const CTask &task, size_t count, Content content, | |||
| ParallelSearchInfo *parallel_search_info); | |||
| } // namespace kernel | |||
| } // namespace mindspore | |||
| @@ -259,9 +259,9 @@ bool EltWiseGradCPUKernel<T>::Launch(const std::vector<kernel::AddressPtr> &inpu | |||
| const auto input1 = reinterpret_cast<T *>(inputs[1]->addr); | |||
| auto output = reinterpret_cast<T *>(outputs[0]->addr); | |||
| CPUKernelUtils::ParallelForAutoSearch( | |||
| ParallelLaunchAutoSearch( | |||
| std::bind(elt_map.at(kernel_name_), this, input0, input1, output, std::placeholders::_1, std::placeholders::_2), | |||
| outputs[0]->size / sizeof(T), ¶llel_search_info_); | |||
| outputs[0]->size / sizeof(T), this, ¶llel_search_info_); | |||
| return true; | |||
| } | |||
| } // namespace kernel | |||
| @@ -52,8 +52,6 @@ MS_REG_CPU_KERNEL(Sigmoid, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutp | |||
| EltWiseCPUKernel); | |||
| MS_REG_CPU_KERNEL(Sqrt, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| EltWiseCPUKernel); | |||
| MS_REG_CPU_KERNEL(Square, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| EltWiseCPUKernel); | |||
| MS_REG_CPU_KERNEL(Tanh, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| EltWiseCPUKernel); | |||
| MS_REG_CPU_KERNEL(Softplus, KernelAttr().AddInputAttr(kNumberTypeFloat32).AddOutputAttr(kNumberTypeFloat32), | |||
| @@ -36,9 +36,6 @@ class MulCPUKernel : public MKLCPUKernel { | |||
| private: | |||
| bool need_swap_{false}; | |||
| }; | |||
| MS_REG_CPU_KERNEL(Mul, KernelAttr(), MulCPUKernel); | |||
| MS_REG_CPU_KERNEL_T(Mul, KernelAttr(), ArithmeticCPUKernel, int32_t); | |||
| } // namespace kernel | |||
| } // namespace mindspore | |||
| @@ -26,6 +26,12 @@ void ExpFp32(const float *src, float *dst, int num) { | |||
| for (; i < count; i += C4NUM) { | |||
| simd_exp(vld1q_f32(src + i), dst + i); | |||
| } | |||
| #endif | |||
| #ifdef ENABLE_AVX | |||
| int count = (num / C8NUM) * C8NUM; | |||
| for (; i < count; i += C8NUM) { | |||
| simd_exp_avx(_mm256_loadu_ps(src + i), dst + i); | |||
| } | |||
| #endif | |||
| for (; i < num; ++i) { | |||
| single_exp(src[i], dst + i); | |||
| @@ -66,7 +66,7 @@ bool OneHotCPUKernel::Launch(const std::vector<kernel::AddressPtr> &inputs, cons | |||
| } | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, elem_num, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, elem_num, this, ¶llel_search_info_); | |||
| return true; | |||
| } | |||
| @@ -139,7 +139,7 @@ bool ReduceCPUKernel<T>::Launch(const std::vector<kernel::AddressPtr> &inputs, c | |||
| } | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, output_size, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, output_size, this, ¶llel_search_info_); | |||
| return true; | |||
| } | |||
| } | |||
| @@ -113,7 +113,7 @@ bool SliceCPUKernel::Launch(const std::vector<kernel::AddressPtr> &inputs, const | |||
| auto dst = static_cast<int8_t *>(output_addr) + data_size_ * slice_param_.size_[1] * start; | |||
| SliceSimpleDim2(src, dst, &slice_param_, data_size_, end - start); | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, slice_param_.size_[0], ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, slice_param_.size_[0], this, ¶llel_search_info_); | |||
| return true; | |||
| } | |||
| DoSliceNoParallel(input_addr, output_addr, &slice_param_, data_size_); | |||
| @@ -67,7 +67,7 @@ void SplitCPUKernel<T>::LaunchSplit(T *input, T **output, size_t size) { | |||
| (void)DoSplit(input, reinterpret_cast<void **>(output), &input_shape_[0], SizeToInt(start), SizeToInt(end - start), | |||
| ¶m, SizeToInt(sizeof(T))); | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, param.split_count_ * param.num_split_, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, param.split_count_ * param.num_split_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| @@ -42,7 +42,7 @@ bool TensorAddCPUKernel<T>::Launch(const std::vector<kernel::AddressPtr> &inputs | |||
| output_addr[i] = input_addr_a[i] + input_addr_b[i]; | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, output_size, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, output_size, this, ¶llel_search_info_); | |||
| } else { // Broadcast | |||
| BroadcastIterator base_iter(input_shape_a_, input_shape_b_, output_shape_); | |||
| auto task = [&base_iter, output_addr, input_addr_a, input_addr_b](size_t start, size_t end) { | |||
| @@ -53,7 +53,7 @@ bool TensorAddCPUKernel<T>::Launch(const std::vector<kernel::AddressPtr> &inputs | |||
| iter.GenNextPos(); | |||
| } | |||
| }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, output_size, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, output_size, this, ¶llel_search_info_); | |||
| } | |||
| return true; | |||
| } | |||
| @@ -112,7 +112,7 @@ void TileCPUKernel::LaunchKernel(const std::vector<AddressPtr> &inputs, const st | |||
| if (one_dim_tile_) { | |||
| auto task = [&](size_t start, size_t end) { TileSimple(x_addr, y_addr, start, end, &tile_parameter_); }; | |||
| CPUKernelUtils::ParallelForAutoSearch(task, tile_parameter_.fast_outer_size_, ¶llel_search_info_); | |||
| ParallelLaunchAutoSearch(task, tile_parameter_.fast_outer_size_, this, ¶llel_search_info_); | |||
| return; | |||
| } | |||
| @@ -21,11 +21,12 @@ | |||
| namespace mindspore { | |||
| namespace runtime { | |||
| void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num) { | |||
| void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num, size_t *max_thread_num) { | |||
| MS_EXCEPTION_IF_NULL(actor_thread_num); | |||
| MS_EXCEPTION_IF_NULL(OMP_thread_num); | |||
| size_t cpu_core_num = std::thread::hardware_concurrency(); | |||
| MS_EXCEPTION_IF_NULL(max_thread_num); | |||
| size_t cpu_core_num = std::thread::hardware_concurrency() - 1; | |||
| const size_t kMaxThreadNum = 23; | |||
| const size_t kActorThreadMaxNum = 5; | |||
| // The MemoryManagerActor binds single thread, and the other actors share one thread at least, so the min num is 2. | |||
| const size_t kActorThreadMinNum = 2; | |||
| @@ -41,6 +42,10 @@ void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num) { | |||
| const size_t kOMPThreadMaxNum = 8; | |||
| *OMP_thread_num = cpu_core_num < kOMPThreadMaxNum ? cpu_core_num : kOMPThreadMaxNum; | |||
| *max_thread_num = cpu_core_num > *actor_thread_num ? cpu_core_num : (*actor_thread_num + 1); | |||
| if (*max_thread_num > kMaxThreadNum) { | |||
| *max_thread_num = kMaxThreadNum; | |||
| } | |||
| } | |||
| bool IsDeviceQueueDSActor(const AnfNodePtr &node, GraphExecutionStrategy strategy) { | |||
| @@ -66,7 +66,7 @@ enum class GraphExecutionStrategy { | |||
| return; \ | |||
| } | |||
| void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num); | |||
| void ComputeThreadNums(size_t *actor_thread_num, size_t *OMP_thread_num, size_t *max_thread_num); | |||
| bool IsDeviceQueueDSActor(const AnfNodePtr &node, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline); | |||
| @@ -528,12 +528,11 @@ void GraphScheduler::Initialize() { | |||
| // Create the thread pool of actor runtime and Set the OMP_NUM_THREADS env. | |||
| size_t actor_thread_num = 0; | |||
| size_t OMP_thread_num = 0; | |||
| ComputeThreadNums(&actor_thread_num, &OMP_thread_num); | |||
| size_t max_thread_num = 0; | |||
| ComputeThreadNums(&actor_thread_num, &OMP_thread_num, &max_thread_num); | |||
| auto actor_manager = ActorMgr::GetActorMgrRef(); | |||
| MS_EXCEPTION_IF_NULL(actor_manager); | |||
| actor_manager->Initialize(true, actor_thread_num); | |||
| actor_manager->Initialize(true, actor_thread_num, max_thread_num); | |||
| std::string OMP_env = std::to_string(OMP_thread_num); | |||
| (void)common::SetEnv("OMP_NUM_THREADS", OMP_env.c_str(), 0); | |||
| auto OMP_thread_num_used = common::GetEnv("OMP_NUM_THREADS"); | |||
| @@ -44,9 +44,14 @@ ActorMgr::ActorMgr() : actors(), procotols(), urls() { | |||
| urls.clear(); | |||
| } | |||
| ActorMgr::~ActorMgr() {} | |||
| ActorMgr::~ActorMgr() { | |||
| if (inner_pool_ != nullptr) { | |||
| delete inner_pool_; | |||
| inner_pool_ = nullptr; | |||
| } | |||
| } | |||
| void ActorMgr::Initialize(bool use_inner_pool, size_t thread_num) { | |||
| void ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t max_thread_num) { | |||
| bool expected = false; | |||
| if (!initialized_.compare_exchange_strong(expected, true)) { | |||
| MS_LOG(DEBUG) << "Actor Manager has been initialized before"; | |||
| @@ -54,7 +59,14 @@ void ActorMgr::Initialize(bool use_inner_pool, size_t thread_num) { | |||
| } | |||
| // create inner thread pool only when specified use_inner_pool | |||
| if (use_inner_pool) { | |||
| inner_pool_ = ActorThreadPool::CreateThreadPool(thread_num); | |||
| if (max_thread_num <= actor_thread_num) { | |||
| inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num); | |||
| } else { | |||
| inner_pool_ = ActorThreadPool::CreateThreadPool(actor_thread_num, max_thread_num, {}); | |||
| inner_pool_->SetActorThreadNum(actor_thread_num); | |||
| inner_pool_->DisableOccupiedActorThread(); | |||
| inner_pool_->SetKernelThreadNum(max_thread_num - actor_thread_num); | |||
| } | |||
| } | |||
| } | |||
| @@ -48,12 +48,14 @@ class ActorMgr { | |||
| (void)ActorMgr::GetActorMgrRef()->Send(AID(to), std::move(msg)); | |||
| } | |||
| ActorThreadPool *GetActorThreadPool() { return inner_pool_; } | |||
| ActorMgr(); | |||
| ~ActorMgr(); | |||
| void Finalize(); | |||
| // initialize actor manager resource, do not create inner thread pool by default | |||
| void Initialize(bool use_inner_pool = false, size_t thread_num = 1); | |||
| void Initialize(bool use_inner_pool = false, size_t actor_thread_num = 1, size_t max_thread_num = 1); | |||
| void RemoveActor(const std::string &name); | |||
| ActorBase *GetActor(const AID &id); | |||
| @@ -140,7 +140,7 @@ int ActorThreadPool::CreateThreads(size_t actor_thread_num, size_t all_thread_nu | |||
| size_t core_num = std::thread::hardware_concurrency(); | |||
| THREAD_INFO("ThreadInfo, Actor: [%zu], All: [%zu], CoreNum: [%zu]", actor_thread_num, all_thread_num, core_num); | |||
| actor_thread_num_ = actor_thread_num < core_num ? actor_thread_num : core_num; | |||
| if (actor_thread_num_ <= 0 || actor_thread_num > all_thread_num) { | |||
| if (actor_thread_num > all_thread_num) { | |||
| THREAD_ERROR("thread num is invalid"); | |||
| return THREAD_ERROR; | |||
| } | |||
| @@ -23,6 +23,7 @@ | |||
| #include <atomic> | |||
| #include <condition_variable> | |||
| #include "thread/threadpool.h" | |||
| #include "thread/core_affinity.h" | |||
| #include "actor/actor.h" | |||
| #include "thread/hqueue.h" | |||
| #define USE_HQUEUE | |||
| @@ -197,7 +197,11 @@ void ThreadPool::DistributeTask(Task *task, int task_num) const { | |||
| int sum_frequency = 0; | |||
| std::vector<Worker *> assigned; | |||
| int num = static_cast<int>(workers_.size()) - 1; | |||
| for (int i = num; i >= 0 && count < num_assigned; --i) { | |||
| int offset = 0; | |||
| if (!occupied_actor_thread_) { | |||
| offset = static_cast<int>(actor_thread_num_); | |||
| } | |||
| for (int i = num; i >= offset && count < num_assigned; --i) { | |||
| if (workers_[i]->available()) { | |||
| assigned.push_back(workers_[i]); | |||
| sum_frequency += workers_[i]->frequency(); | |||
| @@ -24,6 +24,7 @@ | |||
| #include <atomic> | |||
| #include <condition_variable> | |||
| #include <mutex> | |||
| #include <functional> | |||
| #include "thread/threadlog.h" | |||
| #include "thread/core_affinity.h" | |||
| @@ -40,7 +41,7 @@ enum ThreadStatus { | |||
| // used in scenarios with unequal division of task | |||
| // the parameters indicate the start and end coefficients | |||
| using Func = int (*)(void *, int, float, float); | |||
| using Func = std::function<int(void *, int, float, float)>; | |||
| using Content = void *; | |||
| typedef struct Task { | |||
| @@ -113,6 +114,10 @@ class ThreadPool { | |||
| int SetProcessAffinity(BindMode bind_mode) const; | |||
| int ParallelLaunch(const Func &func, Content content, int task_num) const; | |||
| void DisableOccupiedActorThread() { occupied_actor_thread_ = false; } | |||
| void SetActorThreadNum(size_t actor_thread_num) { actor_thread_num_ = actor_thread_num; } | |||
| void SetKernelThreadNum(size_t kernel_thread_num) { kernel_thread_num_ = kernel_thread_num; } | |||
| size_t GetKernelThreadNum() const { return kernel_thread_num_; } | |||
| protected: | |||
| ThreadPool() = default; | |||
| @@ -132,6 +137,9 @@ class ThreadPool { | |||
| std::mutex pool_mutex_; | |||
| std::vector<Worker *> workers_; | |||
| CoreAffinity *affinity_{nullptr}; | |||
| size_t actor_thread_num_{0}; | |||
| size_t kernel_thread_num_{0}; | |||
| bool occupied_actor_thread_{true}; | |||
| }; | |||
| } // namespace mindspore | |||