Skip to content

Commit

Permalink
Remove pause/resume on session level buffer
Browse files Browse the repository at this point in the history
Summary:
We currently pause all producers if the sum of the egress buffers of all transactions exceeds the write buffer limit.  This turns out to be deterimental to prioritization.

Now, we pass the underlying transport pause state or connection flow control state back to the handlers.  The previous diff in this stack introduces a per-stream buffer limit (64kb default).  To limit total session buffer size, limit the number of concurrent streams or lower the per-stream limit.

Reviewed By: lnicco

Differential Revision: D17097138

fbshipit-source-id: 9025c5be8b318963311c3aaad9ee9a03c0e2265e
  • Loading branch information
afrind authored and facebook-github-bot committed Oct 28, 2019
1 parent 10ebc59 commit 95d4b19
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 84 deletions.
55 changes: 54 additions & 1 deletion proxygen/lib/http/session/HQSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1199,9 +1199,18 @@ void HQSession::runLoopCallback() noexcept {

inLoopCallback_ = true;
HQSession::DestructorGuard dg(this);
auto scopeg = folly::makeGuard([this] {
auto scopeg = folly::makeGuard([this, toSend=maxToSend_] {
// This ScopeGuard needs to be under the above DestructorGuard
updatePendingWrites();
if (toSend > 0) {
if (txnEgressQueue_.empty()) {
// We wrote out everything
resumeTransactions();
} else {
// We couldn't write everything, the socket is backpressuring
pauseTransactions();
}
}
checkForShutdown();
inLoopCallback_ = false;
});
Expand Down Expand Up @@ -1926,10 +1935,54 @@ void HQSession::onGoaway(uint64_t lastGoodStreamID,
}

void HQSession::pauseTransactions() {
writesPaused_ = true;
invokeOnEgressStreams(
[](HQStreamTransportBase* stream) { stream->txn_.pauseEgress(); });
}

void HQSession::resumeTransactions() {
DestructorGuard g(this);
auto resumeFn = [this](HTTP2PriorityQueue&,
HTTPCodec::StreamID id,
HTTPTransaction* txn,
double) {
if (txn && !txn->isEgressComplete() && sock_) {
auto flowControl = sock_->getStreamFlowControl(id);
if (!flowControl.hasError() && flowControl->sendWindowAvailable > 0) {
txn->resumeEgress();
}
}
return false;
};
auto stopFn = [this] {
return !hasActiveTransactions();
};

txnEgressQueue_.iterateBFS(resumeFn, stopFn, true /* all */);
writesPaused_ = false;
}

void HQSession::setNewTransactionPauseState(HTTPTransaction* txn) {
bool pauseNew = writesPaused_;
if (!pauseNew && sock_) {
if (sock_->getConnectionBufferAvailable() == 0) {
pauseNew = true;
} else {
auto flowControl = sock_->getConnectionFlowControl();
if (!flowControl.hasError() && flowControl->sendWindowAvailable == 0) {
pauseNew = true;
}
}
}
if (pauseNew) {
CHECK(txn);
// If writes are paused, start this txn off in the egress paused state
VLOG(4) << *this << " starting streamID=" << txn->getID()
<< " egress paused";
txn->pauseEgress();
}
}

void HQSession::notifyEgressBodyBuffered(int64_t bytes) {
if (HTTPSessionBase::notifyEgressBodyBuffered(bytes, true) &&
!inLoopCallback_ && !isLoopCallbackScheduled() && sock_) {
Expand Down
5 changes: 5 additions & 0 deletions proxygen/lib/http/session/HQSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ class HQSession
quic::StreamId /* id */,
const HQUnidirStreamDispatcher::Callback::ReadError& /* err */) override;

void setNewTransactionPauseState(HTTPTransaction* txn) override;

/**
* Attempt to bind an ingress push stream object (which has the txn)
* to a nascent stream (which has the transport/codec).
Expand Down Expand Up @@ -746,6 +748,8 @@ class HQSession

void pauseTransactions() override;

void resumeTransactions() override;

void notifyEgressBodyBuffered(int64_t bytes);

// Schedule the loop callback.
Expand Down Expand Up @@ -2219,6 +2223,7 @@ class HQSession
bool scheduledWrite_{false};

bool forceUpstream1_1_{true};
bool writesPaused_{false};

/** Reads in the current loop iteration */
uint16_t readsPerLoop_{0};
Expand Down
14 changes: 14 additions & 0 deletions proxygen/lib/http/session/HTTPSession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,16 @@ HTTPTransaction* HTTPSession::newPushedTransaction(
return txn;
}

void HTTPSession::setNewTransactionPauseState(HTTPTransaction* txn) {
if (!writesPaused()) {
return;
}
CHECK(txn);
// If writes are paused, start this txn off in the egress paused state
VLOG(4) << *this << " starting streamID=" << txn->getID() << " egress paused";
txn->pauseEgress();
}

HTTPTransaction* FOLLY_NULLABLE
HTTPSession::newExTransaction(HTTPTransaction::Handler* handler,
HTTPCodec::StreamID controlStream,
Expand Down Expand Up @@ -2227,10 +2237,12 @@ void HTTPSession::updateWriteCount() {
// Exceeded limit. Pause reading on the incoming stream.
VLOG(3) << "Pausing egress for " << *this;
writes_ = SocketState::PAUSED;
pauseTransactions();
} else if (numActiveWrites_ == 0 && writesPaused()) {
// Dropped below limit. Resume reading on the incoming stream if needed.
VLOG(3) << "Resuming egress for " << *this;
writes_ = SocketState::UNPAUSED;
resumeTransactions();
}
}

Expand Down Expand Up @@ -2868,11 +2880,13 @@ void HTTPSession::errorOnTransactionId(HTTPCodec::StreamID id,

void HTTPSession::onConnectionSendWindowOpen() {
flowControlTimeout_.cancelTimeout();
resumeTransactions();
// We can write more now. Schedule a write.
scheduleWrite();
}

void HTTPSession::onConnectionSendWindowClosed() {
pauseTransactions();
if (!txnEgressQueue_.empty()) {
VLOG(4) << *this << " session stalled by flow control";
if (sessionStats_) {
Expand Down
2 changes: 2 additions & 0 deletions proxygen/lib/http/session/HTTPSession.h
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,8 @@ class HTTPSession
return folly::none;
}

void setNewTransactionPauseState(HTTPTransaction* txn) override;

void readTimeoutExpired() noexcept;
void writeTimeoutExpired() noexcept;
void flowControlTimeoutExpired() noexcept;
Expand Down
50 changes: 2 additions & 48 deletions proxygen/lib/http/session/HTTPSessionBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ HTTPSessionBase::HTTPSessionBase(const SocketAddress& localAddr,
peerAddr_(peerAddr),
prioritySample_(false),
h2PrioritiesEnabled_(true),
inResume_(false),
pendingPause_(false),
exHeadersEnabled_(false) {

// If we receive IPv4-mapped IPv6 addresses, convert them to IPv4.
Expand Down Expand Up @@ -135,33 +133,8 @@ void HTTPSessionBase::updateWriteBufSize(int64_t delta) {
// the sock_'s write buffer.
delta += pendingWriteSizeDelta_;
pendingWriteSizeDelta_ = 0;
bool wasExceeded = egressLimitExceeded();
DCHECK(delta >= 0 || uint64_t(-delta) <= pendingWriteSize_);
pendingWriteSize_ += delta;

if (egressLimitExceeded() && !wasExceeded) {
// Exceeded limit. Pause reading on the incoming stream.
if (inResume_) {
VLOG(3) << "Pausing txn egress for " << *this << " deferred";
pendingPause_ = true;
} else {
VLOG(3) << "Pausing txn egress for " << *this;
pauseTransactions();
}
} else if (!egressLimitExceeded() && wasExceeded) {
// Dropped below limit. Resume reading on the incoming stream if needed.
if (inResume_) {
if (pendingPause_) {
VLOG(3) << "Cancel deferred txn egress pause for " << *this;
pendingPause_ = false;
} else {
VLOG(3) << "Ignoring redundant resume for " << *this;
}
} else {
VLOG(3) << "Resuming txn egress for " << *this;
resumeTransactions();
}
}
}

void HTTPSessionBase::updatePendingWrites() {
Expand All @@ -171,40 +144,21 @@ void HTTPSessionBase::updatePendingWrites() {
}

void HTTPSessionBase::resumeTransactions() {
CHECK(!inResume_);
inResume_ = true;
DestructorGuard g(this);
auto resumeFn = [](HTTP2PriorityQueue&,
HTTPCodec::StreamID,
HTTPTransaction* txn,
double) {
if (txn) {
if (txn && !txn->isEgressComplete()) {
txn->resumeEgress();
}
return false;
};
auto stopFn = [this] {
return (!hasActiveTransactions() || egressLimitExceeded());
return (!hasActiveTransactions());
};

txnEgressQueue_.iterateBFS(resumeFn, stopFn, true /* all */);
inResume_ = false;
if (pendingPause_) {
VLOG(3) << "Pausing txn egress for " << *this;
pendingPause_ = false;
pauseTransactions();
}
}

void HTTPSessionBase::setNewTransactionPauseState(HTTPTransaction* txn) {
if (!egressLimitExceeded()) {
return;
}

CHECK(txn);
// If writes are paused, start this txn off in the egress paused state
VLOG(4) << *this << " starting streamID=" << txn->getID() << " egress paused";
txn->pauseEgress();
}

void HTTPSessionBase::handleErrorDirectly(HTTPTransaction* txn,
Expand Down
14 changes: 2 additions & 12 deletions proxygen/lib/http/session/HTTPSessionBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,9 @@ class HTTPSessionBase : public wangle::ManagedConnection {

virtual void pauseTransactions() = 0;

void resumeTransactions();
virtual void resumeTransactions();

void setNewTransactionPauseState(HTTPTransaction* txn);
virtual void setNewTransactionPauseState(HTTPTransaction* txn) = 0;

/**
* Install a direct response handler for the transaction based on the
Expand Down Expand Up @@ -614,14 +614,6 @@ class HTTPSessionBase : public wangle::ManagedConnection {
}
}

/**
* Returns true iff egress should stop on this session.
*/
bool egressLimitExceeded() const {
// Changed to >
return pendingWriteSize_ > writeBufLimit_;
}

/**
* The latest time when this session became idle status
*/
Expand Down Expand Up @@ -681,8 +673,6 @@ class HTTPSessionBase : public wangle::ManagedConnection {

bool prioritySample_ : 1;
bool h2PrioritiesEnabled_ : 1;
bool inResume_ : 1;
bool pendingPause_ : 1;

/**
* Indicates whether Ex Headers is supported in HTTPSession
Expand Down
90 changes: 87 additions & 3 deletions proxygen/lib/http/session/test/HQDownstreamSessionTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,15 @@ TEST_P(HQDownstreamSessionTest, OnConnectionWindowPartialHeaders) {
auto id = sendRequest();
auto handler = addSimpleStrictHandler();
handler->expectHeaders();
handler->expectEOM([&handler] { handler->sendReplyWithBody(200, 100); });
handler->expectEOM([&handler] {
handler->sendHeaders(200, 100);
handler->txn_->sendBody(makeBuf(100));
});
// TODO: we should probably pause egress on conn limited.
// handler->expectEgressPaused();
// handler->expectEgressResumed();
handler->expectEgressPaused();
handler->expectEgressResumed([&handler] {
handler->txn_->sendEOM();
});
handler->expectDetachTransaction();

// Initialize the flow control window to less than the response body
Expand Down Expand Up @@ -2328,6 +2333,85 @@ TEST_P(HQDownstreamSessionTestHQ, TooManyControlStreams) {
HTTP3::ErrorCode::HTTP_WRONG_STREAM_COUNT);
}

TEST_P(HQDownstreamSessionTest, TestUniformPauseState) {
sendRequest("/", 1);
sendRequest("/", 1);

InSequence handlerSequence;
auto handler1 = addSimpleStrictHandler();
handler1->expectHeaders();
handler1->expectEOM();
auto handler2 = addSimpleStrictHandler();
handler2->expectHeaders();
handler2->expectEOM([&] {
handler1->sendHeaders(200, 24002);
// triggers pause of all txns
// If I set to 0, then I never get onWriteReady.
// HQSession needs to runInLoop and pauseTransactions if onWriteReady
// never comes?
socketDriver_->setConnectionFlowControlWindow(1);
handler1->txn_->sendBody(makeBuf(12001));
});
// HQ streams invocations are unordered set
handler2->expectEgressPaused();
handler1->expectEgressPaused();

flushRequestsAndLoopN(3);
sendRequest("/", 2);

auto handler3 = addSimpleStrictHandler();
handler3->expectEgressPaused();
handler3->expectHeaders();
handler3->expectEOM([this] {
eventBase_.runAfterDelay([this] {
socketDriver_->setConnectionFlowControlWindow(65536);
}, 50);
});

handler2->expectEgressResumed();
handler1->expectEgressResumed([&] {
// resume does not trigger another pause,
handler1->txn_->sendBody(makeBuf(12001));
socketDriver_->setConnectionFlowControlWindow(1);
eventBase_.runAfterDelay([this] {
socketDriver_->setConnectionFlowControlWindow(65536);
}, 50);
});
handler3->expectEgressResumed();
handler1->expectEgressPaused();
handler2->expectEgressPaused();
handler3->expectEgressPaused();

handler2->expectEgressResumed();
handler1->expectEgressResumed([&] {
handler2->sendHeaders(200, 12001);
handler2->txn_->sendBody(makeBuf(12001));
socketDriver_->setConnectionFlowControlWindow(1);
eventBase_.runAfterDelay([this] {
socketDriver_->setConnectionFlowControlWindow(65536);
}, 50);
});
handler3->expectEgressResumed();

handler1->expectEgressPaused();
handler2->expectEgressPaused();
handler3->expectEgressPaused();

handler2->expectEgressResumed();
handler1->expectEgressResumed([&] {
handler1->txn_->sendEOM();
handler2->txn_->sendEOM();
});
handler3->expectEgressResumed([&] { handler3->txn_->sendAbort(); });

handler3->expectDetachTransaction();
handler2->expectDetachTransaction();
handler1->expectDetachTransaction();

flushRequestsAndLoop();
hqSession_->closeWhenIdle();
}

/**
* Instantiate the Parametrized test cases
*/
Expand Down
Loading

0 comments on commit 95d4b19

Please sign in to comment.