Skip to content

Commit

Permalink
[#1041]: Skip dumping of finished call
Browse files Browse the repository at this point in the history
Summary:
During network issues send queue could process slowly.
So calls in this queue could timeout w/o being sent.
In case of timeout we invoke call callback immediately,
so corresponding RpcController could be destroyed after that.

If we invoke DumpRunningRpcs while having such calls in queue it could hang.
But it also blocks whole reactor thread, and it cannot perform requests.

Test Plan: ybd --gtest_filter RpcTest.DumpTimedOutCall

Reviewers: amitanand, bogdan

Reviewed By: bogdan

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D6363
  • Loading branch information
spolitov committed Mar 21, 2019
1 parent a322d95 commit 3a61698
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 11 deletions.
27 changes: 17 additions & 10 deletions src/yb/rpc/rpc-test-base.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,32 +63,39 @@ std::unique_ptr<ServiceIf> CreateCalculatorService(

class CalculatorServiceMethods {
public:
static const constexpr auto kSendStringsMethodName = "SendStrings";
static const constexpr auto kSleepMethodName = "Sleep";
static const constexpr auto kAddMethodName = "Add";
static const constexpr auto kDisconnectMethodName = "Disconnect";
static const constexpr auto kEchoMethodName = "Echo";
static const constexpr auto kSendStringsMethodName = "SendStrings";
static const constexpr auto kSleepMethodName = "Sleep";

static RemoteMethod* SendStringsMethod() {
static RemoteMethod* AddMethod() {
static RemoteMethod method(
rpc_test::CalculatorServiceIf::static_service_name(), kSendStringsMethodName);
rpc_test::CalculatorServiceIf::static_service_name(), kAddMethodName);
return &method;
}

static RemoteMethod* SleepMethod() {
static RemoteMethod* DisconnectMethod() {
static RemoteMethod method(
rpc_test::CalculatorServiceIf::static_service_name(), kSleepMethodName);
rpc_test::CalculatorServiceIf::static_service_name(), kDisconnectMethodName);
return &method;
}

static RemoteMethod* AddMethod() {
static RemoteMethod* EchoMethod() {
static RemoteMethod method(
rpc_test::CalculatorServiceIf::static_service_name(), kAddMethodName);
rpc_test::CalculatorServiceIf::static_service_name(), kEchoMethodName);
return &method;
}

static RemoteMethod* DisconnectMethod() {
static RemoteMethod* SendStringsMethod() {
static RemoteMethod method(
rpc_test::CalculatorServiceIf::static_service_name(), kDisconnectMethodName);
rpc_test::CalculatorServiceIf::static_service_name(), kSendStringsMethodName);
return &method;
}

static RemoteMethod* SleepMethod() {
static RemoteMethod method(
rpc_test::CalculatorServiceIf::static_service_name(), kSleepMethodName);
return &method;
}
};
Expand Down
47 changes: 47 additions & 0 deletions src/yb/rpc/rpc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,5 +648,52 @@ TEST_F(TestRpc, TestDisconnect) {
ASSERT_EQ(kRequests, total);
}

// Check that we could perform DumpRunningRpcs while timed out calls are in queue.
//
// Start listenting socket, that will accept one connection and does not read it.
// Send big RPC request, that does not fit into socket buffer, so it will be sending forever.
// Wait until this call is timed out.
// Check that we could invoke DumpRunningRpcs after it.
TEST_F(TestRpc, DumpTimedOutCall) {
// Set up a simple socket server which accepts a connection.
HostPort server_addr;
Socket listen_sock;
ASSERT_OK(StartFakeServer(&listen_sock, &server_addr));

std::atomic<bool> stop(false);

std::thread thread([&listen_sock, &stop] {
Socket socket;
Endpoint remote;
ASSERT_OK(listen_sock.Accept(&socket, &remote, 0));
while (!stop.load(std::memory_order_acquire)) {
std::this_thread::sleep_for(100ms);
}
});

auto messenger = CreateMessenger("Client");
Proxy p(messenger, server_addr);

{
rpc_test::EchoRequestPB req;
req.set_data(std::string(1_MB, 'X'));
rpc_test::EchoResponsePB resp;
std::aligned_storage<sizeof(RpcController), alignof(RpcController)>::type storage;
auto controller = new (&storage) RpcController;
controller->set_timeout(100ms);
auto status = p.SyncRequest(CalculatorServiceMethods::EchoMethod(), req, &resp, controller);
ASSERT_TRUE(status.IsTimedOut()) << status;
controller->~RpcController();
memset(&storage, 0xff, sizeof(storage));
}

DumpRunningRpcsRequestPB dump_req;
DumpRunningRpcsResponsePB dump_resp;
ASSERT_OK(messenger->DumpRunningRpcs(dump_req, &dump_resp));

stop.store(true, std::memory_order_release);
thread.join();
}

} // namespace rpc
} // namespace yb
2 changes: 1 addition & 1 deletion src/yb/rpc/tcp_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ void TcpStream::Send(OutboundDataPtr data) {
void TcpStream::DumpPB(const DumpRunningRpcsRequestPB& req, RpcConnectionPB* resp) {
auto call_in_flight = resp->add_calls_in_flight();
for (auto& entry : sending_) {
if (entry.data && entry.data->DumpPB(req, call_in_flight)) {
if (entry.data && !entry.data->IsFinished() && entry.data->DumpPB(req, call_in_flight)) {
call_in_flight = resp->add_calls_in_flight();
}
}
Expand Down

0 comments on commit 3a61698

Please sign in to comment.