Skip to content

Commit

Permalink
Fixing a deadlock bug in the integration tests (#1888)
Browse files Browse the repository at this point in the history
Signed-off-by: Alyssa Wilk <alyssar@chromium.org>
  • Loading branch information
alyssawilk authored and mattklein123 committed Oct 18, 2017
1 parent 9ae4fff commit 1567641
Show file tree
Hide file tree
Showing 11 changed files with 32 additions and 25 deletions.
2 changes: 1 addition & 1 deletion test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class AdsIntegrationTest : public HttpIntegrationTest,
void initialize() override {
BaseIntegrationTest::initialize();
ads_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
ads_stream_ = ads_connection_->waitForNewStream();
ads_stream_ = ads_connection_->waitForNewStream(*dispatcher_);
ads_stream_->startGrpcStream();
}

Expand Down
12 changes: 9 additions & 3 deletions test/integration/fake_upstream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,22 @@ void FakeConnectionBase::waitForDisconnect(bool ignore_spurious_events) {
ASSERT(disconnected_);
}

FakeStreamPtr FakeHttpConnection::waitForNewStream(bool ignore_spurious_events) {
FakeStreamPtr FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_dispatcher,
bool ignore_spurious_events) {
std::unique_lock<std::mutex> lock(lock_);
while (new_streams_.empty()) {
connection_event_.wait(lock);
std::cv_status status = connection_event_.wait_until(lock, std::chrono::system_clock::now() +
std::chrono::milliseconds(5));
// As with waitForDisconnect, by default, waitForNewStream returns after the next event.
// If the caller explicitly notes other events should be ignored, it will instead actually
// wait for the next new stream, ignoring other events such as onData()
if (!ignore_spurious_events) {
if (status == std::cv_status::no_timeout && !ignore_spurious_events) {
break;
}
if (new_streams_.empty()) {
// Run the client dispatcher since we may need to process window updates, etc.
client_dispatcher.run(Event::Dispatcher::RunType::NonBlock);
}
}

ASSERT(!new_streams_.empty());
Expand Down
3 changes: 2 additions & 1 deletion test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeCo
// By default waitForNewStream assumes the next event is a new stream and
// fails an assert if an unexpected event occurs. If a caller truly wishes to
// wait for a new stream, set ignore_spurious_events = true.
FakeStreamPtr waitForNewStream(bool ignore_spurious_events = false);
FakeStreamPtr waitForNewStream(Event::Dispatcher& client_dispatcher,
bool ignore_spurious_events = false);

// Http::ServerConnectionCallbacks
Http::StreamDecoder& newStream(Http::StreamEncoder& response_encoder) override;
Expand Down
2 changes: 1 addition & 1 deletion test/integration/grpc_json_transcoder_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class GrpcJsonTranscoderIntegrationTest
}

fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
if (!grpc_request_messages.empty()) {
upstream_request_->waitForEndStream(*dispatcher_);

Expand Down
6 changes: 3 additions & 3 deletions test/integration/http2_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ void Http2IntegrationTest::simultaneousRequest(int32_t request1_bytes, int32_t r
*response1);

fake_upstream_connection1 = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request1 = fake_upstream_connection1->waitForNewStream();
upstream_request1 = fake_upstream_connection1->waitForNewStream(*dispatcher_);

// Start request 2
response2.reset(new IntegrationStreamDecoder(*dispatcher_));
Expand All @@ -210,7 +210,7 @@ void Http2IntegrationTest::simultaneousRequest(int32_t request1_bytes, int32_t r
{":authority", "host"}},
*response2);
fake_upstream_connection2 = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request2 = fake_upstream_connection2->waitForNewStream();
upstream_request2 = fake_upstream_connection2->waitForNewStream(*dispatcher_);

// Finish request 1
codec_client_->sendData(*encoder1, request1_bytes, true);
Expand Down Expand Up @@ -311,7 +311,7 @@ void Http2RingHashIntegrationTest::sendMultipleRequests(
FakeUpstream::waitForHttpConnection(*dispatcher_, fake_upstreams_);
// As data and streams are interwoven, make sure waitForNewStream()
// ignores incoming data and waits for actual stream establishment.
upstream_requests.push_back(fake_upstream_connection->waitForNewStream(true));
upstream_requests.push_back(fake_upstream_connection->waitForNewStream(*dispatcher_, true));
upstream_requests.back()->setAddServedByHeader(true);
fake_upstream_connections_.push_back(std::move(fake_upstream_connection));
}
Expand Down
12 changes: 6 additions & 6 deletions test/integration/http2_upstream_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ void Http2UpstreamIntegrationTest::bidirectionalStreaming(uint32_t bytes) {
{":authority", "host"}},
*response_);
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);

// Send part of the request body and ensure it is received upstream.
codec_client_->sendData(*request_encoder_, bytes, false);
Expand Down Expand Up @@ -146,7 +146,7 @@ TEST_P(Http2UpstreamIntegrationTest, BidirectionalStreamingReset) {
{":authority", "host"}},
*response_);
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);

// Send some request data.
codec_client_->sendData(*request_encoder_, 1024, false);
Expand Down Expand Up @@ -186,7 +186,7 @@ void Http2UpstreamIntegrationTest::simultaneousRequest(uint32_t request1_bytes,
{":authority", "host"}},
*response1);
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request1 = fake_upstream_connection_->waitForNewStream();
upstream_request1 = fake_upstream_connection_->waitForNewStream(*dispatcher_);

// Start request 2
Http::StreamEncoder* encoder2 =
Expand All @@ -195,7 +195,7 @@ void Http2UpstreamIntegrationTest::simultaneousRequest(uint32_t request1_bytes,
{":scheme", "http"},
{":authority", "host"}},
*response2);
upstream_request2 = fake_upstream_connection_->waitForNewStream();
upstream_request2 = fake_upstream_connection_->waitForNewStream(*dispatcher_);

// Finish request 1
codec_client_->sendData(*encoder1, request1_bytes, true);
Expand Down Expand Up @@ -258,7 +258,7 @@ void Http2UpstreamIntegrationTest::manySimultaneousRequests(uint32_t request_byt
for (uint32_t i = 0; i < num_requests; ++i) {
// As data and streams are interwoven, make sure waitForNewStream()
// ignores incoming data and waits for actual stream establishment.
upstream_requests.push_back(fake_upstream_connection_->waitForNewStream(true));
upstream_requests.push_back(fake_upstream_connection_->waitForNewStream(*dispatcher_, true));
}
for (uint32_t i = 0; i < num_requests; ++i) {
upstream_requests[i]->waitForEndStream(*dispatcher_);
Expand Down Expand Up @@ -316,7 +316,7 @@ TEST_P(Http2UpstreamIntegrationTest, UpstreamConnectionCloseWithManyStreams) {
}
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
for (uint32_t i = 0; i < num_requests; ++i) {
upstream_requests.push_back(fake_upstream_connection_->waitForNewStream());
upstream_requests.push_back(fake_upstream_connection_->waitForNewStream(*dispatcher_));
}
for (uint32_t i = 0; i < num_requests; ++i) {
if (i % 15 != 0) {
Expand Down
8 changes: 4 additions & 4 deletions test/integration/http_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void HttpIntegrationTest::waitForNextUpstreamRequest() {
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
}
// Wait for the next stream on the upstream connection.
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
// Wait for the stream to be completely received.
upstream_request_->waitForEndStream(*dispatcher_);
}
Expand Down Expand Up @@ -370,7 +370,7 @@ void HttpIntegrationTest::testRouterUpstreamDisconnectBeforeRequestComplete() {

fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);

upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForHeadersComplete();
fake_upstream_connection_->close();
fake_upstream_connection_->waitForDisconnect();
Expand Down Expand Up @@ -432,7 +432,7 @@ void HttpIntegrationTest::testRouterDownstreamDisconnectBeforeRequestComplete(
{":authority", "host"}},
*response_);
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForHeadersComplete();
codec_client_->close();

Expand Down Expand Up @@ -491,7 +491,7 @@ void HttpIntegrationTest::testRouterUpstreamResponseBeforeRequestComplete() {
{":authority", "host"}},
*response_);
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForHeadersComplete();
upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
upstream_request_->encodeData(512, true);
Expand Down
4 changes: 2 additions & 2 deletions test/integration/load_stats_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class LoadStatsIntegrationTest : public HttpIntegrationTest,

void waitForLoadStatsStream() {
fake_loadstats_connection_ = load_report_upstream_->waitForHttpConnection(*dispatcher_);
loadstats_stream_ = fake_loadstats_connection_->waitForNewStream();
loadstats_stream_ = fake_loadstats_connection_->waitForNewStream(*dispatcher_);
}

void waitForLoadStatsRequest(
Expand Down Expand Up @@ -146,7 +146,7 @@ class LoadStatsIntegrationTest : public HttpIntegrationTest,
void waitForUpstreamResponse(uint32_t endpoint_index, uint32_t response_code = 200) {
fake_upstream_connection_ =
service_upstream_[endpoint_index]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForEndStream(*dispatcher_);

upstream_request_->encodeHeaders(
Expand Down
2 changes: 1 addition & 1 deletion test/integration/proto_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ TEST_P(ProtoIntegrationTest, TestBind) {
std::string address =
fake_upstream_connection_->connection().remoteAddress().ip()->addressAsString();
EXPECT_EQ(address, address_string);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForEndStream(*dispatcher_);
// Cleanup both downstream and upstream
codec_client_->close();
Expand Down
4 changes: 2 additions & 2 deletions test/integration/ratelimit_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class RatelimitIntegrationTest : public HttpIntegrationTest,

void waitForRatelimitRequest() {
fake_ratelimit_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_);
ratelimit_request_ = fake_ratelimit_connection_->waitForNewStream();
ratelimit_request_ = fake_ratelimit_connection_->waitForNewStream(*dispatcher_);
pb::lyft::ratelimit::RateLimitRequest request_msg;
ratelimit_request_->waitForGrpcMessage(*dispatcher_, request_msg);
ratelimit_request_->waitForEndStream(*dispatcher_);
Expand All @@ -79,7 +79,7 @@ class RatelimitIntegrationTest : public HttpIntegrationTest,

void waitForSuccessfulUpstreamResponse() {
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForEndStream(*dispatcher_);

upstream_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false);
Expand Down
2 changes: 1 addition & 1 deletion test/integration/xfcc_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void XfccIntegrationTest::testRequestAndResponseWithXfccHeader(Network::ClientCo
codec_client_ = makeHttpConnection(std::move(conn));
codec_client_->makeHeaderOnlyRequest(header_map, *response_);
fake_upstream_connection_ = fake_upstreams_[0]->waitForHttpConnection(*dispatcher_);
upstream_request_ = fake_upstream_connection_->waitForNewStream();
upstream_request_ = fake_upstream_connection_->waitForNewStream(*dispatcher_);
upstream_request_->waitForEndStream(*dispatcher_);
if (expected_xfcc.empty()) {
EXPECT_EQ(nullptr, upstream_request_->headers().ForwardedClientCert());
Expand Down

0 comments on commit 1567641

Please sign in to comment.