Skip to content

Commit

Permalink
test: Wait for flush to happen in IntegrationCodecClient::flushWrite
Browse files Browse the repository at this point in the history
Unfortunately implementing this by tracking output buffer watermarks requires changes to the connection interface so that the read and write buffer watermarks can be controlled independently. This will be specially true after envoyproxy#14054 which adjusts read size based on the read buffer high-watermark. This change also depends on envoyproxy#14333 since without it is not possible to get a true 1 byte output buffer highwatermark.

Signed-off-by: Antonio Vicente <avd@google.com>
  • Loading branch information
antoniovicente committed Jan 12, 2021
1 parent c2522c6 commit ee41a9c
Show file tree
Hide file tree
Showing 27 changed files with 148 additions and 96 deletions.
11 changes: 8 additions & 3 deletions include/envoy/network/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,17 @@ class Connection : public Event::DeferredDeletable, public FilterManager {
* write buffer, onBelowWriteBufferHighWatermark is called which similarly allows subscribers
* resuming reading.
*/
virtual void setBufferLimits(uint32_t limit) PURE;
virtual void setBufferLimits(uint32_t read_buffer_limit, uint32_t write_buffer_limit) PURE;

/**
* Get the value set with setBufferLimits.
* Get the read buffer limit value set with setBufferLimits.
*/
virtual uint32_t bufferLimit() const PURE;
virtual uint32_t readBufferLimit() const PURE;

/**
* Get the write buffer limit value set with setBufferLimits.
*/
virtual uint32_t writeBufferLimit() const PURE;

/**
* @return boolean telling if the connection is currently above the high watermark.
Expand Down
1 change: 1 addition & 0 deletions source/common/event/dispatcher_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ void DispatcherImpl::post(std::function<void()> callback) {
}

void DispatcherImpl::run(RunType type) {
ASSERT(isThreadSafe(), "DispatcherImpl::run should be called from a single thread");
run_tid_ = api_.threadFactory().currentThreadId();

// Flush all post callbacks before we run the event loop. We do this because there are post
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat
[&]() -> void { this->onAboveHighWatermark(); },
[]() -> void { /* TODO(adisuissa): Handle overflow watermark */ })),
max_headers_kb_(max_headers_kb), max_headers_count_(max_headers_count) {
output_buffer_->setWatermarks(connection.bufferLimit());
output_buffer_->setWatermarks(connection.writeBufferLimit());
http_parser_init(&parser_, type);
parser_.allow_chunked_length = 1;
parser_.data = this;
Expand Down
2 changes: 1 addition & 1 deletion source/common/http/http1/codec_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable<Log
connection_.readDisable(disable);
}
}
uint32_t bufferLimit() { return connection_.bufferLimit(); }
uint32_t bufferLimit() { return connection_.readBufferLimit(); }
virtual bool supportsHttp10() { return false; }
bool maybeDirectDispatch(Buffer::Instance& data);
virtual void maybeAddSentinelBufferFragment(Buffer::Instance&) {}
Expand Down
13 changes: 8 additions & 5 deletions source/common/network/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,9 @@ void ConnectionImpl::write(Buffer::Instance& data, bool end_stream, bool through
}
}

void ConnectionImpl::setBufferLimits(uint32_t limit) {
read_buffer_limit_ = limit;
void ConnectionImpl::setBufferLimits(uint32_t read_buffer_limit, uint32_t write_buffer_limit) {
read_buffer_limit_ = read_buffer_limit;
write_buffer_limit_ = write_buffer_limit;

// Due to the fact that writes to the connection and flushing data from the connection are done
// asynchronously, we have the option of either setting the watermarks aggressively, and regularly
Expand All @@ -501,9 +502,11 @@ void ConnectionImpl::setBufferLimits(uint32_t limit) {
// watermark from |limit + 1| to |limit| as the common case (move |limit| bytes, flush |limit|
// bytes) would not trigger watermarks but a blocked socket (move |limit| bytes, flush 0 bytes)
// would result in respecting the exact buffer limit.
if (limit > 0) {
write_buffer_->setWatermarks(limit + 1);
read_buffer_->setWatermarks(limit + 1);
if (write_buffer_limit > 0) {
write_buffer_->setWatermarks(write_buffer_limit + 1);
}
if (read_buffer_limit > 0) {
read_buffer_->setWatermarks(read_buffer_limit + 1);
}
}

Expand Down
6 changes: 4 additions & 2 deletions source/common/network/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ class ConnectionImpl : public ConnectionImplBase,
State state() const override;
bool connecting() const override { return connecting_; }
void write(Buffer::Instance& data, bool end_stream) override;
void setBufferLimits(uint32_t limit) override;
uint32_t bufferLimit() const override { return read_buffer_limit_; }
void setBufferLimits(uint32_t read_buffer_limit, uint32_t write_buffer_limit) override;
uint32_t readBufferLimit() const override { return read_buffer_limit_; }
uint32_t writeBufferLimit() const override { return write_buffer_limit_; }
bool aboveHighWatermark() const override { return write_buffer_above_high_watermark_; }
const ConnectionSocket::OptionsSharedPtr& socketOptions() const override {
return socket_->options();
Expand Down Expand Up @@ -164,6 +165,7 @@ class ConnectionImpl : public ConnectionImplBase,
// This buffer is always allocated, never nullptr.
Buffer::InstancePtr read_buffer_;
uint32_t read_buffer_limit_ = 0;
uint32_t write_buffer_limit_ = 0;
bool connecting_{false};
ConnectionEvent immediate_error_event_{ConnectionEvent::Connected};
bool bind_error_{false};
Expand Down
3 changes: 2 additions & 1 deletion source/common/upstream/upstream_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ HostImpl::createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& clu
address, cluster.sourceAddress(),
socket_factory.createTransportSocket(std::move(transport_socket_options)),
connection_options);
connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
auto buffer_limit = cluster.perConnectionBufferLimitBytes();
connection->setBufferLimits(buffer_limit, buffer_limit);
cluster.createNetworkFilterChain(*connection);
return connection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ namespace Extensions {
namespace NetworkFilters {
namespace DubboProxy {

constexpr uint32_t BufferLimit = UINT32_MAX;

ConnectionManager::ConnectionManager(Config& config, Random::RandomGenerator& random_generator,
TimeSource& time_system)
: config_(config), time_system_(time_system), stats_(config_.stats()),
Expand Down Expand Up @@ -61,7 +59,7 @@ void ConnectionManager::initializeReadFilterCallbacks(Network::ReadFilterCallbac
read_callbacks_ = &callbacks;
read_callbacks_->connection().addConnectionCallbacks(*this);
read_callbacks_->connection().enableHalfClose(true);
read_callbacks_->connection().setBufferLimits(BufferLimit);
read_callbacks_->connection().setBufferLimits(0, 0);
}

void ConnectionManager::onEvent(Network::ConnectionEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ bool QuicFilterManagerConnectionImpl::isHalfCloseEnabled() {
return false;
}

void QuicFilterManagerConnectionImpl::setBufferLimits(uint32_t /*limit*/) {
void QuicFilterManagerConnectionImpl::setBufferLimits(uint32_t /*read_buffer_limit*/,
uint32_t /*write_buffer_limit*/) {
// Currently read buffer is capped by connection level flow control. And write buffer limit is set
// during construction. Changing the buffer limit during the life time of the connection is not
// supported.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ class QuicFilterManagerConnectionImpl : public Network::ConnectionImplBase {
// All writes should be handled by Quic internally.
NOT_REACHED_GCOVR_EXCL_LINE;
}
void setBufferLimits(uint32_t limit) override;
uint32_t bufferLimit() const override {
void setBufferLimits(uint32_t read_buffer_limit, uint32_t write_buffer_limit) override;
uint32_t readBufferLimit() const override {
// As quic connection is not HTTP1.1, this method shouldn't be called by HCM.
NOT_REACHED_GCOVR_EXCL_LINE;
}
uint32_t writeBufferLimit() const override {
// As quic connection is not HTTP1.1, this method shouldn't be called by HCM.
NOT_REACHED_GCOVR_EXCL_LINE;
}
Expand Down
5 changes: 3 additions & 2 deletions source/server/api_listener_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ class ApiListenerImplBase : public ApiListener,
State state() const override { return Network::Connection::State::Open; }
bool connecting() const override { return false; }
void write(Buffer::Instance&, bool) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
void setBufferLimits(uint32_t) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
uint32_t bufferLimit() const override { return 65000; }
void setBufferLimits(uint32_t, uint32_t) override { NOT_IMPLEMENTED_GCOVR_EXCL_LINE; }
uint32_t readBufferLimit() const override { return 65000; }
uint32_t writeBufferLimit() const override { return 65000; }
bool aboveHighWatermark() const override { return false; }
const Network::ConnectionSocket::OptionsSharedPtr& socketOptions() const override {
return options_;
Expand Down
3 changes: 2 additions & 1 deletion source/server/connection_handler_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ void ConnectionHandlerImpl::ActiveTcpListener::newConnection(
ActiveTcpConnectionPtr active_connection(
new ActiveTcpConnection(active_connections, std::move(server_conn_ptr),
parent_.dispatcher_.timeSource(), std::move(stream_info)));
active_connection->connection_->setBufferLimits(config_->perConnectionBufferLimitBytes());
auto buffer_limit = config_->perConnectionBufferLimitBytes();
active_connection->connection_->setBufferLimits(buffer_limit, buffer_limit);

const bool empty_filter_chain = !config_->filterChainFactory().createNetworkFilterChain(
*active_connection->connection_, filter_chain->networkFilterFactories());
Expand Down
4 changes: 2 additions & 2 deletions test/common/http/http1/codec_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1874,7 +1874,7 @@ TEST_F(Http1ServerConnectionImplTest, ConnectRequestWithZeroContentLength) {
}

TEST_F(Http1ServerConnectionImplTest, WatermarkTest) {
EXPECT_CALL(connection_, bufferLimit()).WillOnce(Return(10));
EXPECT_CALL(connection_, writeBufferLimit()).WillOnce(Return(10));
initialize();

NiceMock<MockRequestDecoder> decoder;
Expand Down Expand Up @@ -2565,7 +2565,7 @@ TEST_F(Http1ClientConnectionImplTest, ConnectRejected) {
}

TEST_F(Http1ClientConnectionImplTest, WatermarkTest) {
EXPECT_CALL(connection_, bufferLimit()).WillOnce(Return(10));
EXPECT_CALL(connection_, writeBufferLimit()).WillOnce(Return(10));
initialize();

InSequence s;
Expand Down
2 changes: 1 addition & 1 deletion test/common/http/http1/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ TEST_F(Http1ConnPoolImplTest, VerifyBufferLimits) {
ConnPoolCallbacks callbacks;
conn_pool_->expectClientCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*conn_pool_->test_clients_.back().connection_, setBufferLimits(8192));
EXPECT_CALL(*conn_pool_->test_clients_.back().connection_, setBufferLimits(8192, 8192));
Http::ConnectionPool::Cancellable* handle = conn_pool_->newStream(outer_decoder, callbacks);
EXPECT_NE(nullptr, handle);

Expand Down
2 changes: 1 addition & 1 deletion test/common/http/http2/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class Http2ConnPoolImplTest : public Event::TestUsingSimulatedTime, public testi
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes())
.Times(num_clients)
.WillRepeatedly(Return(*buffer_limits));
EXPECT_CALL(*test_client.connection_, setBufferLimits(*buffer_limits));
EXPECT_CALL(*test_client.connection_, setBufferLimits(*buffer_limits, *buffer_limits));
}
}
// Finally (for InSequence tests) set up createCodecClient and make sure the
Expand Down
23 changes: 12 additions & 11 deletions test/common/network/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -938,31 +938,31 @@ TEST_P(ConnectionImplTest, WriteWatermarks) {
// Go from watermarks being off to being above the high watermark.
EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark());
EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0);
client_connection_->setBufferLimits(buffer_len - 3);
client_connection_->setBufferLimits(0, buffer_len - 3);
EXPECT_TRUE(client_connection_->aboveHighWatermark());
}

{
// Go from above the high watermark to in between both.
EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()).Times(0);
EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0);
client_connection_->setBufferLimits(buffer_len + 1);
client_connection_->setBufferLimits(0, buffer_len + 1);
EXPECT_TRUE(client_connection_->aboveHighWatermark());
}

{
// Go from above the high watermark to below the low watermark.
EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()).Times(0);
EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark());
client_connection_->setBufferLimits(buffer_len * 3);
client_connection_->setBufferLimits(0, buffer_len * 3);
EXPECT_FALSE(client_connection_->aboveHighWatermark());
}

{
// Go back in between and verify neither callback is called.
EXPECT_CALL(client_callbacks_, onAboveWriteBufferHighWatermark()).Times(0);
EXPECT_CALL(client_callbacks_, onBelowWriteBufferLowWatermark()).Times(0);
client_connection_->setBufferLimits(buffer_len * 2);
client_connection_->setBufferLimits(0, buffer_len * 2);
EXPECT_FALSE(client_connection_->aboveHighWatermark());
}

Expand All @@ -973,7 +973,7 @@ TEST_P(ConnectionImplTest, WriteWatermarks) {
TEST_P(ConnectionImplTest, ReadWatermarks) {

setUpBasicConnection();
client_connection_->setBufferLimits(2);
client_connection_->setBufferLimits(2, 0);
std::shared_ptr<MockReadFilter> client_read_filter(new NiceMock<MockReadFilter>());
client_connection_->addReadFilter(client_read_filter);
connect();
Expand Down Expand Up @@ -1117,7 +1117,7 @@ TEST_P(ConnectionImplTest, WriteWithWatermarks) {

connect();

client_connection_->setBufferLimits(2);
client_connection_->setBufferLimits(0, 2);

std::string data_to_write = "hello world";
Buffer::OwnedImpl first_buffer_to_write(data_to_write);
Expand Down Expand Up @@ -1180,7 +1180,7 @@ TEST_P(ConnectionImplTest, WatermarkFuzzing) {
setUpBasicConnection();

connect();
client_connection_->setBufferLimits(10);
client_connection_->setBufferLimits(0, 10);

TestRandomGenerator rand;
int bytes_buffered = 0;
Expand Down Expand Up @@ -2079,7 +2079,7 @@ TEST_F(MockTransportConnectionImplTest, ReadBufferResumeAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->setBufferLimits(5, 0);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

Expand Down Expand Up @@ -2146,7 +2146,7 @@ TEST_F(MockTransportConnectionImplTest, ResumeWhileAndAfterReadDisable) {
InSequence s;

std::shared_ptr<MockReadFilter> read_filter(new StrictMock<MockReadFilter>());
connection_->setBufferLimits(5);
connection_->setBufferLimits(5, 0);
connection_->enableHalfClose(true);
connection_->addReadFilter(read_filter);

Expand Down Expand Up @@ -2774,10 +2774,11 @@ class ReadBufferLimitTest : public ConnectionImplTest {
.WillOnce(Invoke([&](Network::ConnectionSocketPtr& socket) -> void {
server_connection_ = dispatcher_->createServerConnection(
std::move(socket), Network::Test::createRawBufferSocket(), stream_info_);
server_connection_->setBufferLimits(read_buffer_limit);
server_connection_->setBufferLimits(read_buffer_limit, 0);
server_connection_->addReadFilter(read_filter_);
EXPECT_EQ("", server_connection_->nextProtocol());
EXPECT_EQ(read_buffer_limit, server_connection_->bufferLimit());
EXPECT_EQ(read_buffer_limit, server_connection_->readBufferLimit());
EXPECT_EQ(0, server_connection_->writeBufferLimit());
}));

uint32_t filter_seen = 0;
Expand Down
4 changes: 2 additions & 2 deletions test/common/tcp/conn_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class TcpConnPoolImplDestructorTest : public Event::TestUsingSimulatedTime,

void prepareConn() {
connection_ = new StrictMock<Network::MockClientConnection>();
EXPECT_CALL(*connection_, setBufferLimits(0));
EXPECT_CALL(*connection_, setBufferLimits(0, 0));
EXPECT_CALL(*connection_, detectEarlyCloseWhenReadDisabled(false));
EXPECT_CALL(*connection_, addConnectionCallbacks(_));
EXPECT_CALL(*connection_, addReadFilter(_));
Expand Down Expand Up @@ -488,7 +488,7 @@ TEST_P(TcpConnPoolImplTest, VerifyBufferLimitsAndOptions) {
ConnPoolCallbacks callbacks;
conn_pool_->expectConnCreate();
EXPECT_CALL(*cluster_, perConnectionBufferLimitBytes()).WillOnce(Return(8192));
EXPECT_CALL(*conn_pool_->test_conns_.back().connection_, setBufferLimits(8192));
EXPECT_CALL(*conn_pool_->test_conns_.back().connection_, setBufferLimits(8192, 8192));

EXPECT_CALL(callbacks.pool_failure_, ready());
Tcp::ConnectionPool::Cancellable* handle = conn_pool_->newConnection(callbacks);
Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,7 @@ TEST_F(ClusterManagerImplTest, VerifyBufferLimits) {

create(parseBootstrapFromV3Yaml(yaml));
Network::MockClientConnection* connection = new NiceMock<Network::MockClientConnection>();
EXPECT_CALL(*connection, setBufferLimits(8192));
EXPECT_CALL(*connection, setBufferLimits(8192, 8192));
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_, _, _, _))
.WillOnce(Return(connection));
auto conn_data = cluster_manager_->getThreadLocalCluster("cluster_1")->tcpConn(nullptr);
Expand Down Expand Up @@ -1528,7 +1528,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) {
.WillByDefault(Return(ClusterInfo::Features::CLOSE_CONNECTIONS_ON_HOST_HEALTH_FAILURE));
EXPECT_CALL(factory_.tls_.dispatcher_, createClientConnection_(_, _, _, _))
.WillOnce(Return(connection));
EXPECT_CALL(*connection, setBufferLimits(_));
EXPECT_CALL(*connection, setBufferLimits(_, _));
EXPECT_CALL(*connection, addConnectionCallbacks(_));
auto conn_info = cluster_manager_->getThreadLocalCluster("fake_cluster")->tcpConn(nullptr);
EXPECT_EQ(conn_info.connection_.get(), connection);
Expand Down
6 changes: 3 additions & 3 deletions test/common/upstream/hds_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ TEST_F(HdsTest, TestProcessMessageMissingFieldsWithFallback) {
EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(2);
EXPECT_CALL(async_stream_, sendMessageRaw_(_, false));
EXPECT_CALL(test_factory_, createClusterInfo(_)).WillOnce(Return(cluster_info_));
EXPECT_CALL(*connection, setBufferLimits(_));
EXPECT_CALL(*connection, setBufferLimits(_, _));
EXPECT_CALL(dispatcher_, deferredDelete_(_));
// Process message
hds_delegate_->onReceiveMessage(std::move(message));
Expand Down Expand Up @@ -568,7 +568,7 @@ TEST_F(HdsTest, TestSocketContext) {
return cluster_info_;
}));

EXPECT_CALL(*connection, setBufferLimits(_));
EXPECT_CALL(*connection, setBufferLimits(_, _));
EXPECT_CALL(dispatcher_, deferredDelete_(_));

// Process message.
Expand Down Expand Up @@ -671,7 +671,7 @@ TEST_F(HdsTest, TestSendResponseOneEndpointTimeout) {
EXPECT_CALL(*server_response_timer_, enableTimer(_, _)).Times(2);
EXPECT_CALL(async_stream_, sendMessageRaw_(_, false));
EXPECT_CALL(test_factory_, createClusterInfo(_)).WillOnce(Return(cluster_info_));
EXPECT_CALL(*connection_, setBufferLimits(_));
EXPECT_CALL(*connection_, setBufferLimits(_, _));
EXPECT_CALL(dispatcher_, deferredDelete_(_));
// Process message
hds_delegate_->onReceiveMessage(std::move(message));
Expand Down
Loading

0 comments on commit ee41a9c

Please sign in to comment.