From 3f3588a0354967876d7ad906a4bea66129c2f2f2 Mon Sep 17 00:00:00 2001 From: chendongsheng Date: Thu, 29 Apr 2021 17:34:53 +0800 Subject: [PATCH] fixed communicator --- mindspore/ccsrc/ps/constants.h | 8 +++++--- mindspore/ccsrc/ps/core/communicator/task_executor.cc | 2 +- mindspore/ccsrc/ps/core/communicator/task_executor.h | 5 +++-- mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc | 8 +++++++- mindspore/ccsrc/ps/core/communicator/tcp_communicator.h | 1 + 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/mindspore/ccsrc/ps/constants.h b/mindspore/ccsrc/ps/constants.h index 5b0386bca9..1189ebc227 100644 --- a/mindspore/ccsrc/ps/constants.h +++ b/mindspore/ccsrc/ps/constants.h @@ -76,9 +76,11 @@ constexpr uint32_t kMaxMessageSize = static_cast(100 * (uint32_t(1) << constexpr char kServerNum[] = "server_num"; constexpr char kWorkerNum[] = "worker_num"; -constexpr int64_t kSubmitTaskInterval = 1; -constexpr int64_t kMaxTaskNum = 1024; -constexpr int64_t kSubmitTimeOut = 3000; +constexpr int64_t kSubmitTaskIntervalInMs = 1; +constexpr int64_t kMaxTaskNum = 10240; +constexpr int64_t kSubmitTimeOutInMs = 30000; +constexpr int64_t kRetryCount = 60; +constexpr int64_t kRetryIntervalInMs = 10; using DataPtr = std::shared_ptr; using VectorPtr = std::shared_ptr>; diff --git a/mindspore/ccsrc/ps/core/communicator/task_executor.cc b/mindspore/ccsrc/ps/core/communicator/task_executor.cc index 25ce4d1b10..a91d5b6e54 100644 --- a/mindspore/ccsrc/ps/core/communicator/task_executor.cc +++ b/mindspore/ccsrc/ps/core/communicator/task_executor.cc @@ -67,7 +67,7 @@ TaskExecutor::TaskExecutor(size_t thread_num, size_t max_task_num, size_t submit cv_.notify_one(); } } - std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskInterval)); + std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskIntervalInMs)); } }); notify_thread_.detach(); diff --git a/mindspore/ccsrc/ps/core/communicator/task_executor.h b/mindspore/ccsrc/ps/core/communicator/task_executor.h index 272484ffe2..f96371c0d6 100644 --- a/mindspore/ccsrc/ps/core/communicator/task_executor.h +++ b/mindspore/ccsrc/ps/core/communicator/task_executor.h @@ -41,7 +41,8 @@ namespace core { */ class TaskExecutor { public: - explicit TaskExecutor(size_t thread_num, size_t max_task_num = kMaxTaskNum, size_t submit_timeout = kSubmitTimeOut); + explicit TaskExecutor(size_t thread_num, size_t max_task_num = kMaxTaskNum, + size_t submit_timeout = kSubmitTimeOutInMs); ~TaskExecutor(); // If the number of submitted tasks is greater than the size of the queue, it will block the submission of subsequent @@ -55,7 +56,7 @@ class TaskExecutor { std::unique_lock lock(mtx_); if (task_num_ >= max_task_num_) { lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskInterval)); + std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskIntervalInMs)); index++; } else { break; diff --git a/mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc b/mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc index 756077f540..e0d5491209 100644 --- a/mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc +++ b/mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc @@ -42,7 +42,13 @@ bool TcpCommunicator::Start() { std::shared_ptr tcp_msg_handler = std::make_shared(server_node_, conn, meta, data, size); MS_EXCEPTION_IF_NULL(tcp_msg_handler); - task_executor_->Submit(msg_callbacks_[msg_type], tcp_msg_handler); + // The Submit function timed out for 30s, if it returns false, it will retry 60 times. + bool res = CommUtil::Retry([&] { return task_executor_->Submit(msg_callbacks_[msg_type], tcp_msg_handler); }, + kRetryCount, kRetryIntervalInMs); + if (res == false) { + MS_LOG(EXCEPTION) << "Submit tcp msg handler failed."; + } + return; }, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4); diff --git a/mindspore/ccsrc/ps/core/communicator/tcp_communicator.h b/mindspore/ccsrc/ps/core/communicator/tcp_communicator.h index 17be6827f9..7d01408353 100644 --- a/mindspore/ccsrc/ps/core/communicator/tcp_communicator.h +++ b/mindspore/ccsrc/ps/core/communicator/tcp_communicator.h @@ -29,6 +29,7 @@ #include "ps/core/communicator/task_executor.h" #include "ps/core/communicator/communicator_base.h" #include "ps/core/communicator/tcp_msg_handler.h" +#include "ps/core/comm_util.h" namespace mindspore { namespace ps {