Browse Source

!23529 move profiling

Merge pull request !23529 from shenwei41/move_profiling
tags/v1.5.0-rc1
i-robot Gitee 4 years ago
parent
commit
526c400065
11 changed files with 129 additions and 31 deletions
  1. +17
    -5
      mindspore/ccsrc/minddata/dataset/CMakeLists.txt
  2. +10
    -2
      mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt
  3. +14
    -4
      mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc
  4. +6
    -4
      mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.h
  5. +30
    -0
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc
  6. +4
    -3
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h
  7. +6
    -0
      mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc
  8. +11
    -5
      mindspore/ccsrc/minddata/dataset/engine/execution_tree.h
  9. +9
    -1
      mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc
  10. +14
    -1
      mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc
  11. +8
    -6
      mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h

+ 17
- 5
mindspore/ccsrc/minddata/dataset/CMakeLists.txt View File

@@ -88,7 +88,6 @@ add_dependencies(engine-datasetops-source-sampler core)
add_dependencies(engine-datasetops core)
add_dependencies(engine-datasetops-mapop core)
add_dependencies(engine-opt core)
add_dependencies(engine-perf core)
add_dependencies(engine-gnn core)
add_dependencies(engine core)
add_dependencies(callback core)
@@ -116,6 +115,9 @@ endif()
if(ENABLE_PYTHON)
add_dependencies(APItoPython core)
endif()
if(NOT ENABLE_SECURITY)
add_dependencies(engine-perf core)
endif()
if(ENABLE_TDTQUE)
add_dependencies(engine-tdt core)
endif()
@@ -127,7 +129,9 @@ if(ENABLE_CACHE)
add_dependencies(engine-ir-datasetops-source engine-cache-client)
add_dependencies(engine-opt engine-cache-client)
add_dependencies(engine-datasetops engine-cache-client)
add_dependencies(engine-perf engine-cache-client)
if(NOT ENABLE_SECURITY)
add_dependencies(engine-perf engine-cache-client)
endif()
add_dependencies(engine-cache-client core)
add_dependencies(engine-cache-server core)
endif()
@@ -151,7 +155,6 @@ set(submodules
$<TARGET_OBJECTS:engine-datasetops-source-sampler>
$<TARGET_OBJECTS:engine-datasetops-mapop>
$<TARGET_OBJECTS:engine-gnn>
$<TARGET_OBJECTS:engine-perf>
$<TARGET_OBJECTS:engine-datasetops>
$<TARGET_OBJECTS:engine-opt>
$<TARGET_OBJECTS:engine-cache-client>
@@ -169,6 +172,7 @@ set(submodules
$<TARGET_OBJECTS:md_plugin>
)


if(ENABLE_ACL)
set(submodules
${submodules}
@@ -183,9 +187,17 @@ if(ENABLE_PYTHON)
endif()

if(ENABLE_TDTQUE)
add_library(_c_dataengine SHARED ${submodules} $<TARGET_OBJECTS:engine-tdt>)
if(NOT ENABLE_SECURITY)
add_library(_c_dataengine SHARED ${submodules} $<TARGET_OBJECTS:engine-perf> $<TARGET_OBJECTS:engine-tdt>)
else()
add_library(_c_dataengine SHARED ${submodules} $<TARGET_OBJECTS:engine-tdt>)
endif()
else()
add_library(_c_dataengine SHARED ${submodules})
if(NOT ENABLE_SECURITY)
add_library(_c_dataengine SHARED ${submodules} $<TARGET_OBJECTS:engine-perf>)
else()
add_library(_c_dataengine SHARED ${submodules})
endif()
endif()

if(ENABLE_PYTHON)


+ 10
- 2
mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt View File

@@ -2,7 +2,12 @@ add_subdirectory(datasetops)
add_subdirectory(opt)
add_subdirectory(gnn)
add_subdirectory(ir)
add_subdirectory(perf)

message("ENABLE_SECURITY = ${ENABLE_SECURITY}")
if(NOT ENABLE_SECURITY)
add_subdirectory(perf)
endif()

add_subdirectory(cache)

if(ENABLE_TDTQUE)
@@ -41,11 +46,14 @@ add_dependencies(engine engine-datasetops
engine-datasetops-source
engine-opt
engine-gnn
engine-perf
engine-cache-client
engine-datasetops-mapop
)

if(NOT ENABLE_SECURITY)
add_dependencies(engine engine-perf)
endif()

if(ENABLE_TDTQUE)
add_dependencies(engine engine-tdt)
endif()

+ 14
- 4
mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc View File

@@ -24,7 +24,9 @@
#include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/status.h"
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#ifndef ENABLE_SECURITY
#include "minddata/dataset/engine/perf/profiling.h"
#endif

namespace mindspore {
namespace dataset {
@@ -78,16 +80,20 @@ Status DatasetIterator::GetNextAsMap(TensorMap *out_map) {
// Constructor of the DatasetIterator
DatasetIterator::DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree)
: root_(exe_tree->root()),
#ifndef ENABLE_SECURITY
tracing_(nullptr),
#endif
cur_batch_num_(0),
cur_connector_size_(0),
cur_connector_capacity_(0),
eof_handled_(false) {
std::shared_ptr<Tracing> node;
#ifndef ENABLE_SECURITY
Status s = exe_tree->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
if (s.IsOk()) {
tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
}
#endif
}

DatasetIterator::~DatasetIterator() = default;
@@ -100,20 +106,21 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
}
// clear the old tensor row
out_row->clear();
#ifndef ENABLE_SECURITY
bool isProfilingEnable = root_->Tree()->GetProfilingManager()->IsProfilingEnable();
#endif
// Once eof is handled, always return empty row. Class must be destroyed and recreated if you
// want to iterate again.
if (eof_handled_) {
std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
RETURN_STATUS_UNEXPECTED(err);
}
#ifndef ENABLE_SECURITY
if (tracing_ != nullptr) {
cur_connector_size_ = root_->ConnectorSize();
cur_connector_capacity_ = root_->ConnectorCapacity();
}
#endif
RETURN_IF_NOT_OK(root_->GetNextRow(out_row));

// Since GetNextRow was used rather than GetNextInput(), it means we need to manually
@@ -123,9 +130,11 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
// The next row in the pipeline might be an EOF or a TensorRow for next epoch
if (out_row->eoe()) {
MS_LOG(INFO) << "End of data iteration.";
#ifndef ENABLE_SECURITY
if (isProfilingEnable) {
root_->Tree()->SetEpochEnd();
}
#endif
return Status::OK();
}

@@ -138,12 +147,13 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) {
std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs.";
RETURN_STATUS_UNEXPECTED(err);
}
#ifndef ENABLE_SECURITY
if (tracing_ != nullptr) {
cur_batch_num_++;
RETURN_IF_NOT_OK(tracing_->Record(static_cast<int32_t>(CONNECTOR_DEPTH), cur_connector_capacity_, cur_batch_num_,
cur_connector_size_, ProfilingTime::GetCurMilliSecond()));
}
#endif
return Status::OK();
}



+ 6
- 4
mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.h View File

@@ -69,11 +69,13 @@ class DatasetIterator {
private:
std::shared_ptr<DatasetOp> root_; // saves the root of the executionTree
TensorRow device_queue_row_;
#ifndef ENABLE_SECURITY
std::shared_ptr<DatasetIteratorTracing> tracing_; // trace profiling data
int32_t cur_batch_num_; // current batch number,used for profiling
int32_t cur_connector_size_; // current connector size of root op,used for profiling
int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling
bool eof_handled_; // T/F if this op got an eof
#endif
int32_t cur_batch_num_; // current batch number,used for profiling
int32_t cur_connector_size_; // current connector size of root op,used for profiling
int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling
bool eof_handled_; // T/F if this op got an eof
std::unordered_map<std::string, int32_t> col_name_id_map_;
std::vector<std::pair<std::string, int32_t>> column_order_; // key: column name, val: column id
};


+ 30
- 0
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc View File

@@ -126,8 +126,10 @@ Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
}

Status DeviceQueueOp::operator()() {
#ifndef ENABLE_SECURITY
RETURN_IF_NOT_OK(tree_->AllTasks()->CreateAsyncTask(
"Detect first batch", std::bind(&DeviceQueueOp::DetectFirstBatch, this), nullptr, id()));
#endif
TaskManager::FindMe()->Post();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);

@@ -168,17 +170,22 @@ Status DeviceQueueOp::operator()() {
#ifdef ENABLE_TDTQUE
Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Device queue, sending data to Ascend.";
#ifndef ENABLE_SECURITY
uint64_t batch_start_time, end_time;
uint64_t batch_record_start, batch_record_end;
#endif
int64_t send_batch = 0;
int32_t tdt_cost = 0;
#ifndef ENABLE_SECURITY
int32_t connector_size = 0;
int32_t connector_capacity = 0;
#endif
bool is_break_loop = false;

std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
int64_t sending_num = cfg->sending_batches(); // Get the current sending_num

#ifndef ENABLE_SECURITY
std::shared_ptr<DeviceQueueTracing> profiling_node;
bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable();
if (isProfilingEnable) {
@@ -188,11 +195,16 @@ Status DeviceQueueOp::SendDataToAscend() {
batch_start_time = ProfilingTime::GetCurMilliSecond();
connector_capacity = ChildOpConnectorCapacity();
}
#else
bool isProfilingEnable = false;
#endif
#ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
md_channel_info_->RecordPreprocessBatch(0);
#endif
#ifndef ENABLE_SECURITY
batch_record_start = ProfilingTime::GetCurMilliSecond();
#endif
TensorRow curr_row;
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
first_fetch_flag_ = true;
@@ -211,9 +223,11 @@ Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
first_push_flag_ = true;
}
#ifndef ENABLE_SECURITY
DetectPerBatchTime(&batch_record_start, &batch_record_end);
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
connector_capacity, connector_size);
#endif
send_batch++;
#ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
@@ -229,10 +243,12 @@ Status DeviceQueueOp::SendDataToAscend() {
// wait when sending num is not 0, and sending num no larger than already sending batch
LimitSendingBatches(send_batch, &sending_num, cfg);

#ifndef ENABLE_SECURITY
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
}
#endif
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
}
if (curr_row.eoe() && send_epoch_end_) {
@@ -256,11 +272,13 @@ Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
stop_send_ = true;
}
#ifndef ENABLE_SECURITY
if (isProfilingEnable) {
connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity();
tree_->SetEpochEnd();
}
#endif
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
}

@@ -383,8 +401,11 @@ Status DeviceQueueOp::PushDataToGPU() {
TaskManager::FindMe()->Post();
uint64_t batch_start_time = 0;
int32_t push_cost = 0;
#ifndef ENABLE_SECURITY
int32_t connector_size = 0;
int32_t connector_capacity = 0;
#endif
#ifndef ENABLE_SECURITY
std::shared_ptr<DeviceQueueTracing> profiling_node;
bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable();
if (isProfilingEnable) {
@@ -394,6 +415,7 @@ Status DeviceQueueOp::PushDataToGPU() {
batch_start_time = ProfilingTime::GetCurMilliSecond();
connector_capacity = gpu_item_connector_->capacity();
}
#endif
#ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
md_channel_info_->RecordPreprocessBatch(0);
@@ -431,6 +453,7 @@ Status DeviceQueueOp::PushDataToGPU() {
}
RETURN_IF_NOT_OK(RetryPushData(handle, items));
send_batch++;
#ifndef ENABLE_SECURITY
if (isProfilingEnable) {
uint64_t end_time = ProfilingTime::GetCurMilliSecond();
// record push data time
@@ -446,6 +469,7 @@ Status DeviceQueueOp::PushDataToGPU() {
connector_size = gpu_item_connector_->size();
connector_capacity = gpu_item_connector_->capacity();
}
#endif
#ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(gpu_item_connector_->size());
md_channel_info_->RecordPreprocessBatch(send_batch);
@@ -538,8 +562,10 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
Status DeviceQueueOp::SendDataToGPU() {
RETURN_IF_NOT_OK(LaunchParallelCopyThread());
MS_LOG(INFO) << "Device queue, sending data to GPU.";
#ifndef ENABLE_SECURITY
uint64_t batch_record_start, batch_record_end;
batch_record_start = ProfilingTime::GetCurMilliSecond();
#endif
TensorRow current_row;
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
first_fetch_flag_ = true;
@@ -554,7 +580,9 @@ Status DeviceQueueOp::SendDataToGPU() {
MS_LOG(INFO) << "Loading dataset and push first batch into device successful.";
first_push_flag_ = true;
}
#ifndef ENABLE_SECURITY
DetectPerBatchTime(&batch_record_start, &batch_record_end);
#endif
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
} else {
@@ -642,6 +670,7 @@ void DeviceQueueOp::Print(std::ostream &out, bool show_all) const {
}
}

#ifndef ENABLE_SECURITY
void DeviceQueueOp::ProfilingRecorder(bool isProfilingEnable, std::shared_ptr<DeviceQueueTracing> profiling_node,
int64_t send_batch, int32_t tdt_cost, uint64_t *batch_start_time,
uint64_t *end_time, int32_t connector_capacity, int32_t connector_size) {
@@ -692,5 +721,6 @@ void DeviceQueueOp::DetectPerBatchTime(uint64_t *start_time, uint64_t *end_time)
}
*start_time = *end_time;
}
#endif
} // namespace dataset
} // namespace mindspore

+ 4
- 3
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.h View File

@@ -103,11 +103,12 @@ class DeviceQueueOp : public PipelineOp {
}

Status operator()() override;
#ifndef ENABLE_SECURITY
// Record the pipeline profiling info
void ProfilingRecorder(bool isProfilingEnable, std::shared_ptr<DeviceQueueTracing> profiling_node, int64_t send_batch,
int32_t tdt_cost, uint64_t *batch_start_time, uint64_t *end_time, int32_t connector_capacity,
int32_t connector_size);
#endif

// Op name getter
// @return Name of the current Op
@@ -156,13 +157,13 @@ class DeviceQueueOp : public PipelineOp {
#endif

Status SendDataToCPU();
#ifndef ENABLE_SECURITY
// Create async thread to detect whether it takes too long and unable to fetch first batch
Status DetectFirstBatch();

// Detect the cost time of each batch, present alarm message if cost too long
void DetectPerBatchTime(uint64_t *start_time, uint64_t *end_time);
#endif
std::atomic<bool> first_fetch_flag_;

std::unique_ptr<ChildIterator> child_iterator_;


+ 6
- 0
mindspore/ccsrc/minddata/dataset/engine/execution_tree.cc View File

@@ -19,8 +19,10 @@
#include <limits>
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/engine/datasetops/device_queue_op.h"
#ifndef ENABLE_SECURITY
#include "minddata/dataset/engine/perf/profiling.h"
#include "minddata/dataset/engine/perf/monitor.h"
#endif
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
#include "minddata/dataset/util/numa_interface.h"
#endif
@@ -31,7 +33,9 @@ namespace dataset {
// Constructor
ExecutionTree::ExecutionTree() : id_count_(0), tree_state_(kDeTStateInit) {
tg_ = std::make_unique<TaskGroup>();
#ifndef ENABLE_SECURITY
profiling_manager_ = std::make_unique<ProfilingManager>(this);
#endif
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
std::shared_ptr<ConfigManager> cfg = GlobalContext::config_manager();
rank_id_ = cfg->rank_id();
@@ -183,12 +187,14 @@ Status ExecutionTree::Launch() {
}

// Profiling infrastructures need to be initialized before Op launching
#ifndef ENABLE_SECURITY
if (profiling_manager_->IsProfilingEnable()) {
// Setup profiling manager
RETURN_IF_NOT_OK(profiling_manager_->Initialize());
// Launch Monitor Thread
RETURN_IF_NOT_OK(profiling_manager_->LaunchMonitor());
}
#endif

std::ostringstream ss;
ss << *this;


+ 11
- 5
mindspore/ccsrc/minddata/dataset/engine/execution_tree.h View File

@@ -29,7 +29,9 @@
#endif
#include "minddata/dataset/engine/datasetops/dataset_op.h"
#include "minddata/dataset/util/status.h"
#ifndef ENABLE_SECURITY
#include "mindspore/ccsrc/minddata/dataset/engine/perf/profiling.h"
#endif
namespace mindspore {
namespace dataset {
// Forward declares
@@ -190,7 +192,9 @@ class ExecutionTree {
}

/// \brief Getter for profiling manager, no ownership
#ifndef ENABLE_SECURITY
ProfilingManager *GetProfilingManager() { return profiling_manager_.get(); }
#endif

private:
/// \brief A helper functions for doing the recursive printing
@@ -201,12 +205,14 @@ class ExecutionTree {
void PrintNode(std::ostream &out, const std::shared_ptr<DatasetOp> &dataset_op, std::string indent, bool last,
bool detailed) const;

std::unique_ptr<TaskGroup> tg_; // Class for worker management
std::shared_ptr<DatasetOp> root_; // The root node of the tree
int32_t id_count_; // Counter for generating operator id's
uint32_t prepare_flags_; // Flags used during tree prepare
TreeState tree_state_; // Tracking the current tree state
std::unique_ptr<TaskGroup> tg_; // Class for worker management
std::shared_ptr<DatasetOp> root_; // The root node of the tree
int32_t id_count_; // Counter for generating operator id's
uint32_t prepare_flags_; // Flags used during tree prepare
TreeState tree_state_; // Tracking the current tree state
#ifndef ENABLE_SECURITY
std::unique_ptr<ProfilingManager> profiling_manager_; // Profiling manager
#endif
#if defined(ENABLE_GPUQUE) || defined(ENABLE_TDTQUE)
// This rank_id is for numa and device_queue, one process work with only one rank_id,
// for standalone scenario, this rank_id may come from env 'CUDA_VISIBLE_DEVICES',


+ 9
- 1
mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_plugin.cc View File

@@ -15,7 +15,9 @@
*/
#include "minddata/dataset/engine/tdt/tdt_plugin.h"
#include "utils/ms_utils.h"
#ifndef ENABLE_SECURITY
#include "minddata/dataset/engine/perf/profiling.h"
#endif
#include "minddata/dataset/util/log_adapter.h"
#if ENABLE_D
#include "ps/ps_cache/ps_data/ps_data_prefetch.h"
@@ -58,16 +60,20 @@ Status TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_n
MS_LOG(DEBUG) << "TDT channel name is " << channel_name << ".";

acltdtDataset *acl_dataset = nullptr;
#ifndef ENABLE_SECURITY
double start_time;
#endif
auto ret = translate(tdt_type, ts_row, &acl_dataset);
if (ret != Status::OK()) {
DestroyAclDataset(acl_dataset);
RETURN_STATUS_UNEXPECTED("Converting into TDT tensor failed!");
}
#ifndef ENABLE_SECURITY
if (profiling) {
start_time = ProfilingTime::GetCurMilliSecond();
}
#endif

#if ENABLE_D
// Data prefetch only when PS mode enables cache.
if (acltdtGetDatasetSize(acl_dataset) > 0) {
@@ -88,10 +94,12 @@ Status TdtPlugin::hostPush(TensorRow ts_row, bool is_wait, std::string channel_n
ReportErrorMessage();
RETURN_STATUS_UNEXPECTED("Tdt Send data failed.");
}
#ifndef ENABLE_SECURITY
if (profiling) {
double end_time = ProfilingTime::GetCurMilliSecond();
time = (int32_t)(end_time - start_time);
}
#endif
return Status::OK();
}



+ 14
- 1
mindspore/ccsrc/minddata/dataset/engine/tree_adapter.cc View File

@@ -153,7 +153,9 @@ Status TreeAdapter::Build(std::shared_ptr<DatasetNode> root_ir) {
// This will evolve in the long run
tree_ = std::make_unique<ExecutionTree>();
// disable profiling if this is only a getter pass
#ifndef ENABLE_SECURITY
if (usage_ == kDeGetter) tree_->GetProfilingManager()->DisableProfiling();
#endif
// Build the Execution tree from the child of the IR root node, which represent the root of the input IR tree
std::shared_ptr<DatasetOp> root_op;
RETURN_IF_NOT_OK(BuildExecutionTreeRecur(root_ir->Children()[0], &root_op));
@@ -218,8 +220,9 @@ Status TreeAdapter::GetNext(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(tree_);
RETURN_UNEXPECTED_IF_NULL(row);
row->clear(); // make sure row is empty
#ifndef ENABLE_SECURITY
bool isProfilingEnable = tree_->GetProfilingManager()->IsProfilingEnable();
#endif

// When cur_db_ is a nullptr, it means this is the first call to get_next, launch ExecutionTree
if (!launched_) {
@@ -229,9 +232,11 @@ Status TreeAdapter::GetNext(TensorRow *row) {
RETURN_IF_NOT_OK(tree_->root()->GetNextRow(row)); // first buf can't be eof or empty buf with none flag
if (row->eoe()) { // return empty tensor if 1st buf is a ctrl buf (no rows)
MS_LOG(INFO) << "End of data iteration.";
#ifndef ENABLE_SECURITY
if (isProfilingEnable) {
tree_->SetEpochEnd();
}
#endif
return Status::OK();
}
if (row->eof()) {
@@ -241,6 +246,7 @@ Status TreeAdapter::GetNext(TensorRow *row) {
}

// Record profiling info
#ifndef ENABLE_SECURITY
if (tracing_ != nullptr) {
uint64_t end_time = ProfilingTime::GetCurMilliSecond();
cur_batch_num_++;
@@ -249,6 +255,7 @@ Status TreeAdapter::GetNext(TensorRow *row) {
RETURN_IF_NOT_OK(
tracing_->Record(CONNECTOR_DEPTH, cur_connector_capacity_, cur_batch_num_, cur_connector_size_, end_time));
}
#endif
return Status::OK();
}

@@ -258,9 +265,15 @@ Status TreeAdapter::Launch() {
launched_ = true;
// Profiling
std::shared_ptr<Tracing> node;
#ifndef ENABLE_SECURITY
Status s = tree_->GetProfilingManager()->GetTracingNode(kDatasetIteratorTracingName, &node);
#else
Status s = Status::OK();
#endif
if (s.IsOk()) {
#ifndef ENABLE_SECURITY
tracing_ = std::dynamic_pointer_cast<DatasetIteratorTracing>(node);
#endif
cur_connector_size_ = tree_->root()->ConnectorSize();
cur_connector_capacity_ = tree_->root()->ConnectorCapacity();
}


+ 8
- 6
mindspore/ccsrc/minddata/dataset/engine/tree_adapter.h View File

@@ -90,13 +90,15 @@ class TreeAdapter {

std::unordered_map<std::string, int32_t> column_name_map_;
std::shared_ptr<DatasetNode> root_ir_;
std::unique_ptr<ExecutionTree> tree_; // current connector capacity of root op, used for profiling
bool optimize_; // Flag to enable optional optimization pass
std::unique_ptr<ExecutionTree> tree_; // current connector capacity of root op, used for profiling
bool optimize_; // Flag to enable optional optimization pass
#ifndef ENABLE_SECURITY
std::shared_ptr<DatasetIteratorTracing> tracing_; // trace profiling data
int32_t cur_batch_num_; // current batch number, used for profiling
int32_t cur_connector_size_; // current connector size of root op, used for profiling
int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling
UsageFlag usage_; // usage of this tree adapter (type of consumer)
#endif
int32_t cur_batch_num_; // current batch number, used for profiling
int32_t cur_connector_size_; // current connector size of root op, used for profiling
int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling
UsageFlag usage_; // usage of this tree adapter (type of consumer)
bool launched_;
// State flags for the lifecycle of the tree
enum CompileState {


Loading…
Cancel
Save