Browse Source

fixed communicator

pull/15908/head
chendongsheng 4 years ago
parent
commit
3f3588a035
5 changed files with 17 additions and 7 deletions
  1. +5
    -3
      mindspore/ccsrc/ps/constants.h
  2. +1
    -1
      mindspore/ccsrc/ps/core/communicator/task_executor.cc
  3. +3
    -2
      mindspore/ccsrc/ps/core/communicator/task_executor.h
  4. +7
    -1
      mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc
  5. +1
    -0
      mindspore/ccsrc/ps/core/communicator/tcp_communicator.h

+ 5
- 3
mindspore/ccsrc/ps/constants.h View File

@@ -76,9 +76,11 @@ constexpr uint32_t kMaxMessageSize = static_cast<uint32_t>(100 * (uint32_t(1) <<
constexpr char kServerNum[] = "server_num";
constexpr char kWorkerNum[] = "worker_num";

constexpr int64_t kSubmitTaskInterval = 1;
constexpr int64_t kMaxTaskNum = 1024;
constexpr int64_t kSubmitTimeOut = 3000;
constexpr int64_t kSubmitTaskIntervalInMs = 1;
constexpr int64_t kMaxTaskNum = 10240;
constexpr int64_t kSubmitTimeOutInMs = 30000;
constexpr int64_t kRetryCount = 60;
constexpr int64_t kRetryIntervalInMs = 10;

using DataPtr = std::shared_ptr<unsigned char[]>;
using VectorPtr = std::shared_ptr<std::vector<unsigned char>>;


+ 1
- 1
mindspore/ccsrc/ps/core/communicator/task_executor.cc View File

@@ -67,7 +67,7 @@ TaskExecutor::TaskExecutor(size_t thread_num, size_t max_task_num, size_t submit
cv_.notify_one();
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskInterval));
std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskIntervalInMs));
}
});
notify_thread_.detach();


+ 3
- 2
mindspore/ccsrc/ps/core/communicator/task_executor.h View File

@@ -41,7 +41,8 @@ namespace core {
*/
class TaskExecutor {
public:
explicit TaskExecutor(size_t thread_num, size_t max_task_num = kMaxTaskNum, size_t submit_timeout = kSubmitTimeOut);
explicit TaskExecutor(size_t thread_num, size_t max_task_num = kMaxTaskNum,
size_t submit_timeout = kSubmitTimeOutInMs);
~TaskExecutor();

// If the number of submitted tasks is greater than the size of the queue, it will block the submission of subsequent
@@ -55,7 +56,7 @@ class TaskExecutor {
std::unique_lock<std::mutex> lock(mtx_);
if (task_num_ >= max_task_num_) {
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskInterval));
std::this_thread::sleep_for(std::chrono::milliseconds(kSubmitTaskIntervalInMs));
index++;
} else {
break;


+ 7
- 1
mindspore/ccsrc/ps/core/communicator/tcp_communicator.cc View File

@@ -42,7 +42,13 @@ bool TcpCommunicator::Start() {
std::shared_ptr<MessageHandler> tcp_msg_handler =
std::make_shared<TcpMsgHandler>(server_node_, conn, meta, data, size);
MS_EXCEPTION_IF_NULL(tcp_msg_handler);
task_executor_->Submit(msg_callbacks_[msg_type], tcp_msg_handler);
// The Submit function timed out for 30s, if it returns false, it will retry 60 times.
bool res = CommUtil::Retry([&] { return task_executor_->Submit(msg_callbacks_[msg_type], tcp_msg_handler); },
kRetryCount, kRetryIntervalInMs);
if (res == false) {
MS_LOG(EXCEPTION) << "Submit tcp msg handler failed.";
}

return;
},
std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4);


+ 1
- 0
mindspore/ccsrc/ps/core/communicator/tcp_communicator.h View File

@@ -29,6 +29,7 @@
#include "ps/core/communicator/task_executor.h"
#include "ps/core/communicator/communicator_base.h"
#include "ps/core/communicator/tcp_msg_handler.h"
#include "ps/core/comm_util.h"

namespace mindspore {
namespace ps {


Loading…
Cancel
Save