Browse Source

use num_worker as queue_capacity minimum size

tags/v1.6.0
hesham 4 years ago
parent
commit
46613e6425
2 changed files with 42 additions and 7 deletions
  1. +41
    -6
      mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc
  2. +1
    -1
      mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h

+ 41
- 6
mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.cc View File

@@ -20,6 +20,10 @@
#include <functional>
#include <memory>
#include <vector>
#ifndef ENABLE_ANDROID
#include "minddata/dataset/engine/datasetops/source/nonmappable_leaf_op.h"
#endif

#include "minddata/dataset/util/task_manager.h"

namespace mindspore {
@@ -40,6 +44,7 @@ Status AutoTune::Main() {
rc = RunIteration();
if (rc.IsError()) {
MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
RETURN_IF_NOT_OK(profiling_manager_->Stop());
break;
}
rc = cv_.WaitFor(&_lock, GlobalContext::config_manager()->autotune_interval());
@@ -71,7 +76,12 @@ void AutoTune::PrintTreeConfiguration() {

Status AutoTune::LaunchThread() {
MS_LOG(INFO) << "Launching Dataset AutoTune thread";
RETURN_IF_NOT_OK(CollectOpsInfo());
Status rc = CollectOpsInfo();
if (rc.IsError()) {
MS_LOG(ERROR) << "Dataset AutoTune failed and will exit with the following error: " << rc;
RETURN_IF_NOT_OK(profiling_manager_->Stop());
return Status::OK();
}
RETURN_IF_NOT_OK(cv_.Register(tree_adapter_->AllTasks()->GetIntrpService()));
RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this)));
return Status::OK();
@@ -86,6 +96,11 @@ Status AutoTune::CollectOpsInfo() {
if (itr->NumWorkers() > 0) {
parallel_ops_ids_.push_back(itr->id());
}
CHECK_FAIL_RETURN_UNEXPECTED(itr->Name() != "GeneratorOp",
"GeneratorDataset is in the pipeline, AutoTune is not supported.");
CHECK_FAIL_RETURN_UNEXPECTED(!itr->IsPython(), "Pyfunc is in the pipeline, AutoTune is not supported.");
CHECK_FAIL_RETURN_UNEXPECTED(itr->Name() != "GeneratorOp",
"GeneratorDataset is in the pipeline, AutoTune is not supported.");
}
// sort parallel ops in reverse order of IDs (i.e., bottommost op is first)
std::sort(parallel_ops_ids_.begin(), parallel_ops_ids_.end(), std::greater<>());
@@ -248,12 +263,14 @@ Status AutoTune::IsDSaBottleneck(bool *isBottleneck) {
return Status::OK();
}

Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t new_workers) {
Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t *num_workers_requested) {
int32_t new_workers = old_workers + INCREMENT_WORKER;
new_workers = std::min(new_workers, max_workers_);
new_workers = std::max(new_workers, MIN_NUM_WORKERS);
RETURN_IF_NOT_OK(tree_modifier_->AddChangeRequest(op_id, std::make_shared<ChangeNumWorkersRequest>(new_workers)));
MS_LOG(WARNING) << "Added request to change \"num_parallel_workers\" of Operator: " << ops_[op_id]->NameWithID()
<< "From old value: [" << old_workers << "] to new value: [" << new_workers << "].";
*num_workers_requested = new_workers;
return Status::OK();
}

@@ -280,6 +297,16 @@ Status AutoTune::Analyse() {

// check parallel ops in loop
for (const auto &op_id : parallel_ops_ids_) {
// MindRecordOp and NonMappableDataset is not supported in AutoTune
if (ops_[op_id]->Name() == "MindRecordOp") {
continue;
}
#ifndef ENABLE_ANDROID
if (std::dynamic_pointer_cast<NonMappableLeafOp>(ops_[op_id]) != nullptr) {
continue;
}
#endif

// op specifics
double output_queue_util = out_ops_queue_util[op_id];
double input_queue_util = in_ops_queue_util[op_id];
@@ -289,8 +316,11 @@ Status AutoTune::Analyse() {
CHECK_FAIL_RETURN_UNEXPECTED(num_workers != 0, "ParallelOp with num_workers=0");
// derived metrics
double queue_diff = input_queue_util - output_queue_util;
int64_t queue_capacity = 0;
int64_t queue_capacity;
RETURN_IF_NOT_OK(GetOpConnectorCapacity(op_id, &queue_capacity));
int64_t new_queue_capacity = queue_capacity;

int32_t requested_workers = 0;

MS_LOG(DEBUG) << "Op (" << ops_[op_id]->NameWithID() << ") CPU=" << cpu_util / num_workers
<< ", in=" << input_queue_util << "out=" << output_queue_util;
@@ -300,18 +330,23 @@ Status AutoTune::Analyse() {
<< ") is slow, input connector utilization=" << input_queue_util
<< ", output connector utilization=" << output_queue_util << ", diff= " << queue_diff << " > "
<< INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD << " threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, &requested_workers));
} else if ((cpu_util / num_workers) > MAP_OP_WORKER_HIGH_THRESHOLD) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting high average worker cpu utilization "
<< (cpu_util / num_workers) << "% > " << MAP_OP_WORKER_HIGH_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, num_workers + INCREMENT_WORKER));
RETURN_IF_NOT_OK(RequestNumWorkerChange(op_id, num_workers, &requested_workers));
}
if ((cpu_util / num_workers) < MAP_OP_WORKER_LOW_THRESHOLD &&
((input_queue_util < INPUT_QUEUE_LOW) || (-1 * queue_diff > INPUT_OUTPUT_QUEUE_DIFF_THRESHOLD))) {
MS_LOG(WARNING) << "Op (" << ops_[op_id]->NameWithID() << ") getting low average worker cpu utilization "
<< (cpu_util / num_workers) << "% < " << MAP_OP_WORKER_LOW_THRESHOLD << "% threshold.";
RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, queue_capacity + INCREMENT_QUEUE_SIZE));
new_queue_capacity = queue_capacity + INCREMENT_QUEUE_SIZE;
}
if (requested_workers == 0) {
requested_workers = num_workers;
}
new_queue_capacity = std::max(new_queue_capacity, static_cast<int64_t>(requested_workers));
RETURN_IF_NOT_OK(RequestConnectorCapacityChange(op_id, queue_capacity, new_queue_capacity));
}
return Status::OK();
}


+ 1
- 1
mindspore/ccsrc/minddata/dataset/engine/perf/auto_tune.h View File

@@ -125,7 +125,7 @@ class AutoTune {
/// \param old_workers Old number of workers for logging purposes
/// \param new_workers new number of worker
/// \return Status code
Status RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t new_workers);
Status RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int32_t *num_workers_requested);

/// Send a ChangeRequest to the operator to update the connector capacity
/// \param op_id operator ID


Loading…
Cancel
Save