| @@ -104,6 +104,8 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { | |||||
| // Common code init and error checking in the base class. | // Common code init and error checking in the base class. | ||||
| RETURN_IF_NOT_OK(IteratorBase::FetchNextTensorRow(out_row)); | RETURN_IF_NOT_OK(IteratorBase::FetchNextTensorRow(out_row)); | ||||
| bool isProfilingEnable = root_->Tree()->GetProfilingManager()->IsProfilingEnable(); | |||||
| // Once eof is handled, always return empty row. Class must be destroyed and recreated if you | // Once eof is handled, always return empty row. Class must be destroyed and recreated if you | ||||
| // want to iterate again. | // want to iterate again. | ||||
| if (eof_handled_) { | if (eof_handled_) { | ||||
| @@ -127,6 +129,9 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { | |||||
| if (curr_buffer_->eoe()) { | if (curr_buffer_->eoe()) { | ||||
| MS_LOG(INFO) << "End of data iteration."; | MS_LOG(INFO) << "End of data iteration."; | ||||
| curr_buffer_.reset(); // explicitly free the eoe buffer | curr_buffer_.reset(); // explicitly free the eoe buffer | ||||
| if (isProfilingEnable) { | |||||
| root_->Tree()->SetEpochEnd(); | |||||
| } | |||||
| return Status::OK(); | return Status::OK(); | ||||
| } | } | ||||
| @@ -136,6 +141,7 @@ Status DatasetIterator::FetchNextTensorRow(TensorRow *out_row) { | |||||
| if (curr_buffer_->eof()) { | if (curr_buffer_->eof()) { | ||||
| eof_handled_ = true; | eof_handled_ = true; | ||||
| curr_buffer_.reset(); // explicitly free the eof buffer | curr_buffer_.reset(); // explicitly free the eof buffer | ||||
| root_->Tree()->SetFinished(); | |||||
| std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs."; | std::string err = "EOF buffer encountered. Users try to fetch data beyond the specified number of epochs."; | ||||
| RETURN_STATUS_UNEXPECTED(err); | RETURN_STATUS_UNEXPECTED(err); | ||||
| } | } | ||||
| @@ -170,6 +170,7 @@ Status DeviceQueueOp::SendDataToAscend() { | |||||
| if (isProfilingEnable) { | if (isProfilingEnable) { | ||||
| connector_size = ChildOpConnectorSize(); | connector_size = ChildOpConnectorSize(); | ||||
| connector_capacity = ChildOpConnectorCapacity(); | connector_capacity = ChildOpConnectorCapacity(); | ||||
| tree_->SetEpochEnd(); | |||||
| } | } | ||||
| RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); | RETURN_IF_NOT_OK(GetNextInput(¤t_buffer)); | ||||
| } | } | ||||
| @@ -48,6 +48,7 @@ class ExecutionTree { | |||||
| kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare | kDeTStatePrepare, // The tree has been assigned a root node and is pending prepare | ||||
| kDeTStateReady, // The tree has been prepared and is ready to be launched | kDeTStateReady, // The tree has been prepared and is ready to be launched | ||||
| kDeTStateExecuting, // The tree has been launched and is executing | kDeTStateExecuting, // The tree has been launched and is executing | ||||
| kDeTStateEpochEnd, // The tree has been received end of epoch signal, just for profiling | |||||
| kDeTStateFinished // The tree has been drained, dataset iterator received EOF | kDeTStateFinished // The tree has been drained, dataset iterator received EOF | ||||
| }; | }; | ||||
| @@ -207,6 +208,16 @@ class ExecutionTree { | |||||
| // @return raw pointer to the TaskGroup | // @return raw pointer to the TaskGroup | ||||
| TaskGroup *AllTasks() const { return tg_.get(); } | TaskGroup *AllTasks() const { return tg_.get(); } | ||||
| // Return if the ExecutionTree is at end of epoch status | |||||
| // @return bool - true is ExecutionTree is end of epoch status | |||||
| bool IsEpochEnd() const { return tree_state_ == TreeState::kDeTStateEpochEnd; } | |||||
| // Set the ExecutionTree to EOE state | |||||
| void SetEpochEnd() { tree_state_ = TreeState::kDeTStateEpochEnd; } | |||||
| // Set the ExecutionTree to executing state | |||||
| void SetExecuting() { tree_state_ = TreeState::kDeTStateExecuting; } | |||||
| // Return if the ExecutionTree is finished (iterator receives EOF). | // Return if the ExecutionTree is finished (iterator receives EOF). | ||||
| // @return Bool - true is ExecutionTree is finished | // @return Bool - true is ExecutionTree is finished | ||||
| bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; } | bool isFinished() const { return tree_state_ == TreeState::kDeTStateFinished; } | ||||
| @@ -36,6 +36,10 @@ Status Monitor::operator()() { | |||||
| // 1) Monitor Task is not interrupted by TaskManager AND | // 1) Monitor Task is not interrupted by TaskManager AND | ||||
| // 2) Iterator has not received EOF | // 2) Iterator has not received EOF | ||||
| while (!this_thread::is_interrupted() && !(tree_->isFinished())) { | while (!this_thread::is_interrupted() && !(tree_->isFinished())) { | ||||
| if (tree_->IsEpochEnd()) { | |||||
| tree_->GetProfilingManager()->SaveProfilingData(); | |||||
| tree_->SetExecuting(); | |||||
| } | |||||
| for (auto &node : tree_->GetProfilingManager()->GetSamplingNodes()) { | for (auto &node : tree_->GetProfilingManager()->GetSamplingNodes()) { | ||||
| RETURN_IF_NOT_OK(node.second->Sample()); | RETURN_IF_NOT_OK(node.second->Sample()); | ||||
| } | } | ||||