Browse Source

!31670 Fix static code check issues for distributed communication

Merge pull request !31670 from chengang/fix_static_code_check
r1.7
i-robot Gitee 4 years ago
parent
commit
b60d3105c9
No known key found for this signature in database GPG Key ID: 173E9B9CA92EEF8F
9 changed files with 62 additions and 58 deletions
  1. +9
    -8
      mindspore/ccsrc/distributed/rpc/tcp/connection.cc
  2. +8
    -2
      mindspore/ccsrc/distributed/rpc/tcp/constants.h
  3. +1
    -1
      mindspore/ccsrc/distributed/rpc/tcp/event_loop.cc
  4. +1
    -1
      mindspore/ccsrc/distributed/rpc/tcp/event_loop.h
  5. +3
    -3
      mindspore/ccsrc/distributed/rpc/tcp/socket_operation.h
  6. +1
    -1
      mindspore/ccsrc/distributed/rpc/tcp/tcp_client.cc
  7. +7
    -4
      mindspore/ccsrc/distributed/rpc/tcp/tcp_comm.cc
  8. +29
    -35
      mindspore/ccsrc/distributed/rpc/tcp/tcp_socket_operation.cc
  9. +3
    -3
      mindspore/ccsrc/distributed/rpc/tcp/tcp_socket_operation.h

+ 9
- 8
mindspore/ccsrc/distributed/rpc/tcp/connection.cc View File

@@ -324,7 +324,7 @@ std::string Connection::GenerateHttpMessage(MessageBase *msg) {

void Connection::FillSendMessage(MessageBase *msg, const std::string &advertiseUrl, bool isHttpKmsg) {
if (msg->type == MessageBase::Type::KMSG) {
size_t index = 0;
int index = 0;
if (!isHttpKmsg) {
send_to = msg->to;
send_from = msg->from;
@@ -372,7 +372,7 @@ void Connection::FillSendMessage(MessageBase *msg, const std::string &advertiseU
send_io_vec[index].iov_len = msg->body.size();
++index;
send_kernel_msg.msg_iov = send_io_vec;
send_kernel_msg.msg_iovlen = IntToSize(index);
send_kernel_msg.msg_iovlen = index;
total_send_len = UlongToUint(msg->body.size());
send_message = msg;

@@ -429,7 +429,7 @@ int Connection::AddConnnectEventHandler() {

bool Connection::ParseMessage() {
int retval = 0;
uint32_t recvLen = 0;
size_t recvLen = 0;
char *recvBuf = nullptr;

switch (recv_state) {
@@ -437,7 +437,7 @@ bool Connection::ParseMessage() {
case State::kMsgHeader:
recvBuf = reinterpret_cast<char *>(&recv_msg_header) + recv_len;
retval = socket_operation->Receive(this, recvBuf, sizeof(MessageHeader) - recv_len, &recvLen);
if (retval < 0) {
if (retval != IO_RW_OK) {
state = ConnectionState::kDisconnecting;
recv_len += recvLen;
return false;
@@ -463,13 +463,14 @@ bool Connection::ParseMessage() {

// Parse message body.
case State::kBody:
retval = socket_operation->ReceiveMessage(this, &recv_kernel_msg, total_recv_len);
if (retval != static_cast<int>(total_recv_len)) {
if (retval < 0) {
recvLen = 0;
retval = socket_operation->ReceiveMessage(this, &recv_kernel_msg, total_recv_len, &recvLen);
if (recvLen != total_recv_len) {
if (retval != IO_RW_OK) {
state = ConnectionState::kDisconnecting;
return false;
}
total_recv_len -= IntToSize(retval);
total_recv_len -= recvLen;
return false;
}
recv_state = State::kMsgHeader;


+ 8
- 2
mindspore/ccsrc/distributed/rpc/tcp/constants.h View File

@@ -78,13 +78,19 @@ static const char URL_IP_PORT_SEPARATOR[] = ":";
static const char TCP_RECV_EVLOOP_THREADNAME[] = "RECV_EVENT_LOOP";
static const char TCP_SEND_EVLOOP_THREADNAME[] = "SEND_EVENT_LOOP";

constexpr int RPC_ERROR = -1;
constexpr int RPC_OK = 0;
constexpr int RPC_ERROR = -1;

constexpr int IO_RW_OK = 1;
constexpr int IO_RW_ERROR = -1;

constexpr int IP_LEN_MAX = 128;

// Kill the process for safe exiting.
inline void KillProcess(const std::string &ret) { (void)raise(SIGKILL); }
inline void KillProcess(const std::string &ret) {
MS_LOG(ERROR) << ret;
(void)raise(SIGKILL);
}

/*
* The MessageHeader contains the stats info about the message body.


+ 1
- 1
mindspore/ccsrc/distributed/rpc/tcp/event_loop.cc View File

@@ -129,7 +129,7 @@ void EventLoop::ReleaseResource() {
}
}

ssize_t EventLoop::AddTask(std::function<int()> &&task) {
size_t EventLoop::AddTask(std::function<int()> &&task) {
// put func to the queue
task_queue_mutex_.lock();
(void)task_queue_.emplace(std::move(task));


+ 1
- 1
mindspore/ccsrc/distributed/rpc/tcp/event_loop.h View File

@@ -68,7 +68,7 @@ class EventLoop {

// Add task (eg. send message, reconnect etc.) to task queue of the event loop.
// These tasks are executed asynchronously.
ssize_t AddTask(std::function<int()> &&task);
size_t AddTask(std::function<int()> &&task);

// Set event handler for events(read/write/..) occurred on the socket fd.
int SetEventHandler(int sock_fd, uint32_t events, EventHandler handler, void *data);


+ 3
- 3
mindspore/ccsrc/distributed/rpc/tcp/socket_operation.h View File

@@ -74,12 +74,12 @@ class SocketOperation {
virtual ssize_t ReceivePeek(Connection *connection, char *recvBuf, uint32_t recvLen) = 0;

// Try to receive messages up to totalRecvLen (for message header).
virtual int Receive(Connection *connection, char *recvBuf, uint32_t totalRecvLen, uint32_t *recvLen) = 0;
virtual int Receive(Connection *connection, char *recvBuf, size_t totalRecvLen, size_t *recvLen) = 0;

// Receive message (for message body).
virtual int ReceiveMessage(Connection *connection, struct msghdr *recvMsg, uint32_t recvLen) = 0;
virtual int ReceiveMessage(Connection *connection, struct msghdr *recvMsg, size_t totalRecvLen, size_t *recvLen) = 0;

virtual ssize_t SendMessage(Connection *connection, struct msghdr *sendMsg, size_t *sendLen) = 0;
virtual int SendMessage(Connection *connection, struct msghdr *sendMsg, size_t totalSendLen, size_t *sendLen) = 0;

// Handle connect and connected events.
virtual void NewConnEventHandler(void *context) = 0;


+ 1
- 1
mindspore/ccsrc/distributed/rpc/tcp/tcp_client.cc View File

@@ -80,7 +80,7 @@ bool TCPClient::Disconnect(const std::string &dst_url, size_t timeout_in_sec) {
} else {
break;
}
usleep(sleep_in_us);
(void)usleep(sleep_in_us);
}
return rt;
}


+ 7
- 4
mindspore/ccsrc/distributed/rpc/tcp/tcp_comm.cc View File

@@ -139,9 +139,11 @@ int DoSend(Connection *conn) {
conn->send_message_queue.pop();
}

int sendLen = conn->socket_operation->SendMessage(conn, &conn->send_kernel_msg, &conn->total_send_len);
if (sendLen > 0) {
size_t sendLen = 0;
int retval = conn->socket_operation->SendMessage(conn, &conn->send_kernel_msg, conn->total_send_len, &sendLen);
if (retval == IO_RW_OK && sendLen > 0) {
total_send_bytes += sendLen;
conn->total_send_len -= sendLen;
if (conn->total_send_len == 0) {
// update metrics
conn->send_metrics->UpdateError(false);
@@ -150,7 +152,7 @@ int DoSend(Connection *conn) {
delete conn->send_message;
conn->send_message = nullptr;
}
} else if (sendLen == 0) {
} else if (retval == IO_RW_OK && sendLen == 0) {
// EAGAIN
(void)conn->recv_event_loop->UpdateEpollEvent(conn->socket_fd, EPOLLOUT | EPOLLIN | EPOLLHUP | EPOLLERR);
break;
@@ -372,7 +374,8 @@ ssize_t TCPComm::Send(MessageBase *msg, bool sync) {
if (sync) {
return task();
} else {
return send_event_loop_->AddTask(task);
send_event_loop_->AddTask(task);
return true;
}
}



+ 29
- 35
mindspore/ccsrc/distributed/rpc/tcp/tcp_socket_operation.cc View File

@@ -25,7 +25,7 @@ ssize_t TCPSocketOperation::ReceivePeek(Connection *connection, char *recvBuf, u
return recv(connection->socket_fd, recvBuf, recvLen, MSG_PEEK);
}

int TCPSocketOperation::Receive(Connection *connection, char *recvBuf, uint32_t totalRecvLen, uint32_t *recvLen) {
int TCPSocketOperation::Receive(Connection *connection, char *recvBuf, size_t totalRecvLen, size_t *recvLen) {
char *curRecvBuf = recvBuf;
int fd = connection->socket_fd;

@@ -33,41 +33,40 @@ int TCPSocketOperation::Receive(Connection *connection, char *recvBuf, uint32_t
while (*recvLen != totalRecvLen) {
ssize_t retval = recv(fd, curRecvBuf, totalRecvLen - *recvLen, static_cast<int>(0));
if (retval > 0) {
*recvLen += static_cast<uint32_t>(retval);
*recvLen += retval;
if (*recvLen == totalRecvLen) {
return UintToInt(totalRecvLen);
return IO_RW_OK;
}
curRecvBuf = curRecvBuf + retval;
// Failed to receive message.
} else if (retval < 0) {
if (EAGAIN == errno) {
return UintToInt(*recvLen);
return IO_RW_OK;
} else if (ECONNRESET == errno || ECONNABORTED == errno || ENOTCONN == errno || EPIPE == errno) {
connection->error_code = errno;
return -1;
return IO_RW_ERROR;
} else {
return UintToInt(*recvLen);
return IO_RW_OK;
}
} else {
connection->error_code = errno;
return -1;
return IO_RW_ERROR;
}
}
return UintToInt(*recvLen);
return IO_RW_OK;
}

int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *recvMsg, uint32_t recvLen) {
ssize_t totalRecvLen = recvLen;

int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *recvMsg, size_t totalRecvLen,
size_t *recvLen) {
if (totalRecvLen == 0) {
return 0;
return IO_RW_OK;
}

while (totalRecvLen) {
while (*recvLen < totalRecvLen) {
auto retval = recvmsg(connection->socket_fd, recvMsg, 0);
if (retval > 0) {
totalRecvLen -= retval;
if (totalRecvLen == 0) {
*recvLen += retval;
if (*recvLen == totalRecvLen) {
recvMsg->msg_iovlen = 0;
break;
}
@@ -76,7 +75,7 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re
if (iovlen > 0) {
size_t tmpLen = 0;
for (unsigned int i = 0; i < iovlen; ++i) {
if (recvMsg->msg_iov[i].iov_len + tmpLen <= (size_t)retval) {
if (recvMsg->msg_iov[i].iov_len + tmpLen <= static_cast<size_t>(retval)) {
tmpLen += recvMsg->msg_iov[i].iov_len;
} else {
recvMsg->msg_iov[i].iov_len -= IntToSize(retval - tmpLen);
@@ -90,49 +89,47 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re
}
}
} else if (retval == 0) {
return -1;
return IO_RW_ERROR;
} else {
if (EAGAIN == errno) {
return UintToInt(recvLen - totalRecvLen);
return IO_RW_OK;
} else if (ECONNRESET == errno || ECONNABORTED == errno || ENOTCONN == errno || EPIPE == errno) {
connection->error_code = errno;
return -1;
return IO_RW_ERROR;
} else {
return UintToInt(recvLen - totalRecvLen);
return IO_RW_OK;
}
}
}
return UintToInt(recvLen);
return IO_RW_OK;
}

ssize_t TCPSocketOperation::SendMessage(Connection *connection, struct msghdr *sendMsg, size_t *sendLen) {
int TCPSocketOperation::SendMessage(Connection *connection, struct msghdr *sendMsg, size_t totalSendLen,
size_t *sendLen) {
int eagainCount = EAGAIN_RETRY;
size_t totalLen = *sendLen;
ssize_t unsendLen = static_cast<ssize_t>(*sendLen);

while (*sendLen != 0) {
while (*sendLen != totalSendLen) {
auto retval = sendmsg(connection->socket_fd, sendMsg, MSG_NOSIGNAL);
if (retval < 0) {
--eagainCount;
if (errno != EAGAIN) {
connection->error_code = errno;
unsendLen = -1;
break;
return IO_RW_ERROR;
} else if (eagainCount == 0) {
unsendLen = 0;
*sendLen = 0;
break;
}
} else {
*sendLen -= retval;
*sendLen += retval;

if (*sendLen == 0) {
if (*sendLen == totalSendLen) {
sendMsg->msg_iovlen = 0;
break;
}

size_t tmpBytes = 0;
for (unsigned int i = 0; i < sendMsg->msg_iovlen; ++i) {
if (sendMsg->msg_iov[i].iov_len + tmpBytes < IntToSize(retval)) {
if (sendMsg->msg_iov[i].iov_len + tmpBytes < static_cast<size_t>(retval)) {
tmpBytes += sendMsg->msg_iov[i].iov_len;
} else {
sendMsg->msg_iov[i].iov_len -= (retval - tmpBytes);
@@ -147,10 +144,7 @@ ssize_t TCPSocketOperation::SendMessage(Connection *connection, struct msghdr *s
eagainCount = EAGAIN_RETRY;
}
}
if (unsendLen > 0) {
unsendLen = totalLen - *sendLen;
}
return unsendLen;
return IO_RW_OK;
}

void TCPSocketOperation::Close(Connection *connection) {


+ 3
- 3
mindspore/ccsrc/distributed/rpc/tcp/tcp_socket_operation.h View File

@@ -26,10 +26,10 @@ namespace rpc {
class TCPSocketOperation : public SocketOperation {
public:
ssize_t ReceivePeek(Connection *connection, char *recvBuf, uint32_t recvLen) override;
int Receive(Connection *connection, char *recvBuf, uint32_t totRecvLen, uint32_t *recvLen) override;
int ReceiveMessage(Connection *connection, struct msghdr *recvMsg, uint32_t recvLen) override;
int Receive(Connection *connection, char *recvBuf, size_t totRecvLen, size_t *recvLen) override;
int ReceiveMessage(Connection *connection, struct msghdr *recvMsg, size_t totalRecvLen, size_t *recvLen) override;

ssize_t SendMessage(Connection *connection, struct msghdr *sendMsg, size_t *sendLen) override;
int SendMessage(Connection *connection, struct msghdr *sendMsg, size_t totalSendLen, size_t *sendLen) override;

void Close(Connection *connection) override;



Loading…
Cancel
Save