From: @anancds Reviewed-by: @wilfchen,@limingqi107 Signed-off-by: @limingqi107tags/v1.2.0-rc1
| @@ -46,7 +46,7 @@ bool ServerNode::Start(const uint32_t &timeout) { | |||||
| void ServerNode::set_handler(const RequestHandler &handler) { request_handler_ = handler; } | void ServerNode::set_handler(const RequestHandler &handler) { request_handler_ = handler; } | ||||
| void ServerNode::Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, DataPtr data, | |||||
| void ServerNode::Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, void *data, | |||||
| size_t size) { | size_t size) { | ||||
| MS_EXCEPTION_IF_NULL(conn); | MS_EXCEPTION_IF_NULL(conn); | ||||
| MS_EXCEPTION_IF_NULL(meta); | MS_EXCEPTION_IF_NULL(meta); | ||||
| @@ -55,7 +55,7 @@ void ServerNode::Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<M | |||||
| meta->set_rank_id(node_info_.rank_id_); | meta->set_rank_id(node_info_.rank_id_); | ||||
| MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_) | MS_LOG(DEBUG) << "The node role is:" << CommUtil::NodeRoleToString(node_info_.node_role_) | ||||
| << ", the node id is:" << node_info_.node_id_ << " send the request id is:" << meta->request_id(); | << ", the node id is:" << node_info_.node_id_ << " send the request id is:" << meta->request_id(); | ||||
| server_->SendMessage(conn, meta, Protos::RAW, data.get(), size); | |||||
| server_->SendMessage(conn, meta, Protos::RAW, data, size); | |||||
| } | } | ||||
| void ServerNode::CreateTcpServer() { | void ServerNode::CreateTcpServer() { | ||||
| @@ -46,7 +46,7 @@ class ServerNode : public AbstractNode { | |||||
| DataPtr data, size_t size)>; | DataPtr data, size_t size)>; | ||||
| void set_handler(const RequestHandler &handler); | void set_handler(const RequestHandler &handler); | ||||
| void Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, DataPtr data, size_t size); | |||||
| void Response(std::shared_ptr<TcpConnection> conn, std::shared_ptr<MessageMeta> meta, void *data, size_t size); | |||||
| private: | private: | ||||
| void CreateTcpServer(); | void CreateTcpServer(); | ||||
| @@ -505,16 +505,16 @@ void ParameterServer::ServerHandler::operator()(std::shared_ptr<core::TcpConnect | |||||
| auto &handler_ptr = handlers_[meta->user_cmd()]; | auto &handler_ptr = handlers_[meta->user_cmd()]; | ||||
| (this->*handler_ptr)(data, size, output); | (this->*handler_ptr)(data, size, output); | ||||
| std::shared_ptr<unsigned char[]> res(new unsigned char[output->size()]); | |||||
| MS_LOG(DEBUG) << "The output size is:" << output->size(); | MS_LOG(DEBUG) << "The output size is:" << output->size(); | ||||
| if (output->size() > 0) { | if (output->size() > 0) { | ||||
| int ret = memcpy_s(res.get(), output->size(), output->data(), output->size()); | |||||
| if (ret != 0) { | |||||
| MS_LOG(EXCEPTION) << "The memcpy_s error, errorno(" << ret << ")"; | |||||
| } | |||||
| ps_->server_node_->Response(conn, meta, output->data(), output->size()); | |||||
| } else { | |||||
| // If the size of the output is 0, then constructed an empty string, Because the Response function is a synchronous, | |||||
| // the res variable will be automatically recycled after calling the Response function | |||||
| std::string res; | |||||
| ps_->server_node_->Response(conn, meta, res.data(), res.length()); | |||||
| } | } | ||||
| ps_->server_node_->Response(conn, meta, res, output->size()); | |||||
| MS_LOG(DEBUG) << "The request id is:" << meta->request_id() << " the current time is:" | MS_LOG(DEBUG) << "The request id is:" << meta->request_id() << " the current time is:" | ||||
| << std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now()) | << std::chrono::time_point_cast<std::chrono::microseconds>(std::chrono::high_resolution_clock::now()) | ||||
| .time_since_epoch() | .time_since_epoch() | ||||
| @@ -313,7 +313,6 @@ void Worker::DoPSEmbeddingLookup(const Key &key, const std::vector<int> &lookup_ | |||||
| for (auto j = 0; j < message.values_size(); j++) { | for (auto j = 0; j < message.values_size(); j++) { | ||||
| values->push_back(message.values(j)); | values->push_back(message.values(j)); | ||||
| } | } | ||||
| MS_LOG(DEBUG) << "The embedding resp:" << *values; | |||||
| for (auto k = 0; k < message.keys_size(); k++) { | for (auto k = 0; k < message.keys_size(); k++) { | ||||
| const Key &key = message.keys(k); | const Key &key = message.keys(k); | ||||
| float *addr = values->data() + value_offset; | float *addr = values->data() + value_offset; | ||||