Browse Source

!4950 Fixing Minddata profiling monitor

Merge pull request !4950 from anthonyaje/monitor_fix
tags/v1.0.0
mindspore-ci-bot Gitee 5 years ago
parent
commit
5b4642d10a
9 changed files with 104 additions and 50 deletions
  1. +3
    -6
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc
  2. +7
    -10
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h
  3. +1
    -2
      mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc
  4. +0
    -2
      mindspore/ccsrc/minddata/dataset/engine/execution_tree.h
  5. +26
    -6
      mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc
  6. +31
    -7
      mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc
  7. +13
    -0
      mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc
  8. +5
    -2
      mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h
  9. +18
    -15
      tests/ut/python/dataset/test_profiling.py

+ 3
- 6
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc View File

@@ -32,8 +32,8 @@
namespace mindspore {
namespace dataset {
DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
int32_t op_connector_size, bool send_epoch_end)
: PipelineOp(op_connector_size),
bool send_epoch_end)
: PipelineOp(1),
channel_name_(channel_name),
device_type_(device_type),
device_id_(device_id),
@@ -55,10 +55,7 @@ DeviceQueueOp::Builder::Builder(int32_t prefetch_size)
: builder_prefetch_size_(prefetch_size),
builder_device_id_(0),
builder_device_type_(DeviceType::CPU),
builder_channel_name_("") {
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
builder_op_connector_size_ = cfg->op_connector_size();
}
builder_channel_name_("") {}

Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
state_ = OpState::kDeOpIdle;


+ 7
- 10
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h View File

@@ -65,11 +65,6 @@ class DeviceQueueOp : public PipelineOp {
return *this;
}

Builder &SetOpConnectorSize(int32_t op_connector_size) {
builder_op_connector_size_ = op_connector_size;
return *this;
}

Builder &SetDeviceType(const std::string &device_type) {
if (device_type == "Ascend") {
builder_device_type_ = DeviceType::Ascend;
@@ -96,9 +91,8 @@ class DeviceQueueOp : public PipelineOp {
// to call this Build() method. It will instantiate the DeviceQueueOp
// and return it to caller as a shared pointer.
Status Build(std::shared_ptr<DeviceQueueOp> *ptr) {
*ptr =
std::make_shared<DeviceQueueOp>(builder_channel_name_, builder_device_type_, builder_device_id_,
builder_prefetch_size_, builder_op_connector_size_, builder_send_epoch_end_);
*ptr = std::make_shared<DeviceQueueOp>(builder_channel_name_, builder_device_type_, builder_device_id_,
builder_prefetch_size_, builder_send_epoch_end_);
return Status::OK();
}

@@ -107,19 +101,22 @@ class DeviceQueueOp : public PipelineOp {
int32_t builder_device_id_;
DeviceType builder_device_type_;
std::string builder_channel_name_;
int32_t builder_op_connector_size_;
bool builder_send_epoch_end_;
};

// Name: constructor
// Description
DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
int32_t op_connector_size, bool send_epoch_end);
bool send_epoch_end);

// Name: destructor
// Description
~DeviceQueueOp();

/// \brief Getter function
/// \return connector size of current op
int32_t ConnectorSize() const { return ChildOpConnectorSize(); }

Status EoeReceived(int32_t worker_id) override;

const int32_t get_prefetch_size() { return prefetch_size_; }


+ 1
- 2
mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc View File

@@ -37,7 +37,6 @@ ExecutionTree::ExecutionTree() : id_count_(0) {
tg_ = std::make_unique<TaskGroup>();
tree_state_ = kDeTStateInit;
prepare_flags_ = kDePrepNone;
perf_monitor_ = std::make_unique<Monitor>(this);
profiling_manager_ = std::make_unique<ProfilingManager>(this);
optimize_ = common::GetEnv("OPTIMIZE") == "true" ? true : false;
}
@@ -139,7 +138,7 @@ Status ExecutionTree::Launch() {
// Setup profiling manager
RETURN_IF_NOT_OK(profiling_manager_->Initialize());
// Launch Monitor Thread
RETURN_IF_NOT_OK(tg_->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
RETURN_IF_NOT_OK(profiling_manager_->LaunchMonitor());
}

MS_LOG(DEBUG) << "Printing the tree before launch tasks:\n" << ss.str();


+ 0
- 2
mindspore/ccsrc/minddata/dataset/engine/execution_tree.h View File

@@ -30,7 +30,6 @@ namespace dataset {
// Forward declares
class TaskGroup;
class DatasetOp;
class Monitor;

class ExecutionTree {
public:
@@ -269,7 +268,6 @@ class ExecutionTree {
uint32_t prepare_flags_; // Flags used during tree prepare
TreeState tree_state_; // Tracking the current tree state
int32_t num_epochs_; // Total number of epochs to run for this tree
std::unique_ptr<Monitor> perf_monitor_; // Performance Monitor
std::unique_ptr<ProfilingManager> profiling_manager_; // Profiling manager
bool optimize_; // Flag to enable optional optimizations
};


+ 26
- 6
mindspore/ccsrc/minddata/dataset/engine/perf/connector_size.cc View File

@@ -62,24 +62,44 @@ json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t
}

// Save profiling data to file
// If the file is already exist (created by other sampling node), simply add the data to metrics field.
Status ConnectorSize::SaveToFile() {
std::ofstream os(file_path_, std::ios::trunc);
uint32_t idx = 0;
Path path = Path(file_path_);
json output;
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
output["sampling_interval"] = cfg->monitor_sampling_interval();
if (path.Exists()) {
MS_LOG(DEBUG) << file_path_ << " exists";
std::ifstream file(file_path_);
file >> output;
} else {
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}

uint32_t idx = 0;
// Traverse the ExecutionTree for JSON node generation
for (auto &node : *tree_) {
std::vector<int32_t> cur_queue_size;
std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size),
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
json json_node = ParseOpInfo(node, cur_queue_size);
output["op_info"].push_back(json_node);
if (!path.Exists()) {
json json_node = ParseOpInfo(node, cur_queue_size);
output["op_info"].push_back(json_node);
} else {
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
auto &ops_data = output["op_info"];
ops_data[idx]["metrics"]["output_queue"]["size"] = cur_queue_size;
ops_data[idx]["metrics"]["output_queue"]["length"] = node.ConnectorCapacity();
}
}

idx++;
}

// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
os << output;
return Status::OK();
}

Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
return Status::OK();


+ 31
- 7
mindspore/ccsrc/minddata/dataset/engine/perf/connector_throughput.cc View File

@@ -20,6 +20,7 @@
#include <memory>
#include <string>
#include <nlohmann/json.hpp>
#include "minddata/dataset/core/config_manager.h"
#include "minddata/dataset/engine/perf/connector_throughput.h"
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/path.h"
@@ -75,8 +76,11 @@ json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<d
json_node["op_type"] = node.Name();
json_node["num_workers"] = node.num_workers();
json metrics;
metrics["output_queue"] = {{"throughput", thr}};

// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
// So we should not output its connector throughput.
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
metrics["output_queue"] = {{"throughput", thr}};
}
json_node["metrics"] = metrics;
if (!children_id.empty()) {
json_node["children"] = children_id;
@@ -86,10 +90,18 @@ json ConnectorThroughput::ParseOpInfo(const DatasetOp &node, const std::vector<d
}

// Save profiling data to file
// If the file is already exist (created by other sampling node), simply add the data to metrics field.
Status ConnectorThroughput::SaveToFile() {
std::ofstream os(file_path_);
Path path = Path(file_path_);
json output;
output["sampling_interval"] = 10;
if (path.Exists()) {
MS_LOG(DEBUG) << file_path_ << " exists";
std::ifstream file(file_path_);
file >> output;
} else {
output["sampling_interval"] = GlobalContext::config_manager()->monitor_sampling_interval();
}

// Traverse the ExecutionTree for JSON node generation
int col = 0;
for (auto &node : *tree_) {
@@ -97,15 +109,27 @@ Status ConnectorThroughput::SaveToFile() {
for (auto i = 0; i < throughput_.size(); i++) {
throughput.push_back(throughput_[col][i]);
}
json json_node = ParseOpInfo(node, throughput);
output["op_info"].push_back(json_node);

if (!path.Exists()) {
json json_node = ParseOpInfo(node, throughput);
output["op_info"].push_back(json_node);
} else {
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
auto &ops_data = output["op_info"];
ops_data[col]["metrics"]["output_queue"]["throughput"] = throughput;
}
}
col++;
}

// Discard the content of the file when opening.
std::ofstream os(file_path_, std::ios::trunc);
os << output;
return Status::OK();
}

Status ConnectorThroughput::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + Name() + "_" + device_id + ".json")).toString();
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
return Status::OK();
}
} // namespace dataset


+ 13
- 0
mindspore/ccsrc/minddata/dataset/engine/perf/profiling.cc View File

@@ -29,6 +29,11 @@
namespace mindspore {
namespace dataset {

// Constructor
ProfilingManager::ProfilingManager(ExecutionTree *tree) : tree_(tree) {
perf_monitor_ = std::make_unique<Monitor>(tree_);
}

bool ProfilingManager::IsProfilingEnable() const {
auto profiling = common::GetEnv("PROFILING_MODE");
if (profiling.empty() || profiling != "true") {
@@ -68,6 +73,7 @@ Status ProfilingManager::Initialize() {
// device_queue node is used for graph mode
std::shared_ptr<Tracing> device_queue_tracing = std::make_shared<DeviceQueueTracing>();
RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing));

// dataset_iterator node is used for graph mode
std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>();
RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing));
@@ -77,6 +83,13 @@ Status ProfilingManager::Initialize() {

std::shared_ptr<Sampling> connector_thr_sampling = std::make_shared<ConnectorThroughput>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(connector_thr_sampling));

return Status::OK();
}

// Launch monitoring thread.
Status ProfilingManager::LaunchMonitor() {
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
return Status::OK();
}



+ 5
- 2
mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h View File

@@ -78,7 +78,7 @@ class Tracing : public Profiling {
// 4) Manage profiling data serialization process
class ProfilingManager {
public:
explicit ProfilingManager(ExecutionTree *tree) : tree_(tree) {}
explicit ProfilingManager(ExecutionTree *tree);

~ProfilingManager() = default;

@@ -105,7 +105,11 @@ class ProfilingManager {

const std::unordered_map<std::string, std::shared_ptr<Sampling>> &GetSamplingNodes() { return sampling_nodes_; }

// Launch monitoring thread.
Status LaunchMonitor();

private:
std::unique_ptr<Monitor> perf_monitor_;
std::unordered_map<std::string, std::shared_ptr<Tracing>> tracing_nodes_;

std::unordered_map<std::string, std::shared_ptr<Sampling>> sampling_nodes_;
@@ -138,7 +142,6 @@ class ProfilingTime {
public:
static int64_t GetCurMilliSecond();
};

} // namespace dataset
} // namespace mindspore
#endif

+ 18
- 15
tests/ut/python/dataset/test_profiling.py View File

@@ -15,6 +15,7 @@
"""
Testing profiling support in DE
"""
import json
import os
import numpy as np
import mindspore.dataset as ds
@@ -23,8 +24,7 @@ FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"

PIPELINE_FILE_SIZE = "./pipeline_profiling_1.json"
PIPELINE_FILE_THR = "./pipeline_profiling_Connector_Throughput_Sampling_1.json"
PIPELINE_FILE = "./pipeline_profiling_1.json"
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"


@@ -44,10 +44,8 @@ def test_profiling_simple_pipeline():
for _ in data1:
pass

assert os.path.exists(PIPELINE_FILE_SIZE) is True
os.remove(PIPELINE_FILE_SIZE)
assert os.path.exists(PIPELINE_FILE_THR) is True
os.remove(PIPELINE_FILE_THR)
assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)
del os.environ['PROFILING_MODE']
@@ -57,7 +55,7 @@ def test_profiling_simple_pipeline():
def test_profiling_complex_pipeline():
"""
Generator -> Map ->
-> Zip -> Batch
-> Zip
TFReader -> Shuffle ->
"""
os.environ['PROFILING_MODE'] = 'true'
@@ -77,10 +75,17 @@ def test_profiling_complex_pipeline():
for _ in data3:
pass

assert os.path.exists(PIPELINE_FILE_SIZE) is True
os.remove(PIPELINE_FILE_SIZE)
assert os.path.exists(PIPELINE_FILE_THR) is True
os.remove(PIPELINE_FILE_THR)
with open(PIPELINE_FILE) as f:
data = json.load(f)
op_info = data["op_info"]
assert len(op_info) == 5
for i in range(5):
assert "size" in op_info[i]["metrics"]["output_queue"]
assert "length" in op_info[i]["metrics"]["output_queue"]
assert "throughput" in op_info[i]["metrics"]["output_queue"]

assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)
del os.environ['PROFILING_MODE']
@@ -108,10 +113,8 @@ def test_profiling_sampling_iterval():
for _ in data1:
pass

assert os.path.exists(PIPELINE_FILE_SIZE) is True
os.remove(PIPELINE_FILE_SIZE)
assert os.path.exists(PIPELINE_FILE_THR) is True
os.remove(PIPELINE_FILE_THR)
assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)



Loading…
Cancel
Save