|
|
|
@@ -29,6 +29,7 @@ AutoTune::AutoTune(TreeAdapter *tree_adap, ProfilingManager *profiling_mgr) |
|
|
|
tree_modifier_ = std::make_unique<TreeModifier>(tree_adapter_); |
|
|
|
max_workers_ = GlobalContext::config_manager()->num_cpu_threads(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::Main() { |
|
|
|
TaskManager::FindMe()->Post(); |
|
|
|
MS_LOG(INFO) << "AutoTune thread has started."; |
|
|
|
@@ -49,8 +50,21 @@ Status AutoTune::Main() { |
|
|
|
} |
|
|
|
RETURN_IF_NOT_OK(profiling_manager_->Stop()); |
|
|
|
MS_LOG(INFO) << "AutoTune thread is finished."; |
|
|
|
MS_LOG(INFO) << "Printing final tree configuration"; |
|
|
|
PrintTreeConfiguration(); |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
void AutoTune::PrintTreeConfiguration() { |
|
|
|
ExecutionTree *tree = tree_adapter_->tree_.get(); |
|
|
|
for (auto itr = tree->begin(); itr != tree->end(); itr++) { |
|
|
|
if (!itr->inlined() && itr->Name() != "DeviceQueueOp") { |
|
|
|
MS_LOG(INFO) << itr->NameWithID() << " num_workers: " << itr->NumWorkers() |
|
|
|
<< " connector_capacity: " << itr->ConnectorCapacity(); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::LaunchThread() { |
|
|
|
MS_LOG(INFO) << "Launching AutoTune thread"; |
|
|
|
RETURN_IF_NOT_OK(CollectOpsInfo()); |
|
|
|
@@ -58,6 +72,7 @@ Status AutoTune::LaunchThread() { |
|
|
|
RETURN_IF_NOT_OK(tree_adapter_->AllTasks()->CreateAsyncTask("AutoTune Thread", std::bind(&AutoTune::Main, this))); |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::CollectOpsInfo() { |
|
|
|
ExecutionTree *tree = tree_adapter_->tree_.get(); |
|
|
|
RETURN_UNEXPECTED_IF_NULL(tree); |
|
|
|
@@ -102,6 +117,7 @@ Status AutoTune::GetOpsCpuUtil(std::map<int32_t, double> *ops_cpu_util) { |
|
|
|
} |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util, |
|
|
|
std::map<int32_t, double> *in_ops_queue_util) { |
|
|
|
// loop over all itr keys in the ops_ and get output_queue usage |
|
|
|
@@ -139,16 +155,19 @@ Status AutoTune::GetOpsQueueUtil(std::map<int32_t, double> *out_ops_queue_util, |
|
|
|
|
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::GetOpsNumWorker(std::map<int32_t, int32_t> *ops_num_workers) { |
|
|
|
for (const auto &op : ops_) { |
|
|
|
(*ops_num_workers)[op.first] = op.second->NumWorkers(); |
|
|
|
} |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
bool AutoTune::IsSink() { |
|
|
|
std::shared_ptr<Tracing> node; |
|
|
|
return profiling_manager_->GetTracingNode(kDeviceQueueTracingName, &node).IsOk(); |
|
|
|
} |
|
|
|
|
|
|
|
template <typename T> |
|
|
|
double AutoTune::Mean(const std::vector<T> &items) { |
|
|
|
if (items.size() == 0) { |
|
|
|
@@ -166,6 +185,7 @@ Status AutoTune::RunIteration() { |
|
|
|
} |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::RecordPipelineTime() { |
|
|
|
std::vector<int32_t> times; |
|
|
|
RETURN_IF_NOT_OK(profiling_manager_->GetPipelineTimeByEpoch(cur_epoch_, ×)); |
|
|
|
@@ -175,6 +195,7 @@ Status AutoTune::RecordPipelineTime() { |
|
|
|
<< " ms. The avg pipeline time for all epochs is " << Mean(avg_pipeline_times_) << "ms"; |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::RunIterationEpoch() { |
|
|
|
RETURN_IF_NOT_OK(RecordPipelineTime()); |
|
|
|
bool isBottleneck = false; |
|
|
|
@@ -224,6 +245,7 @@ Status AutoTune::RequestNumWorkerChange(int32_t op_id, int32_t old_workers, int3 |
|
|
|
<< " New value: " << new_workers << " Old value: " << old_workers; |
|
|
|
return Status::OK(); |
|
|
|
} |
|
|
|
|
|
|
|
Status AutoTune::RequestConnectorCapacityChange(int32_t op_id, int32_t old_size, int32_t new_size) { |
|
|
|
new_size = std::min(new_size, MAX_QUEUE_SIZE); |
|
|
|
new_size = std::max(new_size, MIN_QUEUE_SIZE); |
|
|
|
|