Browse Source

Fix lizard check for engine/cache/perf

tags/v1.2.0-rc1
Lixia Chen 4 years ago
parent
commit
fc35d54fe0
4 changed files with 234 additions and 194 deletions
  1. +202
    -184
      mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.cc
  2. +4
    -0
      mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.h
  3. +26
    -10
      mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc
  4. +2
    -0
      mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.h

+ 202
- 184
mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.cc View File

@@ -33,6 +33,10 @@
namespace mindspore {
namespace dataset {
const char CachePerfRun::kCachePipelineBinary[] = "cache_pipeline";
const int32_t port_opt = 1000; // there is no short option for port
const int32_t hostname_opt = 1001; // there is no short option for hostname
const int32_t connect_opt = 1002; // there is no short option for connect

void CachePerfRun::PrintHelp() {
std::cout << "Options:\n"
" -h,--help: Show this usage message\n"
@@ -62,16 +66,136 @@ void CachePerfRun::PrintHelp() {
<< " --hostname: Hostname of the cache server. Default = " << kCfgDefaultCacheHost << "\n";
}

int32_t CachePerfRun::ProcessArgsHelper(int32_t opt) {
int32_t rc = 0;
try {
switch (opt) {
case 'n': {
num_pipelines_ = std::stoi(optarg);
break;
}

case 'e': {
num_epoches_ = std::stoi(optarg);
break;
}

case 'p': {
int32_t prefetch_sz = std::stoi(optarg);
cache_builder_.SetPrefetchSize(prefetch_sz);
break;
}

case 'a': {
int32_t cache_sz = std::stoi(optarg);
cache_builder_.SetCacheMemSz(cache_sz);
break;
}

case 's': {
num_rows_ = std::stoi(optarg);
break;
}

case 'r': {
row_size_ = std::stoi(optarg);
break;
}

case 'w': {
cfg_.set_num_parallel_workers(std::stoi(optarg));
break;
}

case connect_opt: {
int32_t connection_sz = std::stoi(optarg);
cache_builder_.SetNumConnections(connection_sz);
break;
}

case port_opt: {
int32_t port = std::stoi(optarg);
cache_builder_.SetPort(port);
break;
}

case hostname_opt: {
std::string hostname = optarg;
cache_builder_.SetHostname(hostname);
break;
}

case 'h': // -h or --help
PrintHelp();
rc = -1;
break;

case ':':
std::cerr << "Missing argument for option " << char(optopt) << std::endl;
rc = -1;
break;

case '?': // Unrecognized option
default:
std::cerr << "Unknown option " << char(optopt) << std::endl;
PrintHelp();
rc = -1;
break;
}
} catch (const std::exception &e) {
PrintHelp();
rc = -1;
}
return rc;
}

int32_t CachePerfRun::SanityCheck(std::map<int32_t, int32_t> seen_opts) {
// We have all the defaults except sample size and average row size which the user must specify.
auto it = seen_opts.find('s');
if (it == seen_opts.end()) {
std::cerr << "Missing sample size." << std::endl;
return -1;
}

it = seen_opts.find('r');
if (it == seen_opts.end()) {
std::cerr << "Missing average row size." << std::endl;
return -1;
}

if (num_rows_ <= 0) {
std::cerr << "Sample size must be positive." << std::endl;
return -1;
}

if (row_size_ <= 0) {
std::cerr << "Average row size must be positive." << std::endl;
return -1;
}

if (num_pipelines_ <= 0) {
std::cerr << "Number of pipelines must be positive." << std::endl;
return -1;
}

if (num_epoches_ <= 0) {
std::cerr << "Number of epoches must be positive." << std::endl;
return -1;
}

if (num_rows_ < num_pipelines_) {
std::cerr << "Sample size is smaller than the number of pipelines." << std::endl;
return -1;
}
return 0;
}

int32_t CachePerfRun::ProcessArgs(int argc, char **argv) {
if (argc == 1) {
PrintHelp();
return -1;
}

const int32_t port_opt = 1000; // there is no short option for port
const int32_t hostname_opt = 1001; // there is no short option for hostname
const int32_t connect_opt = 1002; // there is no short option for connect

int shuffle = 0;
int spill = 0;

@@ -98,7 +222,7 @@ int32_t CachePerfRun::ProcessArgs(int argc, char **argv) {
int32_t option_indxex;
const auto opt = getopt_long(argc, argv, short_opts, long_opts, &option_indxex);

if (-1 == opt) {
if (opt == -1) {
if (optind < argc) {
rc = -1;
std::cerr << "Unknown arguments: ";
@@ -120,138 +244,27 @@ int32_t CachePerfRun::ProcessArgs(int argc, char **argv) {
}
}

switch (opt) {
case 0: {
if (long_opts[option_indxex].flag == &shuffle) {
shuffle_ = true;
} else if (long_opts[option_indxex].flag == &spill) {
cache_builder_.SetSpill(true);
}
break;
}

case 'n': {
num_pipelines_ = std::stoi(optarg);
break;
}

case 'e': {
num_epoches_ = std::stoi(optarg);
break;
}

case 'p': {
int32_t prefetch_sz = std::stoi(optarg);
cache_builder_.SetPrefetchSize(prefetch_sz);
break;
}

case 'a': {
int32_t cache_sz = std::stoi(optarg);
cache_builder_.SetCacheMemSz(cache_sz);
break;
}

case 's': {
num_rows_ = std::stoi(optarg);
break;
}

case 'r': {
row_size_ = std::stoi(optarg);
break;
}

case 'w': {
cfg_.set_num_parallel_workers(std::stoi(optarg));
break;
}

case connect_opt: {
int32_t connection_sz = std::stoi(optarg);
cache_builder_.SetNumConnections(connection_sz);
break;
}

case port_opt: {
int32_t port = std::stoi(optarg);
cache_builder_.SetPort(port);
break;
}

case hostname_opt: {
std::string hostname = optarg;
cache_builder_.SetHostname(hostname);
break;
if (opt == 0) {
if (long_opts[option_indxex].flag == &shuffle) {
shuffle_ = true;
} else if (long_opts[option_indxex].flag == &spill) {
cache_builder_.SetSpill(true);
}

case 'h': // -h or --help
PrintHelp();
rc = -1;
break;

case ':':
std::cerr << "Missing argument for option " << char(optopt) << std::endl;
rc = -1;
break;

case '?': // Unrecognized option
default:
std::cerr << "Unknown option " << char(optopt) << std::endl;
PrintHelp();
rc = -1;
break;
continue;
}

rc = ProcessArgsHelper(opt);
}
} catch (const std::exception &e) {
PrintHelp();
rc = -1;
}
if (rc < 0) return rc;

if (rc < 0) {
return rc;
}

// We have all the defaults except sample size and average row size which the user must specify.
auto it = seen_opts.find('s');
if (it == seen_opts.end()) {
std::cerr << "Missing sample size." << std::endl;
return -1;
}

it = seen_opts.find('r');
if (it == seen_opts.end()) {
std::cerr << "Missing average row size." << std::endl;
return -1;
}

if (num_rows_ <= 0) {
std::cerr << "Sample size must be positive." << std::endl;
return -1;
}

if (row_size_ <= 0) {
std::cerr << "Average row size must be positive." << std::endl;
return -1;
}

if (num_pipelines_ <= 0) {
std::cerr << "Number of pipelines must be positive." << std::endl;
return -1;
}

if (num_epoches_ <= 0) {
std::cerr << "Number of epoches must be positive." << std::endl;
return -1;
}

if (num_rows_ < num_pipelines_) {
std::cerr << "Sample size is smaller than the number of pipelines." << std::endl;
return -1;
}
rc = SanityCheck(seen_opts);
if (rc < 0) return rc;

pid_lists_.reserve(num_pipelines_);

return 0;
}

@@ -392,42 +405,7 @@ Status CachePerfRun::ListenToPipeline(int32_t workerId) {
return Status::OK();
}

Status CachePerfRun::Run() {
// Now we bring up TaskManager.
RETURN_IF_NOT_OK(Services::CreateInstance());
// Handle Control-C
RegisterHandlers();

// Get a session from the server.
RETURN_IF_NOT_OK(GetSession());

// Generate a random crc.
auto mt = GetRandomDevice();
std::uniform_int_distribution<session_id_type> distribution(0, std::numeric_limits<int32_t>::max());
crc_ = distribution(mt);
std::cout << "CRC: " << crc_ << std::endl;

// Create all the resources required by the pipelines before we fork.
for (auto i = 0; i < num_pipelines_; ++i) {
// We will use shared message queues for communication between parent (this process)
// and each pipelines.
auto access_mode = S_IRUSR | S_IWUSR;
int32_t msg_send_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_send_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_send_lists_.push_back(msg_send_qid);
int32_t msg_recv_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_recv_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_recv_lists_.push_back(msg_recv_qid);
}

// Now we create the children knowing all two sets of message queues are constructed.
auto start_tick = std::chrono::steady_clock::now();
Status CachePerfRun::StartPipelines() {
for (auto i = 0; i < num_pipelines_; ++i) {
auto pid = fork();
if (pid == 0) {
@@ -492,10 +470,64 @@ Status CachePerfRun::Run() {
RETURN_STATUS_UNEXPECTED(errMsg);
}
}
return Status::OK();
}

Status CachePerfRun::Cleanup() {
// Destroy the cache. We no longer need it around.
RETURN_IF_NOT_OK(cc_->DestroyCache());

// Unreserve the session
CacheClientInfo cinfo;
cinfo.set_session_id(session_);
auto rq = std::make_shared<DropSessionRequest>(cinfo);
RETURN_IF_NOT_OK(cc_->PushRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Drop session " << session_ << " successful" << std::endl;
session_ = 0;
return Status::OK();
}

Status CachePerfRun::Run() {
// Now we bring up TaskManager.
RETURN_IF_NOT_OK(Services::CreateInstance());
// Handle Control-C
RegisterHandlers();

// Get a session from the server.
RETURN_IF_NOT_OK(GetSession());

// Generate a random crc.
auto mt = GetRandomDevice();
std::uniform_int_distribution<session_id_type> distribution(0, std::numeric_limits<int32_t>::max());
crc_ = distribution(mt);
std::cout << "CRC: " << crc_ << std::endl;

// Create all the resources required by the pipelines before we fork.
for (auto i = 0; i < num_pipelines_; ++i) {
// We will use shared message queues for communication between parent (this process)
// and each pipelines.
auto access_mode = S_IRUSR | S_IWUSR;
int32_t msg_send_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_send_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_send_lists_.push_back(msg_send_qid);
int32_t msg_recv_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode);
if (msg_recv_qid == -1) {
std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno);
RETURN_STATUS_UNEXPECTED(errMsg);
}
msg_recv_lists_.push_back(msg_recv_qid);
}

// Now we create the children knowing all two sets of message queues are constructed.
auto start_tick = std::chrono::steady_clock::now();
RETURN_IF_NOT_OK(StartPipelines());

// Spawn a few threads to monitor the communications from the pipeline.
RETURN_IF_NOT_OK(vg_.ServiceStart());

auto f = std::bind(&CachePerfRun::ListenToPipeline, this, std::placeholders::_1);
for (auto i = 0; i < num_pipelines_; ++i) {
RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Queue listener", std::bind(f, i)));
@@ -526,14 +558,10 @@ Status CachePerfRun::Run() {
std::cout << "Get statistics for this session:\n";
std::cout << std::setw(12) << "Mem cached" << std::setw(12) << "Disk cached" << std::setw(16) << "Avg cache size"
<< std::setw(10) << "Numa hit" << std::endl;
std::string stat_mem_cached;
std::string stat_disk_cached;
std::string stat_avg_cached;
std::string stat_numa_hit;
stat_mem_cached = (stat.num_mem_cached == 0) ? "n/a" : std::to_string(stat.num_mem_cached);
stat_disk_cached = (stat.num_disk_cached == 0) ? "n/a" : std::to_string(stat.num_disk_cached);
stat_avg_cached = (stat.avg_cache_sz == 0) ? "n/a" : std::to_string(stat.avg_cache_sz);
stat_numa_hit = (stat.num_numa_hit == 0) ? "n/a" : std::to_string(stat.num_numa_hit);
std::string stat_mem_cached = (stat.num_mem_cached == 0) ? "n/a" : std::to_string(stat.num_mem_cached);
std::string stat_disk_cached = (stat.num_disk_cached == 0) ? "n/a" : std::to_string(stat.num_disk_cached);
std::string stat_avg_cached = (stat.avg_cache_sz == 0) ? "n/a" : std::to_string(stat.avg_cache_sz);
std::string stat_numa_hit = (stat.num_numa_hit == 0) ? "n/a" : std::to_string(stat.num_numa_hit);

std::cout << std::setw(12) << stat_mem_cached << std::setw(12) << stat_disk_cached << std::setw(16) << stat_avg_cached
<< std::setw(10) << stat_numa_hit << std::endl;
@@ -566,18 +594,8 @@ Status CachePerfRun::Run() {
++epoch_num;
}

// Destroy the cache. We no longer need it around.
RETURN_IF_NOT_OK(cc_->DestroyCache());

// Unreserve the session
CacheClientInfo cinfo;
cinfo.set_session_id(session_);
auto rq = std::make_shared<DropSessionRequest>(cinfo);
RETURN_IF_NOT_OK(cc_->PushRequest(rq));
RETURN_IF_NOT_OK(rq->Wait());
std::cout << "Drop session " << session_ << " successful" << std::endl;
session_ = 0;

// Destroy the cache client and drop the session
RETURN_IF_NOT_OK(Cleanup());
return Status::OK();
}
} // namespace dataset


+ 4
- 0
mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.h View File

@@ -93,6 +93,10 @@ class CachePerfRun {
Status GetSession();
Status ListenToPipeline(int32_t workerId);
void PrintEpochSummary() const;
Status StartPipelines();
Status Cleanup();
int32_t SanityCheck(std::map<int32_t, int32_t> seen_opts);
int32_t ProcessArgsHelper(int32_t opt);
};
} // namespace dataset
} // namespace mindspore


+ 26
- 10
mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc View File

@@ -31,14 +31,9 @@ namespace mindspore {
namespace dataset {
void CachePipelineRun::PrintHelp() { std::cout << "Please run the executable cache_perf instead." << std::endl; }

int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
if (argc != 3) {
PrintHelp();
return -1;
}

int32_t CachePipelineRun::ProcessPipelineArgs(char *argv) {
try {
std::stringstream cfg_ss(argv[1]);
std::stringstream cfg_ss(argv);
std::string s;
int32_t numArgs = 0;
while (std::getline(cfg_ss, s, ',')) {
@@ -72,8 +67,18 @@ int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
std::cerr << "Incomplete arguments. Expect 11. But get " << numArgs << std::endl;
return -1;
}
std::stringstream client_ss(argv[2]);
numArgs = 0;
} catch (const std::exception &e) {
std::cerr << "Parse error: " << e.what() << std::endl;
return -1;
}
return 0;
}

int32_t CachePipelineRun::ProcessClientArgs(char *argv) {
try {
std::stringstream client_ss(argv);
std::string s;
int32_t numArgs = 0;
while (std::getline(client_ss, s, ',')) {
if (numArgs == 0) {
cache_builder_.SetHostname(s);
@@ -101,6 +106,17 @@ int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
return 0;
}

int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) {
if (argc != 3) {
PrintHelp();
return -1;
}
int32_t rc = ProcessPipelineArgs(argv[1]);
if (rc < 0) return rc;
rc = ProcessClientArgs(argv[2]);
return rc;
}

CachePipelineRun::CachePipelineRun()
: my_pipeline_(-1),
num_pipelines_(kDftNumOfPipelines),
@@ -282,7 +298,7 @@ Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
std::shared_ptr<Tensor> element;
RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, col_desc->type(), &element));
row.setId(id);
// CreateEmpty allocates the memory but in virutal address. Let's commit the memory
// CreateEmpty allocates the memory but in virtual address. Let's commit the memory
// so we can get an accurate timing.
auto it = element->begin<int64_t>();
for (auto i = 0; i < num_elements; ++i, ++it) {


+ 2
- 0
mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.h View File

@@ -52,6 +52,8 @@ class CachePipelineRun {
~CachePipelineRun();
static void PrintHelp();
int32_t ProcessArgs(int argc, char **argv);
int32_t ProcessPipelineArgs(char *argv);
int32_t ProcessClientArgs(char *argv);

void Print(std::ostream &out) const {
out << "Number of pipelines: " << num_pipelines_ << "\n"


Loading…
Cancel
Save