Browse Source

!1978 MindData Profiler Infrastructure

Merge pull request !1978 from JunhanHu/perf_monitor
tags/v0.5.0-beta
mindspore-ci-bot Gitee 5 years ago
parent
commit
4aed2bf9ef
56 changed files with 1166 additions and 261 deletions
  1. +2
    -0
      mindspore/ccsrc/dataset/CMakeLists.txt
  2. +2
    -0
      mindspore/ccsrc/dataset/api/python_bindings.cc
  3. +2
    -0
      mindspore/ccsrc/dataset/core/config_manager.cc
  4. +9
    -0
      mindspore/ccsrc/dataset/core/config_manager.h
  5. +1
    -0
      mindspore/ccsrc/dataset/core/constants.h
  6. +3
    -2
      mindspore/ccsrc/dataset/engine/CMakeLists.txt
  7. +20
    -2
      mindspore/ccsrc/dataset/engine/dataset_iterator.cc
  8. +5
    -0
      mindspore/ccsrc/dataset/engine/dataset_iterator.h
  9. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/batch_op.h
  10. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/concat_op.h
  11. +1
    -0
      mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc
  12. +24
    -4
      mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h
  13. +17
    -33
      mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc
  14. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h
  15. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/filter_op.h
  16. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/map_op.h
  17. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/project_op.h
  18. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/rename_op.h
  19. +3
    -3
      mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h
  20. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h
  21. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/skip_op.h
  22. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h
  23. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h
  24. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h
  25. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h
  26. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h
  27. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h
  28. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h
  29. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h
  30. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h
  31. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h
  32. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h
  33. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/take_op.h
  34. +4
    -0
      mindspore/ccsrc/dataset/engine/datasetops/zip_op.h
  35. +15
    -3
      mindspore/ccsrc/dataset/engine/execution_tree.cc
  36. +21
    -6
      mindspore/ccsrc/dataset/engine/execution_tree.h
  37. +6
    -0
      mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt
  38. +89
    -0
      mindspore/ccsrc/dataset/engine/perf/connector_size.cc
  39. +68
    -0
      mindspore/ccsrc/dataset/engine/perf/connector_size.h
  40. +64
    -0
      mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.cc
  41. +51
    -0
      mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h
  42. +64
    -0
      mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.cc
  43. +52
    -0
      mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.h
  44. +50
    -0
      mindspore/ccsrc/dataset/engine/perf/monitor.cc
  45. +52
    -0
      mindspore/ccsrc/dataset/engine/perf/monitor.h
  46. +153
    -0
      mindspore/ccsrc/dataset/engine/perf/profiling.cc
  47. +140
    -0
      mindspore/ccsrc/dataset/engine/perf/profiling.h
  48. +10
    -1
      mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.cc
  49. +1
    -1
      mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.h
  50. +1
    -2
      mindspore/ccsrc/dataset/util/CMakeLists.txt
  51. +0
    -112
      mindspore/ccsrc/dataset/util/profiling.cc
  52. +0
    -92
      mindspore/ccsrc/dataset/util/profiling.h
  53. +3
    -0
      mindspore/ccsrc/dataset/util/status.cc
  54. +1
    -0
      mindspore/ccsrc/dataset/util/status.h
  55. +29
    -0
      mindspore/dataset/core/configuration.py
  56. +119
    -0
      tests/ut/python/dataset/test_profiling.py

+ 2
- 0
mindspore/ccsrc/dataset/CMakeLists.txt View File

@@ -62,6 +62,7 @@ add_dependencies(engine-datasetops-source core)
add_dependencies(engine-datasetops-source-sampler core)
add_dependencies(engine-datasetops core)
add_dependencies(engine-opt core)
add_dependencies(engine-perf core)
add_dependencies(engine-gnn core)
add_dependencies(engine core)
add_dependencies(text core)
@@ -81,6 +82,7 @@ set(submodules
$<TARGET_OBJECTS:engine-datasetops-source>
$<TARGET_OBJECTS:engine-datasetops-source-sampler>
$<TARGET_OBJECTS:engine-gnn>
$<TARGET_OBJECTS:engine-perf>
$<TARGET_OBJECTS:engine-datasetops>
$<TARGET_OBJECTS:engine-opt>
$<TARGET_OBJECTS:engine>


+ 2
- 0
mindspore/ccsrc/dataset/api/python_bindings.cc View File

@@ -239,11 +239,13 @@ void bindTensor(py::module *m) {
.def("set_worker_connector_size", &ConfigManager::set_worker_connector_size)
.def("set_op_connector_size", &ConfigManager::set_op_connector_size)
.def("set_seed", &ConfigManager::set_seed)
.def("set_monitor_sampling_interval", &ConfigManager::set_monitor_sampling_interval)
.def("get_rows_per_buffer", &ConfigManager::rows_per_buffer)
.def("get_num_parallel_workers", &ConfigManager::num_parallel_workers)
.def("get_worker_connector_size", &ConfigManager::worker_connector_size)
.def("get_op_connector_size", &ConfigManager::op_connector_size)
.def("get_seed", &ConfigManager::seed)
.def("get_monitor_sampling_interval", &ConfigManager::monitor_sampling_interval)
.def("load", [](ConfigManager &c, std::string s) { (void)c.LoadFile(s); });

(void)py::class_<Tensor, std::shared_ptr<Tensor>>(*m, "Tensor", py::buffer_protocol())


+ 2
- 0
mindspore/ccsrc/dataset/core/config_manager.cc View File

@@ -88,5 +88,7 @@ void ConfigManager::set_op_connector_size(int32_t connector_size) { op_connector
uint32_t ConfigManager::seed() const { return seed_; }

void ConfigManager::set_seed(uint32_t seed) { seed_ = seed; }

void ConfigManager::set_monitor_sampling_interval(uint32_t interval) { monitor_sampling_interval_ = interval; }
} // namespace dataset
} // namespace mindspore

+ 9
- 0
mindspore/ccsrc/dataset/core/config_manager.h View File

@@ -111,12 +111,21 @@ class ConfigManager {
// @param seed - The default seed to use
void set_seed(uint32_t seed);

// setter function
// @param interval - The setting to apply to the config
void set_monitor_sampling_interval(uint32_t interval);

// getter function
// @return The iterval of monitor sampling
int32_t monitor_sampling_interval() const { return monitor_sampling_interval_; }

private:
int32_t rows_per_buffer_{kCfgRowsPerBuffer};
int32_t num_parallel_workers_{kCfgParallelWorkers};
int32_t worker_connector_size_{kCfgWorkerConnectorSize};
int32_t op_connector_size_{kCfgOpConnectorSize};
uint32_t seed_{kCfgDefaultSeed};
uint32_t monitor_sampling_interval_{kCfgMonitorSamplingInterval};

// Private helper function that taks a nlohmann json format and populates the settings
// @param j - The json nlohmann json info


+ 1
- 0
mindspore/ccsrc/dataset/core/constants.h View File

@@ -47,6 +47,7 @@ constexpr uint32_t kCfgParallelWorkers = 4;
constexpr uint32_t kCfgWorkerConnectorSize = 16;
constexpr uint32_t kCfgOpConnectorSize = 16;
constexpr uint32_t kCfgDefaultSeed = std::mt19937::default_seed;
constexpr uint32_t kCfgMonitorSamplingInterval = 10;

// Invalid OpenCV type should not be from 0 to 7 (opencv4/opencv2/core/hal/interface.h)
constexpr uint8_t kCVInvalidType = 255;


+ 3
- 2
mindspore/ccsrc/dataset/engine/CMakeLists.txt View File

@@ -1,6 +1,7 @@
add_subdirectory(datasetops)
add_subdirectory(opt)
add_subdirectory(gnn)
add_subdirectory(perf)
if (ENABLE_TDTQUE)
add_subdirectory(tdt)
endif ()
@@ -16,7 +17,7 @@ add_library(engine OBJECT
target_include_directories(engine PRIVATE ${pybind11_INCLUDE_DIRS})

if (ENABLE_TDTQUE)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-tdt engine-opt engine-gnn engine-perf)
else()
add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn)
add_dependencies(engine engine-datasetops engine-datasetops-source engine-opt engine-gnn engine-perf)
endif ()

+ 20
- 2
mindspore/ccsrc/dataset/engine/dataset_iterator.cc View File

@@ -83,7 +83,14 @@ Status IteratorBase::FetchNextTensorRow(TensorRow *out_row) {
}

// Constructor of the DatasetIterator
DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree) : IteratorBase(), root_(exe_tree->root()) {}
DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)
: IteratorBase(), root_(exe_tree->root()), tracing_(nullptr), cur_batch_num_(0), cur_connector_size_(0) {
std::shared_ptr<Tracing> node;
Status s = exe_tree->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
if (s.IsOk()) {
tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
}
}

DatasetIterator::~DatasetIterator() = default;

@@ -101,6 +108,10 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {

// Check if we need to get a new DataBuffer to iterate.
if (curr_buffer_ == nullptr || curr_buffer_->NumRows() == 0) {
if (tracing_ != nullptr) {
cur_connector_size_ = root_->ConnectorSize();
cur_connector_capacity_ = root_->ConnectorCapacity();
}
RETURN_IF_NOT_OK(root_->GetNextBuffer(&curr_buffer_));

// Since GetNextBuffer was used rather than GetNextInput(), it means we need to manually
@@ -121,6 +132,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
}
eof_handled_ = true;
curr_buffer_.reset(); // explicitly free the eof buffer
// Set tree to Finished state
root_->Tree()->SetFinished();

return Status::OK();
}
@@ -131,13 +144,18 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
// flow of an eof up the pipeline by itself.
eof_handled_ = true;
curr_buffer_.reset(); // explicitly free the eof buffer
// Set tree to Finished state
root_->Tree()->SetFinished();
return Status::OK();
}
}

// If we got this far, now it's time to pop that next row for return to caller
RETURN_IF_NOT_OK(curr_buffer_->PopRow(out_row));

if (tracing_ != nullptr) {
cur_batch_num_++;
tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_);
}
return Status::OK();
}



+ 5
- 0
mindspore/ccsrc/dataset/engine/dataset_iterator.h View File

@@ -24,6 +24,7 @@
#include "dataset/core/tensor.h"
#include "dataset/engine/datasetops/dataset_op.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/engine/perf/dataset_iterator_tracing.h"

namespace mindspore {
namespace dataset {
@@ -109,6 +110,10 @@ class DatasetIterator : public IteratorBase {
private:
std::shared_ptr<DatasetOp> root_; // saves the root of the executionTree
TensorRow device_queue_row_;
std::shared_ptr<DatasetIteratorTracing> tracing_; // trace profiling data
int32_t cur_batch_num_; // current batch number,used for profiling
int32_t cur_connector_size_; // current connector size of root op,used for profiling
int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling
};

// The ChildIterator derived class is for fetching rows from intermediate nodes of execution tree.


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/batch_op.h View File

@@ -189,6 +189,10 @@ class BatchOp : public ParallelOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "BatchOp"; }

private:
// Worker thread for doing the memcpy of batch
// @param int32_t param workerId


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/concat_op.h View File

@@ -81,6 +81,10 @@ class ConcatOp : public PipelineOp {
// before providing their own implementations.
Status PrepareNodePostAction() override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ConcatOp"; }

private:
Status Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf);



+ 1
- 0
mindspore/ccsrc/dataset/engine/datasetops/dataset_op.cc View File

@@ -38,6 +38,7 @@ DatasetOp::DatasetOp(int32_t op_connector_size)
tree_(nullptr),
state_(OpState::kDeOpIdle),
op_ctrl_flags_(kDeOpNone),
out_connector_(nullptr),
first_fetch_(true) {
// The operator starts out with an invalid operator id. The only way to
// get it out of invalid state is to assign the operator to an execution tree.


+ 24
- 4
mindspore/ccsrc/dataset/engine/datasetops/dataset_op.h View File

@@ -51,7 +51,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
};

// Flags that control operator runtime behaviours
enum OpState { kDeOpRunning = 0, kDeOpIdle = 1 };
enum OpState { kDeOpRunning = 0, kDeOpIdle = 1, kDeOpTerminated };

// Constructor
// @param op_connector_size - The size for the output connector of this operator.
@@ -213,11 +213,23 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {

// Getter function
// @return connector size of current op
virtual int32_t ConnectorSize() const { return out_connector_->size(); }
int32_t ConnectorSize() const {
if (!inlined()) {
return out_connector_->size();
}
// Return -1 for inlined op
return -1;
}

// Getter function
// @return connector size of current op
virtual int32_t ConnectorCapacity() const { return out_connector_->capacity(); }
int32_t ConnectorCapacity() const {
if (!inlined()) {
return out_connector_->size();
}
// Return -1 for inlined op
return -1;
}

// Getter function
// @return connector size of child op
@@ -228,7 +240,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
int32_t ChildOpConnectorCapacity(int32_t child_index = 0) const { return child_[child_index]->ConnectorCapacity(); }

// Children Getter
// @return Vector or Children
// @return Vector of Children
std::vector<std::shared_ptr<DatasetOp>> Children() const { return child_; }

// Base method for NodePass visit.
@@ -237,6 +249,14 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
// @return Statue of the node visit
virtual Status Accept(NodePass *p, bool *modified);

// Op name getter
// @return Name of the current Op
virtual std::string Name() const { return "DatasetOp"; }

// Execution Tree getter
// @return Pointer to the ExecutionTree the current op belongs to, no ownership
ExecutionTree *Tree() { return tree_; }

protected:
// Adds a parent operator to this operator
// @notes External callers do not have access to this function.


+ 17
- 33
mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.cc View File

@@ -13,25 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/engine/datasetops/device_queue_op.h"
#include <iomanip>
#include <iostream>
#include <memory>

#include "dataset/core/config_manager.h"
#include "dataset/core/global_context.h"
#include "dataset/engine/datasetops/device_queue_op.h"
#include "dataset/engine/data_buffer.h"
#include "dataset/engine/dataset_iterator.h"
#include "dataset/engine/opt/pass.h"
#include "dataset/engine/perf/profiling.h"
#include "dataset/engine/perf/device_queue_tracing.h"
#include "dataset/util/status.h"
#include "dataset/util/task_manager.h"
#include "dataset/engine/opt/pass.h"
#include "dataset/util/profiling.h"

namespace mindspore {
namespace dataset {
#define DEVICE_QUEUE_PROFILING_DATA(type, subtype, batch_num, value) \
std::to_string(type) + " " + std::to_string(subtype) + " " + std::to_string(batch_num) + " " + std::to_string(value)

DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
int32_t op_connector_size, int64_t num_batch)
: PipelineOp(op_connector_size),
@@ -101,22 +99,16 @@ Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Device queue, sending data to Ascend.";
int64_t total_batch = 0;
bool is_break_loop = false;
double batch_start_time, tdt_start_time, end_time;
double batch_start_time, end_time;
int32_t batch_cost, tdt_cost;
int32_t connector_size = 0;
int32_t connector_capacity;
std::shared_ptr<Profiling> profiling_node;
bool isProfilingEnable = ProfilingManager::GetInstance().IsProfilingEnable();
std::shared_ptr<DeviceQueueTracing> profiling_node;
bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable();
if (isProfilingEnable) {
std::string file_name = "critical_point_profiling";
// Here can determine performance bottleneck is in pipeline or in tdt.
// Context format of this file "type subtype batchnum value"
// type:0: time, 1: queue depth
// subtype:0: pipeline time, 1: push tdt time, 2: all time
// batchnum: batch number
// value: value of time(ms) or queue depth
profiling_node = std::make_shared<Profiling>(file_name, device_id_);
RETURN_IF_NOT_OK(ProfilingManager::GetInstance().RegisterProfilingNode(&profiling_node));
std::shared_ptr<Tracing> node;
RETURN_IF_NOT_OK(tree_->GetProfilingManager()->GetTracingNode(kDeviceQueueTracingName, &node));
profiling_node = std::dynamic_pointer_cast<DeviceQueueTracing>(node);
batch_start_time = ProfilingTime::GetCurMilliSecond();
connector_capacity = ChildOpConnectorCapacity();
}
@@ -129,29 +121,23 @@ Status DeviceQueueOp::SendDataToAscend() {
TensorRow currRow;
for (int row_id = 0; row_id < current_buffer->NumRows() && !is_break_loop; row_id++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow));
if (isProfilingEnable) {
tdt_start_time = ProfilingTime::GetCurMilliSecond();
}
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_);
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost);
if (status == TdtStatus::FAILED) {
return Status(StatusCode::kTDTPushFailure, "TDT Push Failed");
}

if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
tdt_cost = (int32_t)(end_time - tdt_start_time);
// record push tdt time
profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, TDT_PUSH_TIME, total_batch + 1, tdt_cost));
profiling_node->Record(TIME, TDT_PUSH_TIME, total_batch + 1, tdt_cost);
batch_cost = (int32_t)(end_time - batch_start_time);
// record batch time
profiling_node->Record(DEVICE_QUEUE_PROFILING_DATA(TIME, BATCH_TIME, total_batch + 1, batch_cost));
profiling_node->Record(TIME, BATCH_TIME, total_batch + 1, batch_cost);
// record pipeline time
profiling_node->Record(
DEVICE_QUEUE_PROFILING_DATA(TIME, PIPELINE_TIME, total_batch + 1, batch_cost - tdt_cost));
profiling_node->Record(TIME, PIPELINE_TIME, total_batch + 1, batch_cost - tdt_cost);
batch_start_time = end_time;
// record connector depth
profiling_node->Record(
DEVICE_QUEUE_PROFILING_DATA(CONNECTOR_DEPTH, connector_capacity, total_batch + 1, connector_size));
profiling_node->Record(CONNECTOR_DEPTH, connector_capacity, total_batch + 1, connector_size);
}
total_batch++;
if (num_batch_ > 0 && total_batch == num_batch_) {
@@ -171,9 +157,7 @@ Status DeviceQueueOp::SendDataToAscend() {
RETURN_IF_NOT_OK(GetNextInput(&current_buffer));
}

if (isProfilingEnable) {
profiling_node->SaveToFile();
}
tree_->SetFinished();
MS_LOG(INFO) << "Device queue total batch is " << total_batch << ", number of batches is " << num_batch_ << ".";

return Status::OK();


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/device_queue_op.h View File

@@ -140,6 +140,10 @@ class DeviceQueueOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "DeviceQueueOp"; }

private:
// Name: checkExceptions(DataBuffer);
// Description: Check whether the dataBuffer meets the condition for performing DeviceQueueOp


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/filter_op.h View File

@@ -127,6 +127,10 @@ class FilterOp : public ParallelOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "FilterOp"; }

private:
// predicate_func python callable which returns a boolean value.
py::function predicate_func_;


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/map_op.h View File

@@ -177,6 +177,10 @@ class MapOp : public ParallelOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "MapOp"; }

private:
// Local queues where worker threads can pop from.
// Popping directly from the Connector can block if the previous designated threads haven't pop.


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/project_op.h View File

@@ -107,6 +107,10 @@ class ProjectOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ProjectOp"; }

private:
std::vector<std::string> columns_to_project_;
std::vector<int32_t> projected_column_indices_;


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/rename_op.h View File

@@ -116,6 +116,10 @@ class RenameOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "RenameOp"; }

protected:
// Rename core functionality
Status RenameColumns();


+ 3
- 3
mindspore/ccsrc/dataset/engine/datasetops/repeat_op.h View File

@@ -124,9 +124,9 @@ class RepeatOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

virtual int32_t ConnectorSize() const { return child_[0]->ConnectorSize(); }
virtual int32_t ConnectorCapacity() const { return child_[0]->ConnectorCapacity(); }
// Op name getter
// @return Name of the current Op
std::string Name() const override { return "RepeatOp"; }

private:
int32_t max_repeats_; // The number of repeats that the user requested


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/shuffle_op.h View File

@@ -161,6 +161,10 @@ class ShuffleOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ShuffleOp"; }

private:
// Private function to add a new row to the shuffle buffer.
// @return Status - The error code return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/skip_op.h View File

@@ -80,6 +80,10 @@ class SkipOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "SkipOp"; }

private:
int32_t max_skips_; // The number of skips that the user requested
int32_t skip_count_; // A counter for the current number of executed skips


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/celeba_op.h View File

@@ -169,6 +169,10 @@ class CelebAOp : public ParallelOp, RandomAccessOp {
// @return Status - The error code return
Status AddIOBlock(std::unique_ptr<DataBuffer> *data_buffer);

// Op name getter
// @return Name of the current Op
std::string Name() const { return "CelebAOp"; }

private:
// Called first when function is called
// @return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/cifar_op.h View File

@@ -155,6 +155,10 @@ class CifarOp : public ParallelOp, public RandomAccessOp {
// @return
static Status CountTotalRows(const std::string &dir, bool isCIFAR10, int64_t *count);

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "CifarOp"; }

private:
// Initialize Sampler, calls sampler->Init() within
// @return Status - The error code return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/generator_op.h View File

@@ -127,6 +127,10 @@ class GeneratorOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "GeneratorOp"; }

private:
py::function generator_function_;
std::vector<std::string> column_names_;


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/image_folder_op.h View File

@@ -210,6 +210,10 @@ class ImageFolderOp : public ParallelOp, public RandomAccessOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ImageFolderOp"; }

private:
// Initialize Sampler, calls sampler->Init() within
// @return Status - The error code return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/manifest_op.h View File

@@ -172,6 +172,10 @@ class ManifestOp : public ParallelOp, public RandomAccessOp {
static Status GetClassIndexing(const std::string &file, const py::dict &dict, const std::string &usage,
std::map<std::string, int32_t> *output_class_indexing);

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ManifestOp"; }

private:
// Initialize Sampler, calls sampler->Init() within
// @return Status - The error code return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/mindrecord_op.h View File

@@ -218,6 +218,10 @@ class MindRecordOp : public ParallelOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "MindRecordOp"; }

private:
Status GetBufferFromReader(std::unique_ptr<DataBuffer> *fetched_buffer, int64_t buffer_id, int32_t worker_id);



+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/mnist_op.h View File

@@ -152,6 +152,10 @@ class MnistOp : public ParallelOp, public RandomAccessOp {
// @return
static Status CountTotalRows(const std::string &dir, int64_t *count);

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "MnistOp"; }

private:
// Initialize Sampler, calls sampler->Init() within
// @return Status - The error code return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/random_data_op.h View File

@@ -189,6 +189,10 @@ class RandomDataOp : public ParallelOp {
*/
int64_t GetTotalRows() const { return total_rows_; }

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "RandomDataOp"; }

private:
/**
* The entry point code for when workers are launched


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/text_file_op.h View File

@@ -169,6 +169,10 @@ class TextFileOp : public ParallelOp {
// @return Status - the error coed returned.
static Status CountAllFileRows(const std::vector<std::string> &files, int64_t *count);

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "TextFileOp"; }

private:
// The entry point for when workers are launched.
// @param worker_id - the id of the worker that is executing this function.


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/tf_reader_op.h View File

@@ -228,6 +228,10 @@ class TFReaderOp : public ParallelOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "TFReaderOp"; }

private:
// The entry point for when workers are launched.
// @param worker_id - the id of the worker that is executing this function.


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/source/voc_op.h View File

@@ -205,6 +205,10 @@ class VOCOp : public ParallelOp, public RandomAccessOp {
static Status GetClassIndexing(const std::string &dir, const std::string &task_type, const std::string &task_mode,
const py::dict &dict, std::map<std::string, int32_t> *output_class_indexing);

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "VOCOp"; }

private:
// Initialize Sampler, calls sampler->Init() within
// @return Status - The error code return


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/take_op.h View File

@@ -90,6 +90,10 @@ class TakeOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "TakeOp"; }

private:
int32_t max_takes_; // The number of takes that the user requested
int32_t take_count_; // A counter for the current number of executed takes


+ 4
- 0
mindspore/ccsrc/dataset/engine/datasetops/zip_op.h View File

@@ -110,6 +110,10 @@ class ZipOp : public PipelineOp {
// @return - Status of the node visit.
Status Accept(NodePass *p, bool *modified) override;

// Op name getter
// @return Name of the current Op
std::string Name() const override { return "ZipOp"; }

private:
// Handles preprocessing of the main loop, used when starting new epoch
Status prepare(TensorQTable *const table);


+ 15
- 3
mindspore/ccsrc/dataset/engine/execution_tree.cc View File

@@ -19,9 +19,8 @@
#include "dataset/engine/datasetops/dataset_op.h"
#include "dataset/engine/datasetops/shuffle_op.h"
#include "dataset/util/task_manager.h"
#include "dataset/util/profiling.h"

#include "dataset/engine/opt/util/printer_pass.h"
#include "dataset/engine/perf/profiling.h"
#include "dataset/engine/perf/monitor.h"

namespace mindspore {
namespace dataset {
@@ -30,6 +29,8 @@ ExecutionTree::ExecutionTree() : id_count_(0) {
tg_ = std::make_unique<TaskGroup>();
tree_state_ = kDeTStateInit;
prepare_flags_ = kDePrepNone;
perf_monitor_ = std::make_unique<Monitor>(this);
profiling_manager_ = std::make_unique<ProfilingManager>(this);
}

// Destructor
@@ -121,6 +122,15 @@ Status ExecutionTree::Launch() {
}
std::ostringstream ss;
ss << *this;

// Profiling infrastructures need to be initialized before Op launching
if (profiling_manager_->IsProfilingEnable()) {
// Setup profiling manager
RETURN_IF_NOT_OK(profiling_manager_->Initialize());
// Launch Monitor Thread
RETURN_IF_NOT_OK(tg_->CreateAsyncTask("Monitor Thread launched", std::ref(*perf_monitor_)));
}

MS_LOG(DEBUG) << "Printing the tree before launch tasks:\n" << ss.str();
for (auto itr = this->begin(); itr != this->end(); ++itr) {
// An inlined operator is one that has an output connector size of 0, and it does not
@@ -133,7 +143,9 @@ Status ExecutionTree::Launch() {
// Set the state of the Operator as running. This only matters in Leaf ops, CacheOp and TakeOp
}
}

tree_state_ = kDeTStateExecuting;

return Status::OK();
}



+ 21
- 6
mindspore/ccsrc/dataset/engine/execution_tree.h View File

@@ -23,12 +23,14 @@
#include <vector>
#include "dataset/engine/datasetops/dataset_op.h"
#include "dataset/util/status.h"
#include "mindspore/ccsrc/dataset/engine/perf/profiling.h"

namespace mindspore {
namespace dataset {
// Forward declares
class TaskGroup;
class DatasetOp;
class Monitor;

class ExecutionTree {
public:
@@ -40,11 +42,12 @@ class ExecutionTree {

// State flags for the lifecycle of the tree
enum TreeState {
kDeTStateInit = 0, // The freshly initialized state after construction
kDeTStateBuilding, // The tree is being built, nodes are being added
kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare
kDeTStateReady, // The tree has been prepared and is ready to be launched
kDeTStateExecuting // The tree has been launched and is executing
kDeTStateInit = 0, // The freshly initialized state after construction
kDeTStateBuilding, // The tree is being built, nodes are being added
kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare
kDeTStateReady, // The tree has been prepared and is ready to be launched
kDeTStateExecuting, // The tree has been launched and is executing
kDeTStateFinished // The tree has been drained, dataset iterator received EOF
};

class Iterator {
@@ -120,7 +123,7 @@ class ExecutionTree {
// Returns an iterator positioned at the start
// @return Iterator - The iterator
ExecutionTree::Iterator begin(const std::shared_ptr<DatasetOp> &root = nullptr) const {
return Iterator((root == nullptr) ? root_ : root);
return Iterator(root == nullptr ? root_ : root);
}

// Returns an iterator positioned at the end
@@ -207,6 +210,16 @@ class ExecutionTree {
// @return raw pointer to the TaskGroup
TaskGroup *AllTasks() const { return tg_.get(); }

// Return if the ExecutionTree is finished (iterator receives EOF).
// @return Bool - true is ExecutionTree is finished
bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; }

// Set the ExecutionTree to Finished state.
void SetFinished() { tree_state_ = TreeState::kDeTStateFinished; }

// Getter for profiling manager, no ownership
ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); }

private:
// A helper functions for doing the recursive printing
// @param dataset_op - The dataset op to print
@@ -222,6 +235,8 @@ class ExecutionTree {
uint32_t prepare_flags_; // Flags used during tree prepare
TreeState tree_state_; // Tracking the current tree state
std::stack<std::shared_ptr<DatasetOp>> repeat_stack_; // A stack used during prepare phase
std::unique_ptr<Monitor> perf_monitor_; // Performance Monitor
std::unique_ptr<ProfilingManager> profiling_manager_; // Profiling manager
};
} // namespace dataset
} // namespace mindspore


+ 6
- 0
mindspore/ccsrc/dataset/engine/perf/CMakeLists.txt View File

@@ -0,0 +1,6 @@
add_library(engine-perf OBJECT
profiling.cc
monitor.cc
device_queue_tracing.cc
connector_size.cc
dataset_iterator_tracing.cc)

+ 89
- 0
mindspore/ccsrc/dataset/engine/perf/connector_size.cc View File

@@ -0,0 +1,89 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/engine/perf/connector_size.h"

#include <algorithm>
#include <fstream>
#include <memory>
#include <string>
#include "dataset/core/config_manager.h"
#include "dataset/engine/execution_tree.h"
#include "dataset/util/path.h"

using json = nlohmann::json;
namespace mindspore {
namespace dataset {
using Qrow = std::vector<int>;

// Sample action
Status ConnectorSize::Sample() {
Qrow cur_row;
std::transform(tree_->begin(), tree_->end(), std::back_inserter(cur_row),
[](DatasetOp &op) { return op.ConnectorSize(); });
// Push new row of sample
sample_table_.push_back(cur_row);
return Status::OK();
}

// JSON serializer helper function
json ConnectorSize::ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size) {
auto children = node.Children();
std::vector<int32_t> children_id;
std::transform(children.begin(), children.end(), std::back_inserter(children_id),
[](std::shared_ptr<DatasetOp> op) -> int32_t { return op->id(); });
json json_node;
json_node["op_id"] = node.id();
json_node["op_type"] = node.Name();
json_node["num_workers"] = node.num_workers();
json metrics;
// DeviceQueueOp is a special op,it is not inlined but its output queue is invalid.
// So we should not output its queue size.
if (!node.inlined() && node.Name() != "DeviceQueueOp") {
metrics["output_queue"] = {{"size", size}, {"length", node.ConnectorCapacity()}};
}
json_node["metrics"] = metrics;
if (!children_id.empty()) {
json_node["children"] = children_id;
}

return json_node;
}

// Save profiling data to file
Status ConnectorSize::SaveToFile() {
std::ofstream os(file_path_, std::ios::trunc);
uint32_t idx = 0;
json output;
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
output["sampling_interval"] = cfg->monitor_sampling_interval();
// Traverse the ExecutionTree for JSON node generation
for (auto &node : *tree_) {
std::vector<int32_t> cur_queue_size;
std::transform(sample_table_.begin(), sample_table_.end(), std::back_inserter(cur_queue_size),
[&](const ConnectorSizeSample &sample) { return sample[idx]; });
json json_node = ParseOpInfo(node, cur_queue_size);
output["op_info"].push_back(json_node);
idx++;
}
os << output;
return Status::OK();
}
Status ConnectorSize::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("pipeline_profiling_" + device_id + ".json")).toString();
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

+ 68
- 0
mindspore/ccsrc/dataset/engine/perf/connector_size.h View File

@@ -0,0 +1,68 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_QUEUE_DEPTH_H
#define MINDSPORE_QUEUE_DEPTH_H

#include <string>
#include <vector>
#include <nlohmann/json.hpp>
#include "dataset/engine/perf/profiling.h"
#include "dataset/engine/datasetops/dataset_op.h"

using json = nlohmann::json;

namespace mindspore {
namespace dataset {
class ExecutionTree;

// Connector size sampling samples the output connector size of each op in the pipeline.
// It support JSON serialization for external usage.
class ConnectorSize : public Sampling {
// Connecto size sampling data is stored as a 2D vector
// op_0 ... op_m
// sample_0 size_0_0 ... size_m_0
// ... ... ... ...
// sample_n size_0_m ... size_m_n
//
// A circular buffer will be implemented in the future to make this table more flexible.
using ConnectorSizeSample = std::vector<int>;
using ConnectorSizeSampleTable = std::vector<ConnectorSizeSample>;

public:
explicit ConnectorSize(ExecutionTree *tree) : tree_(tree) {}

// Driver function for connector size sampling.
// This function samples the connector size of every nodes within the ExecutionTree
Status Sample() override;

std::string Name() const override { return kDeviceQueueTracingName; };

// Save sampling data to file
// @return Status - The error code return
Status SaveToFile() override;

Status Init(const std::string &dir_path, const std::string &device_id);

// Parse op infomation and transform to json format
json ParseOpInfo(const DatasetOp &node, const std::vector<int32_t> &size);

private:
ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
ConnectorSizeSampleTable sample_table_; // Dataset structure to store all samples of connector size sampling
};
} // namespace dataset
} // namespace mindspore
#endif // MINDSPORE_QUEUE_DEPTH_H

+ 64
- 0
mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.cc View File

@@ -0,0 +1,64 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fstream>
#include <string>
#include "dataset/engine/perf/dataset_iterator_tracing.h"
#include "dataset/util/path.h"

namespace mindspore {
namespace dataset {

Status DatasetIteratorTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num,
const int32_t value) {
// Format: "type extra-info batch-num value"
// type: 0: time, 1: connector size
// extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time
// if type is 1 - connector capacity
// batch-num: batch number
// value: if type is 0 - value is time(ms)
// if type is 1 - value is connector size
// Examples:
// 0 0 20 10 - The 20th batch took 10ms to get data from pipeline.
// 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64.
std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " +
std::to_string(value);
value_.emplace_back(data);
return Status::OK();
}

Status DatasetIteratorTracing::SaveToFile() {
if (value_.empty()) {
return Status::OK();
}

std::ofstream handle(file_path_, std::ios::trunc);
if (!handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
}
for (auto value : value_) {
handle << value << "\n";
}
handle.close();

return Status::OK();
}

Status DatasetIteratorTracing::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("dataset_iterator_profiling_" + device_id + ".txt")).toString();
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

+ 51
- 0
mindspore/ccsrc/dataset/engine/perf/dataset_iterator_tracing.h View File

@@ -0,0 +1,51 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_DATASET_ITERATOR_TRACING_H
#define MINDSPORE_DATASET_ITERATOR_TRACING_H

#include <string>
#include <vector>
#include "dataset/engine/perf/profiling.h"

namespace mindspore {
namespace dataset {
class DatasetIteratorTracing : public Tracing {
public:
// Constructor
DatasetIteratorTracing() = default;

// Destructor
~DatasetIteratorTracing() = default;

// Record tracing data
// @return Status - The error code return
Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value);

std::string Name() const override { return kDatasetIteratorTracingName; };

// Save tracing data to file
// @return Status - The error code return
Status SaveToFile() override;

Status Init(const std::string &dir_path, const std::string &device_id);

private:
std::vector<std::string> value_;
};
} // namespace dataset
} // namespace mindspore

#endif // MINDSPORE_DATASET_ITERATOR_TRACING_H

+ 64
- 0
mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.cc View File

@@ -0,0 +1,64 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <fstream>
#include <string>
#include "dataset/engine/perf/device_queue_tracing.h"
#include "dataset/util/path.h"
namespace mindspore {
namespace dataset {

Status DeviceQueueTracing::Record(const int32_t type, const int32_t extra_info, const int32_t batch_num,
const int32_t value) {
// Format: "type extra-info batch-num value"
// type: 0: time, 1: connector size
// extra-info: if type is 0 - 0: pipeline time, 1: push tdt time, 2: batch time
// if type is 1 - connector capacity
// batch-num: batch number
// value: if type is 0 - value is time(ms)
// if type is 1 - value is connector size
// Examples:
// 0 0 20 10 - The 20th batch took 10ms to get data from pipeline.
// 1 64 20 5 - Connector size is 5 when get the 20th batch.Connector capacity is 64.
std::string data = std::to_string(type) + " " + std::to_string(extra_info) + " " + std::to_string(batch_num) + " " +
std::to_string(value);
value_.emplace_back(data);
return Status::OK();
}

Status DeviceQueueTracing::SaveToFile() {
if (value_.empty()) {
return Status::OK();
}

std::ofstream handle(file_path_, std::ios::trunc);
if (!handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
}
for (auto value : value_) {
handle << value << "\n";
}
handle.close();

return Status::OK();
}

Status DeviceQueueTracing::Init(const std::string &dir_path, const std::string &device_id) {
file_path_ = (Path(dir_path) / Path("critical_point_profiling_" + device_id + ".txt")).toString();
return Status::OK();
}
} // namespace dataset
} // namespace mindspore

+ 52
- 0
mindspore/ccsrc/dataset/engine/perf/device_queue_tracing.h View File

@@ -0,0 +1,52 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_DEVICE_QUEUE_TRACING_H
#define MINDSPORE_DEVICE_QUEUE_TRACING_H

#include <string>
#include <vector>
#include "dataset/engine/perf/profiling.h"

namespace mindspore {
namespace dataset {
class DeviceQueueTracing : public Tracing {
public:
// Constructor
DeviceQueueTracing() = default;

// Destructor
~DeviceQueueTracing() = default;

// Record tracing data
// @return Status - The error code return
Status Record(const int32_t type, const int32_t extra_info, const int32_t batch_num, const int32_t value);

std::string Name() const override { return "Device Queue Tracing"; };

// Save tracing data to file
// @return Status - The error code return
Status SaveToFile() override;

Status Init(const std::string &dir_path, const std::string &device_id);

private:
std::vector<std::string> value_;
};
} // namespace dataset
} // namespace mindspore

#endif // MINDSPORE_DEVICE_QUEUE_TRACING_H

+ 50
- 0
mindspore/ccsrc/dataset/engine/perf/monitor.cc View File

@@ -0,0 +1,50 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <vector>
#include "dataset/core/config_manager.h"
#include "dataset/engine/perf/monitor.h"
#include "dataset/engine/execution_tree.h"

namespace mindspore {
namespace dataset {

Monitor::Monitor(ExecutionTree *tree) : tree_(tree) {
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
sampling_interval_ = cfg->monitor_sampling_interval();
}

Status Monitor::operator()() {
// Register this thread with TaskManager to receive proper interrupt signal.
TaskManager::FindMe()->Post();

// Keep sampling if
// 1) Monitor Task is not interrupted by TaskManager AND
// 2) Iterator has not received EOF
while (!this_thread::is_interrupted() && !(tree_->isFinished())) {
for (auto &node : tree_->GetProfilingManager()->GetSamplingNodes()) {
RETURN_IF_NOT_OK(node.second->Sample());
std::this_thread::sleep_for(std::chrono::milliseconds(sampling_interval_));
}
}

// Output all profiling data upon request.
tree_->GetProfilingManager()->SaveProfilingData();
return Status::OK();
}

} // namespace dataset
} // namespace mindspore

+ 52
- 0
mindspore/ccsrc/dataset/engine/perf/monitor.h View File

@@ -0,0 +1,52 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef MINDSPORE_MONITOR_H
#define MINDSPORE_MONITOR_H

#include <memory>
#include <unordered_map>
#include <vector>
#include "dataset/util/status.h"
#include "dataset/engine/perf/profiling.h"

namespace mindspore {
namespace dataset {
class ExecutionTree;
class Monitor {
public:
// Monitor object constructor
explicit Monitor(ExecutionTree *tree);

Monitor() = default;

// Functor for Perf Monitor main loop.
// This function will be the entry point of Mindspore::Dataset::Task
Status operator()();

int64_t GetSamplingInterval() { return sampling_interval_; }

private:
int64_t cur_row_;
int64_t max_samples_;
int64_t sampling_interval_;
ExecutionTree *tree_;
std::vector<std::shared_ptr<Sampling>> sampling_list_;
};
} // namespace dataset
} // namespace mindspore

#endif // MINDSPORE_MONITOR_H

+ 153
- 0
mindspore/ccsrc/dataset/engine/perf/profiling.cc View File

@@ -0,0 +1,153 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/engine/perf/profiling.h"

#include <sys/time.h>
#include <cstdlib>
#include <fstream>
#include "common/utils.h"
#include "dataset/util/path.h"
#include "dataset/engine/perf/monitor.h"
#include "dataset/engine/perf/device_queue_tracing.h"
#include "dataset/engine/perf/connector_size.h"
#include "dataset/engine/perf/dataset_iterator_tracing.h"
#include "utils/log_adapter.h"

namespace mindspore {
namespace dataset {

bool ProfilingManager::IsProfilingEnable() const {
auto profiling = common::GetEnv("PROFILING_MODE");
if (profiling.empty() || profiling != "true") {
return false;
}
return true;
}

Status ProfilingManager::Initialize() {
// Register nodes based on config
std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR");
if (dir.empty()) {
RETURN_STATUS_UNEXPECTED("Profiling dir is not set.");
}
char real_path[PATH_MAX] = {0};
if (dir.size() >= PATH_MAX) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#else
if (realpath(common::SafeCStr(dir), real_path) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#endif
dir_path_ = real_path;

// If DEVICE_ID is not set,defult value is 0
device_id_ = common::GetEnv("DEVICE_ID");
if (device_id_.empty()) {
device_id_ = "0";
}

// Register all profiling node.
// device_queue node is used for graph mode
std::shared_ptr<Tracing> device_queue_tracing = std::make_shared<DeviceQueueTracing>();
RETURN_IF_NOT_OK(RegisterTracingNode(device_queue_tracing));
// dataset_iterator node is used for graph mode
std::shared_ptr<Tracing> dataset_iterator_tracing = std::make_shared<DatasetIteratorTracing>();
RETURN_IF_NOT_OK(RegisterTracingNode(dataset_iterator_tracing));

std::shared_ptr<Sampling> monitor_sampling = std::make_shared<ConnectorSize>(tree_);
RETURN_IF_NOT_OK(RegisterSamplingNode(monitor_sampling));

return Status::OK();
}

// Profiling node registration
Status ProfilingManager::RegisterTracingNode(std::shared_ptr<Tracing> node) {
// Check if node with the same name has already been registered.
auto exist = tracing_nodes_.find(node->Name());
if (exist != tracing_nodes_.end()) {
return Status(StatusCode::kProfilingError, "Profiling node already exist: " + node->Name());
}
// Register the node with its name as key.
RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
tracing_nodes_[node->Name()] = node;
return Status::OK();
}

// Profiling node getter
Status ProfilingManager::GetTracingNode(const std::string &name, std::shared_ptr<Tracing> *node) {
// Check if node with the same name has already been registered.
auto exist = tracing_nodes_.find(name);
if (exist == tracing_nodes_.end()) {
return Status(StatusCode::kProfilingError, "Profiling node does not exist: " + name);
}
// Fetch node.
*node = tracing_nodes_[name];
return Status::OK();
}

// Profiling node registration
Status ProfilingManager::RegisterSamplingNode(std::shared_ptr<Sampling> node) {
// Check if node with the same name has already been registered.
auto exist = sampling_nodes_.find(node->Name());
if (exist != sampling_nodes_.end()) {
return Status(StatusCode::kProfilingError, "Profiling node already exist: " + node->Name());
}
// Register the node with its name as key.
RETURN_IF_NOT_OK(node->Init(dir_path_, device_id_));
sampling_nodes_[node->Name()] = node;
return Status::OK();
}

// Profiling node getter
Status ProfilingManager::GetSamplingNode(const std::string &name, std::shared_ptr<Sampling> *node) {
// Check if node with the same name has already been registered.
auto exist = sampling_nodes_.find(name);
if (exist == sampling_nodes_.end()) {
return Status(StatusCode::kProfilingError, "Profiling node does not exist: " + name);
}
// Fetch node.
*node = sampling_nodes_[name];
return Status::OK();
}

Status ProfilingManager::SaveProfilingData() {
if (!IsProfilingEnable()) {
return Status::OK();
}
MS_LOG(INFO) << "Start to save profiling data.";
for (auto node : tracing_nodes_) {
RETURN_IF_NOT_OK(node.second->SaveToFile());
}
for (auto node : sampling_nodes_) {
RETURN_IF_NOT_OK(node.second->SaveToFile());
}
MS_LOG(INFO) << "Save profiling data end.";

return Status::OK();
}

double ProfilingTime::GetCurMilliSecond() {
struct timeval tv = {0, 0};
(void)gettimeofday(&tv, nullptr);
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
} // namespace dataset
} // namespace mindspore

+ 140
- 0
mindspore/ccsrc/dataset/engine/perf/profiling.h View File

@@ -0,0 +1,140 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_PROFILE_H_
#define DATASET_UTIL_PROFILE_H_

#include <string>
#include <vector>
#include <unordered_map>
#include <memory>
#include "dataset/util/status.h"

namespace mindspore {
namespace dataset {

class Monitor;
class ExecutionTree;

const char kDeviceQueueTracingName[] = "Device Queue Tracing";
const char kDatasetIteratorTracingName[] = "Dataset Iterator Tracing";
const char kConnectorSizeSamplingName[] = "Connector Size Sampling";

// Profiling is a class of basic unit of profiling action
// This base class encapsulate the serialization output logic
class Profiling : std::enable_shared_from_this<Profiling> {
public:
// Constructor
Profiling() = default;

// Destructor
virtual ~Profiling() = default;

virtual Status Init(const std::string &dir_path, const std::string &device_id) = 0;

// Default serialization file generator
virtual Status SaveToFile() = 0;

// Profiling name
virtual std::string Name() const = 0;

protected:
std::string file_path_;
};

// Sampling is a class of profiling which generate samples periodically.
class Sampling : public Profiling {
public:
// Sampling action function. This function will be invoked by performance monitor thread.
virtual Status Sample() = 0;
};

// Tracing is class of profiling which record samples upon request.
class Tracing : public Profiling {
// Tracing does not define a fixed interface to provide flexible on data recording.
};

// ProfilingManager is a class manages all profiling infrastructure
// It serves the following purposes:
// 1) Fetch profiling configs from global contexts
// 2) Setup all profiling node based on config
// 3) Provide access of profiling nodes for profiling actions
// 4) Manage profiling data serialization process
class ProfilingManager {
public:
explicit ProfilingManager(ExecutionTree *tree) : tree_(tree) {}

~ProfilingManager() = default;

Status Initialize();

// Save profile data to file
// @return Status - The error code return
Status SaveProfilingData();

// Sampling node getter
// @param name - The name of the requested node
// @param node - Pointer to the shared pointer for the Sampling node
// @return Status - The error code return
Status GetSamplingNode(const std::string &name, std::shared_ptr<Sampling> *node);

// Tracing node getter
// @param name - The name of the requested node
// @param node - Pointer to the shared pointer for the Tracing node
// @return Status - The error code return
Status GetTracingNode(const std::string &name, std::shared_ptr<Tracing> *node);

// If profiling is enabled.
bool IsProfilingEnable() const;

std::unordered_map<std::string, std::shared_ptr<Sampling>> &GetSamplingNodes() { return sampling_nodes_; }

private:
std::unordered_map<std::string, std::shared_ptr<Tracing>> tracing_nodes_;

std::unordered_map<std::string, std::shared_ptr<Sampling>> sampling_nodes_;

// Register profile node to tree
// @param node - Profiling node
// @return Status - The error code return
Status RegisterTracingNode(std::shared_ptr<Tracing> node);

// Register profile node to tree
// @param node - Profiling node
// @return Status - The error code return
Status RegisterSamplingNode(std::shared_ptr<Sampling> node);

ExecutionTree *tree_ = nullptr; // ExecutionTree pointer
std::string dir_path_; // where to create profiling file
std::string device_id_; // used when create profiling file,filename_deviceid.suffix
};

enum ProfilingType { TIME, CONNECTOR_DEPTH };

enum ProfilingTimeSubType {
PIPELINE_TIME,
TDT_PUSH_TIME,
BATCH_TIME,
INVALID_TIME,
};

class ProfilingTime {
public:
static double GetCurMilliSecond();
};

} // namespace dataset
} // namespace mindspore
#endif

+ 10
- 1
mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.cc View File

@@ -16,6 +16,7 @@
#include "dataset/engine/tdt/tdt_plugin.h"
#include "common/utils.h"
#include "utils/log_adapter.h"
#include "dataset/engine/perf/profiling.h"

namespace mindspore {
namespace dataset {
@@ -28,18 +29,26 @@ std::shared_ptr<TdtPlugin> TdtPlugin::GetInstance() {
return instance_ptr_;
}

TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name) {
TdtStatus TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profiling, int32_t &time) {
MS_LOG(INFO) << "TDT channel name is " << channel_name << ".";
std::vector<DataItem> items;
double start_time;
auto ret = translate(ts_row, items);
if (ret != SUCCESS) {
MS_LOG(ERROR) << "TDT converting tensor failed!";
return FAILED;
}
if (profiling) {
start_time = ProfilingTime::GetCurMilliSecond();
}
if (tdt::TdtHostPushData(channel_name, items) != 0) {
MS_LOG(ERROR) << "TDT pushing data failed!";
return FAILED;
}
if (profiling) {
double end_time = ProfilingTime::GetCurMilliSecond();
time = (int32_t)(end_time - start_time);
}
return SUCCESS;
}



+ 1
- 1
mindspore/ccsrc/dataset/engine/tdt/tdt_plugin.h View File

@@ -37,7 +37,7 @@ class TdtPlugin {
public:
static std::shared_ptr<TdtPlugin> GetInstance();

TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name);
TdtStatus hostPush(TensorRow ts_row, bool is_wait, std::string channel_name, bool profilig, int32_t &time);

private:
TdtPlugin() {}


+ 1
- 2
mindspore/ccsrc/dataset/util/CMakeLists.txt View File

@@ -14,5 +14,4 @@ add_library(utils OBJECT
status.cc
path.cc
wait_post.cc
sig_handler.cc
profiling.cc)
sig_handler.cc)

+ 0
- 112
mindspore/ccsrc/dataset/util/profiling.cc View File

@@ -1,112 +0,0 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "dataset/util/profiling.h"

#include <sys/time.h>
#include <cstdlib>
#include <fstream>
#include "dataset/util/path.h"
#include "common/utils.h"
#include "utils/log_adapter.h"

namespace mindspore {
namespace dataset {
Profiling::Profiling(const std::string &file_name, const int32_t device_id)
: file_name_(file_name), device_id_(device_id) {}

Status Profiling::Init() {
std::string dir = common::GetEnv("MINDDATA_PROFILING_DIR");
if (dir.empty()) {
RETURN_STATUS_UNEXPECTED("Profiling dir is not set.");
}
char real_path[PATH_MAX] = {0};
if (dir.size() >= PATH_MAX) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#if defined(_WIN32) || defined(_WIN64)
if (_fullpath(real_path, common::SafeCStr(dir), PATH_MAX) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#else
if (realpath(common::SafeCStr(dir), real_path) == nullptr) {
RETURN_STATUS_UNEXPECTED("Profiling dir is invalid.");
}
#endif
file_path_ = (Path(real_path) / Path(file_name_ + "_" + std::to_string(device_id_) + ".txt")).toString();
return Status::OK();
}

Status Profiling::Record(const std::string &data) {
value_.emplace_back(data);
return Status::OK();
}

Status Profiling::SaveToFile() {
if (file_name_.empty()) {
RETURN_STATUS_UNEXPECTED("Profiling file name has not been set.");
}
std::ofstream handle(file_path_, std::ios::app);
if (!handle.is_open()) {
RETURN_STATUS_UNEXPECTED("Profiling file can not be opened.");
}
for (auto value : value_) {
handle << value << "\n";
}
handle.close();

return Status::OK();
}

ProfilingManager &ProfilingManager::GetInstance() {
static ProfilingManager instance;
return instance;
}

bool ProfilingManager::IsProfilingEnable() const {
auto profiling = common::GetEnv("PROFILING_MODE");
if (profiling.empty() || profiling != "true") {
return false;
}

return true;
}

Status ProfilingManager::RegisterProfilingNode(std::shared_ptr<Profiling> *node) {
RETURN_IF_NOT_OK((*node)->Init());
profiling_node_.emplace_back(*node);
return Status::OK();
}

Status ProfilingManager::SaveProfilingData() {
if (!IsProfilingEnable()) {
return Status::OK();
}
MS_LOG(INFO) << "Start to save profile data.";
for (auto node : profiling_node_) {
RETURN_IF_NOT_OK(node->SaveToFile());
}
MS_LOG(INFO) << "Save profile data end.";

return Status::OK();
}

double ProfilingTime::GetCurMilliSecond() {
struct timeval tv = {0, 0};
(void)gettimeofday(&tv, nullptr);
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
} // namespace dataset
} // namespace mindspore

+ 0
- 92
mindspore/ccsrc/dataset/util/profiling.h View File

@@ -1,92 +0,0 @@
/**
* Copyright 2020 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DATASET_UTIL_PROFILE_H_
#define DATASET_UTIL_PROFILE_H_

#include <string>
#include <vector>
#include <memory>
#include "dataset/util/status.h"

namespace mindspore {
namespace dataset {
enum ProfilingType {
TIME,
CONNECTOR_DEPTH,
};

enum ProfilingTimeSubType {
PIPELINE_TIME,
TDT_PUSH_TIME,
BATCH_TIME,
INVALID_TIME,
};

class Profiling {
public:
// Constructor
Profiling() = default;

// Constructor if need save profile data to file
Profiling(const std::string &file_name, const int32_t device_id);

// Destructor
~Profiling() = default;

Status Init();

// Record profile data
Status Record(const std::string &data);

// Save profile data to file if necessary
Status SaveToFile();

private:
std::vector<std::string> value_;
std::string file_name_;
std::string file_path_;
int32_t device_id_;
};

class ProfilingManager {
public:
ProfilingManager() = default;
~ProfilingManager() = default;

static ProfilingManager &GetInstance();

// Save profile data to file
// @return Status - The error code return
Status SaveProfilingData();

// Register profile node to tree
// @param node - Profiling node
// @return Status - The error code return
Status RegisterProfilingNode(std::shared_ptr<Profiling> *node);

bool IsProfilingEnable() const;

private:
std::vector<std::shared_ptr<Profiling>> profiling_node_;
};

class ProfilingTime {
public:
static double GetCurMilliSecond();
};
} // namespace dataset
} // namespace mindspore
#endif

+ 3
- 0
mindspore/ccsrc/dataset/util/status.cc View File

@@ -45,6 +45,9 @@ std::string CodeAsString(const StatusCode c) {
case StatusCode::kDuplicateKey:
s = "Duplicate key";
break;
case StatusCode::kProfilingError:
s = "Error encountered while profiling";
break;
case StatusCode::kUnexpectedError:
default:
s = "Unexpected error";


+ 1
- 0
mindspore/ccsrc/dataset/util/status.h View File

@@ -70,6 +70,7 @@ enum class StatusCode : char {
kPythonInterpreterFailure = 7,
kTDTPushFailure = 8,
kFileNotExist = 9,
kProfilingError = 10,
// Make this error code the last one. Add new error code above it.
kUnexpectedError = 127
};


+ 29
- 0
mindspore/dataset/core/configuration.py View File

@@ -125,6 +125,35 @@ class ConfigurationManager:
"""
return self.config.get_num_parallel_workers()

def set_monitor_sampling_interval(self, interval):
"""
Set the default interval(ms) of monitor sampling.

Args:
interval: interval(ms) to be used to performance monitor sampling.

Raises:
ValueError: If interval is invalid (<= 0 or > MAX_INT_32).

Examples:
>>> import mindspore.dataset as ds
>>> con = ds.engine.ConfigurationManager()
>>> # sets the new interval value.
>>> con.set_monitor_sampling_interval(100)
"""
if interval <= 0 or interval > INT32_MAX:
raise ValueError("Interval given is not within the required range")
self.config.set_monitor_sampling_interval(interval)

def get_monitor_sampling_interval(self):
"""
Get the default interval of performance monitor sampling.

Returns:
Interval: interval(ms) of performance monitor sampling.
"""
return self.config.get_monitor_sampling_interval()

def __str__(self):
"""
String representation of the configurations.


+ 119
- 0
tests/ut/python/dataset/test_profiling.py View File

@@ -0,0 +1,119 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
Testing profiling support in DE
"""
import os
import numpy as np
import mindspore.dataset as ds

FILES = ["../data/dataset/testTFTestAllTypes/test.data"]
DATASET_ROOT = "../data/dataset/testTFTestAllTypes/"
SCHEMA_FILE = "../data/dataset/testTFTestAllTypes/datasetSchema.json"

PIPELINE_FILE = "./pipeline_profiling_1.json"
DATASET_ITERATOR_FILE = "./dataset_iterator_profiling_1.txt"


def test_profiling_simple_pipeline():
"""
Generator -> Shuffle -> Batch
"""
os.environ['PROFILING_MODE'] = 'true'
os.environ['MINDDATA_PROFILING_DIR'] = '.'
os.environ['DEVICE_ID'] = '1'

source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["data"])
data1 = data1.shuffle(64)
data1 = data1.batch(32)

for _ in data1:
pass

assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)
del os.environ['PROFILING_MODE']
del os.environ['MINDDATA_PROFILING_DIR']


def test_profiling_complex_pipeline():
"""
Generator -> Map ->
-> Zip -> Batch
TFReader -> Shuffle ->
"""
os.environ['PROFILING_MODE'] = 'true'
os.environ['MINDDATA_PROFILING_DIR'] = '.'
os.environ['DEVICE_ID'] = '1'

source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["gen"])
data1 = data1.map("gen", operations=[(lambda x: x + 1)])

pattern = DATASET_ROOT + "/test.data"
data2 = ds.TFRecordDataset(pattern, SCHEMA_FILE, shuffle=ds.Shuffle.FILES)
data2 = data2.shuffle(4)

data3 = ds.zip((data1, data2))

for _ in data3:
pass

assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)
del os.environ['PROFILING_MODE']
del os.environ['MINDDATA_PROFILING_DIR']


def test_profiling_sampling_iterval():
"""
Test non-default monitor sampling interval
"""
os.environ['PROFILING_MODE'] = 'true'
os.environ['MINDDATA_PROFILING_DIR'] = '.'
os.environ['DEVICE_ID'] = '1'
interval_origin = ds.config.get_monitor_sampling_interval()

ds.config.set_monitor_sampling_interval(30)
interval = ds.config.get_monitor_sampling_interval()
assert interval == 30

source = [(np.array([x]),) for x in range(1024)]
data1 = ds.GeneratorDataset(source, ["data"])
data1 = data1.shuffle(64)
data1 = data1.batch(32)

for _ in data1:
pass

assert os.path.exists(PIPELINE_FILE) is True
os.remove(PIPELINE_FILE)
assert os.path.exists(DATASET_ITERATOR_FILE) is True
os.remove(DATASET_ITERATOR_FILE)

ds.config.set_monitor_sampling_interval(interval_origin)
del os.environ['PROFILING_MODE']
del os.environ['MINDDATA_PROFILING_DIR']


if __name__ == "__main__":
test_profiling_simple_pipeline()
test_profiling_complex_pipeline()
test_profiling_sampling_iterval()

Loading…
Cancel
Save