Skip to content

Commit

Permalink
connection: Remember transport socket read resumption requests and re…
Browse files Browse the repository at this point in the history
…play them when re-enabling read. (#13772)

Fixes SslSocket read resumption after readDisable when processing the SSL record that contains the last bytes of the HTTP message

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente authored Oct 28, 2020
1 parent db756af commit a625c80
Show file tree
Hide file tree
Showing 9 changed files with 397 additions and 28 deletions.
1 change: 1 addition & 0 deletions docs/root/version_history/current.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Bug Fixes
*Changes expected to improve the state of the world and are unlikely to have negative effects*

* http: sending CONNECT_ERROR for HTTP/2 where appropriate during CONNECT requests.
* tls: fix read resumption after triggering buffer high-watermark and all remaining request/response bytes are stored in the SSL connection's internal buffers.

Removed Config or Runtime
-------------------------
Expand Down
32 changes: 23 additions & 9 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ ConnectionImpl::ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPt
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
write_buffer_above_high_watermark_(false), detect_early_close_(true),
enable_half_close_(false), read_end_stream_raised_(false), read_end_stream_(false),
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false) {
write_end_stream_(false), current_write_end_stream_(false), dispatch_buffered_data_(false),
transport_wants_read_(false) {

if (!connected) {
connecting_ = true;
Expand Down Expand Up @@ -193,7 +194,7 @@ Connection::State ConnectionImpl::state() const {

void ConnectionImpl::closeConnectionImmediately() { closeSocket(ConnectionEvent::LocalClose); }

bool ConnectionImpl::consumerWantsToRead() {
bool ConnectionImpl::filterChainWantsData() {
return read_disable_count_ == 0 ||
(read_disable_count_ == 1 && read_buffer_.highWatermarkTriggered());
}
Expand Down Expand Up @@ -271,7 +272,7 @@ void ConnectionImpl::noDelay(bool enable) {
}

void ConnectionImpl::onRead(uint64_t read_buffer_size) {
if (inDelayedClose() || !consumerWantsToRead()) {
if (inDelayedClose() || !filterChainWantsData()) {
return;
}
ASSERT(ioHandle().isOpen());
Expand Down Expand Up @@ -356,11 +357,17 @@ void ConnectionImpl::readDisable(bool disable) {
ioHandle().enableFileEvents(Event::FileReadyType::Read | Event::FileReadyType::Write);
}

if (consumerWantsToRead() && read_buffer_.length() > 0) {
// If the connection has data buffered there's no guarantee there's also data in the kernel
// which will kick off the filter chain. Alternately if the read buffer has data the fd could
// be read disabled. To handle these cases, fake an event to make sure the buffered data gets
// processed regardless and ensure that we dispatch it via onRead.
if (filterChainWantsData() && (read_buffer_.length() > 0 || transport_wants_read_)) {
// If the read_buffer_ is not empty or transport_wants_read_ is true, the connection may be
// able to process additional bytes even if there is no data in the kernel to kick off the
// filter chain. Alternately if the read buffer has data the fd could be read disabled. To
// handle these cases, fake an event to make sure the buffered data in the read buffer or in
// transport socket internal buffers gets processed regardless and ensure that we dispatch it
// via onRead.

// Sanity check: resumption with read_disable_count_ > 0 should only happen if the read
// buffer's high watermark has triggered.
ASSERT(read_buffer_.length() > 0 || read_disable_count_ == 0);
dispatch_buffered_data_ = true;
setReadBufferReady();
}
Expand Down Expand Up @@ -557,12 +564,19 @@ void ConnectionImpl::onReadReady() {
// 2) The consumer of connection data called readDisable(true), and instead of reading from the
// socket we simply need to dispatch already read data.
if (read_disable_count_ != 0) {
if (latched_dispatch_buffered_data && consumerWantsToRead()) {
// Do not clear transport_wants_read_ when returning early; the early return skips the transport
// socket doRead call.
if (latched_dispatch_buffered_data && filterChainWantsData()) {
onRead(read_buffer_.length());
}
return;
}

// Clear transport_wants_read_ just before the call to doRead. This is the only way to ensure that
// the transport socket read resumption happens as requested; onReadReady() returns early without
// reading from the transport if the read buffer is above high watermark at the start of the
// method.
transport_wants_read_ = false;
IoResult result = transport_socket_->doRead(read_buffer_);
uint64_t new_buffer_size = read_buffer_.length();
updateReadBufferStats(result.bytes_processed_, new_buffer_size);
Expand Down
19 changes: 13 additions & 6 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// TODO(htuch): While this is the basis for also yielding to other connections to provide some
// fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
// Reconsider how to make fairness happen.
void setReadBufferReady() override { ioHandle().activateFileEvents(Event::FileReadyType::Read); }
void setReadBufferReady() override {
transport_wants_read_ = true;
ioHandle().activateFileEvents(Event::FileReadyType::Read);
}
void flushWriteBuffer() override;

// Obtain global next connection ID. This should only be used in tests.
Expand All @@ -128,11 +131,10 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
// A convenience function which returns true if
// 1) The read disable count is zero or
// 2) The read disable count is one due to the read buffer being overrun.
// In either case the consumer of the data would like to read from the buffer.
// If the read count is greater than one, or equal to one when the buffer is
// not overrun, then the consumer of the data has called readDisable, and does
// not want to read.
bool consumerWantsToRead();
// In either case the filter chain would like to process data from the read buffer or transport
// socket. If the read count is greater than one, or equal to one when the buffer is not overrun,
// then the filter chain has called readDisable, and does not want additional data.
bool filterChainWantsData();

// Network::ConnectionImplBase
void closeConnectionImmediately() final;
Expand Down Expand Up @@ -197,6 +199,11 @@ class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallback
bool write_end_stream_ : 1;
bool current_write_end_stream_ : 1;
bool dispatch_buffered_data_ : 1;
// True if the most recent call to the transport socket's doRead method invoked setReadBufferReady
// to schedule read resumption after yielding due to shouldDrainReadBuffer(). When true,
// readDisable must schedule read resumption when read_disable_count_ == 0 to ensure that read
// resumption happens when remaining bytes are held in transport socket internal buffers.
bool transport_wants_read_ : 1;
};

class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
Expand Down
191 changes: 191 additions & 0 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,197 @@ TEST_F(MockTransportConnectionImplTest, ObjectDestructOrder) {
file_ready_cb_(Event::FileReadyType::Read);
}

// Verify that read resumptions requested via setReadBufferReady() are scheduled once read is
// re-enabled.
TEST_F(MockTransportConnectionImplTest, ReadBufferReadyResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// No calls to activate when re-enabling if there are no pending read requests.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);

// setReadBufferReady triggers an immediate call to activate.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->setReadBufferReady();

// When processing a sequence of read disable/read enable, changes to the enabled event mask
// happen only when the disable count transitions to/from 0.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
connection_->readDisable(false);
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
// Expect a read activation since there have been no transport doRead calls since the call to
// setReadBufferReady.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// No calls to doRead when file_ready_cb is invoked while read disabled.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
file_ready_cb_(Event::FileReadyType::Read);

// Expect a read activate when re-enabling since the file ready cb has not done a read.
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Do a read to clear the transport_wants_read_ flag, verify that no read activation is scheduled.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// No read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Verify that read resumption is scheduled when read is re-enabled while the read buffer is
// non-empty.
TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer to trigger read activate calls when re-enabling read.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);

// Already read disabled, expect no changes to enabled events mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
// Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read
// activate but no call to setEnable to change the registration mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Invoke the file event cb while read_disable_count_ == 1 to partially drain the read buffer.
// Expect no transport reads.
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(10, data.length());
data.drain(data.length() - 1);
return FilterStatus::Continue;
}));
// Partial drain of the read buffer below low watermark triggers an update to the fd enabled mask
// and a read activate since the read buffer is not empty.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
file_ready_cb_(Event::FileReadyType::Read);

// Drain the rest of the buffer and verify there are no spurious read activate calls.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(1, data.length());
data.drain(1);
return FilterStatus::Continue;
}));
file_ready_cb_(Event::FileReadyType::Read);

EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
// read buffer is empty, no read activate call.
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Verify that transport_wants_read_ read resumption is not lost when processing read buffer
// high-watermark resumptions.
TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

// Add some data to the read buffer and also call setReadBufferReady to mimic what transport
// sockets are expected to do when the read buffer high watermark is hit.
EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Invoke([this](Buffer::Instance& buffer) -> IoResult {
buffer.add("0123456789");
connection_->setReadBufferReady();
return {PostIoAction::KeepOpen, 10, false};
}));
// Expect a change to the event mask when hitting the read buffer high-watermark.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Write));
// The setReadBufferReady call adds a spurious read activation.
// TODO(antoniovicente) Skip the read activate in setReadBufferReady when read_disable_count_ > 0.
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
EXPECT_CALL(*read_filter, onNewConnection()).WillOnce(Return(FilterStatus::Continue));
EXPECT_CALL(*read_filter, onData(_, false)).WillOnce(Return(FilterStatus::Continue));
file_ready_cb_(Event::FileReadyType::Read);

// Already read disabled, expect no changes to enabled events mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
connection_->readDisable(true);
connection_->readDisable(true);
connection_->readDisable(false);
// Read buffer is at the high watermark so read_disable_count should be == 1. Expect a read
// activate but no call to setEnable to change the registration mask.
EXPECT_CALL(*file_event_, setEnabled(_)).Times(0);
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
connection_->readDisable(false);

// Invoke the file event cb while read_disable_count_ == 1 and fully drain the read buffer.
// Expect no transport reads. Expect a read resumption due to transport_wants_read_ being true
// when read is re-enabled due to going under the low watermark.
EXPECT_CALL(*transport_socket_, doRead(_)).Times(0);
EXPECT_CALL(*read_filter, onData(_, _))
.WillRepeatedly(Invoke([&](Buffer::Instance& data, bool) -> FilterStatus {
EXPECT_EQ(10, data.length());
data.drain(data.length());
return FilterStatus::Continue;
}));
// The buffer is fully drained. Expect a read activation because setReadBufferReady set
// transport_wants_read_ and no transport doRead calls have happened.
EXPECT_CALL(*file_event_, setEnabled(Event::FileReadyType::Read | Event::FileReadyType::Write));
EXPECT_CALL(*file_event_, activate(Event::FileReadyType::Read));
file_ready_cb_(Event::FileReadyType::Read);

EXPECT_CALL(*transport_socket_, doRead(_))
.WillOnce(Return(IoResult{PostIoAction::KeepOpen, 0, false}));
file_ready_cb_(Event::FileReadyType::Read);

// Verify there are no read activate calls the event callback does a transport read and clears the
// transport_wants_read_ state.
EXPECT_CALL(*file_event_, setEnabled(_));
connection_->readDisable(true);
EXPECT_CALL(*file_event_, setEnabled(_));
EXPECT_CALL(*file_event_, activate(_)).Times(0);
connection_->readDisable(false);
}

// Test that BytesSentCb is invoked at the correct times
TEST_F(MockTransportConnectionImplTest, BytesSentCallback) {
uint64_t bytes_sent = 0;
Expand Down
Loading

0 comments on commit a625c80

Please sign in to comment.