Browse Source

!14891 dataset: fix codex

From: @ms_yan
Reviewed-by: @heleiwang,@liucunwei
Signed-off-by: @liucunwei
pull/14891/MERGE
mindspore-ci-bot Gitee 4 years ago
parent
commit
16fa94f45a
4 changed files with 55 additions and 39 deletions
  1. +16
    -4
      mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc
  2. +28
    -25
      mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc
  3. +10
    -10
      mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h
  4. +1
    -0
      mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h

+ 16
- 4
mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc View File

@@ -142,9 +142,9 @@ Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "Device queue, sending data to Ascend.";
uint64_t batch_start_time, end_time;
int64_t send_batch = 0;
int32_t tdt_cost;
int32_t tdt_cost = 0;
int32_t connector_size = 0;
int32_t connector_capacity;
int32_t connector_capacity = 0;
bool is_break_loop = false;

std::shared_ptr<DeviceQueueTracing> profiling_node;
@@ -202,7 +202,13 @@ Status DeviceQueueOp::SendDataToAscend() {
MS_LOG(INFO) << "stop_send received";
return Status::OK();
}
return Status(StatusCode::kMDTDTPushFailure, "TDT Push Failed");
return Status(StatusCode::kMDTDTPushFailure,
"TDT Push data into device Failed, please check the first error or TraceBack first, following are"
" several possible checking way: 1) if training is not ready, still in network graph compiling"
" stage, please check error raised by Network used operator or environment configuration. 2) if"
" interrupt in middle process of training, may check whether dataset sending num and network"
" training num mismatch. 3) if this error raised in end of training, ignore this. 4) other cases,"
" try find ascend host log or checking info log ects.");
}
MS_LOG(INFO) << "an epoch has already sent, now stop send data.";
stop_send_ = true;
@@ -237,7 +243,13 @@ Status DeviceQueueOp::SendRowToTdt(TensorRow currRow, bool isProfilingEnable, in
MS_LOG(INFO) << "stop_send received";
return Status::OK();
}
return Status(StatusCode::kMDTDTPushFailure, "TDT Push Failed");
return Status(StatusCode::kMDTDTPushFailure,
"TDT Push data into device Failed, please check the first error or TraceBack first, following are"
" several possible checking way: 1) if training is not ready, still in network graph compiling"
" stage, please check error raised by Network used operator or environment configuration. 2) if"
" interrupt in middle process of training, may check whether dataset sending num and network"
" training num mismatch. 3) if this error raised in end of training, ignore this. 4) other cases,"
" try find ascend host log or checking info log ects.");
}
if (create_data_info_queue_) {
DATA_INFO data_info;


+ 28
- 25
mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.cc View File

@@ -44,6 +44,9 @@ BaseCpu::BaseCpu() {
pre_cpu_stat_.io_stat_ = 0;
pre_cpu_stat_.idle_stat_ = 0;
pre_cpu_stat_.total_stat_ = 0;
fetched_all_process = false;
pre_fetched_state = false;
cpu_processor_num_ = 0;
}

Status DeviceCpu::ParseCpuInfo(const std::string &str) {
@@ -51,8 +54,8 @@ Status DeviceCpu::ParseCpuInfo(const std::string &str) {
uint64_t nice = 0;
uint64_t irq = 0;
uint64_t softirq = 0;
if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &cpu_stat.user_stat_, &nice, &cpu_stat.sys_stat_,
&cpu_stat.idle_stat_, &cpu_stat.io_stat_, &irq, &softirq) == EOF) {
if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &cpu_stat.user_stat_, &nice, &cpu_stat.sys_stat_,
&cpu_stat.idle_stat_, &cpu_stat.io_stat_, &irq, &softirq) == EOF) {
return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
}

@@ -86,7 +89,7 @@ Status DeviceCpu::ParseCpuInfo(const std::string &str) {

Status DeviceCpu::ParseCtxt(const std::string &str) {
uint64_t ctxt;
if (std::sscanf(str.c_str(), "%*s %lu", &ctxt) == EOF) {
if (sscanf_s(str.c_str(), "%*s %lu", &ctxt) == EOF) {
return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
}
// Calculate the utilization from the second sampling
@@ -99,7 +102,7 @@ Status DeviceCpu::ParseCtxt(const std::string &str) {

Status DeviceCpu::ParseRunningProcess(const std::string &str) {
uint32_t running_process;
if (std::sscanf(str.c_str(), "%*s %ud", &running_process) == EOF) {
if (sscanf_s(str.c_str(), "%*s %ud", &running_process) == EOF) {
return Status(StatusCode::kMDUnexpectedError, "Get context switch count failed.");
}
// Drop the first value in order to collect same amount of CPU utilization
@@ -110,7 +113,7 @@ Status DeviceCpu::ParseRunningProcess(const std::string &str) {
return Status::OK();
}

Status DeviceCpu::Collect(ExecutionTree *tree) {
Status DeviceCpu::Collect(const ExecutionTree *tree) {
std::ifstream file("/proc/stat");
if (!file.is_open()) {
MS_LOG(INFO) << "Open CPU file failed when collect CPU information";
@@ -222,8 +225,8 @@ Status OperatorCpu::ParseCpuInfo(int32_t op_id, int64_t thread_id,
getline(file, str);
uint64_t utime;
uint64_t stime;
if (std::sscanf(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime,
&stime) == EOF) {
if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &utime, &stime) ==
EOF) {
file.close();
return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
}
@@ -243,7 +246,7 @@ Status OperatorCpu::GetTotalCpuTime(uint64_t *total_stat) {
std::string str;
getline(file, str);
uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0;
if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
EOF) {
file.close();
return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
@@ -254,15 +257,15 @@ Status OperatorCpu::GetTotalCpuTime(uint64_t *total_stat) {
return Status::OK();
}

Status OperatorCpu::Collect(ExecutionTree *tree) {
Status OperatorCpu::Collect(const ExecutionTree *tree) {
if (first_collect_) {
for (auto iter = tree->begin(); iter != tree->end(); ++iter) {
id_count++;
id_count_++;
op_name[iter->id()] = iter->NameWithID();
op_parallel_workers[iter->id()] = iter->num_workers();
}
#if defined(USING_LINUX)
cpu_processor_num = get_nprocs_conf();
cpu_processor_num_ = get_nprocs_conf();
#endif
}

@@ -313,7 +316,7 @@ Status OperatorCpu::Collect(ExecutionTree *tree) {
}

// iter all the op, and obtain the CPU utilization of each operator
for (auto op_id = -1; op_id < id_count; op_id++) {
for (auto op_id = -1; op_id < id_count_; op_id++) {
float user_util = 0, sys_util = 0;
auto iter = std::find(total_op_id.begin(), total_op_id.end(), op_id);
if (iter != total_op_id.end()) {
@@ -362,7 +365,7 @@ Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string
*utilization = 0;

// start loop from 0 was as don't want to analyze op -1
for (auto op_id = 0; op_id < id_count; op_id++) {
for (auto op_id = 0; op_id < id_count_; op_id++) {
int sum = 0;
int index = op_id + 1;
for (int i = start_analyze; i < end_analyze; i++) {
@@ -370,7 +373,7 @@ Status OperatorCpu::Analyze(std::string *name, double *utilization, std::string
sum += cpu_op_util_[i][index].sys_utilization_;
}
if ((end_analyze - start_analyze) > 0) {
op_util = 1.0 * sum * cpu_processor_num / (op_parallel_workers[op_id] * (end_analyze - start_analyze));
op_util = 1.0 * sum * cpu_processor_num_ / (op_parallel_workers[op_id] * (end_analyze - start_analyze));
}
if (op_util > *utilization) {
*utilization = op_util;
@@ -394,15 +397,15 @@ Status OperatorCpu::SaveToFile(const std::string &file_path) {

uint8_t index = 0;
json OpWriter;
for (auto op_id = -1; op_id < id_count; op_id++) {
for (auto op_id = -1; op_id < id_count_; op_id++) {
std::vector<uint16_t> user_util;
std::vector<uint16_t> sys_util;
std::transform(
cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(user_util),
[&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].user_utilization_ * cpu_processor_num); });
[&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].user_utilization_ * cpu_processor_num_); });
std::transform(
cpu_op_util_.begin(), cpu_op_util_.end(), std::back_inserter(sys_util),
[&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].sys_utilization_ * cpu_processor_num); });
[&](const std::vector<CpuOpUtil> &info) { return int16_t(info[index].sys_utilization_ * cpu_processor_num_); });

json per_op_info = {{"metrics", {{"user_utilization", user_util}, {"sys_utilization", sys_util}}},
{"op_id", op_id}};
@@ -451,8 +454,8 @@ Status ProcessCpu::ParseCpuInfo() {
std::string str;
getline(file, str);
uint64_t user = 0, sys = 0;
if (std::sscanf(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &user,
&sys) == EOF) {
if (sscanf_s(str.c_str(), "%*d %*s %*s %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %*lu %lu %lu", &user, &sys) ==
EOF) {
file.close();
return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
}
@@ -487,7 +490,7 @@ Status ProcessCpu::GetTotalCpuTime(uint64_t *total_stat) {
std::string str;
getline(file, str);
uint64_t user = 0, sys = 0, idle = 0, iowait = 0, nice = 0, irq = 0, softirq = 0;
if (std::sscanf(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
if (sscanf_s(str.c_str(), "%*s %lu %lu %lu %lu %lu %lu %lu", &user, &nice, &sys, &idle, &iowait, &irq, &softirq) ==
EOF) {
file.close();
return Status(StatusCode::kMDUnexpectedError, "Get device CPU failed.");
@@ -498,10 +501,10 @@ Status ProcessCpu::GetTotalCpuTime(uint64_t *total_stat) {
return Status::OK();
}

Status ProcessCpu::Collect(ExecutionTree *tree) {
Status ProcessCpu::Collect(const ExecutionTree *tree) {
if (first_collect_) {
#if defined(USING_LINUX)
cpu_processor_num = get_nprocs_conf();
cpu_processor_num_ = get_nprocs_conf();
#endif
}
RETURN_IF_NOT_OK(ParseCpuInfo());
@@ -543,13 +546,13 @@ Status ProcessCpu::SaveToFile(const std::string &file_path) {

std::vector<int16_t> user_util;
std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(user_util),
[&](const CpuProcessUtil &info) { return uint16_t(info.user_utilization_ * cpu_processor_num); });
[&](const CpuProcessUtil &info) { return uint16_t(info.user_utilization_ * cpu_processor_num_); });
std::vector<int16_t> sys_util;
std::transform(process_util_.begin(), process_util_.end(), std::back_inserter(sys_util),
[&](const CpuProcessUtil &info) { return uint16_t(info.sys_utilization_ * cpu_processor_num); });
[&](const CpuProcessUtil &info) { return uint16_t(info.sys_utilization_ * cpu_processor_num_); });

output["process_info"] = {{"user_utilization", user_util}, {"sys_utilization", sys_util}};
output["cpu_processor_num"] = cpu_processor_num;
output["cpu_processor_num"] = cpu_processor_num_;
// Discard the content of the file when opening.
std::ofstream os(file_path, std::ios::trunc);
os << output;


+ 10
- 10
mindspore/ccsrc/minddata/dataset/engine/perf/cpu_sampling.h View File

@@ -69,7 +69,7 @@ class BaseCpu {
BaseCpu();
~BaseCpu() = default;
// Collect CPU information
virtual Status Collect(ExecutionTree *tree) = 0;
virtual Status Collect(const ExecutionTree *tree) = 0;
virtual Status SaveToFile(const std::string &file_path) = 0;
virtual Status Analyze(std::string *name, double *utilization, std::string *extra_message) = 0;

@@ -78,10 +78,10 @@ class BaseCpu {
CpuStat pre_cpu_stat_;
static bool fetched_all_process_shared;
static std::unordered_map<int32_t, std::vector<pid_t>> op_process_shared;
bool fetched_all_process = false;
bool pre_fetched_state = false;
bool fetched_all_process;
bool pre_fetched_state;
std::unordered_map<int32_t, std::vector<pid_t>> op_process;
int32_t cpu_processor_num;
int32_t cpu_processor_num_;
};

// Collect device CPU information
@@ -89,7 +89,7 @@ class DeviceCpu : public BaseCpu {
public:
DeviceCpu() : pre_running_process_(0), pre_context_switch_count_(0), first_collect_(true) {}
~DeviceCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status Collect(const ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
Status Analyze(std::string *name, double *utilization, std::string *extra_message) override;

@@ -113,9 +113,9 @@ class DeviceCpu : public BaseCpu {
// Collect operator CPU information
class OperatorCpu : public BaseCpu {
public:
OperatorCpu() : first_collect_(true) {}
OperatorCpu() : first_collect_(true), pre_total_stat_(0), id_count_(0) {}
~OperatorCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status Collect(const ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
// Analyze will output the name of the metric, the avg utiliization of highest
// object within the class and any extra message that would be useful for the user.
@@ -142,15 +142,15 @@ class OperatorCpu : public BaseCpu {
std::unordered_map<int32_t, int32_t> op_parallel_workers;
std::unordered_map<int32_t, std::unordered_map<int64_t, CpuOpStat>> pre_op_stat_;
uint64_t pre_total_stat_;
int32_t id_count = 0;
int32_t id_count_;
};

// Collect operator CPU information
class ProcessCpu : public BaseCpu {
public:
ProcessCpu() : first_collect_(true) {}
ProcessCpu() : first_collect_(true), pre_total_stat_(0) {}
~ProcessCpu() = default;
Status Collect(ExecutionTree *tree) override;
Status Collect(const ExecutionTree *tree) override;
Status SaveToFile(const std::string &file_path) override;
Status Analyze(std::string *name, double *utilization, std::string *extra_message) override;



+ 1
- 0
mindspore/ccsrc/minddata/dataset/engine/tdt/tdt_handle.h View File

@@ -32,6 +32,7 @@ class TdtHandle {

private:
TdtHandle() {}
~TdtHandle() = default;
};
} // namespace dataset
} // namespace mindspore


Loading…
Cancel
Save