diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.cc index 90d821addc..43ab2b817e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.cc @@ -33,6 +33,10 @@ namespace mindspore { namespace dataset { const char CachePerfRun::kCachePipelineBinary[] = "cache_pipeline"; +const int32_t port_opt = 1000; // there is no short option for port +const int32_t hostname_opt = 1001; // there is no short option for hostname +const int32_t connect_opt = 1002; // there is no short option for connect + void CachePerfRun::PrintHelp() { std::cout << "Options:\n" " -h,--help: Show this usage message\n" @@ -62,16 +66,136 @@ void CachePerfRun::PrintHelp() { << " --hostname: Hostname of the cache server. Default = " << kCfgDefaultCacheHost << "\n"; } +int32_t CachePerfRun::ProcessArgsHelper(int32_t opt) { + int32_t rc = 0; + try { + switch (opt) { + case 'n': { + num_pipelines_ = std::stoi(optarg); + break; + } + + case 'e': { + num_epoches_ = std::stoi(optarg); + break; + } + + case 'p': { + int32_t prefetch_sz = std::stoi(optarg); + cache_builder_.SetPrefetchSize(prefetch_sz); + break; + } + + case 'a': { + int32_t cache_sz = std::stoi(optarg); + cache_builder_.SetCacheMemSz(cache_sz); + break; + } + + case 's': { + num_rows_ = std::stoi(optarg); + break; + } + + case 'r': { + row_size_ = std::stoi(optarg); + break; + } + + case 'w': { + cfg_.set_num_parallel_workers(std::stoi(optarg)); + break; + } + + case connect_opt: { + int32_t connection_sz = std::stoi(optarg); + cache_builder_.SetNumConnections(connection_sz); + break; + } + + case port_opt: { + int32_t port = std::stoi(optarg); + cache_builder_.SetPort(port); + break; + } + + case hostname_opt: { + std::string hostname = optarg; + cache_builder_.SetHostname(hostname); + break; + } + + case 'h': // -h or --help + PrintHelp(); + rc = -1; + break; + + case ':': + std::cerr << "Missing argument for option " << char(optopt) << std::endl; + rc = -1; + break; + + case '?': // Unrecognized option + default: + std::cerr << "Unknown option " << char(optopt) << std::endl; + PrintHelp(); + rc = -1; + break; + } + } catch (const std::exception &e) { + PrintHelp(); + rc = -1; + } + return rc; +} + +int32_t CachePerfRun::SanityCheck(std::map seen_opts) { + // We have all the defaults except sample size and average row size which the user must specify. + auto it = seen_opts.find('s'); + if (it == seen_opts.end()) { + std::cerr << "Missing sample size." << std::endl; + return -1; + } + + it = seen_opts.find('r'); + if (it == seen_opts.end()) { + std::cerr << "Missing average row size." << std::endl; + return -1; + } + + if (num_rows_ <= 0) { + std::cerr << "Sample size must be positive." << std::endl; + return -1; + } + + if (row_size_ <= 0) { + std::cerr << "Average row size must be positive." << std::endl; + return -1; + } + + if (num_pipelines_ <= 0) { + std::cerr << "Number of pipelines must be positive." << std::endl; + return -1; + } + + if (num_epoches_ <= 0) { + std::cerr << "Number of epoches must be positive." << std::endl; + return -1; + } + + if (num_rows_ < num_pipelines_) { + std::cerr << "Sample size is smaller than the number of pipelines." << std::endl; + return -1; + } + return 0; +} + int32_t CachePerfRun::ProcessArgs(int argc, char **argv) { if (argc == 1) { PrintHelp(); return -1; } - const int32_t port_opt = 1000; // there is no short option for port - const int32_t hostname_opt = 1001; // there is no short option for hostname - const int32_t connect_opt = 1002; // there is no short option for connect - int shuffle = 0; int spill = 0; @@ -98,7 +222,7 @@ int32_t CachePerfRun::ProcessArgs(int argc, char **argv) { int32_t option_indxex; const auto opt = getopt_long(argc, argv, short_opts, long_opts, &option_indxex); - if (-1 == opt) { + if (opt == -1) { if (optind < argc) { rc = -1; std::cerr << "Unknown arguments: "; @@ -120,138 +244,27 @@ int32_t CachePerfRun::ProcessArgs(int argc, char **argv) { } } - switch (opt) { - case 0: { - if (long_opts[option_indxex].flag == &shuffle) { - shuffle_ = true; - } else if (long_opts[option_indxex].flag == &spill) { - cache_builder_.SetSpill(true); - } - break; - } - - case 'n': { - num_pipelines_ = std::stoi(optarg); - break; - } - - case 'e': { - num_epoches_ = std::stoi(optarg); - break; - } - - case 'p': { - int32_t prefetch_sz = std::stoi(optarg); - cache_builder_.SetPrefetchSize(prefetch_sz); - break; - } - - case 'a': { - int32_t cache_sz = std::stoi(optarg); - cache_builder_.SetCacheMemSz(cache_sz); - break; - } - - case 's': { - num_rows_ = std::stoi(optarg); - break; - } - - case 'r': { - row_size_ = std::stoi(optarg); - break; - } - - case 'w': { - cfg_.set_num_parallel_workers(std::stoi(optarg)); - break; - } - - case connect_opt: { - int32_t connection_sz = std::stoi(optarg); - cache_builder_.SetNumConnections(connection_sz); - break; - } - - case port_opt: { - int32_t port = std::stoi(optarg); - cache_builder_.SetPort(port); - break; - } - - case hostname_opt: { - std::string hostname = optarg; - cache_builder_.SetHostname(hostname); - break; + if (opt == 0) { + if (long_opts[option_indxex].flag == &shuffle) { + shuffle_ = true; + } else if (long_opts[option_indxex].flag == &spill) { + cache_builder_.SetSpill(true); } - - case 'h': // -h or --help - PrintHelp(); - rc = -1; - break; - - case ':': - std::cerr << "Missing argument for option " << char(optopt) << std::endl; - rc = -1; - break; - - case '?': // Unrecognized option - default: - std::cerr << "Unknown option " << char(optopt) << std::endl; - PrintHelp(); - rc = -1; - break; + continue; } + + rc = ProcessArgsHelper(opt); } } catch (const std::exception &e) { PrintHelp(); rc = -1; } + if (rc < 0) return rc; - if (rc < 0) { - return rc; - } - - // We have all the defaults except sample size and average row size which the user must specify. - auto it = seen_opts.find('s'); - if (it == seen_opts.end()) { - std::cerr << "Missing sample size." << std::endl; - return -1; - } - - it = seen_opts.find('r'); - if (it == seen_opts.end()) { - std::cerr << "Missing average row size." << std::endl; - return -1; - } - - if (num_rows_ <= 0) { - std::cerr << "Sample size must be positive." << std::endl; - return -1; - } - - if (row_size_ <= 0) { - std::cerr << "Average row size must be positive." << std::endl; - return -1; - } - - if (num_pipelines_ <= 0) { - std::cerr << "Number of pipelines must be positive." << std::endl; - return -1; - } - - if (num_epoches_ <= 0) { - std::cerr << "Number of epoches must be positive." << std::endl; - return -1; - } - - if (num_rows_ < num_pipelines_) { - std::cerr << "Sample size is smaller than the number of pipelines." << std::endl; - return -1; - } + rc = SanityCheck(seen_opts); + if (rc < 0) return rc; pid_lists_.reserve(num_pipelines_); - return 0; } @@ -392,42 +405,7 @@ Status CachePerfRun::ListenToPipeline(int32_t workerId) { return Status::OK(); } -Status CachePerfRun::Run() { - // Now we bring up TaskManager. - RETURN_IF_NOT_OK(Services::CreateInstance()); - // Handle Control-C - RegisterHandlers(); - - // Get a session from the server. - RETURN_IF_NOT_OK(GetSession()); - - // Generate a random crc. - auto mt = GetRandomDevice(); - std::uniform_int_distribution distribution(0, std::numeric_limits::max()); - crc_ = distribution(mt); - std::cout << "CRC: " << crc_ << std::endl; - - // Create all the resources required by the pipelines before we fork. - for (auto i = 0; i < num_pipelines_; ++i) { - // We will use shared message queues for communication between parent (this process) - // and each pipelines. - auto access_mode = S_IRUSR | S_IWUSR; - int32_t msg_send_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode); - if (msg_send_qid == -1) { - std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno); - RETURN_STATUS_UNEXPECTED(errMsg); - } - msg_send_lists_.push_back(msg_send_qid); - int32_t msg_recv_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode); - if (msg_recv_qid == -1) { - std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno); - RETURN_STATUS_UNEXPECTED(errMsg); - } - msg_recv_lists_.push_back(msg_recv_qid); - } - - // Now we create the children knowing all two sets of message queues are constructed. - auto start_tick = std::chrono::steady_clock::now(); +Status CachePerfRun::StartPipelines() { for (auto i = 0; i < num_pipelines_; ++i) { auto pid = fork(); if (pid == 0) { @@ -492,10 +470,64 @@ Status CachePerfRun::Run() { RETURN_STATUS_UNEXPECTED(errMsg); } } + return Status::OK(); +} + +Status CachePerfRun::Cleanup() { + // Destroy the cache. We no longer need it around. + RETURN_IF_NOT_OK(cc_->DestroyCache()); + + // Unreserve the session + CacheClientInfo cinfo; + cinfo.set_session_id(session_); + auto rq = std::make_shared(cinfo); + RETURN_IF_NOT_OK(cc_->PushRequest(rq)); + RETURN_IF_NOT_OK(rq->Wait()); + std::cout << "Drop session " << session_ << " successful" << std::endl; + session_ = 0; + return Status::OK(); +} + +Status CachePerfRun::Run() { + // Now we bring up TaskManager. + RETURN_IF_NOT_OK(Services::CreateInstance()); + // Handle Control-C + RegisterHandlers(); + + // Get a session from the server. + RETURN_IF_NOT_OK(GetSession()); + + // Generate a random crc. + auto mt = GetRandomDevice(); + std::uniform_int_distribution distribution(0, std::numeric_limits::max()); + crc_ = distribution(mt); + std::cout << "CRC: " << crc_ << std::endl; + + // Create all the resources required by the pipelines before we fork. + for (auto i = 0; i < num_pipelines_; ++i) { + // We will use shared message queues for communication between parent (this process) + // and each pipelines. + auto access_mode = S_IRUSR | S_IWUSR; + int32_t msg_send_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode); + if (msg_send_qid == -1) { + std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno); + RETURN_STATUS_UNEXPECTED(errMsg); + } + msg_send_lists_.push_back(msg_send_qid); + int32_t msg_recv_qid = msgget(IPC_PRIVATE, IPC_CREAT | IPC_EXCL | access_mode); + if (msg_recv_qid == -1) { + std::string errMsg = "Unable to create a message queue. Errno = " + std::to_string(errno); + RETURN_STATUS_UNEXPECTED(errMsg); + } + msg_recv_lists_.push_back(msg_recv_qid); + } + + // Now we create the children knowing all two sets of message queues are constructed. + auto start_tick = std::chrono::steady_clock::now(); + RETURN_IF_NOT_OK(StartPipelines()); // Spawn a few threads to monitor the communications from the pipeline. RETURN_IF_NOT_OK(vg_.ServiceStart()); - auto f = std::bind(&CachePerfRun::ListenToPipeline, this, std::placeholders::_1); for (auto i = 0; i < num_pipelines_; ++i) { RETURN_IF_NOT_OK(vg_.CreateAsyncTask("Queue listener", std::bind(f, i))); @@ -526,14 +558,10 @@ Status CachePerfRun::Run() { std::cout << "Get statistics for this session:\n"; std::cout << std::setw(12) << "Mem cached" << std::setw(12) << "Disk cached" << std::setw(16) << "Avg cache size" << std::setw(10) << "Numa hit" << std::endl; - std::string stat_mem_cached; - std::string stat_disk_cached; - std::string stat_avg_cached; - std::string stat_numa_hit; - stat_mem_cached = (stat.num_mem_cached == 0) ? "n/a" : std::to_string(stat.num_mem_cached); - stat_disk_cached = (stat.num_disk_cached == 0) ? "n/a" : std::to_string(stat.num_disk_cached); - stat_avg_cached = (stat.avg_cache_sz == 0) ? "n/a" : std::to_string(stat.avg_cache_sz); - stat_numa_hit = (stat.num_numa_hit == 0) ? "n/a" : std::to_string(stat.num_numa_hit); + std::string stat_mem_cached = (stat.num_mem_cached == 0) ? "n/a" : std::to_string(stat.num_mem_cached); + std::string stat_disk_cached = (stat.num_disk_cached == 0) ? "n/a" : std::to_string(stat.num_disk_cached); + std::string stat_avg_cached = (stat.avg_cache_sz == 0) ? "n/a" : std::to_string(stat.avg_cache_sz); + std::string stat_numa_hit = (stat.num_numa_hit == 0) ? "n/a" : std::to_string(stat.num_numa_hit); std::cout << std::setw(12) << stat_mem_cached << std::setw(12) << stat_disk_cached << std::setw(16) << stat_avg_cached << std::setw(10) << stat_numa_hit << std::endl; @@ -566,18 +594,8 @@ Status CachePerfRun::Run() { ++epoch_num; } - // Destroy the cache. We no longer need it around. - RETURN_IF_NOT_OK(cc_->DestroyCache()); - - // Unreserve the session - CacheClientInfo cinfo; - cinfo.set_session_id(session_); - auto rq = std::make_shared(cinfo); - RETURN_IF_NOT_OK(cc_->PushRequest(rq)); - RETURN_IF_NOT_OK(rq->Wait()); - std::cout << "Drop session " << session_ << " successful" << std::endl; - session_ = 0; - + // Destroy the cache client and drop the session + RETURN_IF_NOT_OK(Cleanup()); return Status::OK(); } } // namespace dataset diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.h b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.h index 8b21954bb8..dac9c8012e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_perf_run.h @@ -93,6 +93,10 @@ class CachePerfRun { Status GetSession(); Status ListenToPipeline(int32_t workerId); void PrintEpochSummary() const; + Status StartPipelines(); + Status Cleanup(); + int32_t SanityCheck(std::map seen_opts); + int32_t ProcessArgsHelper(int32_t opt); }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc index b1a48518ad..b2445d83cf 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc @@ -31,14 +31,9 @@ namespace mindspore { namespace dataset { void CachePipelineRun::PrintHelp() { std::cout << "Please run the executable cache_perf instead." << std::endl; } -int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) { - if (argc != 3) { - PrintHelp(); - return -1; - } - +int32_t CachePipelineRun::ProcessPipelineArgs(char *argv) { try { - std::stringstream cfg_ss(argv[1]); + std::stringstream cfg_ss(argv); std::string s; int32_t numArgs = 0; while (std::getline(cfg_ss, s, ',')) { @@ -72,8 +67,18 @@ int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) { std::cerr << "Incomplete arguments. Expect 11. But get " << numArgs << std::endl; return -1; } - std::stringstream client_ss(argv[2]); - numArgs = 0; + } catch (const std::exception &e) { + std::cerr << "Parse error: " << e.what() << std::endl; + return -1; + } + return 0; +} + +int32_t CachePipelineRun::ProcessClientArgs(char *argv) { + try { + std::stringstream client_ss(argv); + std::string s; + int32_t numArgs = 0; while (std::getline(client_ss, s, ',')) { if (numArgs == 0) { cache_builder_.SetHostname(s); @@ -101,6 +106,17 @@ int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) { return 0; } +int32_t CachePipelineRun::ProcessArgs(int argc, char **argv) { + if (argc != 3) { + PrintHelp(); + return -1; + } + int32_t rc = ProcessPipelineArgs(argv[1]); + if (rc < 0) return rc; + rc = ProcessClientArgs(argv[2]); + return rc; +} + CachePipelineRun::CachePipelineRun() : my_pipeline_(-1), num_pipelines_(kDftNumOfPipelines), @@ -282,7 +298,7 @@ Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) { std::shared_ptr element; RETURN_IF_NOT_OK(Tensor::CreateEmpty(shape, col_desc->type(), &element)); row.setId(id); - // CreateEmpty allocates the memory but in virutal address. Let's commit the memory + // CreateEmpty allocates the memory but in virtual address. Let's commit the memory // so we can get an accurate timing. auto it = element->begin(); for (auto i = 0; i < num_elements; ++i, ++it) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.h b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.h index d13aec76de..d1d617133a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.h @@ -52,6 +52,8 @@ class CachePipelineRun { ~CachePipelineRun(); static void PrintHelp(); int32_t ProcessArgs(int argc, char **argv); + int32_t ProcessPipelineArgs(char *argv); + int32_t ProcessClientArgs(char *argv); void Print(std::ostream &out) const { out << "Number of pipelines: " << num_pipelines_ << "\n"