Browse Source

Fix sending large message using rpc.

r1.7
Parallels 4 years ago
parent
commit
d3bcc66d15
2 changed files with 48 additions and 5 deletions
  1. +1
    -1
      mindspore/ccsrc/distributed/rpc/tcp/tcp_socket_operation.cc
  2. +47
    -4
      tests/ut/cpp/distributed/rpc/tcp/tcp_test.cc

+ 1
- 1
mindspore/ccsrc/distributed/rpc/tcp/tcp_socket_operation.cc View File

@@ -84,7 +84,7 @@ int TCPSocketOperation::ReceiveMessage(Connection *connection, struct msghdr *re
reinterpret_cast<char *>(recvMsg->msg_iov[i].iov_base) + static_cast<unsigned int>(retval) - tmpLen;

recvMsg->msg_iov = &recvMsg->msg_iov[i];
recvMsg->msg_iovlen -= (i + 1);
recvMsg->msg_iovlen -= i;
break;
}
}


+ 47
- 4
tests/ut/cpp/distributed/rpc/tcp/tcp_test.cc View File

@@ -75,16 +75,17 @@ class TCPTest : public UT::Common {
void SetUp() {}
void TearDown() {}

std::unique_ptr<MessageBase> CreateMessage(const std::string &serverUrl, const std::string &client_url);
std::unique_ptr<MessageBase> CreateMessage(const std::string &serverUrl, const std::string &client_url,
size_t msg_size = 100);

bool CheckRecvNum(int expectedRecvNum, int _timeout);
bool CheckExitNum(int expectedExitNum, int _timeout);
};

std::unique_ptr<MessageBase> TCPTest::CreateMessage(const std::string &serverUrl, const std::string &clientUrl) {
std::unique_ptr<MessageBase> TCPTest::CreateMessage(const std::string &serverUrl, const std::string &clientUrl,
size_t msg_size) {
std::unique_ptr<MessageBase> message = std::make_unique<MessageBase>();
size_t len = 100;
std::string data(len, 'A');
std::string data(msg_size, 'A');
message->name = "testname";
message->from = AID("client", clientUrl);
message->to = AID("server", serverUrl);
@@ -270,6 +271,48 @@ TEST_F(TCPTest, SendSyncMessage) {
client->Finalize();
server->Finalize();
}

/// Feature: test sending large messages.
/// Description: start a socket server and send several large messages to it.
/// Expectation: the server received these large messages sented from client.
TEST_F(TCPTest, SendLargeMessages) {
Init();

// Start the tcp server.
auto server_url = "127.0.0.1:8081";
std::unique_ptr<TCPServer> server = std::make_unique<TCPServer>();
bool ret = server->Initialize(server_url);
ASSERT_TRUE(ret);

server->SetMessageHandler([](const std::shared_ptr<MessageBase> &message) -> void { IncrDataMsgNum(1); });

// Start the tcp client.
auto client_url = "127.0.0.1:1234";
std::unique_ptr<TCPClient> client = std::make_unique<TCPClient>();
ret = client->Initialize();
ASSERT_TRUE(ret);

// Send the message.
client->Connect(server_url);

size_t msg_cnt = 100;
size_t large_msg_size = 102400;
for (int i = 0; i < msg_cnt; ++i) {
auto message = CreateMessage(server_url, client_url, large_msg_size);
client->SendAsync(std::move(message));
}

// Wait timeout: 15s
WaitForDataMsg(msg_cnt, 15);

// Check result
EXPECT_EQ(msg_cnt, GetDataMsgNum());

// Destroy
client->Disconnect(server_url);
client->Finalize();
server->Finalize();
}
} // namespace rpc
} // namespace distributed
} // namespace mindspore

Loading…
Cancel
Save