Browse Source

implement step based and time based api for autotune

tags/v1.6.0
harshvardhangupta 4 years ago
parent
commit
9a352c4f5b
6 changed files with 469 additions and 63 deletions
  1. +10
    -4
      mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc
  2. +2
    -1
      mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h
  3. +12
    -7
      mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc
  4. +2
    -1
      mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h
  5. +233
    -33
      mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc
  6. +210
    -17
      mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h

+ 10
- 4
mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.cc View File

@@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,10 +27,11 @@
namespace mindspore {
namespace dataset {

constexpr int32_t CONNECTOR_CAPACITY_OFFSET = 0;
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 0;

Status DatasetIteratorTracing::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("dataset_iterator_profiling_" + device_id + ".txt")).ToString();
(void)ts_.emplace_back(0);
return Status::OK();
}

@@ -47,7 +48,7 @@ Status DatasetIteratorTracing::GetBatchTime(int32_t start_step, int32_t end_step
}

Status DatasetIteratorTracing::GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntry(start_step, end_step, CONNECTOR_CAPACITY_OFFSET, result);
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "value", result);
}

Status DatasetIteratorTracing::GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) {
@@ -67,12 +68,17 @@ Status DatasetIteratorTracing::GetEmptyQueueFrequency(int32_t start_step, int32_
uint32_t total = end_step - start_step + 1;
uint32_t count = 0U;
for (auto step_num = start_step; step_num <= end_step; step_num++) {
auto idx = (step_num - 1) * records_per_step_ + CONNECTOR_CAPACITY_OFFSET;
auto idx = (step_num - 1) * records_per_step_ + CONNECTOR_DEPTH_OFFSET;
count += static_cast<uint32_t>(records_[idx].value == 0);
}
*empty_queue_freq = static_cast<float_t>(count) / static_cast<float_t>(total);
return Status::OK();
}

Status DatasetIteratorTracing::GetConnectorCapacity(int32_t start_step, int32_t end_step,
std::vector<int32_t> *result) {
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
}

} // namespace dataset
} // namespace mindspore

+ 2
- 1
mindspore/ccsrc/minddata/dataset/engine/perf/dataset_iterator_tracing.h View File

@@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@ class DatasetIteratorTracing : public Tracing {
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
};
} // namespace dataset


+ 12
- 7
mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.cc View File

@@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -31,27 +31,28 @@ namespace dataset {
constexpr int32_t PUSH_TIME_OFFSET = 0;
constexpr int32_t BATCH_TIME_OFFSET = 1;
constexpr int32_t PIPELINE_TIME_OFFSET = 2;
constexpr int32_t CONNECTOR_CAPACITY_OFFSET = 3;
constexpr int32_t CONNECTOR_DEPTH_OFFSET = 3;

Status DeviceQueueTracing::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("device_queue_profiling_" + device_id + ".txt")).ToString();
(void)ts_.emplace_back(0);
return Status::OK();
}

Status DeviceQueueTracing::GetPipelineTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntry(start_step, end_step, PIPELINE_TIME_OFFSET, result);
return GetRecordEntryFieldValue(start_step, end_step, PIPELINE_TIME_OFFSET, "value", result);
}

Status DeviceQueueTracing::GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntry(start_step, end_step, PUSH_TIME_OFFSET, result);
return GetRecordEntryFieldValue(start_step, end_step, PUSH_TIME_OFFSET, "value", result);
}

Status DeviceQueueTracing::GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntry(start_step, end_step, BATCH_TIME_OFFSET, result);
return GetRecordEntryFieldValue(start_step, end_step, BATCH_TIME_OFFSET, "value", result);
}

Status DeviceQueueTracing::GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntry(start_step, end_step, CONNECTOR_CAPACITY_OFFSET, result);
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "value", result);
}

Status DeviceQueueTracing::GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) {
@@ -71,11 +72,15 @@ Status DeviceQueueTracing::GetEmptyQueueFrequency(int32_t start_step, int32_t en
uint32_t total = end_step - start_step + 1;
uint32_t count = 0U;
for (auto step_num = start_step; step_num <= end_step; step_num++) {
auto idx = (step_num - 1) * records_per_step_ + CONNECTOR_CAPACITY_OFFSET;
auto idx = (step_num - 1) * records_per_step_ + CONNECTOR_DEPTH_OFFSET;
count += static_cast<uint32_t>(records_[idx].value == 0);
}
*empty_queue_freq = static_cast<float_t>(count) / static_cast<float_t>(total);
return Status::OK();
}

Status DeviceQueueTracing::GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
return GetRecordEntryFieldValue(start_step, end_step, CONNECTOR_DEPTH_OFFSET, "extra_info", result);
}
} // namespace dataset
} // namespace mindspore

+ 2
- 1
mindspore/ccsrc/minddata/dataset/engine/perf/device_queue_tracing.h View File

@@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -40,6 +40,7 @@ class DeviceQueueTracing : public Tracing {
Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) override;
Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) override;
};
} // namespace dataset


+ 233
- 33
mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc View File

@@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
#include <sys/stat.h>
#include <cstdlib>
#include <fstream>
#include <algorithm>
#include "utils/ms_utils.h"
#include "minddata/dataset/util/path.h"
#ifdef ENABLE_GPUQUE
@@ -80,10 +81,52 @@ void Tracing::Record(const int32_t type, const int32_t extra_info, const int32_t
std::lock_guard<std::mutex> guard(lock_);
(void)records_.emplace_back(record);
(void)value_.emplace_back(record.ToString());
// save timestamp per batch
if (records_.size() % records_per_step_ == 0) {
(void)ts_.emplace_back(time_stamp);
}
}

Status Tracing::TimeIntervalForStepRange(int32_t start_step, int32_t end_step, uint64_t *start_ts, uint64_t *end_ts) {
std::lock_guard<std::mutex> guard(lock_);
MS_LOG(DEBUG) << "start_step: " << start_step << " end_step: " << end_step;
CHECK_FAIL_RETURN_UNEXPECTED(start_step > 0,
"Expected start_step > 0. Got start_step: " + std::to_string(start_step));
CHECK_FAIL_RETURN_UNEXPECTED(end_step >= start_step,
"Expected end_step >= start_step. Got start_step: " + std::to_string(start_step) +
" end_step: " + std::to_string(end_step));
CHECK_FAIL_RETURN_UNEXPECTED(end_step < ts_.size(),
"Expected end_step < ts_.size(). Got end_step: " + std::to_string(end_step) +
" ts_.size: " + std::to_string(ts_.size()));
// end timestamp of (start_step - 1) step
*start_ts = ts_[start_step - 1];
*end_ts = ts_[end_step];
return Status::OK();
}

Status Tracing::StepIntervalForTimeRange(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step) {
CHECK_FAIL_RETURN_UNEXPECTED(start_ts < end_ts, "Expected start_ts < end_ts. Got start_ts: " +
std::to_string(start_ts) + " end_ts: " + std::to_string(end_ts));
std::lock_guard<std::mutex> guard(lock_);
CHECK_FAIL_RETURN_UNEXPECTED(ts_.size() > 1, "No tracing data available yet.");
// find first ts that is not less than start_ts
auto lower = std::lower_bound(ts_.begin(), ts_.end(), start_ts);
CHECK_FAIL_RETURN_UNEXPECTED(lower != ts_.end(),
"No data available for time >= start_ts. start_ts: " + std::to_string(start_ts));
// there is no 0th step. If start_ts == 0, then lower == ts_.begin()
*start_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), lower)));
// find first ts that is greater than end_ts
auto upper = std::upper_bound(ts_.begin(), ts_.end(), end_ts);
if (upper == ts_.end()) {
*end_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), upper) - 1));
} else {
*end_step = std::max(1, static_cast<int32_t>(std::distance(ts_.begin(), upper)));
}
return Status::OK();
}

Status Tracing::GetRecordEntry(int32_t start_step, int32_t end_step, int32_t record_offset,
std::vector<int32_t> *result) {
Status Tracing::GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, int32_t record_offset,
const std::string field, std::vector<int32_t> *result) {
std::lock_guard<std::mutex> guard(lock_);
auto total_steps = records_.size() / records_per_step_;
MS_LOG(DEBUG) << "start_step: " << start_step << " end_step: " << end_step;
@@ -98,10 +141,15 @@ Status Tracing::GetRecordEntry(int32_t start_step, int32_t end_step, int32_t rec
" end_step: " + std::to_string(end_step));

for (auto step_num = start_step; step_num <= end_step; step_num++) {
// each step has 4 entries in device queue tracing
auto idx = (step_num - 1) * records_per_step_ + record_offset;
assert(idx < records_.size());
(void)result->emplace_back(records_[idx].value);
if (field == "value") {
(void)result->emplace_back(records_[idx].value);
} else if (field == "extra_info") {
(void)result->emplace_back(records_[idx].extra_info);
} else {
return {StatusCode::kMDUnexpectedError,
"Received unexpected field: " + field + R"(. Expected: ["value", "extra_info"].)"};
}
}
return Status::OK();
}
@@ -301,41 +349,84 @@ Status ProfilingManager::ChangeFileMode() {
}

#ifndef ENABLE_ANDROID
Status ProfilingManager::GetUserCpuUtil(int32_t epoch_num, std::vector<uint8_t> *result) {
std::shared_ptr<CpuSampler> cpu_node;
Status ProfilingManager::GetUserCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(PopulateCpuSamplerAPIInputs(epoch_num, &start_ts, &end_ts, &cpu_node));
return cpu_node->GetSystemUserCpuUtil(start_ts, end_ts, result);
RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
return GetUserCpuUtilByTime(start_ts, end_ts, result);
}

Status ProfilingManager::GetSysCpuUtil(int32_t epoch_num, std::vector<uint8_t> *result) {
std::shared_ptr<CpuSampler> cpu_node;
Status ProfilingManager::GetUserCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(PopulateCpuSamplerAPIInputs(epoch_num, &start_ts, &end_ts, &cpu_node));
return cpu_node->GetSystemSysCpuUtil(start_ts, end_ts, result);
RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
return GetUserCpuUtilByTime(start_ts, end_ts, result);
}

Status ProfilingManager::GetUserCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
std::shared_ptr<Sampling> sampling_node;
RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
return node->GetSystemUserCpuUtil(start_ts, end_ts, result);
}

Status ProfilingManager::GetUserCpuUtil(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
std::shared_ptr<CpuSampler> cpu_node;
Status ProfilingManager::GetSysCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(PopulateCpuSamplerAPIInputs(epoch_num, &start_ts, &end_ts, &cpu_node));
return cpu_node->GetOpUserCpuUtil(op_id, start_ts, end_ts, result);
RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
return GetSysCpuUtilByTime(start_ts, end_ts, result);
}

Status ProfilingManager::GetSysCpuUtil(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
std::shared_ptr<CpuSampler> cpu_node;
Status ProfilingManager::GetSysCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(PopulateCpuSamplerAPIInputs(epoch_num, &start_ts, &end_ts, &cpu_node));
return cpu_node->GetOpSysCpuUtil(op_id, start_ts, end_ts, result);
RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
return GetSysCpuUtilByTime(start_ts, end_ts, result);
}

Status ProfilingManager::PopulateCpuSamplerAPIInputs(int32_t epoch_num, uint64_t *start_ts, uint64_t *end_ts,
std::shared_ptr<CpuSampler> *node) {
RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, start_ts, end_ts));
Status ProfilingManager::GetSysCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result) {
std::shared_ptr<Sampling> sampling_node;
RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
*node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
return Status::OK();
auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
return node->GetSystemSysCpuUtil(start_ts, end_ts, result);
}

Status ProfilingManager::GetUserCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
return GetUserCpuUtilByTime(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetUserCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step,
std::vector<uint16_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
return GetUserCpuUtilByTime(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetUserCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
std::vector<uint16_t> *result) {
std::shared_ptr<Sampling> sampling_node;
RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
return node->GetOpUserCpuUtil(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetSysCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
return GetSysCpuUtilByTime(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetSysCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step,
std::vector<uint16_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
return GetSysCpuUtilByTime(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetSysCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
std::vector<uint16_t> *result) {
std::shared_ptr<Sampling> sampling_node;
RETURN_IF_NOT_OK(GetSamplingNode(kCpuSamplerName, &sampling_node));
auto node = std::dynamic_pointer_cast<CpuSampler>(sampling_node);
return node->GetOpSysCpuUtil(op_id, start_ts, end_ts, result);
}
#endif

@@ -361,18 +452,58 @@ Status ProfilingManager::EpochToStepInterval(int32_t epoch_num, uint32_t *start_
return Status::OK();
}

Status ProfilingManager::GetConnectorSize(int32_t op_id, int32_t epoch_num, std::vector<int32_t> *result) {
Status ProfilingManager::StepToTimeInterval(int32_t start_step, int32_t end_step, uint64_t *start_ts,
uint64_t *end_ts) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
return node->TimeIntervalForStepRange(start_step, end_step, start_ts, end_ts);
} else {
return {StatusCode::kMDUnexpectedError,
"Cannot find appropriate tracing node to convert step range to time interval."};
}
}

Status ProfilingManager::TimeToStepInterval(uint64_t start_ts, uint64_t end_ts, int32_t *start_step,
int32_t *end_step) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
return node->StepIntervalForTimeRange(start_ts, end_ts, start_step, end_step);
} else {
return {StatusCode::kMDUnexpectedError,
"Cannot find appropriate tracing node to convert step range to time interval."};
}
}

Status ProfilingManager::GetConnectorSizeByEpoch(int32_t op_id, int32_t epoch_num, std::vector<int32_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(EpochToTimeInterval(epoch_num, &start_ts, &end_ts));
return GetConnectorSizeByTime(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetConnectorSizeByStep(int32_t op_id, int32_t start_step, int32_t end_step,
std::vector<int32_t> *result) {
uint64_t start_ts, end_ts;
RETURN_IF_NOT_OK(StepToTimeInterval(start_step, end_step, &start_ts, &end_ts));
return GetConnectorSizeByTime(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetConnectorSizeByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts,
std::vector<int32_t> *result) {
std::shared_ptr<Sampling> node;
RETURN_IF_NOT_OK(GetSamplingNode(kConnectorSizeSamplingName, &node));
auto connector_node = std::dynamic_pointer_cast<ConnectorSize>(node);
return connector_node->GetOpConnectorSize(op_id, start_ts, end_ts, result);
}

Status ProfilingManager::GetPipelineTime(int32_t epoch_num, std::vector<int32_t> *result) {
Status ProfilingManager::GetPipelineTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
uint32_t start_step, end_step;
RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
return GetPipelineTimeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetPipelineTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
@@ -382,9 +513,19 @@ Status ProfilingManager::GetPipelineTime(int32_t epoch_num, std::vector<int32_t>
}
}

Status ProfilingManager::GetPushTime(int32_t epoch_num, std::vector<int32_t> *result) {
Status ProfilingManager::GetPipelineTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
int32_t start_step, end_step;
RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
return GetPipelineTimeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetPushTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
uint32_t start_step, end_step;
RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
return GetPushTimeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetPushTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
@@ -394,9 +535,19 @@ Status ProfilingManager::GetPushTime(int32_t epoch_num, std::vector<int32_t> *re
}
}

Status ProfilingManager::GetBatchTime(int32_t epoch_num, std::vector<int32_t> *result) {
Status ProfilingManager::GetPushTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
int32_t start_step, end_step;
RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
return GetPushTimeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetBatchTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
uint32_t start_step, end_step;
RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
return GetBatchTimeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetBatchTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
@@ -406,9 +557,19 @@ Status ProfilingManager::GetBatchTime(int32_t epoch_num, std::vector<int32_t> *r
}
}

Status ProfilingManager::GetConnectorSize(int32_t epoch_num, std::vector<int32_t> *result) {
Status ProfilingManager::GetBatchTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
int32_t start_step, end_step;
RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
return GetBatchTimeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetConnectorSizeByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
uint32_t start_step, end_step;
RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
return GetConnectorSizeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetConnectorSizeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
@@ -418,9 +579,19 @@ Status ProfilingManager::GetConnectorSize(int32_t epoch_num, std::vector<int32_t
}
}

Status ProfilingManager::GetEmptyQueueFrequency(int32_t epoch_num, float_t *result) {
Status ProfilingManager::GetConnectorSizeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
int32_t start_step, end_step;
RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
return GetConnectorSizeByStep(start_step, end_step, result);
}

Status ProfilingManager::GetEmptyQueueFrequencyByEpoch(int32_t epoch_num, float_t *result) {
uint32_t start_step, end_step;
RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
return GetEmptyQueueFrequencyByStep(start_step, end_step, result);
}

Status ProfilingManager::GetEmptyQueueFrequencyByStep(int32_t start_step, int32_t end_step, float_t *result) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
@@ -430,6 +601,35 @@ Status ProfilingManager::GetEmptyQueueFrequency(int32_t epoch_num, float_t *resu
}
}

Status ProfilingManager::GetEmptyQueueFrequencyByTime(uint64_t start_ts, uint64_t end_ts, float_t *result) {
int32_t start_step, end_step;
RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
return GetEmptyQueueFrequencyByStep(start_step, end_step, result);
}

Status ProfilingManager::GetConnectorCapacityByEpoch(int32_t epoch_num, std::vector<int32_t> *result) {
uint32_t start_step, end_step;
RETURN_IF_NOT_OK(EpochToStepInterval(epoch_num, &start_step, &end_step));
return GetConnectorCapacityByStep(start_step, end_step, result);
}

Status ProfilingManager::GetConnectorCapacityByStep(int32_t start_step, int32_t end_step,
std::vector<int32_t> *result) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
return node->GetConnectorCapacity(start_step, end_step, result);
} else {
return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
}
}

Status ProfilingManager::GetConnectorCapacityByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result) {
int32_t start_step, end_step;
RETURN_IF_NOT_OK(TimeToStepInterval(start_ts, end_ts, &start_step, &end_step));
return GetConnectorCapacityByStep(start_step, end_step, result);
}

void ProfilingManager::RecordEndOfEpoch(uint32_t step_num) {
MS_LOG(INFO) << "Recording end of epoch. step_num: " << step_num;
(void)epoch_end_ts_.emplace_back(ProfilingTime::GetCurMilliSecond());


+ 210
- 17
mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h View File

@@ -1,5 +1,5 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
* Copyright 2020-2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -98,16 +98,21 @@ class Tracing : public Profiling {
virtual Status GetPushTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
virtual Status GetBatchTime(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
virtual Status GetConnectorSize(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
virtual Status GetConnectorCapacity(int32_t start_step, int32_t end_step, std::vector<int32_t> *result) = 0;
virtual Status GetEmptyQueueFrequency(int32_t start_step, int32_t end_step, float_t *empty_queue_freq) = 0;
void Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value,
const uint64_t time_stamp);
Status TimeIntervalForStepRange(int32_t start_step, int32_t end_step, uint64_t *start_ts, uint64_t *end_ts);
Status StepIntervalForTimeRange(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step);

protected:
explicit Tracing(int32_t records_per_step);
const int32_t records_per_step_;
std::vector<std::string> value_;
std::vector<TracingRecord> records_;
Status GetRecordEntry(int32_t start_step, int32_t end_step, int32_t record_offset, std::vector<int32_t> *result);
std::vector<uint64_t> ts_; // End time of each step or batch
Status GetRecordEntryFieldValue(int32_t start_step, int32_t end_step, int32_t record_offset, std::string field,
std::vector<int32_t> *result);
};

// ProfilingManager is a class manages all profiling infrastructure
@@ -167,27 +172,87 @@ class ProfilingManager {
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the sampled User CPU Utilization for the entire system
/// \return Status object with the error code
Status GetUserCpuUtil(int32_t epoch_num, std::vector<uint8_t> *result);
Status GetUserCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result);

/// \brief API to get User CPU utilization for the system
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the sampled User CPU Utilization for the entire system
/// \return Status object with the error code
Status GetUserCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result);

/// \brief API to get User CPU utilization for the system
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the sampled User CPU Utilization for the entire system
/// \return Status object with the error code
Status GetUserCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result);

/// \brief API to get System CPU utilization for the system
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the sampled System CPU Utilization for the entire system
/// \return Status object with the error code
Status GetSysCpuUtil(int32_t epoch_num, std::vector<uint8_t> *result);
Status GetSysCpuUtilByEpoch(int32_t epoch_num, std::vector<uint8_t> *result);

/// \brief API to get System CPU utilization for the system
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the sampled System CPU Utilization for the entire system
/// \return Status object with the error code
Status GetSysCpuUtilByStep(int32_t start_step, int32_t end_step, std::vector<uint8_t> *result);

/// \brief API to get System CPU utilization for the system
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the sampled System CPU Utilization for the entire system
/// \return Status object with the error code
Status GetSysCpuUtilByTime(uint64_t start_ts, uint64_t end_ts, std::vector<uint8_t> *result);

/// \brief API to get User CPU Utilization of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the sampled User CPU Utilization of the operator.
/// \return Status object with the error code
Status GetUserCpuUtil(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result);
Status GetUserCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result);

/// \brief API to get User CPU Utilization of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the sampled User CPU Utilization of the operator.
/// \return Status object with the error code
Status GetUserCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step, std::vector<uint16_t> *result);

/// \brief API to get User CPU Utilization of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the sampled User CPU Utilization of the operator.
/// \return Status object with the error code
Status GetUserCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result);

/// \brief API to get System CPU Utilization of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the sampled System CPU Utilization of the operator.
/// \return Status object with the error code
Status GetSysCpuUtil(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result);
Status GetSysCpuUtilByEpoch(int32_t op_id, int32_t epoch_num, std::vector<uint16_t> *result);

/// \brief API to get System CPU Utilization of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the sampled System CPU Utilization of the operator.
/// \return Status object with the error code
Status GetSysCpuUtilByStep(int32_t op_id, int32_t start_step, int32_t end_step, std::vector<uint16_t> *result);

/// \brief API to get System CPU Utilization of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the sampled System CPU Utilization of the operator.
/// \return Status object with the error code
Status GetSysCpuUtilByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<uint16_t> *result);
#endif

/// \brief API to get the connector size of an MD operator
@@ -195,37 +260,143 @@ class ProfilingManager {
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the sampled connector sizes of the operator
/// \return Status object with the error code
Status GetConnectorSize(int32_t op_id, int32_t epoch_num, std::vector<int32_t> *result);
Status GetConnectorSizeByEpoch(int32_t op_id, int32_t epoch_num, std::vector<int32_t> *result);

/// \brief API to get the connector size of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the sampled connector sizes of the operator
/// \return Status object with the error code
Status GetConnectorSizeByStep(int32_t op_id, int32_t start_step, int32_t end_step, std::vector<int32_t> *result);

/// \brief API to get the connector size of an MD operator
/// \param [in] op_id The id of the operator
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the sampled connector sizes of the operator
/// \return Status object with the error code
Status GetConnectorSizeByTime(int32_t op_id, uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);

/// \brief API to get the connector size of DatasetIterator or DeviceQueueOp
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with connector size at each step
/// \return Status object with the error code
Status GetConnectorSize(int32_t epoch_num, std::vector<int32_t> *result);
Status GetConnectorSizeByEpoch(int32_t epoch_num, std::vector<int32_t> *result);

/// \brief API to get the connector size of DatasetIterator or DeviceQueueOp
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with connector size at each step
/// \return Status object with the error code
Status GetConnectorSizeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);

/// \brief API to get the connector size of DatasetIterator or DeviceQueueOp
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with connector size at each step
/// \return Status object with the error code
Status GetConnectorSizeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);

/// \brief API to get the connector capacity of DatasetIterator or DeviceQueueOp
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with connector capacity at each step
/// \return Status object with the error code
Status GetConnectorCapacityByEpoch(int32_t epoch_num, std::vector<int32_t> *result);

/// \brief API to get the connector capacity of DatasetIterator or DeviceQueueOp
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with connector capacity at each step
/// \return Status object with the error code
Status GetConnectorCapacityByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);

/// \brief API to get the connector capacity of DatasetIterator or DeviceQueueOp
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with connector capacity for steps in the given time range
/// \return Status object with the error code
Status GetConnectorCapacityByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);

/// \brief API to get the pipeline time of batches
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the pipeline time for each step
/// \return Status object with the error code
Status GetPipelineTime(int32_t epoch_num, std::vector<int32_t> *result);
Status GetPipelineTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result);

/// \brief API to get the pipeline time of batches
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the pipeline time for each step
/// \return Status object with the error code
Status GetPipelineTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);

/// \brief API to get the pipeline time of batches
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the pipeline time for steps in the given time range
/// \return Status object with the error code
Status GetPipelineTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);

/// \brief API to get the push time of batches
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the push time for each each step
/// \return Status object with the error code
Status GetPushTime(int32_t epoch_num, std::vector<int32_t> *result);
Status GetPushTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result);

/// \brief API to get the push time of batches
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the push time for each each step
/// \return Status object with the error code
Status GetPushTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);

/// \brief API to get the push time of batches
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the push time for steps in the given time range
/// \return Status object with the error code
Status GetPushTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);

/// \brief API to get the batch time of batches
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result A vector with the batch time for each step
/// \return Status object with the error code
Status GetBatchTime(int32_t epoch_num, std::vector<int32_t> *result);
Status GetBatchTimeByEpoch(int32_t epoch_num, std::vector<int32_t> *result);

/// \brief API to get the batch time of batches
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result A vector with the batch time for each step
/// \return Status object with the error code
Status GetBatchTimeByStep(int32_t start_step, int32_t end_step, std::vector<int32_t> *result);

/// \brief API to get the batch time of batches
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result A vector with the batch time for steps in the given time range
/// \return Status object with the error code
Status GetBatchTimeByTime(uint64_t start_ts, uint64_t end_ts, std::vector<int32_t> *result);

/// \brief API to get fraction of steps that DatasetIterator or DeviceQueueOp connector was empty
/// \param [in] epoch_num The epoch number for which results are requested
/// \param [out] result The empty queue frequency
/// \return Status object with the error code
Status GetEmptyQueueFrequency(int32_t epoch_num, float_t *result);
Status GetEmptyQueueFrequencyByEpoch(int32_t epoch_num, float_t *result);

/// \brief API to get fraction of steps that DatasetIterator or DeviceQueueOp connector was empty
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] result The empty queue frequency
/// \return Status object with the error code
Status GetEmptyQueueFrequencyByStep(int32_t start_step, int32_t end_step, float_t *result);

/// \brief API to get fraction of steps that DatasetIterator or DeviceQueueOp connector was empty
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] result The empty queue frequency
/// \return Status object with the error code
Status GetEmptyQueueFrequencyByTime(uint64_t start_ts, uint64_t end_ts, float_t *result);

private:
std::unique_ptr<Monitor> perf_monitor_;
@@ -249,13 +420,35 @@ class ProfilingManager {
// @return Status The status code returned
Status RegisterSamplingNode(std::shared_ptr<Sampling> node);

/// \brief Helper to convert a given epoch number to a step interval
/// \param [in] epoch_num The epoch number to be converted
/// \param [out] start_step The corresponding start step for the given epoch
/// \param [out] end_step The corresponding end step for the given epoch
/// \return
Status EpochToStepInterval(int32_t epoch_num, uint32_t *start_step, uint32_t *end_step);
// get start and ending timestamp of an epoch

/// \brief Helper to convert a given epoch number to a time interval
/// \param [in] epoch_num The epoch number to be converted
/// \param [out] start_ts The corresponding starting timestamp in ms for the given epoch
/// \param [out] end_ts The corresponding ending timestamp in ms for the given epoch
/// \return Status object with the error code
Status EpochToTimeInterval(int32_t epoch_num, uint64_t *start_ts, uint64_t *end_ts);
#ifndef ENABLE_ANDROID
Status PopulateCpuSamplerAPIInputs(int32_t epoch_num, uint64_t *start_ts, uint64_t *end_ts,
std::shared_ptr<CpuSampler> *node);
#endif

/// \brief Helper to convert step interval to a time interval
/// \param [in] start_step The step interval start range
/// \param [in] end_step The step interval end range
/// \param [out] start_ts The corresponding starting timestamp in ms for the given step interval
/// \param [out] end_ts The corresponding ending timestamp in ms for the given step interval
/// \return Status object with the error code
Status StepToTimeInterval(int32_t start_step, int32_t end_step, uint64_t *start_ts, uint64_t *end_ts);

/// \brief Helper to convert time interval to a step interval
/// \param [in] start_ts The time interval start range in ms
/// \param [in] end_ts The time interval end range in ms
/// \param [out] start_step The corresponding start step for the given time interval
/// \param [out] end_step The corresponding end step for the given time interval
/// \return Status object with the error code
Status TimeToStepInterval(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step);
};

enum ProfilingType { TIME, CONNECTOR_DEPTH };


Loading…
Cancel
Save