| @@ -209,7 +209,9 @@ void Connection::Disconnect(int fd) { | |||
| void Connection::Close() { | |||
| if (recv_event_loop != nullptr) { | |||
| recv_event_loop->DeleteEpollEvent(socket_fd); | |||
| if (recv_event_loop->DeleteEpollEvent(socket_fd) == RPC_ERROR) { | |||
| MS_LOG(ERROR) << "Failed to delete epoll event " << socket_fd; | |||
| } | |||
| } | |||
| if (!destination.empty()) { | |||
| if (recv_message != nullptr) { | |||
| @@ -329,7 +331,7 @@ std::string Connection::GenerateHttpMessage(MessageBase *msg) { | |||
| if (msg->Body().size() > 0) { | |||
| std::ostringstream bodyLine; | |||
| bodyLine << std::hex << msg->Body().size() << "\r\n"; | |||
| bodyLine.write(msg->Body().data(), msg->Body().size()); | |||
| (void)bodyLine.write(msg->Body().data(), msg->Body().size()); | |||
| return postLine + userAgentLine + fromLine + connectLine + hostLine + chunkedBeginLine + bodyLine.str() + | |||
| chunkedEndLine; | |||
| } | |||
| @@ -364,9 +366,9 @@ void Connection::FillSendMessage(MessageBase *msg, const std::string &advertiseU | |||
| send_io_vec[index].iov_len = msg->body.size(); | |||
| ++index; | |||
| send_kernel_msg.msg_iov = send_io_vec; | |||
| send_kernel_msg.msg_iovlen = index; | |||
| send_kernel_msg.msg_iovlen = IntToSize(index); | |||
| total_send_len = | |||
| sizeof(send_msg_header) + msg->name.size() + send_to.size() + send_from.size() + msg->body.size(); | |||
| UlongToUint(sizeof(send_msg_header)) + msg->name.size() + send_to.size() + send_from.size() + msg->body.size(); | |||
| send_message = msg; | |||
| // update metrics | |||
| @@ -390,8 +392,8 @@ void Connection::FillSendMessage(MessageBase *msg, const std::string &advertiseU | |||
| send_io_vec[index].iov_len = msg->body.size(); | |||
| ++index; | |||
| send_kernel_msg.msg_iov = send_io_vec; | |||
| send_kernel_msg.msg_iovlen = index; | |||
| total_send_len = msg->body.size(); | |||
| send_kernel_msg.msg_iovlen = IntToSize(index); | |||
| total_send_len = UlongToUint(msg->body.size()); | |||
| send_message = msg; | |||
| // update metrics | |||
| @@ -435,8 +437,8 @@ void Connection::FillRecvMessage() { | |||
| ++i; | |||
| recv_kernel_msg.msg_iov = recv_io_vec; | |||
| recv_kernel_msg.msg_iovlen = i; | |||
| total_recv_len = msg->name.size() + recv_to.size() + recv_from.size() + msg->body.size(); | |||
| recv_kernel_msg.msg_iovlen = IntToSize(i); | |||
| total_recv_len = UlongToUint(msg->name.size()) + recv_to.size() + recv_from.size() + msg->body.size(); | |||
| recv_message = msg; | |||
| } | |||
| @@ -488,7 +490,7 @@ bool Connection::ParseMessage() { | |||
| state = ConnectionState::kDisconnecting; | |||
| return false; | |||
| } | |||
| total_recv_len -= retval; | |||
| total_recv_len -= IntToSize(retval); | |||
| return false; | |||
| } | |||
| recv_state = State::kMsgHeader; | |||
| @@ -499,7 +501,7 @@ bool Connection::ParseMessage() { | |||
| return true; | |||
| } | |||
| void Connection::ReorderHeader(MessageHeader *header) { | |||
| void Connection::ReorderHeader(MessageHeader *header) const { | |||
| header->name_len = ntohl(header->name_len); | |||
| header->to_len = ntohl(header->to_len); | |||
| header->from_len = ntohl(header->from_len); | |||
| @@ -56,7 +56,7 @@ struct MessageHeader { | |||
| */ | |||
| struct SendMetrics { | |||
| // Records the message number and max body size. | |||
| void UpdateMax(int size) { | |||
| void UpdateMax(size_t size) { | |||
| accum_msg_count++; | |||
| if (size > max_msg_size) { | |||
| max_msg_size = size; | |||
| @@ -84,10 +84,10 @@ struct SendMetrics { | |||
| } | |||
| // The total number of bytes sent already. | |||
| int accum_msg_count{0}; | |||
| size_t accum_msg_count{0}; | |||
| // The max message body size sent in bytes. | |||
| int max_msg_size{0}; | |||
| size_t max_msg_size{0}; | |||
| int error_code{0}; | |||
| std::string last_succ_msg_name; | |||
| @@ -222,7 +222,7 @@ struct Connection { | |||
| std::string GenerateHttpMessage(MessageBase *msg); | |||
| // Change the header body from network byte order to host byte order. | |||
| void ReorderHeader(MessageHeader *header); | |||
| void ReorderHeader(MessageHeader *header) const; | |||
| std::string advertise_addr_; | |||
| }; | |||
| @@ -37,9 +37,9 @@ void ConnectionPool::CloseConnection(Connection *conn) { | |||
| if (!conn->destination.empty()) { | |||
| if (conn->is_remote) { | |||
| remote_conns_.erase(conn->destination); | |||
| (void)remote_conns_.erase(conn->destination); | |||
| } else { | |||
| local_conns_.erase(conn->destination); | |||
| (void)local_conns_.erase(conn->destination); | |||
| } | |||
| } | |||
| conn->Close(); | |||
| @@ -98,7 +98,7 @@ void ConnectionPool::ResetAllConnMetrics() { | |||
| Connection *ConnectionPool::FindMaxConnection() { | |||
| Connection *conn = nullptr; | |||
| int count = 0; | |||
| size_t count = 0; | |||
| for (const auto &iter : local_conns_) { | |||
| if (iter.second->send_metrics->accum_msg_count > count) { | |||
| count = iter.second->send_metrics->accum_msg_count; | |||
| @@ -116,7 +116,7 @@ Connection *ConnectionPool::FindMaxConnection() { | |||
| Connection *ConnectionPool::FindFastConnection() { | |||
| Connection *conn = nullptr; | |||
| int size = 0; | |||
| size_t size = 0; | |||
| for (const auto &iter : local_conns_) { | |||
| if (iter.second->send_metrics->max_msg_size > size) { | |||
| size = iter.second->send_metrics->max_msg_size; | |||
| @@ -165,9 +165,9 @@ void ConnectionPool::AddConnection(Connection *conn) { | |||
| } | |||
| if (conn->is_remote) { | |||
| remote_conns_.emplace(conn->destination, conn); | |||
| (void)remote_conns_.emplace(conn->destination, conn); | |||
| } else { | |||
| local_conns_.emplace(conn->destination, conn); | |||
| (void)local_conns_.emplace(conn->destination, conn); | |||
| } | |||
| } | |||
| @@ -194,7 +194,7 @@ void ConnectionPool::DeleteConnInfo(int fd) { | |||
| iter2 = conn_infos.erase(iter2); | |||
| delete linkInfo; | |||
| } | |||
| conn_infos_.erase(fd); | |||
| (void)conn_infos_.erase(fd); | |||
| } | |||
| void ConnectionPool::DeleteConnInfo(const std::string &to, int fd) { | |||
| @@ -276,7 +276,7 @@ void ConnectionPool::AddConnInfo(int fd, const AID &sAid, const AID &dAid, Delet | |||
| linker->to = dAid; | |||
| linker->socket_fd = fd; | |||
| linker->delete_callback = callback; | |||
| conn_infos_[fd].insert(linker); | |||
| (void)conn_infos_[fd].insert(linker); | |||
| } | |||
| bool ConnectionPool::ReverseConnInfo(int fromFd, int toFd) { | |||
| @@ -285,7 +285,7 @@ bool ConnectionPool::ReverseConnInfo(int fromFd, int toFd) { | |||
| return false; | |||
| } | |||
| auto conn_infos = iter->second; | |||
| conn_infos_.erase(fromFd); | |||
| (void)conn_infos_.erase(fromFd); | |||
| conn_infos_[toFd] = conn_infos; | |||
| return true; | |||
| } | |||
| @@ -48,7 +48,7 @@ int EventLoopRun(EventLoop *evloop, int timeout) { | |||
| if (memset_s(events, size, 0, size)) { | |||
| MS_LOG(ERROR) << "Failed to call memset_s."; | |||
| free(events); | |||
| return false; | |||
| return RPC_ERROR; | |||
| } | |||
| while (!evloop->is_stop_) { | |||
| @@ -116,11 +116,15 @@ void QueueReadyCallback(int fd, uint32_t events, void *arg) { | |||
| void EventLoop::ReleaseResource() { | |||
| if (task_queue_event_fd_ != -1) { | |||
| close(task_queue_event_fd_); | |||
| if (close(task_queue_event_fd_) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close task queue event fd: " << task_queue_event_fd_; | |||
| } | |||
| task_queue_event_fd_ = -1; | |||
| } | |||
| if (epoll_fd_ != -1) { | |||
| close(epoll_fd_); | |||
| if (close(epoll_fd_) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close epoll fd: " << epoll_fd_; | |||
| } | |||
| epoll_fd_ = -1; | |||
| } | |||
| } | |||
| @@ -128,10 +132,10 @@ void EventLoop::ReleaseResource() { | |||
| int EventLoop::AddTask(std::function<void()> &&task) { | |||
| // put func to the queue | |||
| task_queue_mutex_.lock(); | |||
| task_queue_.emplace(std::move(task)); | |||
| (void)task_queue_.emplace(std::move(task)); | |||
| // return the queque size to send's caller. | |||
| int result = task_queue_.size(); | |||
| // return the queue size to send's caller. | |||
| auto result = task_queue_.size(); | |||
| task_queue_mutex_.unlock(); | |||
| if (result == 1) { | |||
| @@ -212,7 +216,7 @@ void EventLoop::DeleteEvent(int fd) { | |||
| if (eventData != nullptr) { | |||
| delete eventData; | |||
| } | |||
| events_.erase(fd); | |||
| (void)events_.erase(fd); | |||
| } | |||
| Event *EventLoop::FindEvent(int fd) { | |||
| @@ -258,7 +262,7 @@ int EventLoop::SetEventHandler(int fd, uint32_t events, EventHandler handler, vo | |||
| if (memset_s(&ev, sizeof(ev), 0, sizeof(ev))) { | |||
| MS_LOG(ERROR) << "Failed to call memset_s."; | |||
| return false; | |||
| return RPC_ERROR; | |||
| } | |||
| ev.events = events; | |||
| @@ -298,7 +302,7 @@ void EventLoop::AddEvent(Event *event) { | |||
| return; | |||
| } | |||
| DeleteEvent(event->fd); | |||
| events_.emplace(event->fd, event); | |||
| (void)events_.emplace(event->fd, event); | |||
| } | |||
| int EventLoop::DeleteEpollEvent(int fd) { | |||
| @@ -312,7 +316,7 @@ int EventLoop::DeleteEpollEvent(int fd) { | |||
| event_lock_.unlock(); | |||
| return RPC_ERROR; | |||
| } | |||
| events_.erase(tev->fd); | |||
| (void)events_.erase(tev->fd); | |||
| // Don't delete tev immediately, let's push it into deleted_events_, before next epoll_wait,we will free | |||
| // all events in deleted_events_. | |||
| @@ -342,7 +346,7 @@ int EventLoop::UpdateEpollEvent(int fd, uint32_t events) { | |||
| } | |||
| if (memset_s(&ev, sizeof(ev), 0, sizeof(ev))) { | |||
| MS_LOG(ERROR) << "Failed to call memset_s."; | |||
| return false; | |||
| return RPC_ERROR; | |||
| } | |||
| ev.events = events; | |||
| @@ -401,7 +405,7 @@ void EventLoop::RemoveDeletedEvents() { | |||
| deleteEv = nullptr; | |||
| ++eventIter; | |||
| } | |||
| deleted_events_.erase(fdIter++); | |||
| (void)deleted_events_.erase(fdIter++); | |||
| } | |||
| deleted_events_.clear(); | |||
| } | |||
| @@ -103,7 +103,9 @@ int SocketOperation::CreateServerSocket(sa_family_t family) { | |||
| ret = SetSocketOptions(fd); | |||
| if (ret < 0) { | |||
| close(fd); | |||
| if (close(fd) != 0) { | |||
| MS_LOG(EXCEPTION) << "Failed to close fd: " << fd; | |||
| } | |||
| return -1; | |||
| } | |||
| return fd; | |||
| @@ -247,7 +249,9 @@ std::string SocketOperation::GetPeer(int sock_fd) { | |||
| char ipdotdec[IP_LEN_MAX]; | |||
| if (isa.sa.sa_family == AF_INET) { | |||
| inet_ntop(AF_INET, reinterpret_cast<void *>(&isa.saIn.sin_addr), ipdotdec, IP_LEN_MAX); | |||
| if (inet_ntop(AF_INET, reinterpret_cast<void *>(&isa.saIn.sin_addr), ipdotdec, IP_LEN_MAX) == nullptr) { | |||
| MS_LOG(EXCEPTION) << "Failed to call inet_ntop kernel func."; | |||
| } | |||
| peer = std::string(ipdotdec) + ":" + std::to_string(ntohs(isa.saIn.sin_port)); | |||
| } else if (isa.sa.sa_family == AF_INET6) { | |||
| inet_ntop(AF_INET6, reinterpret_cast<void *>(&isa.saIn6.sin6_addr), ipdotdec, IP_LEN_MAX); | |||
| @@ -297,14 +301,18 @@ int SocketOperation::Listen(const std::string &url) { | |||
| // bind | |||
| if (::bind(listenFd, (struct sockaddr *)&addr, sizeof(SocketAddress))) { | |||
| MS_LOG(ERROR) << "Failed to call bind, url: " << url.c_str(); | |||
| close(listenFd); | |||
| if (close(listenFd) != 0) { | |||
| MS_LOG(EXCEPTION) << "Failed to close fd:" << listenFd; | |||
| } | |||
| return -1; | |||
| } | |||
| // listen | |||
| if (::listen(listenFd, SOCKET_LISTEN_BACKLOG)) { | |||
| MS_LOG(ERROR) << "Failed to call listen, fd: " << listenFd << ", errno: " << errno << ", url: " << url.c_str(); | |||
| close(listenFd); | |||
| if (close(listenFd) != 0) { | |||
| MS_LOG(EXCEPTION) << "Failed to close fd:" << listenFd; | |||
| } | |||
| return -1; | |||
| } | |||
| return listenFd; | |||
| @@ -20,6 +20,8 @@ | |||
| #include <netinet/in.h> | |||
| #include <string> | |||
| #include "utils/convert_utils_base.h" | |||
| namespace mindspore { | |||
| namespace distributed { | |||
| namespace rpc { | |||
| @@ -53,7 +53,9 @@ int DoConnect(const std::string &to, Connection *conn, ConnectionCallBack event_ | |||
| int ret = TCPComm::Connect(conn, (struct sockaddr *)&addr, sizeof(addr)); | |||
| if (ret < 0) { | |||
| close(sock_fd); | |||
| if (close(sock_fd) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close fd:" << sock_fd; | |||
| } | |||
| conn->socket_fd = -1; | |||
| return -1; | |||
| } | |||
| @@ -116,8 +118,9 @@ void OnAccept(int server, uint32_t events, void *arg) { | |||
| if (conn == nullptr) { | |||
| MS_LOG(ERROR) << "Failed to create new connection, server fd:" << server << ", events: " << events | |||
| << ", accept fd: " << acceptFd; | |||
| close(acceptFd); | |||
| acceptFd = -1; | |||
| if (close(acceptFd) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close fd: " << acceptFd; | |||
| } | |||
| return; | |||
| } | |||
| @@ -126,8 +129,9 @@ void OnAccept(int server, uint32_t events, void *arg) { | |||
| if (conn->send_metrics == nullptr) { | |||
| MS_LOG(ERROR) << "Failed to create connection metrics, server fd: " << server << ", events: " << events | |||
| << ", accept fd: " << acceptFd; | |||
| close(acceptFd); | |||
| acceptFd = -1; | |||
| if (close(acceptFd) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close fd: " << acceptFd; | |||
| } | |||
| delete conn; | |||
| return; | |||
| } | |||
| @@ -148,7 +152,9 @@ void OnAccept(int server, uint32_t events, void *arg) { | |||
| if (retval != RPC_OK) { | |||
| MS_LOG(ERROR) << "Failed to add accept fd event, server fd: " << server << ", events: " << events | |||
| << ", accept fd: " << acceptFd; | |||
| close(acceptFd); | |||
| if (close(acceptFd) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close fd: " << acceptFd; | |||
| } | |||
| acceptFd = -1; | |||
| delete conn->send_metrics; | |||
| delete conn; | |||
| @@ -373,7 +379,7 @@ int TCPComm::SetConnectedHandler(Connection *conn) { | |||
| int TCPComm::Connect(Connection *conn, const struct sockaddr *sa, socklen_t saLen) { | |||
| int retval = 0; | |||
| uint16_t localPort = -1; | |||
| uint16_t localPort = 0; | |||
| retval = SocketOperation::Connect(conn->socket_fd, sa, saLen, &localPort); | |||
| if (retval != RPC_OK) { | |||
| @@ -451,7 +457,7 @@ void TCPComm::Send(MessageBase *msg, const TCPComm *tcpmgr, bool remoteLink, boo | |||
| if (conn->total_send_len == 0) { | |||
| conn->FillSendMessage(msg, advertise_url_.data(), is_http_msg_); | |||
| } else { | |||
| conn->send_message_queue.emplace(msg); | |||
| (void)conn->send_message_queue.emplace(msg); | |||
| } | |||
| // Send the message. | |||
| @@ -461,7 +467,7 @@ void TCPComm::Send(MessageBase *msg, const TCPComm *tcpmgr, bool remoteLink, boo | |||
| } | |||
| void TCPComm::SendByRecvLoop(MessageBase *msg, const TCPComm *tcpmgr, bool remoteLink, bool isExactNotRemote) { | |||
| recv_event_loop_->AddTask( | |||
| (void)recv_event_loop_->AddTask( | |||
| [msg, tcpmgr, remoteLink, isExactNotRemote] { TCPComm::Send(msg, tcpmgr, remoteLink, isExactNotRemote); }); | |||
| } | |||
| @@ -511,7 +517,7 @@ int TCPComm::Send(MessageBase *msg, bool remoteLink, bool isExactNotRemote) { | |||
| if (conn->total_send_len == 0) { | |||
| conn->FillSendMessage(msg, advertise_url_.data(), is_http_msg_); | |||
| } else { | |||
| conn->send_message_queue.emplace(msg); | |||
| (void)conn->send_message_queue.emplace(msg); | |||
| } | |||
| if (conn->state == ConnectionState::kConnected) { | |||
| @@ -521,7 +527,7 @@ int TCPComm::Send(MessageBase *msg, bool remoteLink, bool isExactNotRemote) { | |||
| } | |||
| void TCPComm::CollectMetrics() { | |||
| send_event_loop_->AddTask([this] { | |||
| (void)send_event_loop_->AddTask([this] { | |||
| Connection::conn_mutex.lock(); | |||
| Connection *maxConn = ConnectionPool::GetConnectionPool()->FindMaxConnection(); | |||
| Connection *fastConn = ConnectionPool::GetConnectionPool()->FindFastConnection(); | |||
| @@ -560,7 +566,7 @@ int TCPComm::Send(std::unique_ptr<MessageBase> &&msg, bool remoteLink, bool isEx | |||
| } | |||
| void TCPComm::Link(const AID &source, const AID &destination) { | |||
| recv_event_loop_->AddTask([source, destination, this] { | |||
| (void)recv_event_loop_->AddTask([source, destination, this] { | |||
| std::string to = destination.Url(); | |||
| std::lock_guard<std::mutex> lock(Connection::conn_mutex); | |||
| @@ -605,7 +611,7 @@ void TCPComm::Link(const AID &source, const AID &destination) { | |||
| } | |||
| void TCPComm::UnLink(const AID &destination) { | |||
| recv_event_loop_->AddTask([destination] { | |||
| (void)recv_event_loop_->AddTask([destination] { | |||
| std::string to = destination.Url(); | |||
| std::lock_guard<std::mutex> lock(Connection::conn_mutex); | |||
| if (is_http_msg_) { | |||
| @@ -642,7 +648,9 @@ void TCPComm::DoReConnectConn(Connection *conn, std::string to, const AID &sourc | |||
| *oldFd = conn->socket_fd; | |||
| conn->recv_event_loop->DeleteEpollEvent(conn->socket_fd); | |||
| if (conn->recv_event_loop->DeleteEpollEvent(conn->socket_fd) == RPC_ERROR) { | |||
| MS_LOG(ERROR) << "Failed to delete epoll event: " << conn->socket_fd; | |||
| } | |||
| conn->socket_operation->Close(conn); | |||
| conn->socket_fd = -1; | |||
| @@ -681,7 +689,7 @@ Connection *TCPComm::CreateDefaultConn(std::string to) { | |||
| } | |||
| void TCPComm::Reconnect(const AID &source, const AID &destination) { | |||
| send_event_loop_->AddTask([source, destination, this] { | |||
| (void)send_event_loop_->AddTask([source, destination, this] { | |||
| std::string to = destination.Url(); | |||
| std::lock_guard<std::mutex> lock(Connection::conn_mutex); | |||
| Connection *conn = ConnectionPool::GetConnectionPool()->FindConnection(to, false, is_http_msg_); | |||
| @@ -689,7 +697,7 @@ void TCPComm::Reconnect(const AID &source, const AID &destination) { | |||
| conn->state = ConnectionState::kClose; | |||
| } | |||
| recv_event_loop_->AddTask([source, destination, this] { | |||
| (void)recv_event_loop_->AddTask([source, destination, this] { | |||
| std::string to = destination.Url(); | |||
| int oldFd = -1; | |||
| std::lock_guard<std::mutex> lock(Connection::conn_mutex); | |||
| @@ -748,7 +756,9 @@ void TCPComm::Finalize() { | |||
| } | |||
| if (server_fd_ > 0) { | |||
| close(server_fd_); | |||
| if (close(server_fd_) != 0) { | |||
| MS_LOG(ERROR) << "Failed to close fd: " << server_fd_; | |||
| } | |||
| server_fd_ = -1; | |||
| } | |||
| } | |||
| @@ -31,29 +31,29 @@ int TCPSocketOperation::Receive(Connection *connection, char *recvBuf, uint32_t | |||
| *recvLen = 0; | |||
| while (*recvLen != totalRecvLen) { | |||
| int retval = recv(fd, curRecvBuf, totalRecvLen - *recvLen, 0); | |||
| int retval = recv(fd, curRecvBuf, totalRecvLen - *recvLen, static_cast<int>(0)); | |||
| if (retval > 0) { | |||
| *recvLen += retval; | |||
| *recvLen += IntToUint(retval); | |||
| if (*recvLen == totalRecvLen) { | |||
| return totalRecvLen; | |||
| return UintToInt(totalRecvLen); | |||
| } | |||
| curRecvBuf = curRecvBuf + retval; | |||
| // Failed to receive message. | |||
| } else if (retval < 0) { | |||
| if (EAGAIN == errno) { | |||
| return *recvLen; | |||
| return UintToInt(*recvLen); | |||
| } else if (ECONNRESET == errno || ECONNABORTED == errno || ENOTCONN == errno || EPIPE == errno) { | |||
| connection->error_code = errno; | |||
| return -1; | |||
| } else { | |||
| return *recvLen; | |||
| return UintToInt(*recvLen); | |||
| } | |||
| } else { | |||
| connection->error_code = errno; | |||
| return -1; | |||
| } | |||
| } | |||
| return *recvLen; | |||
| return UintToInt(*recvLen); | |||
| } | |||
| int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *recvMsg, uint32_t recvLen) { | |||
| @@ -64,10 +64,9 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re | |||
| } | |||
| while (totalRecvLen) { | |||
| int retval = recvmsg(connection->socket_fd, recvMsg, 0); | |||
| auto retval = recvmsg(connection->socket_fd, recvMsg, 0); | |||
| if (retval > 0) { | |||
| totalRecvLen -= retval; | |||
| totalRecvLen -= IntToSize(retval); | |||
| if (totalRecvLen == 0) { | |||
| recvMsg->msg_iovlen = 0; | |||
| break; | |||
| @@ -75,12 +74,12 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re | |||
| unsigned int iovlen = recvMsg->msg_iovlen; | |||
| if (iovlen > 0) { | |||
| unsigned int tmpLen = 0; | |||
| size_t tmpLen = 0; | |||
| for (unsigned int i = 0; i < iovlen; ++i) { | |||
| if (recvMsg->msg_iov[i].iov_len + tmpLen <= (size_t)retval) { | |||
| tmpLen += recvMsg->msg_iov[i].iov_len; | |||
| } else { | |||
| recvMsg->msg_iov[i].iov_len -= (retval - tmpLen); | |||
| recvMsg->msg_iov[i].iov_len -= IntToSize(retval - tmpLen); | |||
| recvMsg->msg_iov[i].iov_base = | |||
| reinterpret_cast<char *>(recvMsg->msg_iov[i].iov_base) + static_cast<unsigned int>(retval) - tmpLen; | |||
| @@ -91,15 +90,15 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re | |||
| } | |||
| } | |||
| } else if (retval == 0) { | |||
| return -1; | |||
| return UintToInt(-1); | |||
| } else { | |||
| if (EAGAIN == errno) { | |||
| return recvLen - totalRecvLen; | |||
| } else if (ECONNRESET == errno || ECONNABORTED == errno || ENOTCONN == errno || EPIPE == errno) { | |||
| connection->error_code = errno; | |||
| connection->error_code = UintToInt(errno); | |||
| return -1; | |||
| } else { | |||
| return recvLen - totalRecvLen; | |||
| return UintToInt(recvLen - totalRecvLen); | |||
| } | |||
| } | |||
| } | |||
| @@ -109,7 +108,7 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re | |||
| int TCPSocketOperation::SendMessage(Connection *connection, struct msghdr *sendMsg, uint32_t *sendLen) { | |||
| int eagainCount = EAGAIN_RETRY; | |||
| uint32_t totalLen = *sendLen; | |||
| uint32_t unsendLen = *sendLen; | |||
| int32_t unsendLen = *sendLen; | |||
| while (*sendLen != 0) { | |||
| int retval = sendmsg(connection->socket_fd, sendMsg, MSG_NOSIGNAL); | |||
| @@ -131,9 +130,9 @@ int TCPSocketOperation::SendMessage(Connection *connection, struct msghdr *sendM | |||
| break; | |||
| } | |||
| unsigned int tmpBytes = 0; | |||
| size_t tmpBytes = 0; | |||
| for (unsigned int i = 0; i < sendMsg->msg_iovlen; ++i) { | |||
| if (sendMsg->msg_iov[i].iov_len + tmpBytes < (size_t)retval) { | |||
| if (sendMsg->msg_iov[i].iov_len + tmpBytes < IntToSize(retval)) { | |||
| tmpBytes += sendMsg->msg_iov[i].iov_len; | |||
| } else { | |||
| sendMsg->msg_iov[i].iov_len -= (retval - tmpBytes); | |||
| @@ -149,7 +148,7 @@ int TCPSocketOperation::SendMessage(Connection *connection, struct msghdr *sendM | |||
| } | |||
| } | |||
| if (unsendLen > 0) { | |||
| unsendLen = totalLen - *sendLen; | |||
| unsendLen = UintToInt(totalLen - *sendLen); | |||
| } | |||
| return unsendLen; | |||
| } | |||