From: @anancds Reviewed-by: @wilfchen,@cristoval Signed-off-by: @cristovalpull/15023/MERGE
| @@ -526,8 +526,8 @@ void AbstractNode::ProcessSendDataResp(std::shared_ptr<MessageMeta> meta, const | |||||
| auto it = receive_messages_.find(request_id); | auto it = receive_messages_.find(request_id); | ||||
| VectorPtr received_data = std::make_shared<std::vector<unsigned char>>(size, 0); | VectorPtr received_data = std::make_shared<std::vector<unsigned char>>(size, 0); | ||||
| if (size > 0) { | if (size > 0) { | ||||
| int ret = memcpy_s(received_data.get()->data(), size, data, size); | |||||
| if (ret != 0) { | |||||
| auto ret = memcpy_s(received_data.get()->data(), size, data, size); | |||||
| if (ret != EOK) { | |||||
| MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")"; | MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")"; | ||||
| } | } | ||||
| } | } | ||||
| @@ -130,7 +130,7 @@ class AbstractNode : public Node { | |||||
| // the key is rank_id, the value is rank_id's actual request_id | // the key is rank_id, the value is rank_id's actual request_id | ||||
| std::unordered_map<uint32_t, uint64_t> actual_rank_request_ids_; | std::unordered_map<uint32_t, uint64_t> actual_rank_request_ids_; | ||||
| std::mutex rank_request_ids_mutex; | std::mutex rank_request_ids_mutex; | ||||
| timeval scheduler_time_; | |||||
| timeval scheduler_time_{0, 0}; | |||||
| std::unordered_map<NodeCommand, ResponseHandler> handlers_; | std::unordered_map<NodeCommand, ResponseHandler> handlers_; | ||||
| }; | }; | ||||
| } // namespace core | } // namespace core | ||||
| @@ -48,7 +48,7 @@ Status HttpClient::Post(const std::string &url, const void *body, size_t len, st | |||||
| struct evhttp_connection *connection = | struct evhttp_connection *connection = | ||||
| evhttp_connection_base_new(event_base_, dns_base_, handler->GetHostByUri(), handler->GetUriPort()); | evhttp_connection_base_new(event_base_, dns_base_, handler->GetHostByUri(), handler->GetUriPort()); | ||||
| if (!connection) { | |||||
| if (connection == nullptr) { | |||||
| MS_LOG(ERROR) << "Create http connection failed!"; | MS_LOG(ERROR) << "Create http connection failed!"; | ||||
| return Status::BADREQUEST; | return Status::BADREQUEST; | ||||
| } | } | ||||
| @@ -78,7 +78,7 @@ Status HttpClient::Get(const std::string &url, std::shared_ptr<std::vector<char> | |||||
| struct evhttp_connection *connection = | struct evhttp_connection *connection = | ||||
| evhttp_connection_base_new(event_base_, dns_base_, handler->GetHostByUri(), handler->GetUriPort()); | evhttp_connection_base_new(event_base_, dns_base_, handler->GetHostByUri(), handler->GetUriPort()); | ||||
| if (!connection) { | |||||
| if (connection == nullptr) { | |||||
| MS_LOG(ERROR) << "Create http connection failed!"; | MS_LOG(ERROR) << "Create http connection failed!"; | ||||
| return Status::BADREQUEST; | return Status::BADREQUEST; | ||||
| } | } | ||||
| @@ -94,7 +94,7 @@ void HttpClient::ReadCallback(struct evhttp_request *request, void *arg) { | |||||
| MS_EXCEPTION_IF_NULL(request); | MS_EXCEPTION_IF_NULL(request); | ||||
| MS_EXCEPTION_IF_NULL(arg); | MS_EXCEPTION_IF_NULL(arg); | ||||
| auto handler = static_cast<HttpMessageHandler *>(arg); | auto handler = static_cast<HttpMessageHandler *>(arg); | ||||
| if (event_base_loopexit(handler->http_base(), nullptr) != 0) { | |||||
| if (event_base_loopexit(const_cast<event_base *>(handler->http_base()), nullptr) != 0) { | |||||
| MS_LOG(EXCEPTION) << "event base loop exit failed!"; | MS_LOG(EXCEPTION) << "event base loop exit failed!"; | ||||
| } | } | ||||
| } | } | ||||
| @@ -104,9 +104,8 @@ int HttpClient::ReadHeaderDoneCallback(struct evhttp_request *request, void *arg | |||||
| MS_EXCEPTION_IF_NULL(arg); | MS_EXCEPTION_IF_NULL(arg); | ||||
| auto handler = static_cast<HttpMessageHandler *>(arg); | auto handler = static_cast<HttpMessageHandler *>(arg); | ||||
| handler->set_request(request); | handler->set_request(request); | ||||
| MS_LOG(DEBUG) << "The http response code is:" << evhttp_request_get_response_code(request) | |||||
| << ", The request code line is:" << evhttp_request_get_response_code_line(request); | |||||
| struct evkeyvalq *headers = evhttp_request_get_input_headers(request); | struct evkeyvalq *headers = evhttp_request_get_input_headers(request); | ||||
| MS_EXCEPTION_IF_NULL(headers); | |||||
| struct evkeyval *header; | struct evkeyval *header; | ||||
| TAILQ_FOREACH(header, headers, next) { | TAILQ_FOREACH(header, headers, next) { | ||||
| MS_LOG(DEBUG) << "The key:" << header->key << ",The value:" << header->value; | MS_LOG(DEBUG) << "The key:" << header->key << ",The value:" << header->value; | ||||
| @@ -136,7 +135,7 @@ void HttpClient::RequestErrorCallback(enum evhttp_request_error error, void *arg | |||||
| MS_EXCEPTION_IF_NULL(arg); | MS_EXCEPTION_IF_NULL(arg); | ||||
| auto handler = static_cast<HttpMessageHandler *>(arg); | auto handler = static_cast<HttpMessageHandler *>(arg); | ||||
| MS_LOG(ERROR) << "The request failed, the error is:" << error; | MS_LOG(ERROR) << "The request failed, the error is:" << error; | ||||
| if (event_base_loopexit(handler->http_base(), nullptr) != 0) { | |||||
| if (event_base_loopexit(const_cast<event_base *>(handler->http_base()), nullptr) != 0) { | |||||
| MS_LOG(EXCEPTION) << "event base loop exit failed!"; | MS_LOG(EXCEPTION) << "event base loop exit failed!"; | ||||
| } | } | ||||
| } | } | ||||
| @@ -150,28 +149,30 @@ void HttpClient::ConnectionCloseCallback(struct evhttp_connection *connection, v | |||||
| } | } | ||||
| } | } | ||||
| void HttpClient::AddHeaders(const std::map<std::string, std::string> &headers, struct evhttp_request *request, | |||||
| void HttpClient::AddHeaders(const std::map<std::string, std::string> &headers, const struct evhttp_request *request, | |||||
| std::shared_ptr<HttpMessageHandler> handler) { | std::shared_ptr<HttpMessageHandler> handler) { | ||||
| MS_EXCEPTION_IF_NULL(request); | MS_EXCEPTION_IF_NULL(request); | ||||
| if (evhttp_add_header(evhttp_request_get_output_headers(request), "Host", handler->GetHostByUri()) != 0) { | |||||
| if (evhttp_add_header(evhttp_request_get_output_headers(const_cast<evhttp_request *>(request)), "Host", | |||||
| handler->GetHostByUri()) != 0) { | |||||
| MS_LOG(EXCEPTION) << "Add header failed!"; | MS_LOG(EXCEPTION) << "Add header failed!"; | ||||
| } | } | ||||
| for (auto &header : headers) { | for (auto &header : headers) { | ||||
| if (evhttp_add_header(evhttp_request_get_output_headers(request), header.first.data(), header.second.data()) != 0) { | |||||
| if (evhttp_add_header(evhttp_request_get_output_headers(const_cast<evhttp_request *>(request)), header.first.data(), | |||||
| header.second.data()) != 0) { | |||||
| MS_LOG(EXCEPTION) << "Add header failed!"; | MS_LOG(EXCEPTION) << "Add header failed!"; | ||||
| } | } | ||||
| } | } | ||||
| } | } | ||||
| void HttpClient::InitRequest(std::shared_ptr<HttpMessageHandler> handler, const std::string &url, | void HttpClient::InitRequest(std::shared_ptr<HttpMessageHandler> handler, const std::string &url, | ||||
| struct evhttp_request *request) { | |||||
| const struct evhttp_request *request) { | |||||
| MS_EXCEPTION_IF_NULL(request); | MS_EXCEPTION_IF_NULL(request); | ||||
| MS_EXCEPTION_IF_NULL(handler); | MS_EXCEPTION_IF_NULL(handler); | ||||
| handler->set_http_base(event_base_); | handler->set_http_base(event_base_); | ||||
| handler->ParseUrl(url); | handler->ParseUrl(url); | ||||
| evhttp_request_set_header_cb(request, ReadHeaderDoneCallback); | |||||
| evhttp_request_set_chunked_cb(request, ReadChunkDataCallback); | |||||
| evhttp_request_set_error_cb(request, RequestErrorCallback); | |||||
| evhttp_request_set_header_cb(const_cast<evhttp_request *>(request), ReadHeaderDoneCallback); | |||||
| evhttp_request_set_chunked_cb(const_cast<evhttp_request *>(request), ReadChunkDataCallback); | |||||
| evhttp_request_set_error_cb(const_cast<evhttp_request *>(request), RequestErrorCallback); | |||||
| MS_LOG(DEBUG) << "The url is:" << url << ", The host is:" << handler->GetHostByUri() | MS_LOG(DEBUG) << "The url is:" << url << ", The host is:" << handler->GetHostByUri() | ||||
| << ", The port is:" << handler->GetUriPort() << ", The request_url is:" << handler->GetRequestPath(); | << ", The port is:" << handler->GetUriPort() << ", The request_url is:" << handler->GetRequestPath(); | ||||
| @@ -196,8 +197,6 @@ Status HttpClient::CreateRequest(std::shared_ptr<HttpMessageHandler> handler, st | |||||
| } | } | ||||
| if (handler->request()) { | if (handler->request()) { | ||||
| MS_LOG(DEBUG) << "The http response code is:" << evhttp_request_get_response_code(handler->request()) | |||||
| << ", The request code line is:" << evhttp_request_get_response_code_line(handler->request()); | |||||
| return Status(evhttp_request_get_response_code(handler->request())); | return Status(evhttp_request_get_response_code(handler->request())); | ||||
| } | } | ||||
| return Status::INTERNAL; | return Status::INTERNAL; | ||||
| @@ -76,9 +76,10 @@ class HttpClient { | |||||
| static void RequestErrorCallback(enum evhttp_request_error error, void *arg); | static void RequestErrorCallback(enum evhttp_request_error error, void *arg); | ||||
| static void ConnectionCloseCallback(struct evhttp_connection *connection, void *arg); | static void ConnectionCloseCallback(struct evhttp_connection *connection, void *arg); | ||||
| void AddHeaders(const std::map<std::string, std::string> &headers, struct evhttp_request *request, | |||||
| void AddHeaders(const std::map<std::string, std::string> &headers, const struct evhttp_request *request, | |||||
| std::shared_ptr<HttpMessageHandler> handler); | std::shared_ptr<HttpMessageHandler> handler); | ||||
| void InitRequest(std::shared_ptr<HttpMessageHandler> handler, const std::string &url, struct evhttp_request *request); | |||||
| void InitRequest(std::shared_ptr<HttpMessageHandler> handler, const std::string &url, | |||||
| const struct evhttp_request *request); | |||||
| Status CreateRequest(std::shared_ptr<HttpMessageHandler> handler, struct evhttp_connection *connection, | Status CreateRequest(std::shared_ptr<HttpMessageHandler> handler, struct evhttp_connection *connection, | ||||
| struct evhttp_request *request, HttpMethod method); | struct evhttp_request *request, HttpMethod method); | ||||
| @@ -43,7 +43,7 @@ void HttpMessageHandler::InitHttpMessage() { | |||||
| MS_EXCEPTION_IF_NULL(event_uri_); | MS_EXCEPTION_IF_NULL(event_uri_); | ||||
| const char *query = evhttp_uri_get_query(event_uri_); | const char *query = evhttp_uri_get_query(event_uri_); | ||||
| if (query) { | |||||
| if (query != nullptr) { | |||||
| MS_LOG(WARNING) << "The query is:" << query; | MS_LOG(WARNING) << "The query is:" << query; | ||||
| evhttp_parse_query_str(query, &path_params_); | evhttp_parse_query_str(query, &path_params_); | ||||
| } | } | ||||
| @@ -142,7 +142,7 @@ std::string HttpMessageHandler::GetRequestPath() { | |||||
| } | } | ||||
| std::string path_res(path); | std::string path_res(path); | ||||
| const char *query = evhttp_uri_get_query(event_uri_); | const char *query = evhttp_uri_get_query(event_uri_); | ||||
| if (query) { | |||||
| if (query != nullptr) { | |||||
| path_res.append("?"); | path_res.append("?"); | ||||
| path_res.append(query); | path_res.append(query); | ||||
| } | } | ||||
| @@ -248,7 +248,7 @@ void HttpMessageHandler::set_content_len(const uint64_t &len) { content_len_ = l | |||||
| uint64_t HttpMessageHandler::content_len() { return content_len_; } | uint64_t HttpMessageHandler::content_len() { return content_len_; } | ||||
| event_base *HttpMessageHandler::http_base() { return event_base_; } | |||||
| const event_base *HttpMessageHandler::http_base() { return event_base_; } | |||||
| void HttpMessageHandler::set_http_base(const struct event_base *base) { | void HttpMessageHandler::set_http_base(const struct event_base *base) { | ||||
| MS_EXCEPTION_IF_NULL(base); | MS_EXCEPTION_IF_NULL(base); | ||||
| @@ -260,9 +260,7 @@ void HttpMessageHandler::set_request(const struct evhttp_request *req) { | |||||
| event_request_ = const_cast<evhttp_request *>(req); | event_request_ = const_cast<evhttp_request *>(req); | ||||
| } | } | ||||
| struct evhttp_request *HttpMessageHandler::request() { | |||||
| return event_request_; | |||||
| } | |||||
| const struct evhttp_request *HttpMessageHandler::request() { return event_request_; } | |||||
| void HttpMessageHandler::InitBodySize() { body_->resize(content_len()); } | void HttpMessageHandler::InitBodySize() { body_->resize(content_len()); } | ||||
| @@ -100,10 +100,10 @@ class HttpMessageHandler { | |||||
| void ReceiveMessage(const void *buffer, size_t num); | void ReceiveMessage(const void *buffer, size_t num); | ||||
| void set_content_len(const uint64_t &len); | void set_content_len(const uint64_t &len); | ||||
| uint64_t content_len(); | uint64_t content_len(); | ||||
| event_base *http_base(); | |||||
| const event_base *http_base(); | |||||
| void set_http_base(const struct event_base *base); | void set_http_base(const struct event_base *base); | ||||
| void set_request(const struct evhttp_request *req); | void set_request(const struct evhttp_request *req); | ||||
| struct evhttp_request *request(); | |||||
| const struct evhttp_request *request(); | |||||
| void InitBodySize(); | void InitBodySize(); | ||||
| VectorPtr body(); | VectorPtr body(); | ||||
| void set_body(VectorPtr body); | void set_body(VectorPtr body); | ||||
| @@ -42,7 +42,7 @@ void TcpConnection::SendMessage(const void *buffer, size_t num) const { | |||||
| } | } | ||||
| } | } | ||||
| TcpServer *TcpConnection::GetServer() const { return server_; } | |||||
| const TcpServer *TcpConnection::GetServer() const { return server_; } | |||||
| const evutil_socket_t &TcpConnection::GetFd() const { return fd_; } | const evutil_socket_t &TcpConnection::GetFd() const { return fd_; } | ||||
| @@ -283,7 +283,7 @@ void TcpServer::ListenerCallback(struct evconnlistener *, evutil_socket_t fd, st | |||||
| MS_EXCEPTION_IF_NULL(sockaddr); | MS_EXCEPTION_IF_NULL(sockaddr); | ||||
| struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); | struct bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE); | ||||
| if (!bev) { | |||||
| if (bev == nullptr) { | |||||
| MS_LOG(ERROR) << "Error constructing buffer event!"; | MS_LOG(ERROR) << "Error constructing buffer event!"; | ||||
| int ret = event_base_loopbreak(base); | int ret = event_base_loopbreak(base); | ||||
| if (ret != 0) { | if (ret != 0) { | ||||
| @@ -361,7 +361,7 @@ void TcpServer::EventCallback(struct bufferevent *bev, std::int16_t events, void | |||||
| struct evbuffer *output = bufferevent_get_output(bev); | struct evbuffer *output = bufferevent_get_output(bev); | ||||
| size_t remain = evbuffer_get_length(output); | size_t remain = evbuffer_get_length(output); | ||||
| auto conn = static_cast<class TcpConnection *>(data); | auto conn = static_cast<class TcpConnection *>(data); | ||||
| auto srv = conn->GetServer(); | |||||
| auto srv = const_cast<TcpServer *>(conn->GetServer()); | |||||
| if (events & BEV_EVENT_EOF) { | if (events & BEV_EVENT_EOF) { | ||||
| MS_LOG(INFO) << "Event buffer end of file!"; | MS_LOG(INFO) << "Event buffer end of file!"; | ||||
| @@ -57,7 +57,7 @@ class TcpConnection { | |||||
| bool SendMessage(std::shared_ptr<CommMessage> message) const; | bool SendMessage(std::shared_ptr<CommMessage> message) const; | ||||
| bool SendMessage(std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data, size_t size) const; | bool SendMessage(std::shared_ptr<MessageMeta> meta, const Protos &protos, const void *data, size_t size) const; | ||||
| virtual void OnReadHandler(const void *buffer, size_t numBytes); | virtual void OnReadHandler(const void *buffer, size_t numBytes); | ||||
| TcpServer *GetServer() const; | |||||
| const TcpServer *GetServer() const; | |||||
| const evutil_socket_t &GetFd() const; | const evutil_socket_t &GetFd() const; | ||||
| void set_callback(const Callback &callback); | void set_callback(const Callback &callback); | ||||
| @@ -42,7 +42,7 @@ bool ServerNode::Start(const uint32_t &timeout) { | |||||
| void ServerNode::set_handler(const RequestHandler &handler) { request_handler_ = handler; } | void ServerNode::set_handler(const RequestHandler &handler) { request_handler_ = handler; } | ||||
| void ServerNode::Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, void *data, | |||||
| void ServerNode::Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, const void *data, | |||||
| size_t size) { | size_t size) { | ||||
| MS_EXCEPTION_IF_NULL(conn); | MS_EXCEPTION_IF_NULL(conn); | ||||
| MS_EXCEPTION_IF_NULL(meta); | MS_EXCEPTION_IF_NULL(meta); | ||||
| @@ -102,8 +102,8 @@ void ServerNode::ProcessSendData(std::shared_ptr<TcpConnection> conn, std::share | |||||
| MS_EXCEPTION_IF_NULL(meta); | MS_EXCEPTION_IF_NULL(meta); | ||||
| MS_EXCEPTION_IF_NULL(data); | MS_EXCEPTION_IF_NULL(data); | ||||
| std::shared_ptr<unsigned char[]> res(new unsigned char[size]); | std::shared_ptr<unsigned char[]> res(new unsigned char[size]); | ||||
| int ret = memcpy_s(res.get(), size, data, size); | |||||
| if (ret != 0) { | |||||
| auto ret = memcpy_s(res.get(), size, data, size); | |||||
| if (ret != EOK) { | |||||
| MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")"; | MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")"; | ||||
| } | } | ||||
| MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_) | MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_) | ||||
| @@ -46,7 +46,7 @@ class ServerNode : public AbstractNode { | |||||
| DataPtr data, size_t size)>; | DataPtr data, size_t size)>; | ||||
| void set_handler(const RequestHandler &handler); | void set_handler(const RequestHandler &handler); | ||||
| void Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, void *data, size_t size); | |||||
| void Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, const void *data, size_t size); | |||||
| private: | private: | ||||
| void CreateTcpServer(); | void CreateTcpServer(); | ||||
| @@ -947,6 +947,7 @@ void Worker::SendForPush(int cmd, const KVMessage &send, const KVPartitioner &pa | |||||
| void Worker::SendForPull(int cmd, const KVMessage &send, const KVPartitioner &partitioner, | void Worker::SendForPull(int cmd, const KVMessage &send, const KVPartitioner &partitioner, | ||||
| const std::map<int64_t, int64_t> &attrs, std::vector<float> *vals, std::vector<int> *lens) { | const std::map<int64_t, int64_t> &attrs, std::vector<float> *vals, std::vector<int> *lens) { | ||||
| MS_EXCEPTION_IF_NULL(vals); | |||||
| PartitionKVMessages messages; | PartitionKVMessages messages; | ||||
| partitioner(send, &messages, {}); | partitioner(send, &messages, {}); | ||||
| std::vector<uint32_t> rank_ids; | std::vector<uint32_t> rank_ids; | ||||
| @@ -81,7 +81,7 @@ class Worker { | |||||
| void Finalize(); | void Finalize(); | ||||
| private: | private: | ||||
| Worker() : running_(false), key_cnt_(0) {} | |||||
| Worker() : server_num_(-1), running_(false), key_cnt_(0) {} | |||||
| ~Worker() = default; | ~Worker() = default; | ||||
| Worker(const Worker &) = delete; | Worker(const Worker &) = delete; | ||||
| Worker &operator=(const Worker &) = delete; | Worker &operator=(const Worker &) = delete; | ||||