From 7ed1d57bcf37fc0c32e0621b5aec3d0174de42ef Mon Sep 17 00:00:00 2001 From: Joanna Jo Date: Fri, 13 Dec 2024 17:48:11 -0800 Subject: [PATCH] Add an absolute cork (min) threshold for WINDOW_UPDATE Summary: Changed HTTPTransaction logic to check whether the minimum of 128KB and half the recv FC window has been delivered to the application/handler, effectively resulting in more frequent WINDOW_UPDATE frames. Reviewed By: afrind Differential Revision: D66999563 fbshipit-source-id: e5dac20bdfa8c4a6ae3e9879348bfba765d7169e --- proxygen/lib/http/session/HTTPTransaction.cpp | 5 +- .../lib/http/session/test/HTTPSessionMocks.h | 10 +++ .../session/test/HTTPUpstreamSessionTest.cpp | 68 +++++++++++++++++++ 3 files changed, 82 insertions(+), 1 deletion(-) diff --git a/proxygen/lib/http/session/HTTPTransaction.cpp b/proxygen/lib/http/session/HTTPTransaction.cpp index 930baa68ee..7bbc5d690b 100644 --- a/proxygen/lib/http/session/HTTPTransaction.cpp +++ b/proxygen/lib/http/session/HTTPTransaction.cpp @@ -27,6 +27,7 @@ namespace { const int64_t kApproximateMTU = 1400; const std::chrono::seconds kRateLimitMaxDelay(10); const uint64_t kMaxBufferPerTxn = 65536; +constexpr uint32_t kMinThreshold = 128 * 1024; using namespace proxygen; HTTPException stateMachineError(HTTPException::Direction dir, std::string msg) { @@ -298,6 +299,7 @@ bool HTTPTransaction::updateContentLengthRemaining(size_t len) { void HTTPTransaction::onIngressBody(unique_ptr chain, uint16_t padding) { FOLLY_SCOPED_TRACE_SECTION("HTTPTransaction - onIngressBody"); DestructorGuard g(this); + VLOG(6) << __func__ << " chain_length=" << chain->computeChainDataLength(); if (isIngressEOMSeen()) { std::stringstream ss; // Use stringstream to invoke operator << for this @@ -369,7 +371,8 @@ void HTTPTransaction::processIngressBody(unique_ptr chain, size_t len) { // closed divisor = 1; } - if (uint32_t(recvToAck_) >= (recvWindow_.getCapacity() / divisor)) { + if (uint32_t(recvToAck_) >= kMinThreshold || + uint32_t(recvToAck_) >= (recvWindow_.getCapacity() / divisor)) { flushWindowUpdate(); } } diff --git a/proxygen/lib/http/session/test/HTTPSessionMocks.h b/proxygen/lib/http/session/test/HTTPSessionMocks.h index e896418f42..9d138ad6df 100644 --- a/proxygen/lib/http/session/test/HTTPSessionMocks.h +++ b/proxygen/lib/http/session/test/HTTPSessionMocks.h @@ -355,6 +355,16 @@ class MockHTTPHandler } } + void expectBodyRepeatedly( + const std::function& callback = std::function()) { + if (callback) { + EXPECT_CALL(*this, _onBodyWithOffset(testing::_, testing::_)) + .WillRepeatedly(testing::InvokeWithoutArgs(callback)); + } else { + EXPECT_CALL(*this, _onBodyWithOffset(testing::_, testing::_)); + } + } + void expectBody( std::function)> callback) { EXPECT_CALL(*this, _onBodyWithOffset(testing::_, testing::_)) diff --git a/proxygen/lib/http/session/test/HTTPUpstreamSessionTest.cpp b/proxygen/lib/http/session/test/HTTPUpstreamSessionTest.cpp index fcf1cf7630..d8c0830a7b 100644 --- a/proxygen/lib/http/session/test/HTTPUpstreamSessionTest.cpp +++ b/proxygen/lib/http/session/test/HTTPUpstreamSessionTest.cpp @@ -3163,6 +3163,74 @@ TEST_F(HTTP2UpstreamSessionTest, TestPingPreserveData) { httpSession_->destroy(); } +class H2LargeFlowControl : public HTTP2UpstreamSessionTest { + void SetUp() override { + constexpr uint32_t kCapacity = 1024 * 1024; + flowControl_ = {kCapacity, kCapacity, kCapacity}; + HTTP2UpstreamSessionTest::SetUp(); + } +}; + +/* + * Verifies that WINDOW_UPDATE is sent when at least kMinThreshold bytes are + * read. + */ +TEST_F(H2LargeFlowControl, WindowUpdateThresholdTest) { + auto handler = openTransaction(); + handler->txn_->pauseIngress(); // buffer incoming bytes + handler->sendRequest(); // getRequest, sends request and eom + eventBase_.loopOnce(); + eventBase_.loopOnce(); + + auto serverCodec = makeServerCodec(); + folly::IOBufQueue output(folly::IOBufQueue::cacheChainLength()); + serverCodec->generateConnectionPreface(output); + serverCodec->generateSettings(output); + + // enqueue 1MB of data into txn + auto resp = makeResponse(200); + serverCodec->generateHeader(output, handler->txn_->getID(), *resp, false); + serverCodec->generateBody(output, + handler->txn_->getID(), + makeBuf(1024 * 1024), + folly::none /* padding */, + false /* eom */); + + auto input = output.move(); + input->coalesce(); + readAndLoop(input->data(), + input->length()); // buffers server's headers and body to txn + + uint32_t bytesReadSoFar = 0; + constexpr uint32_t kMinThreshold = 128 * 1024; + + handler->expectHeaders(); + // setting up expectation, pause ingress after reading 128KB in callback + handler->expectBodyRepeatedly([&] { + bytesReadSoFar += input->computeChainDataLength(); + if (bytesReadSoFar >= kMinThreshold) { + handler->txn_->pauseIngress(); + } + }); + + handler->txn_->resumeIngress(); // only sent to the handler after this point + handler->expectBodyRepeatedly([&] {}); + handler->expectDetachTransaction(); + + // expect window update here + NiceMock callbacks; + serverCodec->setCallback(&callbacks); + EXPECT_CALL(callbacks, onWindowUpdate(0, _)).Times(AnyNumber()); + EXPECT_CALL(callbacks, onWindowUpdate(handler->txn_->getID(), kMinThreshold)) + .Times(AtLeast(1)); + + handler->txn_->resumeIngress(); + eventBase_.loop(); + handler->txn_->sendAbort(); + parseOutput(*serverCodec); // client's buffer -> serverCodec + httpSession_->destroy(); +} + TEST_F(HTTP2UpstreamSessionTest, TestConnectionToken) { auto handler = openTransaction(); handler->expectError();