From 05d3ae22191f7c4fce7bca6b99a3d2202285f8c8 Mon Sep 17 00:00:00 2001 From: chendongsheng Date: Tue, 27 Apr 2021 10:14:44 +0800 Subject: [PATCH] add http communicator --- mindspore/ccsrc/ps/CMakeLists.txt | 4 ++ .../ps/core/communicator/communicator_base.cc | 41 +++++++++++ .../ps/core/communicator/communicator_base.h | 68 ++++++++++++++++++ .../ps/core/communicator/http_communicator.cc | 70 +++++++++++++++++++ .../ps/core/communicator/http_communicator.h | 55 +++++++++++++++ .../ps/core/communicator/http_msg_handler.cc | 43 ++++++++++++ .../ps/core/communicator/http_msg_handler.h | 44 ++++++++++++ .../core/communicator/http_request_handler.cc | 6 +- .../core/communicator/http_request_handler.h | 2 +- .../ccsrc/ps/core/communicator/http_server.cc | 9 ++- .../ccsrc/ps/core/communicator/http_server.h | 2 +- .../ps/core/communicator/message_handler.h | 41 +++++++++++ .../ps/core/communicator/tcp_msg_handler.cc | 46 ++++++++++++ .../ps/core/communicator/tcp_msg_handler.h | 52 ++++++++++++++ tests/ut/cpp/CMakeLists.txt | 4 +- 15 files changed, 478 insertions(+), 9 deletions(-) create mode 100644 mindspore/ccsrc/ps/core/communicator/communicator_base.cc create mode 100644 mindspore/ccsrc/ps/core/communicator/communicator_base.h create mode 100644 mindspore/ccsrc/ps/core/communicator/http_communicator.cc create mode 100644 mindspore/ccsrc/ps/core/communicator/http_communicator.h create mode 100644 mindspore/ccsrc/ps/core/communicator/http_msg_handler.cc create mode 100644 mindspore/ccsrc/ps/core/communicator/http_msg_handler.h create mode 100644 mindspore/ccsrc/ps/core/communicator/message_handler.h create mode 100644 mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.cc create mode 100644 mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.h diff --git a/mindspore/ccsrc/ps/CMakeLists.txt b/mindspore/ccsrc/ps/CMakeLists.txt index 26189758da..b15d7d082f 100644 --- a/mindspore/ccsrc/ps/CMakeLists.txt +++ b/mindspore/ccsrc/ps/CMakeLists.txt @@ -12,6 +12,10 @@ if(NOT (ENABLE_CPU AND (ENABLE_D OR ENABLE_GPU))) list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_client.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_message_handler.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_server.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/communicator_base.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_communicator.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/http_msg_handler.cc") + list(REMOVE_ITEM _PS_SRC_FILES "core/communicator/tcp_msg_handler.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/node.cc") list(REMOVE_ITEM _PS_SRC_FILES "core/node_manager.cc") list(REMOVE_ITEM _PS_SRC_FILES "ps_cache/ps_cache_manager.cc") diff --git a/mindspore/ccsrc/ps/core/communicator/communicator_base.cc b/mindspore/ccsrc/ps/core/communicator/communicator_base.cc new file mode 100644 index 0000000000..0ecfd968fa --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/communicator_base.cc @@ -0,0 +1,41 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/communicator_base.h" +#include + +namespace mindspore { +namespace ps { +namespace core { +bool CommunicatorBase::SendResponse(const void *rsp_data, size_t rsp_len, std::shared_ptr msg_handler) { + // The rsp_len could be 0 because of ProtoBuffer's feature. + if (rsp_data == nullptr || msg_handler == nullptr) { + MS_LOG(ERROR) << "SendResponse inputs are invalid."; + return false; + } + return msg_handler->SendResponse(rsp_data, rsp_len); +} +void CommunicatorBase::Join() { + if (!running_thread_.joinable()) { + MS_LOG(EXCEPTION) << "The running thread of communicator is not joinable."; + return; + } + running_thread_.join(); + return; +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/communicator_base.h b/mindspore/ccsrc/ps/core/communicator/communicator_base.h new file mode 100644 index 0000000000..ce35420e75 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/communicator_base.h @@ -0,0 +1,68 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_COMMUNICATOR_BASE_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_COMMUNICATOR_BASE_H_ + +#include +#include +#include +#include +#include + +#include "ps/core/communicator/message_handler.h" +#include "utils/log_adapter.h" +#include "ps/core/communicator/http_message_handler.h" +#include "ps/core/communicator/tcp_server.h" +#include "ps/core/node_info.h" +#include "ps/constants.h" + +namespace mindspore { +namespace ps { +namespace core { +// CommunicatorBase is used to receive request and send response for server. +// It is the base class of HttpCommunicator and TcpCommunicator. +class CommunicatorBase { + public: + using MessageCallback = std::function)>; + using HttpMsgCallback = std::function)>; + using OnNodeEventCallback = std::function; + using TcpMsgCallBack = std::function conn, + std::shared_ptr meta, DataPtr data, size_t size)>; + using CertainEventCallBack = std::function; + + CommunicatorBase() = default; + + virtual ~CommunicatorBase() = default; + + virtual bool Start() = 0; + virtual bool Stop() = 0; + // You need to call the Start() function before calling the Join() function, it will block server's main thread. + // if you want to exit the Join() function, then you should call the Stop() function in another thread. + void Join(); + + virtual void RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) = 0; + + bool SendResponse(const void *rsp_data, size_t rsp_len, std::shared_ptr msg_handler); + + protected: + std::unordered_map msg_callbacks_; + std::thread running_thread_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_COMMUNICATOR_BASE_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/http_communicator.cc b/mindspore/ccsrc/ps/core/communicator/http_communicator.cc new file mode 100644 index 0000000000..f4ed071977 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/http_communicator.cc @@ -0,0 +1,70 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/http_communicator.h" +#include +#include "common/thread_pool.h" + +namespace mindspore { +namespace ps { +namespace core { +bool HttpCommunicator::Start() { + MS_LOG(INFO) << "Initialize http server IP:" << ip_ << ", PORT:" << port_; + http_server_ = std::make_shared(ip_, port_, 32); + http_server_->InitServer(); + MS_EXCEPTION_IF_NULL(http_server_); + if (!http_server_->Start()) { + MS_LOG(EXCEPTION) << "Http server starting failed."; + } + MS_LOG(INFO) << "Http communicator started."; + + running_thread_ = std::thread([&]() { + try { + http_server_->Wait(); + } catch (const std::exception &e) { + MsException::Instance().SetException(); + } + }); + return true; +} + +bool HttpCommunicator::Stop() { + MS_EXCEPTION_IF_NULL(http_server_); + return http_server_->Stop(); +} + +void HttpCommunicator::RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) { + msg_callbacks_[msg_type] = cb; + http_msg_callbacks_[msg_type] = std::bind( + [&](std::shared_ptr http_msg) -> void { + std::shared_ptr http_msg_handler = std::make_shared(http_msg); + MS_EXCEPTION_IF_NULL(http_msg_handler); + msg_callbacks_[msg_type](http_msg_handler); + return; + }, + std::placeholders::_1); + + std::string url = "/"; + url += msg_type; + bool is_succeed = http_server_->RegisterRoute(url, &http_msg_callbacks_[msg_type]); + if (!is_succeed) { + MS_LOG(EXCEPTION) << "Http server register handler for url " << url << " failed."; + } + return; +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/http_communicator.h b/mindspore/ccsrc/ps/core/communicator/http_communicator.h new file mode 100644 index 0000000000..64ff99cb9c --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/http_communicator.h @@ -0,0 +1,55 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_COMMUNICATOR_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_COMMUNICATOR_H_ + +#include +#include +#include +#include "ps/core/communicator/http_server.h" +#include "ps/core/communicator/http_message_handler.h" +#include "ps/core/communicator/task_executor.h" +#include "ps/core/communicator/communicator_base.h" +#include "ps/core/communicator/http_msg_handler.h" +#include "utils/ms_exception.h" + +namespace mindspore { +namespace ps { +namespace core { +class HttpCommunicator : public CommunicatorBase { + public: + explicit HttpCommunicator(const std::string &ip, std::int16_t port, + const std::shared_ptr &task_executor) + : task_executor_(task_executor), http_server_(nullptr), ip_(ip), port_(port) {} + ~HttpCommunicator() = default; + + bool Start() override; + bool Stop() override; + void RegisterMsgCallBack(const std::string &msg_type, const MessageCallback &cb) override; + + private: + std::shared_ptr task_executor_; + std::shared_ptr http_server_; + std::unordered_map http_msg_callbacks_; + + std::string ip_; + std::int16_t port_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_COMMUNICATOR_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/http_msg_handler.cc b/mindspore/ccsrc/ps/core/communicator/http_msg_handler.cc new file mode 100644 index 0000000000..06b9ef794b --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/http_msg_handler.cc @@ -0,0 +1,43 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/http_msg_handler.h" +#include + +namespace mindspore { +namespace ps { +namespace core { +HttpMsgHandler::HttpMsgHandler(std::shared_ptr http_msg) + : http_msg_(http_msg), data_(nullptr), len_(0) { + len_ = http_msg_->GetPostMsg(&data_); +} + +void *HttpMsgHandler::data() const { + if (data_ == nullptr) { + MS_LOG(ERROR) << "HttpMsgHandler data is nullptr."; + } + return data_; +} + +size_t HttpMsgHandler::len() const { return len_; } + +bool HttpMsgHandler::SendResponse(const void *data, const size_t &len) { + http_msg_->QuickResponse(200, reinterpret_cast(const_cast(data)), len); + return true; +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/http_msg_handler.h b/mindspore/ccsrc/ps/core/communicator/http_msg_handler.h new file mode 100644 index 0000000000..28e228aff8 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/http_msg_handler.h @@ -0,0 +1,44 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MSG_HANDLER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MSG_HANDLER_H_ + +#include +#include "ps/core/communicator/http_message_handler.h" +#include "ps/core/communicator/message_handler.h" + +namespace mindspore { +namespace ps { +namespace core { +class HttpMsgHandler : public MessageHandler { + public: + explicit HttpMsgHandler(std::shared_ptr http_msg); + ~HttpMsgHandler() override = default; + + void *data() const override; + size_t len() const override; + bool SendResponse(const void *data, const size_t &len) override; + + private: + std::shared_ptr http_msg_; + unsigned char *data_; + size_t len_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_HTTP_MSG_HANDLER_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/http_request_handler.cc b/mindspore/ccsrc/ps/core/communicator/http_request_handler.cc index d38d76db69..9e00334640 100644 --- a/mindspore/ccsrc/ps/core/communicator/http_request_handler.cc +++ b/mindspore/ccsrc/ps/core/communicator/http_request_handler.cc @@ -99,13 +99,15 @@ void HttpRequestHandler::Run() { } } -void HttpRequestHandler::Stop() { +bool HttpRequestHandler::Stop() { MS_LOG(INFO) << "Stop http server!"; int ret = event_base_loopbreak(evbase_); if (ret != 0) { - MS_LOG(EXCEPTION) << "event base loop break failed!"; + MS_LOG(ERROR) << "event base loop break failed!"; + return false; } + return true; } bufferevent *HttpRequestHandler::BuffereventCallback(event_base *base, void *arg) { diff --git a/mindspore/ccsrc/ps/core/communicator/http_request_handler.h b/mindspore/ccsrc/ps/core/communicator/http_request_handler.h index d0d53164d1..0bf47562e8 100644 --- a/mindspore/ccsrc/ps/core/communicator/http_request_handler.h +++ b/mindspore/ccsrc/ps/core/communicator/http_request_handler.h @@ -47,7 +47,7 @@ class HttpRequestHandler { bool Initialize(int fd, const std::unordered_map &handlers); void Run(); - void Stop(); + bool Stop(); static bufferevent *BuffereventCallback(event_base *base, void *arg); private: diff --git a/mindspore/ccsrc/ps/core/communicator/http_server.cc b/mindspore/ccsrc/ps/core/communicator/http_server.cc index ce26ee11bf..60a78335f3 100644 --- a/mindspore/ccsrc/ps/core/communicator/http_server.cc +++ b/mindspore/ccsrc/ps/core/communicator/http_server.cc @@ -134,15 +134,20 @@ bool HttpServer::Wait() { return true; } -void HttpServer::Stop() { +bool HttpServer::Stop() { MS_LOG(INFO) << "Stop http server!"; + bool result = true; if (!is_stop_.load()) { for (size_t i = 0; i < thread_num_; i++) { - http_request_handlers[i]->Stop(); + bool res = http_request_handlers[i]->Stop(); + if (res == false) { + result = false; + } } is_stop_ = true; } + return result; } } // namespace core } // namespace ps diff --git a/mindspore/ccsrc/ps/core/communicator/http_server.h b/mindspore/ccsrc/ps/core/communicator/http_server.h index 1d5ab88d86..62736822ed 100644 --- a/mindspore/ccsrc/ps/core/communicator/http_server.h +++ b/mindspore/ccsrc/ps/core/communicator/http_server.h @@ -66,7 +66,7 @@ class HttpServer { bool Start(); bool Wait(); - void Stop(); + bool Stop(); private: std::string server_address_; diff --git a/mindspore/ccsrc/ps/core/communicator/message_handler.h b/mindspore/ccsrc/ps/core/communicator/message_handler.h new file mode 100644 index 0000000000..b98e19619f --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/message_handler.h @@ -0,0 +1,41 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_HANDLER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_HANDLER_H_ + +namespace mindspore { +namespace ps { +namespace core { +// MessageHandler class is used to handle requests from clients and send response from server. +// It's the base class of HttpMsgHandler and TcpMsgHandler. +class MessageHandler { + public: + MessageHandler() = default; + virtual ~MessageHandler() = default; + + // Raw data of this message in bytes. + virtual void *data() const = 0; + + // Raw data size of this message.(Number of bytes) + virtual size_t len() const = 0; + + virtual bool SendResponse(const void *data, const size_t &len) = 0; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_MESSAGE_HANDLER_H_ diff --git a/mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.cc b/mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.cc new file mode 100644 index 0000000000..ef2a14e5b6 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.cc @@ -0,0 +1,46 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ps/core/communicator/tcp_msg_handler.h" +#include + +namespace mindspore { +namespace ps { +namespace core { +TcpMsgHandler::TcpMsgHandler(ServerNode *server_node, std::shared_ptr conn, + std::shared_ptr meta, DataPtr data, size_t size) + : server_node_(server_node), tcp_conn_(conn), meta_(meta), data_ptr_(data), data_(nullptr), len_(size) { + if (data_ptr_ != nullptr) { + data_ = data_ptr_.get(); + } +} + +void *TcpMsgHandler::data() const { + if (data_ == nullptr) { + MS_LOG(ERROR) << "TcpMsgHandler data is nullptr."; + } + return data_; +} + +size_t TcpMsgHandler::len() const { return len_; } + +bool TcpMsgHandler::SendResponse(const void *data, const size_t &len) { + server_node_->Response(tcp_conn_, meta_, const_cast(data), len); + return true; +} +} // namespace core +} // namespace ps +} // namespace mindspore diff --git a/mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.h b/mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.h new file mode 100644 index 0000000000..6cebfe62f0 --- /dev/null +++ b/mindspore/ccsrc/ps/core/communicator/tcp_msg_handler.h @@ -0,0 +1,52 @@ +/** + * Copyright 2021 Huawei Technologies Co., Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MSG_HANDLER_H_ +#define MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MSG_HANDLER_H_ + +#include +#include "proto/ps.pb.h" +#include "ps/core/server_node.h" +#include "ps/core/communicator/message_handler.h" +#include "ps/constants.h" + +namespace mindspore { +namespace ps { +namespace core { +class TcpMsgHandler : public MessageHandler { + public: + TcpMsgHandler(ServerNode *server_node, std::shared_ptr conn, std::shared_ptr meta, + DataPtr data, size_t size); + ~TcpMsgHandler() override = default; + + void *data() const override; + size_t len() const override; + bool SendResponse(const void *data, const size_t &len) override; + + private: + ServerNode *server_node_; + std::shared_ptr tcp_conn_; + // core::MessageMeta is used for server to get the user command and to find communication peer when responding. + std::shared_ptr meta_; + // We use data of shared_ptr array so that the raw pointer won't be released until the reference is 0. + DataPtr data_ptr_; + void *data_; + size_t len_; +}; +} // namespace core +} // namespace ps +} // namespace mindspore +#endif // MINDSPORE_CCSRC_PS_CORE_COMMUNICATOR_TCP_MSG_HANDLER_H_ diff --git a/tests/ut/cpp/CMakeLists.txt b/tests/ut/cpp/CMakeLists.txt index 11d017da30..323fe53b1f 100644 --- a/tests/ut/cpp/CMakeLists.txt +++ b/tests/ut/cpp/CMakeLists.txt @@ -161,8 +161,6 @@ list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/scheduler.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/optimizer_info.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/optimizer_info_builder.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/worker.cc") -list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/core/communicator/http_request_handler.cc") -list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/core/communicator/http_server.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/parameter_server.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/ps_cache/gpu/gpu_ps_cache.cc") list(REMOVE_ITEM MINDSPORE_SRC_LIST "../../../mindspore/ccsrc/ps/ps_cache/ascend/ascend_ps_cache.cc") @@ -191,7 +189,7 @@ endif() if(CMAKE_SYSTEM_NAME MATCHES "Linux") target_link_libraries(ut_tests PRIVATE mindspore::gtest mindspore::event mindspore::event_pthreads - mindspore_gvar ${PYTHON_LIBRARIES} pthread util dl) + mindspore::event_openssl mindspore_gvar ${PYTHON_LIBRARIES} pthread util dl) if(ENABLE_MINDDATA) # AUX_SOURCE_DIRECTORY(LITE_CV_FILES)