Browse Source

autotune step

fix testing script default

updates and changed sampling function

comments addressed

comments addressed

comments addressed

moving isSinkCheckAbove

update - move CheckSinkFunction

fix
tags/v1.6.0
danishfarid 4 years ago
parent
commit
faaa5d5f0a
8 changed files with 161 additions and 36 deletions
  1. +2
    -2
      mindspore/ccsrc/minddata/dataset/core/config_manager.h
  2. +100
    -18
      mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc
  3. +28
    -5
      mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h
  4. +14
    -1
      mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc
  5. +5
    -0
      mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h
  6. +1
    -1
      mindspore/ccsrc/minddata/dataset/include/dataset/constants.h
  7. +10
    -8
      mindspore/dataset/core/config.py
  8. +1
    -1
      tests/ut/python/dataset/test_autotune.py

+ 2
- 2
mindspore/ccsrc/minddata/dataset/core/config_manager.h View File

@@ -239,11 +239,11 @@ class ConfigManager {
bool enable_autotune() { return enable_autotune_; }

// getter function
// @return - autotune interval in millisecods
// @return - autotune interval in steps
int64_t autotune_interval() { return autotune_interval_; }

// setter function
// @param interval - autotune interval in millisecods
// @param interval - autotune interval in steps
void set_autotune_interval(int64_t interval) { autotune_interval_ = interval; }

private:


+ 100
- 18
mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc View File

@@ -25,24 +25,38 @@
namespace mindspore {
namespace dataset {
AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr)
: tree_adapter_(tree_adap), profiling_manager_(profiling_mgr), leaf_op_id_(-1), cur_epoch_(1) {
: tree_adapter_(tree_adap),
profiling_manager_(profiling_mgr),
leaf_op_id_(-1),
cur_epoch_(1),
skip_bool_(true),
last_step_profiled_(0) {
tree_modifier_ = std::make_unique<TreeModifier>(tree_adapter_);
max_workers_ = GlobalContext::config_manager()->num_cpu_threads();
step_gap_ = GlobalContext::config_manager()->autotune_interval();
}

Status AutoTune::Main() {
TaskManager::FindMe()->Post();
MS_LOG(INFO) << "Dataset AutoTune thread has started.";
std::unique_lock<std::mutex> _lock(mux_);
cur_epoch_ = 1;
if (step_gap_) {
mode_ = AutoTuneMode::kAutoTuneModeStep;
} else if (step_gap_ == 0) {
mode_ = AutoTuneMode::kAutoTuneModeEpoch;
}
Status rc;
while (!this_thread::is_interrupted() && !(tree_adapter_->tree_->isFinished())) {
rc = RunIteration();
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
rc = RunIterationEpoch();
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
rc = RunIterationStep();
}
if (rc.IsError()) {
MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
break;
}
rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval());
rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->monitor_sampling_interval());
// the thread may be interrupted for tree termination when waiting (we should not report error in this case)
if (rc.IsError() && rc != StatusCode::kMDInterrupted) {
return rc;
@@ -101,7 +115,7 @@ Status AutoTune::CollectOpsInfo() {

Status AutoTune::GetOpConnectorCapacity(int32_t op_id, int64_t *capacity) {
auto item = ops_.find(op_id);
CHECK_FAIL_RETURN_UNEXPECTED(item != ops_.end(), "Invalid Operator ID");
CHECK_FAIL_RETURN_UNEXPECTED(item != ops_.end(), "Invalid Operator ID.");
*capacity = item->second->ConnectorCapacity();
return Status::OK();
}
@@ -112,8 +126,15 @@ Status AutoTune::GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util) {
std::vector<uint16_t> sys_util;
std::vector<uint16_t> user_util;
#ifndef ENABLE_ANDROID
RETURN_IF_NOT_OK(profiling_manager_->GetSysCpuUtilByEpoch(itr->first, cur_epoch_, &sys_util));
RETURN_IF_NOT_OK(profiling_manager_->GetUserCpuUtilByEpoch(itr->first, cur_epoch_, &user_util));
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
RETURN_IF_NOT_OK(profiling_manager_->GetSysCpuUtilByEpoch(itr->first, cur_epoch_, &sys_util));
RETURN_IF_NOT_OK(profiling_manager_->GetUserCpuUtilByEpoch(itr->first, cur_epoch_, &user_util));
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
RETURN_IF_NOT_OK(
profiling_manager_->GetSysCpuUtilByStep(itr->first, last_step_profiled_, cur_step_ - 1, &sys_util));
RETURN_IF_NOT_OK(
profiling_manager_->GetUserCpuUtilByStep(itr->first, last_step_profiled_, cur_step_ - 1, &user_util));
}
#endif
double sys_cpu_util = Mean(sys_util);
double user_cpu_util = Mean(user_util);
@@ -131,7 +152,12 @@ Status AutoTune::GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util,
continue;
}
std::vector<int32_t> sizes;
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(itr->first, cur_epoch_, &sizes));
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(itr->first, cur_epoch_, &sizes));
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
RETURN_IF_NOT_OK(
profiling_manager_->GetConnectorSizeByStep(itr->first, last_step_profiled_, cur_step_ - 1, &sizes));
}
double avg_size = Mean(sizes);
int64_t capacity = itr->second->ConnectorCapacity();
CHECK_FAIL_RETURN_UNEXPECTED(capacity != 0, "Capacity of connector should not be 0");
@@ -180,34 +206,63 @@ double AutoTune::Mean(const std::vector<T> &items) {
return std::accumulate(items.begin(), items.end(), 0.0) / static_cast<double>(items.size());
}

Status AutoTune::RunIteration() {
Status IsSinkCheck(bool sink_type) {
// Close AutoTune in Non-sink mode, since it's not ready for test.
if (!IsSink()) {
if (sink_type == true) {
return Status::OK();
} else {
MS_LOG(ERROR) << "Dataset AutoTune doesn't support non-sink pipeline.";
return Status(StatusCode::kMDUnexpectedError,
"Dataset AutoTune hasn't been supported in non-sink mode(dataset_sink_mode=False), check training "
"config or set dataset_sink_mode to True.");
}
}

Status AutoTune::RunIterationEpoch() {
RETURN_IF_NOT_OK(IsSinkCheck(IsSink()));
// Run every epoch
if ((profiling_manager_->GetNumOfProfiledEpochs()) >= cur_epoch_) {
MS_LOG(INFO) << "Run Dataset AutoTune at epoch #" << cur_epoch_;
RETURN_IF_NOT_OK(RunIterationEpoch());
RETURN_IF_NOT_OK(RunIteration());
++cur_epoch_;
}
return Status::OK();
}

Status AutoTune::RunIterationStep() {
RETURN_IF_NOT_OK(IsSinkCheck(IsSink()));
// Run at autotune step interval
int32_t step_temp = 0;
profiling_manager_->GetNumberOfProfiledSteps(&step_temp);
cur_step_ = step_temp;
if (cur_step_ - last_step_profiled_ >= step_gap_) {
if (skip_bool_) {
skip_bool_ = false;
last_step_profiled_ = cur_step_;
return Status::OK();
}
MS_LOG(INFO) << "Run AutoTune at step#" << cur_step_;
RETURN_IF_NOT_OK(RunIteration());
last_step_profiled_ = cur_step_;
}
return Status::OK();
}

Status AutoTune::RecordPipelineTime() {
std::vector<int32_t> times;
RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByEpoch(cur_epoch_, &times));
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByEpoch(cur_epoch_, &times));
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByStep(last_step_profiled_, cur_step_ - 1, &times));
}
double avg_time = Mean(times);
avg_pipeline_times_.push_back(avg_time);
MS_LOG(INFO) << "Epoch #" << cur_epoch_ << ", Average Pipeline time is " << avg_time
<< " ms. The avg pipeline time for all epochs is " << Mean(avg_pipeline_times_) << "ms";
MS_LOG(INFO) << "Average Pipeline time is " << avg_time << " ms. The avg pipeline time for all epochs is "
<< Mean(avg_pipeline_times_) << "ms";
return Status::OK();
}

Status AutoTune::RunIterationEpoch() {
Status AutoTune::RunIteration() {
RETURN_IF_NOT_OK(RecordPipelineTime());
bool isBottleneck = false;
RETURN_IF_NOT_OK(IsDSaBottleneck(&isBottleneck));
@@ -217,17 +272,44 @@ Status AutoTune::RunIterationEpoch() {
return Status::OK();
}

Status AutoTune::GetConnectorSize(std::vector<int32_t> *sizes) {
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(cur_epoch_, sizes));
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByStep(last_step_profiled_, cur_step_ - 1, sizes));
}
return Status::OK();
}

Status AutoTune::GetConnectorCapacity(std::vector<int32_t> *capacities) {
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorCapacityByEpoch(cur_epoch_, capacities));
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorCapacityByStep(last_step_profiled_, cur_step_ - 1, capacities));
}
return Status::OK();
}

Status AutoTune::GetEmptyQueueFrequency(float *empty_freq) {
if (mode_ == AutoTuneMode::kAutoTuneModeEpoch) {
RETURN_IF_NOT_OK(profiling_manager_->GetEmptyQueueFrequencyByEpoch(cur_epoch_, empty_freq));
} else if (mode_ == AutoTuneMode::kAutoTuneModeStep) {
RETURN_IF_NOT_OK(profiling_manager_->GetEmptyQueueFrequencyByStep(last_step_profiled_, cur_step_ - 1, empty_freq));
}
return Status::OK();
}

Status AutoTune::IsDSaBottleneck(bool *isBottleneck) {
std::vector<int32_t> sizes;
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorSizeByEpoch(cur_epoch_, &sizes));
RETURN_IF_NOT_OK(GetConnectorSize(&sizes));
double avg_size = Mean(sizes);
std::vector<int32_t> capacities;
RETURN_IF_NOT_OK(profiling_manager_->GetConnectorCapacityByEpoch(cur_epoch_, &capacities));
RETURN_IF_NOT_OK(GetConnectorCapacity(&capacities));
double avg_capacity = Mean(capacities);
CHECK_FAIL_RETURN_UNEXPECTED(avg_capacity != 0, "Capacities of connectors should not be 0");
double usage_avg_last = (avg_size / avg_capacity);
float empty_freq = 0;
RETURN_IF_NOT_OK(profiling_manager_->GetEmptyQueueFrequencyByEpoch(cur_epoch_, &empty_freq));
RETURN_IF_NOT_OK(GetEmptyQueueFrequency(&empty_freq));

// Reporting values
MS_LOG(INFO) << "Epoch #" << cur_epoch_ << ", Device Connector Size: " << avg_size


+ 28
- 5
mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h View File

@@ -56,14 +56,30 @@ class AutoTune {
/// \return Status code
Status CollectOpsInfo();

/// The AutoTune logic that executes every iteration
/// Function to check for current step and execute logic
/// \return status code
Status RunIteration();
Status RunIterationStep();

/// The AutoTune logic for pipelines that executes every epoch
/// Function to check for current epoch and execute logic
/// \return status code
Status RunIterationEpoch();

/// The AutoTune logic for pipelines that executes every epoch
/// \return status code
Status RunIteration();

/// Fetches connector size for steps or epoch based on mode
/// \return status code
Status GetConnectorSize(std::vector<int32_t> *sizes);

/// Fetches connector capacity for steps or epoch based on mode
/// \return status code
Status GetConnectorCapacity(std::vector<int32_t> *capacities);

/// Fetches Connector Queue empty frequency for steps or epoch based on mode
/// \return status code
Status GetEmptyQueueFrequency(float *empty_freq);

/// Check if the dataset pipeline is the bottleneck
/// \param[out] isBottleneck bool
/// \return Status code
@@ -79,7 +95,6 @@ class AutoTune {
const int32_t MIN_NUM_WORKERS = 1;
const int32_t MAX_QUEUE_SIZE = 128;
const int32_t MIN_QUEUE_SIZE = 1;

// Worker specifics
const int32_t INCREMENT_WORKER = 2;
const int32_t DECREMENT_WORKER = -1;
@@ -92,6 +107,8 @@ class AutoTune {
// CPU Specifics
const float_t MAP_OP_WORKER_HIGH_THRESHOLD = 75;
const float_t MAP_OP_WORKER_LOW_THRESHOLD = 35;
// Running mode specifics
enum AutoTuneMode { kAutoTuneModeEpoch, kAutoTuneModeStep };

/// Get the out connector capacity of the operator
/// \param[in] op_id operator id
@@ -166,8 +183,14 @@ class AutoTune {
/// vector of pipeline time per epoch
std::vector<double> avg_pipeline_times_;

/// the current epoch index (starts from 1)
/// the current epoch and step indices (starts from 1)
int32_t cur_epoch_;
// step based auto-tuning specifics
int32_t cur_step_;
int32_t mode_;
int64_t step_gap_;
int32_t last_step_profiled_;
bool skip_bool_;
};
} // namespace dataset
} // namespace mindspore


+ 14
- 1
mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc View File

@@ -220,6 +220,8 @@ Status Tracing::Init() {
return Status::OK();
}

size_t Tracing::GetNumberSteps() { return ts_.size(); }

// Constructor
ProfilingManager::ProfilingManager()
: profiling_state_(ProfilingState::kProfilingStateUnBegun), tree_(nullptr), autotuning_(false), profiling_(false) {}
@@ -465,7 +467,7 @@ Status ProfilingManager::TimeToStepInterval(uint64_t start_ts, uint64_t end_ts,
return node->StepIntervalForTimeRange(start_ts, end_ts, start_step, end_step);
} else {
return {StatusCode::kMDUnexpectedError,
"Cannot find appropriate tracing node to convert step range to time interval."};
"Cannot find appropriate tracing node to convert time interval to step range."};
}
}

@@ -623,6 +625,17 @@ Status ProfilingManager::GetConnectorCapacityByTime(uint64_t start_ts, uint64_t
return GetConnectorCapacityByStep(start_step, end_step, result);
}

Status ProfilingManager::GetNumberOfProfiledSteps(int32_t *steps) {
std::shared_ptr<Tracing> node;
if (GetTracingNode(kDeviceQueueTracingName, &node).IsOk() ||
GetTracingNode(kDatasetIteratorTracingName, &node).IsOk()) {
*steps = node->GetNumberSteps();
return Status::OK();
} else {
return {StatusCode::kMDUnexpectedError, "Cannot find appropriate tracing node"};
}
}

void ProfilingManager::RecordEndOfEpoch(uint32_t step_num) {
if (profiling_state_ != ProfilingState::kProfilingStateRunning) {
return;


+ 5
- 0
mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h View File

@@ -113,6 +113,7 @@ class Tracing : public Profiling {
const uint64_t time_stamp);
Status TimeIntervalForStepRange(int32_t start_step, int32_t end_step, uint64_t *start_ts, uint64_t *end_ts);
Status StepIntervalForTimeRange(uint64_t start_ts, uint64_t end_ts, int32_t *start_step, int32_t *end_step);
size_t GetNumberSteps();

protected:
Tracing() = default;
@@ -437,6 +438,10 @@ class ProfilingManager {
/// \return number of epochs
int32_t GetNumOfProfiledEpochs() { return epoch_end_step_.size() - 1; }

// Get number of steps taken in pipeline
/// \return number of steps
Status GetNumberOfProfiledSteps(int32_t *size);

/// Determine if the Profiler is being used for autotuning.
/// \return boolean
bool IsAutotuning() { return autotuning_; }


+ 1
- 1
mindspore/ccsrc/minddata/dataset/include/dataset/constants.h View File

@@ -286,7 +286,7 @@ using connection_id_type = uint64_t;
using session_id_type = uint32_t;
using row_id_type = int64_t;

constexpr uint32_t kCfgAutoTuneInterval = 100; // ms
constexpr uint32_t kCfgAutoTuneInterval = 0; // default number of steps
} // namespace dataset
} // namespace mindspore



+ 10
- 8
mindspore/dataset/core/config.py View File

@@ -447,35 +447,37 @@ def get_enable_autotune():

def set_autotune_interval(interval):
"""
Set the default interval (in milliseconds) for data pipeline autotuning.
Set the interval (in steps) for data pipeline autotuning. Setting interval to 0
configures autotune to run after every epoch instead of after a certain number of steps.
Default value is set to 0, meaning epoch based autotuning.

Args:
interval (int): Interval (in milliseconds) to be used for data pipeline autotuning.
interval (int): Interval (in steps) to to serve as gap for consecutive AutoTune runs.

Raises:
ValueError: If interval is invalid when interval <= 0 or interval > MAX_INT_32.
ValueError: If interval is invalid when interval < 0 or interval > MAX_INT_32.

Examples:
>>> # Set a new global configuration value for the autotuning interval.
>>> ds.config.set_autotune_interval(100)
>>> ds.config.set_autotune_interval(30)
"""
if not isinstance(interval, int):
raise TypeError("interval must be of type int.")
if interval <= 0 or interval > INT32_MAX:
if interval < 0 or interval > INT32_MAX:
raise ValueError("Interval given is not within the required range.")
_config.set_autotune_interval(interval)


def get_autotune_interval():
"""
Get the global configuration of sampling interval of pipeline autotuning.
Get the global configuration of pipeline autotuning step interval.

Returns:
int, interval (in milliseconds) for data pipeline autotuning.
int, interval (in steps) for data pipeline autotuning.

Examples:
>>> # Get the global configuration of the autotuning interval.
>>> # If set_autotune_interval() is never called before, the default value(100) will be returned.
>>> # If set_autotune_interval() is never called before, the default value(30) will be returned.
>>> autotune_interval = ds.config.get_autotune_interval()
"""
return _config.get_autotune_interval()


+ 1
- 1
tests/ut/python/dataset/test_autotune.py View File

@@ -221,7 +221,7 @@ class TestAutotuneWithProfiler:
ds.config.set_enable_autotune(1)

autotune_interval = ds.config.get_autotune_interval()
assert autotune_interval == 100
assert autotune_interval == 0

ds.config.set_autotune_interval(200)
autotune_interval = ds.config.get_autotune_interval()


Loading…
Cancel
Save