Browse Source

add parallel profiling

feature/build-system-rewrite
fangzehua 4 years ago
parent
commit
e64055d2ca
11 changed files with 222 additions and 77 deletions
  1. +3
    -3
      mindspore/ccsrc/plugin/device/cpu/hal/hardware/cpu_device_context.cc
  2. +79
    -10
      mindspore/ccsrc/profiler/device/cpu/cpu_profiling.cc
  3. +6
    -1
      mindspore/ccsrc/profiler/device/cpu/cpu_profiling.h
  4. +6
    -3
      mindspore/ccsrc/profiler/device/data_saver.cc
  5. +2
    -0
      mindspore/ccsrc/profiler/device/profiling.cc
  6. +4
    -0
      mindspore/ccsrc/profiler/device/profiling.h
  7. +1
    -0
      mindspore/core/mindrt/src/actor/actormgr.cc
  8. +8
    -0
      mindspore/core/mindrt/src/thread/threadpool.cc
  9. +4
    -0
      mindspore/core/mindrt/src/thread/threadpool.h
  10. +1
    -1
      mindspore/lite/test/config/cropped_size.cfg
  11. +108
    -59
      mindspore/python/mindspore/profiler/parser/integrator.py

+ 3
- 3
mindspore/ccsrc/plugin/device/cpu/hal/hardware/cpu_device_context.cc View File

@@ -317,7 +317,6 @@ bool CPUDeviceContext::LaunchKernelWithProfiling(const CNodePtr &kernel, const s
const std::vector<AddressPtr> &workspace,
const std::vector<AddressPtr> &outputs) const {
MS_EXCEPTION_IF_NULL(kernel);
std::lock_guard<std::mutex> locker(launch_mutex_);

auto profiler_inst = profiler::cpu::CPUProfiler::GetInstance();
MS_EXCEPTION_IF_NULL(profiler_inst);
@@ -326,9 +325,10 @@ bool CPUDeviceContext::LaunchKernelWithProfiling(const CNodePtr &kernel, const s
MS_EXCEPTION_IF_NULL(kernel_mod);

uint32_t pid = IntToUint(getpid());
profiler_inst->OpDataProducerBegin(kernel->fullname_with_scope(), pid);
// cpu support multi-thread with mindrt for profiling.
profiler_inst->OpDataProducerBeginParallel(kernel->fullname_with_scope(), pid);
bool ret = DoLaunchKernel(kernel_mod, inputs, workspace, outputs);
profiler_inst->OpDataProducerEnd();
profiler_inst->OpDataProducerEndParallel(kernel->fullname_with_scope());

return ret;
}


+ 79
- 10
mindspore/ccsrc/profiler/device/cpu/cpu_profiling.cc View File

@@ -44,19 +44,88 @@ void CPUProfiler::StepProfilingEnable(const bool enable_flag) {
enable_flag_ = enable_flag;
}

void CPUProfiler::SetRunTimeData(const std::string &op_name, const uint32_t pid) {
void CPUProfiler::SetRunTimeData(const std::string &op_name, const uint32_t pid, bool is_parallel) {
if (!is_parallel) {
op_name_ = op_name;
pid_ = pid;
}
{
std::shared_lock<std::shared_mutex> lock(op_map_mutex_);
auto iter = op_info_map_.find(op_name);
if (iter != op_info_map_.end()) {
iter->second.op_count += 1;
return;
}
}
std::unique_lock<std::shared_mutex> lock(op_map_mutex_);
OpInfo op_info;
op_info.op_name = op_name;
op_info.pid = pid;
op_info.op_count = 1;
op_info_map_[op_name] = op_info;
}

void CPUProfiler::SetRuntimeStart(const std::string op_name, const uint64_t start_timestamp) {
std::shared_lock<std::shared_mutex> lock(op_map_mutex_);
auto iter = op_info_map_.find(op_name);
if (iter != op_info_map_.end()) {
iter->second.op_count += 1;
} else {
OpInfo op_info;
op_info.op_name = op_name;
op_info.pid = pid;
op_info.op_count = 1;
op_info_map_[op_name] = op_info;
iter->second.tmp_start_duration.start_timestamp = start_timestamp;
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);
auto thread_pool = actor_manager->GetActorThreadPool();
auto worker_ids_map = thread_pool->GetWorkerIdMap();
auto id_iter = worker_ids_map.find(std::this_thread::get_id());
if (id_iter != worker_ids_map.end()) {
iter->second.tmp_start_duration.tid = id_iter->second;
}
}
op_name_ = op_name;
pid_ = pid;
}

float CPUProfiler::SetRuntimeEnd(const std::string op_name, const uint64_t stop_timestamp) {
float op_time_elapsed = 0;
std::shared_lock<std::shared_mutex> lock(op_map_mutex_);
auto iter = op_info_map_.find(op_name);
if (iter != op_info_map_.end()) {
iter->second.tmp_start_duration.duration =
(stop_timestamp - iter->second.tmp_start_duration.start_timestamp) / kNanosecondToMillisecond;
auto actor_manager = ActorMgr::GetActorMgrRef();
MS_EXCEPTION_IF_NULL(actor_manager);
auto thread_pool = actor_manager->GetActorThreadPool();
auto worker_ids_map = thread_pool->GetWorkerIdMap();
auto id_iter = worker_ids_map.find(std::this_thread::get_id());
if (id_iter != worker_ids_map.end()) {
if (iter->second.tmp_start_duration.tid != id_iter->second) {
MS_LOG(EXCEPTION) << "Op " << op_name << " start time thread id must be equal to end thread id.";
}
}
iter->second.start_duration.emplace_back(iter->second.tmp_start_duration);
op_time_elapsed = iter->second.tmp_start_duration.duration;
}
return op_time_elapsed;
}

void CPUProfiler::OpDataProducerBeginParallel(const std::string op_name, const uint32_t pid) {
auto start_timestamp = GetHostMonoTimeStamp();
SetRunTimeData(op_name, pid, true);
SetRuntimeStart(op_name, start_timestamp);

#if ENABLE_GPU
if (MsContext::GetInstance()->get_param<bool>(MS_CTX_ENABLE_MINDRT)) {
// For heterogeneous scene, record op name to gpu_profiler_inst.
auto gpu_profiler_inst = profiler::gpu::GPUProfiler::GetInstance();
// For cpu network, no gpu profiler, do not to raise exception.
if (gpu_profiler_inst && gpu_profiler_inst->GetEnableFlag()) {
gpu_profiler_inst->RecordOneStepStartEndInfo(op_name);
}
}
#endif
}

void CPUProfiler::OpDataProducerEndParallel(const std::string op_name) {
auto stop_timestamp = GetHostMonoTimeStamp();
float op_time_elapsed = SetRuntimeEnd(op_name, stop_timestamp);
MS_LOG(DEBUG) << "Host Time Elapsed(ms)," << op_name << "," << op_time_elapsed;
Profiler::SetRunTimeData(op_name, op_time_elapsed);
}

void CPUProfiler::OpDataProducerBegin(const std::string op_name, const uint32_t pid) {


+ 6
- 1
mindspore/ccsrc/profiler/device/cpu/cpu_profiling.h View File

@@ -27,6 +27,7 @@
#if ENABLE_GPU
#include "profiler/device/gpu/gpu_profiling.h"
#endif
#include "actor/actormgr.h"

namespace mindspore {
namespace profiler {
@@ -46,9 +47,13 @@ class CPUProfiler : public Profiler {
void StepProfilingEnable(const bool enable_flag) override;
void OpDataProducerBegin(const std::string op_name, const uint32_t pid);
void OpDataProducerEnd() override;
void OpDataProducerEndParallel(const std::string op_name);
void OpDataProducerBeginParallel(const std::string op_name, const uint32_t pid);
float SetRuntimeEnd(const std::string op_name, const uint64_t stop_timestamp);
void SetRuntimeStart(const std::string op_name, const uint64_t start_timestamp);

private:
void SetRunTimeData(const std::string &op_name, const uint32_t pid);
void SetRunTimeData(const std::string &op_name, const uint32_t pid, bool is_parallel = false);
void SaveProfileData() override;
void ClearInst() override;



+ 6
- 3
mindspore/ccsrc/profiler/device/data_saver.cc View File

@@ -167,11 +167,14 @@ void DataSaver::WriteOpTimestamp(const std::string &saver_base_dir) const {
for (const auto &op_timestamp_info : op_timestamps_map_) {
if (op_side_ == "cpu") {
ofs << op_timestamp_info.first << ";HostCpuOps;";
for (auto start_end : op_timestamp_info.second) {
ofs << start_end.start_timestamp << "," << start_end.duration << "," << start_end.tid << " ";
}
} else {
ofs << op_timestamp_info.first << ";GpuOps;";
}
for (auto start_end : op_timestamp_info.second) {
ofs << start_end.start_timestamp << "," << start_end.duration << " ";
for (auto start_end : op_timestamp_info.second) {
ofs << start_end.start_timestamp << "," << start_end.duration << " ";
}
}
ofs << std::endl;
}


+ 2
- 0
mindspore/ccsrc/profiler/device/profiling.cc View File

@@ -52,6 +52,7 @@ uint64_t Profiler::GetHostMonoTimeStamp() const {
}

void Profiler::SetRunTimeData(const std::string &op_name, const float time_elapsed) {
std::shared_lock<std::shared_mutex> lock(op_map_mutex_);
auto iter = op_info_map_.find(op_name);
if (iter != op_info_map_.end()) {
iter->second.op_host_cost_time += time_elapsed;
@@ -59,6 +60,7 @@ void Profiler::SetRunTimeData(const std::string &op_name, const float time_elaps
}

void Profiler::SetRunTimeData(const std::string &op_name, const uint64_t start, const float duration) {
std::shared_lock<std::shared_mutex> lock(op_map_mutex_);
auto iter = op_info_map_.find(op_name);
if (iter != op_info_map_.end()) {
iter->second.start_duration.emplace_back(StartDuration({start, duration}));


+ 4
- 0
mindspore/ccsrc/profiler/device/profiling.h View File

@@ -21,6 +21,7 @@
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>
@@ -32,6 +33,7 @@ namespace profiler {
struct StartDuration {
uint64_t start_timestamp = 0l;
float duration = 0l;
size_t tid = 0;
};

struct OneStepStartEndInfo {
@@ -48,6 +50,7 @@ struct OpInfo {
int op_kernel_api_count = 0;
int op_kernel_count = 0;
int op_count = 0;
StartDuration tmp_start_duration;
std::vector<StartDuration> start_duration;
void *stream;
uint32_t pid;
@@ -101,6 +104,7 @@ class Profiler {
OneStepStartEndInfo step_start_end_info_;
std::vector<OneStepStartEndInfo> all_step_start_end_info_;
std::vector<std::string> step_start_end_info_vector_;
std::shared_mutex op_map_mutex_;
std::mutex record_mutex_;
bool init_flag_ = false;
};


+ 1
- 0
mindspore/core/mindrt/src/actor/actormgr.cc View File

@@ -77,6 +77,7 @@ int ActorMgr::Initialize(bool use_inner_pool, size_t actor_thread_num, size_t ma
inner_pool_->SetMaxSpinCount(kDefaultSpinCount);
inner_pool_->SetSpinCountMaxValue();
inner_pool_->SetKernelThreadMaxSpinCount(kDefaultKernelSpinCount);
inner_pool_->SetWorkerIdMap();
}
}
return MINDRT_OK;


+ 8
- 0
mindspore/core/mindrt/src/thread/threadpool.cc View File

@@ -409,4 +409,12 @@ ThreadPool *ThreadPool::CreateThreadPool(size_t thread_num, const std::vector<in
}
return pool;
}

void ThreadPool::SetWorkerIdMap() {
for (size_t i = 0; i < workers_.size(); ++i) {
auto thread_id = workers_[i]->thread_id();
worker_ids_[thread_id] = i;
}
return;
}
} // namespace mindspore

+ 4
- 0
mindspore/core/mindrt/src/thread/threadpool.h View File

@@ -19,6 +19,7 @@

#include <new>
#include <vector>
#include <unordered_map>
#include <memory>
#include <thread>
#include <atomic>
@@ -146,6 +147,8 @@ class MS_CORE_API ThreadPool {
void SetMaxSpinCount(int spin_count);
void SetMinSpinCount(int spin_count);
void ActiveWorkers() const;
void SetWorkerIdMap();
const std::unordered_map<std::thread::id, size_t> &GetWorkerIdMap() const { return worker_ids_; }

protected:
ThreadPool() = default;
@@ -164,6 +167,7 @@ class MS_CORE_API ThreadPool {

std::mutex pool_mutex_;
std::vector<Worker *> workers_;
std::unordered_map<std::thread::id, size_t> worker_ids_;
CoreAffinity *affinity_{nullptr};
size_t actor_thread_num_{0};
size_t kernel_thread_num_{0};


+ 1
- 1
mindspore/lite/test/config/cropped_size.cfg View File

@@ -1 +1 @@
839924
844020

+ 108
- 59
mindspore/python/mindspore/profiler/parser/integrator.py View File

@@ -541,6 +541,14 @@ class BaseTimelineGenerator:
_max_scope_name_num = 0
_host_cpu_op_label = 'HostCpuOps'

def __init__(self):
self._tid_dict = {
"computation_op": (self._MERGED_COMPUTATION_TID, self._OP_OVERLAP_PID),
"communication_not_overlapped": (self._PURE_COMMUNICATION_TID, self._OP_OVERLAP_PID),
"communication": (self._MERGED_COMMUNICATION_TID, self._OP_OVERLAP_PID),
"free_time": (self._FREE_TIME_TID, self._OP_OVERLAP_PID)
}

def get_thread_label_name(self):
"""Get process and thread config."""
return [
@@ -579,6 +587,54 @@ class BaseTimelineGenerator:
"args": {"sort_index": self._FREE_TIME_TID}}
]

def _get_merged_time_list(self, time_list, get_interval_time=False, display_name="computation_op", factor=1):
"""
Get merged time segment list.

The process of merge is, for example, there is a list [[1,5], [2,6], [7,8]],
each items in this list contains a start_time and end_time,
the merged result is [[1,6], [7,8]].
"""
time_merged_segment_list = []
tid = self._tid_dict[display_name][0]
pid = self._tid_dict[display_name][1]
for time_item in time_list:
time_segment = list(map(float, time_item[self._start_time_idx:self._duration_idx + 1]))
time_segment[1] = time_segment[0] + time_segment[1] / factor
if not time_merged_segment_list or \
time_segment[0] > time_merged_segment_list[-1]:
time_merged_segment_list.extend(time_segment)
else:
time_merged_segment_list[-1] = max(
time_merged_segment_list[-1],
time_segment[1]
)

# merged_display_list data used for ui page.
merged_display_list = [
[display_name, tid, time_merged_segment_list[i * 2],
(time_merged_segment_list[i * 2 + 1] - time_merged_segment_list[i * 2]) * factor, pid]
for i in range(len(time_merged_segment_list) // 2)
]

if get_interval_time:
time_merged_segment_list = time_merged_segment_list[1:-1]

# merged_res_list data used to compute overlap with other time_list.
merged_res_list = [
[display_name, tid, time_merged_segment_list[i * 2], time_merged_segment_list[i * 2 + 1], pid]
for i in range(len(time_merged_segment_list) // 2)
]

# interval_display_list is interval time used for ui page.
interval_display_list = [
[display_name, tid, time_merged_segment_list[i * 2],
(time_merged_segment_list[i * 2 + 1] - time_merged_segment_list[i * 2]) * factor, pid]
for i in range(len(time_merged_segment_list) // 2)
]

return merged_res_list, interval_display_list, merged_display_list

def write_timeline(self, size_limit=SIZE_LIMIT_DEFAULT):
"""Load data according to the parsed profiling files."""
# Write timeline to file.
@@ -666,7 +722,7 @@ class BaseTimelineGenerator:
else:
return

if tid_name == self._host_cpu_op_label:
if self._host_cpu_op_label == tid_name[:len(self._host_cpu_op_label)]:
thread_name_meta_data['pid'] = self._HOST_CPU_PID

thread_name_meta_data["tid"] = tid
@@ -792,6 +848,7 @@ class GpuTimelineGenerator(BaseTimelineGenerator):
_activity_keys_list = []

def __init__(self, profiling_dir, device_id):
super().__init__()
self._profiling_dir = profiling_dir
self._device_id = device_id
self._timeline_meta = []
@@ -828,7 +885,7 @@ class GpuTimelineGenerator(BaseTimelineGenerator):
# remove the level of scope name which has a format like "0-conv2-Conv2d".
timeline_dict['name'] = "-".join(op_meta.op_name.split('-')[1:])
timeline_dict['scope_level'] = int(op_meta.op_name.split('-')[0])
elif op_meta.stream_id == self._host_cpu_op_label:
elif op_meta.stream_id[:len(self._host_cpu_op_label)] == self._host_cpu_op_label:
timeline_dict['pid'] = self._HOST_CPU_PID

if len(timeline) > 4:
@@ -929,7 +986,13 @@ class GpuTimelineGenerator(BaseTimelineGenerator):
time_arr = time_arr.split(" ")
for time in time_arr:
time = time.split(",")
line_list = op_list[:2] + time
if len(time) == 3:
# for time value is [start_timestamp, duration, tid]
# line_list[1] would be like "HostCpuOps" + str(tid)
line_list = op_list[:1] + [op_list[1] + str(time[-1])] + time[:-1]
else:
# for time value is [start_timestamp, duration]
line_list = op_list[:2] + time
op_timeline_list.append(line_list)
except (IOError, OSError) as err:
logger.critical('Error occurred when load operator timeline data intermediate file: %s', err)
@@ -1053,15 +1116,10 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
_cluster_analyse_filename = 'ascend_cluster_analyse_{}_{}_{}_{}.csv'

def __init__(self, profiling_dir, device_id, rank_id, rank_size):
super().__init__()
self._profiling_dir = profiling_dir
self._device_id = device_id
self._rank_id = rank_id
self._tid_dict = {
"computation_op": (self._MERGED_COMPUTATION_TID, self._OP_OVERLAP_PID),
"communication_not_overlapped": (self._PURE_COMMUNICATION_TID, self._OP_OVERLAP_PID),
"communication": (self._MERGED_COMMUNICATION_TID, self._OP_OVERLAP_PID),
"free_time": (self._FREE_TIME_TID, self._OP_OVERLAP_PID)
}
self._rank_size = rank_size
self._display_filename = self._display_filename.format(rank_id)
self._timeline_summary_filename = self._timeline_summary_filename.format(rank_id)
@@ -1117,7 +1175,7 @@ class AscendTimelineGenerator(BaseTimelineGenerator):
# remove the level of scope name which has a format like "0-conv2-Conv2d".
timeline_dict['name'] = "-".join(op_meta.op_name.split('-')[1:])
timeline_dict['scope_level'] = int(op_meta.op_name.split('-')[0])
elif op_meta.stream_id == self._host_cpu_op_label:
elif op_meta.stream_id[:len(self._host_cpu_op_label)] == self._host_cpu_op_label:
timeline_dict['pid'] = self._HOST_CPU_PID

self._update_format_meta_data(timeline_dict)
@@ -1456,54 +1514,6 @@ class AscendTimelineGenerator(BaseTimelineGenerator):

return per_step_time_list

def _get_merged_time_list(self, time_list, get_interval_time=False, display_name="computation_op"):
"""
Get merged time segment list.

The process of merge is, for example, there is a list [[1,5], [2,6], [7,8]],
each items in this list contains a start_time and end_time,
the merged result is [[1,6], [7,8]].
"""
time_merged_segment_list = []
tid = self._tid_dict[display_name][0]
pid = self._tid_dict[display_name][1]
for time_item in time_list:
time_segment = list(map(float, time_item[self._start_time_idx:self._duration_idx + 1]))
time_segment[1] += time_segment[0]
if not time_merged_segment_list or \
time_segment[0] > time_merged_segment_list[-1]:
time_merged_segment_list.extend(time_segment)
else:
time_merged_segment_list[-1] = max(
time_merged_segment_list[-1],
time_segment[1]
)

# merged_display_list data used for ui page.
merged_display_list = [
[display_name, tid, time_merged_segment_list[i * 2],
time_merged_segment_list[i * 2 + 1] - time_merged_segment_list[i * 2], pid]
for i in range(len(time_merged_segment_list) // 2)
]

if get_interval_time:
time_merged_segment_list = time_merged_segment_list[1:-1]

# merged_res_list data used to compute overlap with other time_list.
merged_res_list = [
[display_name, tid, time_merged_segment_list[i * 2], time_merged_segment_list[i * 2 + 1], pid]
for i in range(len(time_merged_segment_list) // 2)
]

# interval_display_list is interval time used for ui page.
interval_display_list = [
[display_name, tid, time_merged_segment_list[i * 2],
time_merged_segment_list[i * 2 + 1] - time_merged_segment_list[i * 2], pid]
for i in range(len(time_merged_segment_list) // 2)
]

return merged_res_list, interval_display_list, merged_display_list

def _get_intersection_time(self, first_time_list, second_time_list,
display_name="communication_not_overlapped"):
"""Get intersection time of two time list."""
@@ -1582,7 +1592,7 @@ class CpuTimelineGenerator(GpuTimelineGenerator):
"""Load timeline data from file."""
timeline_list = self.load_cpu_op_data()

timeline_list.sort(key=lambda x: float(x[2]))
timeline_list.sort(key=lambda x: float(x[self._start_time_idx]))
self._max_scope_name_num = self._get_max_scope_name_num(timeline_list)
self._timeline_summary['max_scope_name_num'] = self._max_scope_name_num

@@ -1592,6 +1602,12 @@ class CpuTimelineGenerator(GpuTimelineGenerator):

step_time_list = self._get_step_time_list(timeline_list, factor_start_time_uint_to_duration)

# Add merge compute time and free time
merge_compute_timeline = self._get_merged_time_list(
timeline_list, False, "computation_op", factor_start_time_uint_to_duration)[2]
free_time_timeline = self._get_merged_time_list(
timeline_list, True, "free_time", factor_start_time_uint_to_duration)[1]

# Add Scope Name.
default_scope_name_time_list = self._get_scope_name_time_list(timeline_list, "Default",
factor_start_time_uint_to_duration)
@@ -1606,9 +1622,42 @@ class CpuTimelineGenerator(GpuTimelineGenerator):

timeline_list.sort(key=lambda x: (float(x[self._start_time_idx]), x[self._tid_idx]))
timeline_list.sort(key=lambda x: float(x[2]))
timeline_list.extend(merge_compute_timeline)
timeline_list.extend(free_time_timeline)

return timeline_list

def _parse_timeline_data(self, timeline, min_cycle_counter):
"""Parse timeline data."""
# factor to convert the time unit of start_time(ts) from 1ns to 1us for timeline display
factor = 1000
op_meta = TimelineContainer(timeline)
timeline_dict = {}
timeline_dict['name'] = op_meta.op_name.split('/')[-1]
timeline_dict['ph'] = 'X'
timeline_dict['tid'] = op_meta.stream_id
timeline_dict['ts'] = (op_meta.start_time - min_cycle_counter) / factor
dur = op_meta.duration
timeline_dict['dur'] = dur
timeline_dict['pid'] = int(self._device_id)
if op_meta.stream_id == "Scope Name":
# remove the level of scope name which has a format like "0-conv2-Conv2d".
timeline_dict['name'] = "-".join(op_meta.op_name.split('-')[1:])
timeline_dict['scope_level'] = int(op_meta.op_name.split('-')[0])
elif self._host_cpu_op_label == op_meta.stream_id[:len(self._host_cpu_op_label)]:
timeline_dict['pid'] = self._HOST_CPU_PID

if len(timeline) == 5:
# len(timeline) == 5 refers to analyse data.
timeline_dict["pid"] = op_meta.pid
elif op_meta.stream_id not in ["Scope Name", "Steps"]:
# Update total time of operator execution.
self._timeline_summary['total_time'] += dur / factor
self._timeline_summary['op_exe_times'] += 1

self._update_format_meta_data(timeline_dict)
self._timeline_meta.append(timeline_dict)

def init_timeline(self):
"""Init timeline metadata, adding all collected info."""
timeline_list = self._load_timeline_data()


Loading…
Cancel
Save