Browse Source

Merge 75e90b47c4 into d68ea6b4d5

pull/6197/merge
DC Technology GitHub 10 months ago
parent
commit
2b095b8acc
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
17 changed files with 560 additions and 4 deletions
  1. +1
    -0
      .gitignore
  2. +2
    -1
      CMakeLists.txt
  3. +12
    -0
      src/CMakeLists.txt
  4. +69
    -0
      src/TheadInfo.cpp
  5. +30
    -0
      src/TheadInfo.h
  6. +51
    -3
      src/cpu.cpp
  7. +5
    -0
      src/cpu.h
  8. +5
    -0
      src/layer.cpp
  9. +5
    -0
      src/layer.h
  10. +54
    -0
      src/layer/absval.cpp
  11. +1
    -0
      src/layer/absval.h
  12. +2
    -0
      src/layer/batchnorm.h
  13. +1
    -0
      src/platform.h.in
  14. +164
    -0
      src/thread.cpp
  15. +42
    -0
      src/thread.h
  16. +1
    -0
      tests/CMakeLists.txt
  17. +115
    -0
      tests/test_thread.cpp

+ 1
- 0
.gitignore View File

@@ -60,3 +60,4 @@ python/setup.py

# Xmake
.xmake/
CMakePresets.json

+ 2
- 1
CMakeLists.txt View File

@@ -81,7 +81,7 @@ option(NCNN_SIMPLEVK "minimal in-house vulkan loader" ON)
option(NCNN_SYSTEM_GLSLANG "use system glslang library" OFF)
option(NCNN_RUNTIME_CPU "runtime dispatch cpu routines" ON)
option(NCNN_DISABLE_PIC "disable position-independent code" OFF)
option(NCNN_BUILD_TESTS "build tests" OFF)
option(NCNN_BUILD_TESTS "build tests" ON)
option(NCNN_COVERAGE "build for coverage" OFF)
option(NCNN_ASAN "build for address sanitizer" OFF)
option(NCNN_BUILD_BENCHMARK "build benchmark" ON)
@@ -89,6 +89,7 @@ option(NCNN_PYTHON "build python api" OFF)
option(NCNN_INT8 "int8 inference" ON)
option(NCNN_BF16 "bf16 inference" ON)
option(NCNN_FORCE_INLINE "force inline some function" ON)
option(NCNN_MUTITHREAD "enable multi thread bata" ON)

if(ANDROID OR IOS OR NCNN_SIMPLESTL)
option(NCNN_DISABLE_RTTI "disable rtti" ON)


+ 12
- 0
src/CMakeLists.txt View File

@@ -48,6 +48,13 @@ if(ANDROID)
list(APPEND ncnn_SRCS mat_pixel_android.cpp)
endif()

if(NCNN_MUTITHREAD)
list(APPEND ncnn_SRCS thread.cpp)
if(WIN32)
list(APPEND ncnn_SRCS TheadInfo.cpp)
endif()
endif()

ncnn_src_group(ncnn_SRCS "sources")

include_directories("${CMAKE_CURRENT_SOURCE_DIR}/layer/${NCNN_TARGET_ARCH}")
@@ -267,6 +274,11 @@ if(NCNN_THREADS)
target_link_libraries(ncnn PUBLIC pthread)
endif()
endif()
if(NCNN_MUTITHREAD)
if(NOT WIN32 AND (NOT NCNN_SIMPLEOMP) AND (NOT NCNN_SIMPLESTL))
target_link_libraries(ncnn PUBLIC -pthread)
endif()
endif()

if(NCNN_VULKAN)
if(NCNN_SIMPLEVK)


+ 69
- 0
src/TheadInfo.cpp View File

@@ -0,0 +1,69 @@
#ifdef NCNN_MUTITHREAD
#ifdef _WIN32

#include "TheadInfo.h"
namespace ncnn {

// 初始化静态成员
ThreadInfo* ThreadInfo::thread_info = nullptr;

ThreadInfo::ThreadInfo(/* args */)
{
int groupCount = GetActiveProcessorGroupCount();
for (WORD group = 0; group < groupCount; group++)
{
DWORD processorsInGroup = GetActiveProcessorCount(group);
for (int i = 0; i < static_cast<int>(processorsInGroup); i++)
{
CoreInfo info;
info.group = group;
info.id = i + core_infos.size();
info.affinity = (static_cast<DWORD_PTR>(1) << i);
core_infos.push_back(info);
}
}
}

ThreadInfo* ThreadInfo::get()
{
static Mutex lock;
AutoLock guard(lock);

if (!thread_info)
{
thread_info = new ThreadInfo();
}
return thread_info;
}

CoreInfo ThreadInfo::getCurrentCore()
{
// 获取当前线程运行的CPU核心(支持多处理器组)
DWORD_PTR process_affinity, system_affinity;
GetProcessAffinityMask(GetCurrentProcess(), &process_affinity, &system_affinity);

// 使用扩展API获取处理器组信息
PROCESSOR_NUMBER proc_num;
GetCurrentProcessorNumberEx(&proc_num);

for (const auto& core : core_infos)
{
// 匹配组号和组内核心编号
if (core.group == proc_num.Group && (core.affinity & (1ULL << proc_num.Number)))
{
return core;
}
}

// 未找到时返回默认值
return {-1, -1, 0};
}

void ThreadInfo::getAllCore(std::vector<CoreInfo>& out)
{
out = core_infos;
}
} // namespace ncnn

#endif
#endif

+ 30
- 0
src/TheadInfo.h View File

@@ -0,0 +1,30 @@
#ifndef THREAD_INFO_H
#define THREAD_INFO_H
#ifdef NCNN_MUTITHREAD
#if defined _WIN32
#include "cpu.h"
namespace ncnn {
struct CoreInfo
{
public:
int id;
int group;
DWORD_PTR affinity;
};
class ThreadInfo
{
private:
static ThreadInfo* thread_info;
std::vector<CoreInfo> core_infos;
ThreadInfo(/* args */);

public:
static ThreadInfo* get();
CoreInfo getCurrentCore();
void getAllCore(std::vector<CoreInfo>& out);
};
} // namespace ncnn

#endif
#endif
#endif

+ 51
- 3
src/cpu.cpp View File

@@ -1432,12 +1432,21 @@ static std::vector<int> get_max_freq_mhz()

static int set_sched_affinity(const ncnn::CpuSet& thread_affinity_mask)
{
#ifdef _WIN32
GROUP_AFFINITY groupAffinity;
ZeroMemory(&groupAffinity, sizeof(groupAffinity));
groupAffinity.Group = static_cast<WORD>(thread_affinity_mask.cpu_group);
groupAffinity.Mask = thread_affinity_mask.mask;

SetThreadGroupAffinity(GetCurrentThread(), &groupAffinity, NULL);
#else
DWORD_PTR prev_mask = SetThreadAffinityMask(GetCurrentThread(), thread_affinity_mask.mask);
if (prev_mask == 0)
{
NCNN_LOGE("SetThreadAffinityMask failed %d", GetLastError());
return -1;
}
#endif

return 0;
}
@@ -2273,22 +2282,27 @@ CpuSet::CpuSet()

void CpuSet::enable(int cpu)
{
mask |= ((ULONG_PTR)1 << cpu);
cpu_group = cpu / 64;
mask |= ((ULONG_PTR)1 << (cpu - cpu_group * 64));
}

void CpuSet::disable(int cpu)
{
mask &= ~((ULONG_PTR)1 << cpu);
cpu_group = cpu / 64;
mask &= ~((ULONG_PTR)1 << (cpu - cpu_group * 64));
}

void CpuSet::disable_all()
{
cpu_group = 0;
mask = 0;
}

bool CpuSet::is_enabled(int cpu) const
{
return mask & ((ULONG_PTR)1 << cpu);
if (cpu_group != cpu / 64)
return false;
return mask & ((ULONG_PTR)1 << (cpu - cpu_group * 64));
}

int CpuSet::num_enabled() const
@@ -3273,4 +3287,38 @@ int set_flush_denormals(int flush_denormals)
#endif
}

int get_multi_thread_batch()
{
#if defined(_NCNN_MUTITHREAD)
#if defined _WIN32
DWORD length = 0;
GetLogicalProcessorInformation(NULL, &length);
if (GetLastError() != ERROR_INSUFFICIENT_BUFFER)
return 0;

PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buffer = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION)malloc(length);

int count = 0;
if (GetLogicalProcessorInformation(buffer, &length))
{
DWORD offset = 0;
while (offset < length)
{
if (buffer->Relationship == RelationProcessorCore)
count++;

offset += sizeof(SYSTEM_LOGICAL_PROCESSOR_INFORMATION);
buffer++;
}
}
free(buffer);
return count;
#else
return get_cpu_count();
#endif
#else
return get_cpu_count();
#endif
}

} // namespace ncnn

+ 5
- 0
src/cpu.h View File

@@ -8,6 +8,7 @@

#if defined _WIN32
#define WIN32_LEAN_AND_MEAN
#define _WIN32_WINNT 0x0601 // Windows 7+
#include <windows.h>
#endif
#if defined __ANDROID__ || defined __linux__
@@ -30,6 +31,7 @@ public:

public:
#if defined _WIN32
int cpu_group;
ULONG_PTR mask;
#endif
#if defined __ANDROID__ || defined __linux__
@@ -172,6 +174,9 @@ NCNN_EXPORT void set_kmp_blocktime(int time_ms);
NCNN_EXPORT int get_flush_denormals();
NCNN_EXPORT int set_flush_denormals(int flush_denormals);

// multi thread batch inference
NCNN_EXPORT int get_multi_thread_batch();

} // namespace ncnn

#endif // NCNN_CPU_H

+ 5
- 0
src/layer.cpp View File

@@ -98,6 +98,11 @@ int Layer::forward_inplace(Mat& /*bottom_top_blob*/, const Option& /*opt*/) cons
return -1;
}

int Layer::forward_thread(void* /*info*/) const
{
return -1;
}

#if NCNN_VULKAN
int Layer::upload_model(VkTransfer& /*cmd*/, const Option& /*opt*/)
{


+ 5
- 0
src/layer.h View File

@@ -94,6 +94,10 @@ public:
// return 0 if success
virtual int forward_inplace(std::vector<Mat>& bottom_top_blobs, const Option& opt) const;
virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) const;
/// @brief mutithread work function
/// @param workspace thread infomation
/// @return 0 if success
virtual int forward_thread(void* workspace);

#if NCNN_VULKAN
public:
@@ -139,6 +143,7 @@ public:
// layer factory function
typedef Layer* (*layer_creator_func)(void*);
typedef void (*layer_destroyer_func)(Layer*, void*);
typedef int (*layer_work_func)(Layer*, void*);

struct layer_registry_entry
{


+ 54
- 0
src/layer/absval.cpp View File

@@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

#include "absval.h"
#include "thread.h"

namespace ncnn {

@@ -17,6 +18,16 @@ int AbsVal::forward_inplace(Mat& bottom_top_blob, const Option& opt) const
int h = bottom_top_blob.h;
int channels = bottom_top_blob.c;
int size = w * h;
if (opt.num_threads > 64)
{
ThreadWorkspace workspace;
workspace.layer = (Layer*)this;
MutilThread thread(workspace, opt);
std::vector<Mat> workspace_blobs;
workspace_blobs.push_back(bottom_top_blob);
thread.join(workspace_blobs);
return 0;
}

#pragma omp parallel for num_threads(opt.num_threads)
for (int q = 0; q < channels; q++)
@@ -33,4 +44,47 @@ int AbsVal::forward_inplace(Mat& bottom_top_blob, const Option& opt) const
return 0;
}

int AbsVal::forward_thread(void* workspace)
{
ThreadInfoExc* info = (ThreadInfoExc*)workspace;
Mat& bottom_top_blob = info->mats->at(0);
if (bottom_top_blob.elemsize == 1)
{
int8_t* ptr = (int8_t*)bottom_top_blob.data;
const int8_t flag = 1 << 7;
for (size_t i = info->start_index; i < info->end_index; i++)
{
if (ptr[i] & flag)
{
ptr[i] = -ptr[i];
}
}
}
else if (bottom_top_blob.elemsize == 2)
{
int16_t* ptr = (int16_t*)bottom_top_blob.data;
const int16_t flag = 1 << 15;
for (size_t i = info->start_index; i < info->end_index; i++)
{
if (ptr[i] & flag)
{
ptr[i] = -ptr[i];
}
}
}
else
{
float* ptr = (float*)bottom_top_blob.data;
for (size_t i = info->start_index; i < info->end_index; i++)
{
if (ptr[i] < 0)
{
ptr[i] = -ptr[i];
}
}
}

return 0;
}

} // namespace ncnn

+ 1
- 0
src/layer/absval.h View File

@@ -14,6 +14,7 @@ public:
AbsVal();

virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) const;
virtual int forward_thread(void* workspace);
};

} // namespace ncnn


+ 2
- 0
src/layer/batchnorm.h View File

@@ -19,6 +19,8 @@ public:

virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt) const;

virtual int forward_thread(void* workspace);

public:
// param
int channels;


+ 1
- 0
src/platform.h.in View File

@@ -57,6 +57,7 @@
#cmakedefine01 NCNN_INT8
#cmakedefine01 NCNN_BF16
#cmakedefine01 NCNN_FORCE_INLINE
#cmakedefine01 NCNN_MUTITHREAD

#cmakedefine NCNN_VERSION_STRING "@NCNN_VERSION_STRING@"



+ 164
- 0
src/thread.cpp View File

@@ -0,0 +1,164 @@
#include "thread.h"
#include "cpu.h"
#if defined __ANDROID__ || defined __linux__
#include <sched.h>
#endif

#if defined _WIN32
DWORD WINAPI winWorker(LPVOID lpParam)
{
ncnn::ThreadInfoExc* info = (ncnn::ThreadInfoExc*)lpParam;
if (info->coreinfo->group >= 0 && info->coreinfo->affinity != 0)
{
GROUP_AFFINITY groupAffinity;
ZeroMemory(&groupAffinity, sizeof(groupAffinity));
groupAffinity.Group = static_cast<WORD>(info->coreinfo->group);
groupAffinity.Mask = info->coreinfo->affinity;

SetThreadGroupAffinity(GetCurrentThread(), &groupAffinity, NULL);
}
info->workspace->layer->forward_thread(info);
info->manager->threadsComplete[info->threadid] = true;
delete info;
return 0;
}
#else
void* pthreadWorker(void* lpParam)
{
ncnn::ThreadInfoExc* info = (ncnn::ThreadInfoExc*)lpParam;
#if defined __ANDROID__ || defined __linux__
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(info->threadid, &cpuset);
// 绑定到指定核心
pthread_t current_thread = pthread_self();
pthread_setaffinity_np(current_thread, sizeof(cpu_set_t), &cpuset);
#endif
info->workspace->layer->forward_thread(info);
info->manager->threadsComplete[info->threadid] = true;
delete info;
return nullptr;
}
#endif
namespace ncnn {
MutilThread::MutilThread(ThreadWorkspace _workspace, const Option& opt)
{
workspace = _workspace;
m_opt = opt;
threadsComplete.resize(opt.num_threads);
for (int i = 0; i < opt.num_threads; i++)
{
threadsComplete[i] = false;
}
threadsComplete[helpid] = true;
}

MutilThread::~MutilThread()
{
threadsComplete.clear();
}

void MutilThread::join(std::vector<Mat>& mats)
{
#if defined _WIN32
Mat mat = mats[0];
CoreInfo cur = ThreadInfo::get()->getCurrentCore();
std::vector<CoreInfo> cores;
ThreadInfo::get()->getAllCore(cores);
std::vector<HANDLE> handles;
ThreadInfoExc* curinfo = nullptr;
size_t workersize = ((mat.w * mat.h * mat.d) / m_opt.num_threads + 1) * mat.c * mat.elemsize;
size_t matlen = mats.size();
for (int i = 0; i < m_opt.num_threads; i++)
{
ThreadInfoExc* info = new ThreadInfoExc();
info->threadid = i;
info->start_index = i * workersize;
info->end_index = (i + 1) * workersize;
if (info->end_index > matlen)
{
info->end_index = matlen;
}
info->workspace = &workspace;
info->mats = &mats;
info->opt = &m_opt;
info->coreinfo = &cores[i];
threadsComplete[i] = false;
info->manager = this;
if (cur.id == cores[i].id)
{
helpid = i;
threadsComplete[i] = true;
handles.push_back(nullptr);
curinfo = info;
continue;
}
handles.push_back(CreateThread(nullptr, 0, winWorker, info, 0, nullptr));
}
workspace.layer->forward_thread(curinfo);
delete curinfo;
bool check = true;
do
{
check = false;
for (int i = 0; i < m_opt.num_threads; i++)
{
if (threadsComplete[i] == false)
{
check = true;
break;
}
}
} while (check);
for (size_t i = 0; i < handles.size(); i++)
{
if (handles[i] != nullptr)
{
CloseHandle(handles[i]);
}
}
handles.clear();
#else
Mat mat = mats[0];
int curid = -1;
#if defined __ANDROID__ || defined __linux__
curid = sched_getcpu();
#endif

std::vector<pthread_t> pthread_handles;
ThreadInfoExc* curinfo = nullptr;
size_t workersize = ((mat.w * mat.h * mat.d) / m_opt.num_threads + 1) * mat.c * mat.elemsize;
size_t matlen = mats.size();
for (int i = 0; i < m_opt.num_threads; i++)
{
ThreadInfoExc* info = new ThreadInfoExc();
info->threadid = i;
info->start_index = i * workersize;
info->end_index = (i + 1) * workersize;
if (info->end_index > matlen)
{
info->end_index = matlen;
}
info->workspace = &workspace;
info->mats = &mats;
info->opt = &m_opt;
threadsComplete[i] = false;
info->manager = this;
if (curid == cores[i].id && curid > -1)
{
helpid = i;
threadsComplete[i] = true;
curinfo = info;
continue;
}
pthread_handles.push_back(pthread_create(&pthread_handles[i], nullptr, pthreadWorker, info));
}
workspace.layer->forward_thread(curinfo);
delete curinfo;
for (size_t i = 0; i < pthread_handles.size(); i++)
{
pthread_join(pthread_handles[i], nullptr);
}
#endif
}
} // namespace ncnn

+ 42
- 0
src/thread.h View File

@@ -0,0 +1,42 @@
#ifndef THREAD_H
#define THREAD_H
#include "layer.h"
#include "TheadInfo.h"
#if defined __ANDROID__ || defined __linux__ || defined __APPLE__
#include <pthread.h>
#endif
namespace ncnn {
class MutilThread;
struct ThreadWorkspace
{
Layer* layer;
};
struct ThreadInfoExc
{
int threadid;
size_t start_index;
size_t end_index;
ThreadWorkspace* workspace;
std::vector<ncnn::Mat>* mats;
Option* opt;
MutilThread* manager;
#if defined _WIN32
CoreInfo* coreinfo;
#endif
};
class MutilThread
{
private:
Option m_opt;
volatile int helpid;
ThreadWorkspace workspace;

public:
MutilThread(ThreadWorkspace _workspace, const Option& opt);
void join(std::vector<ncnn::Mat>& mats);
std::vector<bool> threadsComplete;
~MutilThread();
};

} // namespace ncnn
#endif

+ 1
- 0
tests/CMakeLists.txt View File

@@ -62,6 +62,7 @@ ncnn_add_test(c_api)
ncnn_add_test(cpu)
ncnn_add_test(expression)
ncnn_add_test(paramdict)
ncnn_add_test(thread)

if(NCNN_VULKAN)
ncnn_add_test(command)


+ 115
- 0
tests/test_thread.cpp View File

@@ -0,0 +1,115 @@
#include "testutil.h"
#include "thread.h"

class TestLayer : public ncnn::Layer
{
public:
virtual int forward_inplace(Mat& bottom_top_blob, const Option& opt)
{
ThreadWorkspace workspace;
workspace.layer = (Layer*)this;
MutilThread thread(workspace, opt);
std::vector<Mat> workspace_blobs;
workspace_blobs.push_back(bottom_top_blob);
thread.join(workspace_blobs);
return 0;
}
virtual int forward_thread(void* workspace)
{
ThreadInfoExc* info = (ThreadInfoExc*)workspace;
Mat& bottom_top_blob = info->mats->at(0);
if (bottom_top_blob.elemsize == 1)
{
int8_t* ptr = (int8_t*)bottom_top_blob.data;
const int8_t flag = 1 << 7;
for (size_t i = info->start_index; i < info->end_index; i++)
{
if (ptr[i] & flag)
{
ptr[i] = -ptr[i];
}
}
}
else if (bottom_top_blob.elemsize == 2)
{
int16_t* ptr = (int16_t*)bottom_top_blob.data;
const int16_t flag = 1 << 15;
for (size_t i = info->start_index; i < info->end_index; i++)
{
if (ptr[i] & flag)
{
ptr[i] = -ptr[i];
}
}
}
else
{
float* ptr = (float*)bottom_top_blob.data;
for (size_t i = info->start_index; i < info->end_index; i++)
{
if (ptr[i] < 0)
{
ptr[i] = -ptr[i];
}
}
}

return 0;
}
};

static int test_thread(const ncnn::Mat& a)
{
ncnn::ParamDict pd;

std::vector<ncnn::Mat> weights(0);

int ret = test_layer("TestLayer", pd, weights, a);
if (ret != 0)
{
fprintf(stderr, "test_thread failed a.dims=%d a=(%d %d %d %d)\n", a.dims, a.w, a.h, a.d, a.c);
}

return ret;
}

static int test_thread_0()
{
return 0
|| test_thread(RandomMat(5, 6, 7, 24))
|| test_thread(RandomMat(5, 6, 7, 12))
|| test_thread(RandomMat(5, 6, 7, 13));
}

static int test_thread_1()
{
return 0
|| test_thread(RandomMat(5, 7, 24))
|| test_thread(RandomMat(5, 6, 24))
|| test_thread(RandomMat(7, 9, 24));
}

static int test_thread_2()
{
return 0
|| test_thread(RandomMat(7, 12))
|| test_thread(RandomMat(5, 12))
|| test_thread(RandomMat(9, 12));
}

static int test_thread_3()
{
return 0
|| test_thread(RandomMat(7))
|| test_thread(RandomMat(128))
|| test_thread(RandomMat(256));
}

int main()
{
return 0
|| test_thread_0()
|| test_thread_1()
|| test_thread_2()
|| test_thread_3();
}

Loading…
Cancel
Save