From 9226ccde02c0d6dacbffe1b3076fd5c9fd41efe9 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Fri, 30 Oct 2020 19:22:00 -0400 Subject: [PATCH 1/4] http2: delay H2 frame serialization if the network connection's output buffer high-watermark is triggered. By avoiding eager serialization the H2 codec implementation is able to prioritize more important frames so they are sent sooner and provide stricter bounds on connection and stream buffers. This change is protected by the `envoy.reloadable_features.enable_h2_watermark_improvements` runtime feature which is enabled by default. Signed-off-by: Antonio Vicente --- docs/root/version_history/current.rst | 6 + source/common/buffer/watermark_buffer.h | 2 +- source/common/event/dispatcher_impl.cc | 3 +- source/common/http/conn_manager_impl.cc | 20 +- source/common/http/http2/codec_impl.cc | 102 ++- source/common/http/http2/codec_impl.h | 42 +- .../common/http/http2/protocol_constraints.cc | 7 +- .../common/http/http2/protocol_constraints.h | 16 +- source/common/runtime/runtime_features.cc | 2 + test/common/http/codec_impl_fuzz_test.cc | 2 + test/common/http/http2/codec_impl_test.cc | 14 +- test/common/http/http2/frame_replay_test.cc | 8 + test/common/http/http2/http2_frame.cc | 9 + test/common/http/http2/http2_frame.h | 2 + .../http/http2/protocol_constraints_test.cc | 69 +- .../http/http2/request_header_fuzz_test.cc | 1 + .../http/http2/response_header_fuzz_test.cc | 1 + test/common/http/mixed_conn_pool_test.cc | 3 + .../upstream/health_checker_impl_test.cc | 1 + .../common/fuzz/uber_per_readfilter.cc | 10 +- .../network/common/fuzz/uber_readfilter.cc | 5 +- .../network/common/fuzz/uber_readfilter.h | 2 +- test/integration/BUILD | 1 + .../filters/test_socket_interface.h | 18 + .../http2_flood_integration_test.cc | 693 ++++++++++++++++-- test/integration/http_integration.cc | 13 +- test/integration/http_integration.h | 10 +- test/integration/server.h | 6 + test/integration/server_stats.h | 10 + test/integration/tracked_watermark_buffer.cc | 40 +- test/integration/tracked_watermark_buffer.h | 29 +- .../tracked_watermark_buffer_test.cc | 30 + test/test_common/utility.cc | 13 + test/test_common/utility.h | 15 + tools/code_format/check_format.py | 2 +- tools/spelling/spelling_dictionary.txt | 1 + 36 files changed, 1083 insertions(+), 125 deletions(-) diff --git a/docs/root/version_history/current.rst b/docs/root/version_history/current.rst index 2f07e2da2e44..0a9e9b067fda 100644 --- a/docs/root/version_history/current.rst +++ b/docs/root/version_history/current.rst @@ -9,6 +9,12 @@ Minor Behavior Changes ---------------------- *Changes that may cause incompatibilities for some users, but should not for most* +* http2: delay serialization of control and data frames if the network connection's output buffer + has triggered its fullness high-watermark. By avoiding eager serialization the H2 codec + implementation is able to prioritize more important frames so they are sent sooner and provide + stricter bounds on connection and stream buffers. This change can be disabled by setting the + `envoy.reloadable_features.enable_h2_watermark_improvements` feature flag to false. If setting + this flag to false is required in a deployment please open an issue against the project. * upstream: host weight changes now cause a full load balancer rebuild as opposed to happening atomically inline. This change has been made to support load balancer pre-computation of data structures based on host weight, but may have performance implications if host weight changes diff --git a/source/common/buffer/watermark_buffer.h b/source/common/buffer/watermark_buffer.h index a84f20c21d5d..e2a12f66a9ce 100644 --- a/source/common/buffer/watermark_buffer.h +++ b/source/common/buffer/watermark_buffer.h @@ -49,7 +49,7 @@ class WatermarkBuffer : public OwnedImpl { protected: virtual void checkHighAndOverflowWatermarks(); - void checkLowWatermark(); + virtual void checkLowWatermark(); private: std::function below_low_watermark_; diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index 558d82b9230d..abd8472600ae 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -191,7 +191,8 @@ TimerPtr DispatcherImpl::createTimer(TimerCb cb) { } Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function cb) { - ASSERT(isThreadSafe()); + // TODO(antoniovicente) Merge in https://github.com/envoyproxy/envoy/pull/14526 and restore ASSERT + // ASSERT(isThreadSafe()); return base_scheduler_.createSchedulableCallback([this, cb]() { touchWatchdog(); cb(); diff --git a/source/common/http/conn_manager_impl.cc b/source/common/http/conn_manager_impl.cc index d6f501985486..729d4d5ebfe5 100644 --- a/source/common/http/conn_manager_impl.cc +++ b/source/common/http/conn_manager_impl.cc @@ -257,10 +257,22 @@ RequestDecoder& ConnectionManagerImpl::newStream(ResponseEncoder& response_encod new_stream->response_encoder_ = &response_encoder; new_stream->response_encoder_->getStream().addCallbacks(*new_stream); new_stream->response_encoder_->getStream().setFlushTimeout(new_stream->idle_timeout_ms_); - // If the network connection is backed up, the stream should be made aware of it on creation. - // Both HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacksHelper. - ASSERT(read_callbacks_->connection().aboveHighWatermark() == false || - new_stream->filter_manager_.aboveHighWatermark()); + + // If the network connection is backed up, the HTTP/1.x stream should be made aware of it on + // creation. In the case of HTTP/2 the stream should be allowed to read up to the configured + // stream limit even when the network connection is backed up, so the readDisable status is not + // propagated from the network connection if the + // envoy.reloadable_features.enable_h2_watermark_improvements runtime feature is enabled. Both + // HTTP/1.x and HTTP/2 codecs handle this in StreamCallbackHelper::addCallbacksHelper. + // TODO(antoniovicente) For full consistency we need to use the enable_h2_watermark_improvements + // latched by the H2 connection when it was created. Accessing the current value of the runtime + // feature may trigger spurious ASSERT failures. + ASSERT((Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.enable_h2_watermark_improvements") && + codec_->protocol() >= Protocol::Http2) + ? !new_stream->filter_manager_.aboveHighWatermark() + : read_callbacks_->connection().aboveHighWatermark() == false || + new_stream->filter_manager_.aboveHighWatermark()); LinkedList::moveIntoList(std::move(new_stream), streams_); return **streams_.begin(); } diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index f53aff523b69..829ec9e80560 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -28,6 +28,22 @@ namespace Envoy { namespace Http { namespace Http2 { +namespace { + +class Nghttp2Session : public Nghttp2SessionInterface { +public: + explicit Nghttp2Session(nghttp2_session& session) : session_(session) {} + + size_t getOutboundControlFrameQueueSize() const override { + return nghttp2_session_get_outbound_queue_size(&session_); + } + +private: + nghttp2_session& session_; +}; + +} // namespace + // Changes or additions to details should be reflected in // docs/root/configuration/http/http_conn_man/response_code_details_details.rst class Http2ResponseCodeDetailValues { @@ -116,13 +132,22 @@ template static T* removeConst(const void* object) { } ConnectionImpl::StreamImpl::StreamImpl(ConnectionImpl& parent, uint32_t buffer_limit) - : parent_(parent), local_end_stream_sent_(false), remote_end_stream_(false), - data_deferred_(false), received_noninformational_headers_(false), + : parent_(parent), + pending_recv_data_(parent_.connection_.dispatcher().getWatermarkFactory().create( + [this]() -> void { this->pendingRecvBufferLowWatermark(); }, + [this]() -> void { this->pendingRecvBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + pending_send_data_(parent_.connection_.dispatcher().getWatermarkFactory().create( + [this]() -> void { this->pendingSendBufferLowWatermark(); }, + [this]() -> void { this->pendingSendBufferHighWatermark(); }, + []() -> void { /* TODO(adisuissa): Handle overflow watermark */ })), + local_end_stream_sent_(false), remote_end_stream_(false), data_deferred_(false), + received_noninformational_headers_(false), pending_receive_buffer_high_watermark_called_(false), pending_send_buffer_high_watermark_called_(false), reset_due_to_messaging_error_(false) { parent_.stats_.streams_active_.inc(); if (buffer_limit > 0) { - setWriteBufferWatermarks(buffer_limit / 2, buffer_limit); + setWriteBufferWatermarks(buffer_limit); } } @@ -131,7 +156,7 @@ ConnectionImpl::StreamImpl::~StreamImpl() { ASSERT(stream_idle_timer_ == nullptr void ConnectionImpl::StreamImpl::destroy() { disarmStreamIdleTimer(); parent_.stats_.streams_active_.dec(); - parent_.stats_.pending_send_bytes_.sub(pending_send_data_.length()); + parent_.stats_.pending_send_bytes_.sub(pending_send_data_->length()); } static void insertHeader(std::vector& headers, const HeaderEntry& header) { @@ -239,7 +264,7 @@ void ConnectionImpl::ServerStreamImpl::encodeHeaders(const ResponseHeaderMap& he void ConnectionImpl::StreamImpl::encodeTrailersBase(const HeaderMap& trailers) { ASSERT(!local_end_stream_); local_end_stream_ = true; - if (pending_send_data_.length() > 0) { + if (pending_send_data_->length() > 0) { // In this case we want trailers to come after we release all pending body data that is // waiting on window updates. We need to save the trailers so that we can emit them later. // However, for empty trailers, we don't need to to save the trailers. @@ -395,13 +420,13 @@ void ConnectionImpl::StreamImpl::submitMetadata(uint8_t flags) { } ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* data_flags) { - if (pending_send_data_.length() == 0 && !local_end_stream_) { + if (pending_send_data_->length() == 0 && !local_end_stream_) { ASSERT(!data_deferred_); data_deferred_ = true; return NGHTTP2_ERR_DEFERRED; } else { *data_flags |= NGHTTP2_DATA_FLAG_NO_COPY; - if (local_end_stream_ && pending_send_data_.length() <= length) { + if (local_end_stream_ && pending_send_data_->length() <= length) { *data_flags |= NGHTTP2_DATA_FLAG_EOF; if (pending_trailers_to_encode_) { // We need to tell the library to not set end stream so that we can emit the trailers. @@ -411,29 +436,38 @@ ssize_t ConnectionImpl::StreamImpl::onDataSourceRead(uint64_t length, uint32_t* } } - return std::min(length, pending_send_data_.length()); + return std::min(length, pending_send_data_->length()); } } -void ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t length) { +ssize_t ConnectionImpl::StreamImpl::onDataSourceSend(const uint8_t* framehd, size_t length) { // In this callback we are writing out a raw DATA frame without copying. nghttp2 assumes that we // "just know" that the frame header is 9 bytes. // https://nghttp2.org/documentation/types.html#c.nghttp2_send_data_callback static const uint64_t FRAME_HEADER_SIZE = 9; + if (parent_.h2_watermark_improvements_ && length > 0 && + parent_.connection_.aboveHighWatermark()) { + // The network connection's output buffer is full. Delay generation of the data frame until the + // buffer's size drops to its low-watermark. + return NGHTTP2_ERR_WOULDBLOCK; + } + parent_.protocol_constraints_.incrementOutboundDataFrameCount(); Buffer::OwnedImpl output; parent_.addOutboundFrameFragment(output, framehd, FRAME_HEADER_SIZE); - if (!parent_.protocol_constraints_.checkOutboundFrameLimits().ok()) { + if (!parent_.protocol_constraints_.checkOutboundFrameLimits(Nghttp2Session(*parent_.session_)) + .ok()) { ENVOY_CONN_LOG(debug, "error sending data frame: Too many frames in the outbound queue", parent_.connection_); setDetails(Http2ResponseCodeDetails::get().outbound_frame_flood); } parent_.stats_.pending_send_bytes_.sub(length); - output.move(pending_send_data_, length); + output.move(*pending_send_data_, length); parent_.connection_.write(output, false); + return 0; } void ConnectionImpl::ClientStreamImpl::submitHeaders(const std::vector& final_headers, @@ -488,7 +522,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e local_end_stream_ = end_stream; parent_.stats_.pending_send_bytes_.add(data.length()); - pending_send_data_.move(data); + pending_send_data_->move(data); if (data_deferred_) { int rc = nghttp2_session_resume_data(parent_.session_, stream_id_); ASSERT(rc == 0); @@ -500,7 +534,7 @@ void ConnectionImpl::StreamImpl::encodeDataHelper(Buffer::Instance& data, bool e // Intended to check through coverage that this error case is tested return; } - if (local_end_stream_ && pending_send_data_.length() > 0) { + if (local_end_stream_ && pending_send_data_->length() > 0) { createPendingFlushTimer(); } } @@ -568,7 +602,11 @@ ConnectionImpl::ConnectionImpl(Network::Connection& connection, CodecStats& stat protocol_constraints_(stats, http2_options), skip_encoding_empty_trailers_(Runtime::runtimeFeatureEnabled( "envoy.reloadable_features.http2_skip_encoding_empty_trailers")), + h2_watermark_improvements_(Runtime::runtimeFeatureEnabled( + "envoy.reloadable_features.enable_h2_watermark_improvements")), dispatching_(false), raised_goaway_(false), pending_deferred_reset_(false), + send_pending_frames_cb_(connection_.dispatcher().createSchedulableCallback( + [this]() -> void { sendPendingFramesAndHandleError(); })), random_(random_generator) { if (http2_options.has_connection_keepalive()) { keepalive_interval_ = std::chrono::milliseconds( @@ -681,9 +719,11 @@ ConnectionImpl::StreamImpl* ConnectionImpl::getStream(int32_t stream_id) { int ConnectionImpl::onData(int32_t stream_id, const uint8_t* data, size_t len) { StreamImpl* stream = getStream(stream_id); + // onData callback only triggers if the stream is still alive when the data arrives. + ASSERT(stream != nullptr); // If this results in buffering too much data, the watermark buffer will call // pendingRecvBufferHighWatermark, resulting in ++read_disable_count_ - stream->pending_recv_data_.add(data, len); + stream->pending_recv_data_->add(data, len); // Update the window to the peer unless some consumer of this stream's data has hit a flow control // limit and disabled reads on this stream if (!stream->buffersOverrun()) { @@ -838,10 +878,10 @@ Status ConnectionImpl::onFrameReceived(const nghttp2_frame* frame) { // It's possible that we are waiting to send a deferred reset, so only raise data if local // is not complete. if (!stream->deferred_reset_) { - stream->decoder().decodeData(stream->pending_recv_data_, stream->remote_end_stream_); + stream->decoder().decodeData(*stream->pending_recv_data_, stream->remote_end_stream_); } - stream->pending_recv_data_.drain(stream->pending_recv_data_.length()); + stream->pending_recv_data_->drain(stream->pending_recv_data_->length()); break; } case NGHTTP2_RST_STREAM: { @@ -887,7 +927,14 @@ int ConnectionImpl::onFrameSend(const nghttp2_frame* frame) { case NGHTTP2_HEADERS: case NGHTTP2_DATA: { StreamImpl* stream = getStream(frame->hd.stream_id); - stream->local_end_stream_sent_ = frame->hd.flags & NGHTTP2_FLAG_END_STREAM; + // Check if stream is still active before recording flag state. onFrameSend callbacks runs once + // the frame is serialized to the output buffer; queued frames can be sent after the associated + // stream has been terminated. + // TODO(antoniovicente) Is this delay recording local_end_stream_sent_ when writes + // NGHTTP2_ERR_WOULDBLOCK acceptable? + if (stream != nullptr) { + stream->local_end_stream_sent_ = frame->hd.flags & NGHTTP2_FLAG_END_STREAM; + } break; } } @@ -953,6 +1000,12 @@ void ConnectionImpl::addOutboundFrameFragment(Buffer::OwnedImpl& output, const u ssize_t ConnectionImpl::onSend(const uint8_t* data, size_t length) { ENVOY_CONN_LOG(trace, "send data: bytes={}", connection_, length); + if (h2_watermark_improvements_ && connection_.aboveHighWatermark()) { + // The network connection's output buffer is full. Return NGHTTP2_ERR_WOULDBLOCK so nghttp2 + // queues the control frame until the buffer's size drops to its low-watermark. + return NGHTTP2_ERR_WOULDBLOCK; + } + Buffer::OwnedImpl buffer; addOutboundFrameFragment(buffer, data, length); @@ -1119,7 +1172,7 @@ Status ConnectionImpl::sendPendingFrames() { // After all pending frames have been written into the outbound buffer check if any of // protocol constraints had been violated. - Status status = protocol_constraints_.checkOutboundFrameLimits(); + Status status = protocol_constraints_.checkOutboundFrameLimits(Nghttp2Session(*session_)); if (!status.ok()) { ENVOY_CONN_LOG(debug, "error sending frames: Too many frames in the outbound queue.", connection_); @@ -1232,8 +1285,7 @@ ConnectionImpl::Http2Callbacks::Http2Callbacks() { [](nghttp2_session*, nghttp2_frame* frame, const uint8_t* framehd, size_t length, nghttp2_data_source* source, void*) -> int { ASSERT(frame->data.padlen == 0); - static_cast(source->ptr)->onDataSourceSend(framehd, length); - return 0; + return static_cast(source->ptr)->onDataSourceSend(framehd, length); }); nghttp2_session_callbacks_set_on_begin_headers_callback( @@ -1407,7 +1459,7 @@ RequestEncoder& ClientConnectionImpl::newStream(ResponseDecoder& decoder) { // If the connection is currently above the high watermark, make sure to inform the new stream. // The connection can not pass this on automatically as it has no awareness that a new stream is // created. - if (connection_.aboveHighWatermark()) { + if (!h2_watermark_improvements_ && connection_.aboveHighWatermark()) { stream->runHighWatermarkCallbacks(); } ClientStreamImpl& stream_ref = *stream; @@ -1424,6 +1476,8 @@ Status ClientConnectionImpl::onBeginHeaders(const nghttp2_frame* frame) { RETURN_IF_ERROR(trackInboundFrames(&frame->hd, frame->headers.padlen)); if (frame->headers.cat == NGHTTP2_HCAT_HEADERS) { StreamImpl* stream = getStream(frame->hd.stream_id); + // Header frames are discarded if the stream is no longer around when the frame arrives. + ASSERT(stream != nullptr); stream->allocTrailers(); } @@ -1500,12 +1554,14 @@ Status ServerConnectionImpl::onBeginHeaders(const nghttp2_frame* frame) { ASSERT(frame->headers.cat == NGHTTP2_HCAT_HEADERS); StreamImpl* stream = getStream(frame->hd.stream_id); + // Header frames are discarded if the stream is no longer around when the frame arrives. + ASSERT(stream != nullptr); stream->allocTrailers(); return okStatus(); } ServerStreamImplPtr stream(new ServerStreamImpl(*this, per_stream_buffer_limit_)); - if (connection_.aboveHighWatermark()) { + if (!h2_watermark_improvements_ && connection_.aboveHighWatermark()) { stream->runHighWatermarkCallbacks(); } stream->request_decoder_ = &callbacks_.newStream(*stream); @@ -1560,7 +1616,7 @@ Http::Status ServerConnectionImpl::dispatch(Buffer::Instance& data) { Http::Status ServerConnectionImpl::innerDispatch(Buffer::Instance& data) { // Make sure downstream outbound queue was not flooded by the upstream frames. - RETURN_IF_ERROR(protocol_constraints_.checkOutboundFrameLimits()); + RETURN_IF_ERROR(protocol_constraints_.checkOutboundFrameLimits(Nghttp2Session(*session_))); return ConnectionImpl::innerDispatch(data); } diff --git a/source/common/http/http2/codec_impl.h b/source/common/http/http2/codec_impl.h index 3e51d57eba5a..17ddd4ebb188 100644 --- a/source/common/http/http2/codec_impl.h +++ b/source/common/http/http2/codec_impl.h @@ -109,13 +109,21 @@ class ConnectionImpl : public virtual Connection, protected Logger::LoggablerunHighWatermarkCallbacks(); + if (h2_watermark_improvements_) { + send_pending_frames_cb_->cancel(); + } else { + for (auto& stream : active_streams_) { + stream->runHighWatermarkCallbacks(); + } } } void onUnderlyingConnectionBelowWriteBufferLowWatermark() override { - for (auto& stream : active_streams_) { - stream->runLowWatermarkCallbacks(); + if (h2_watermark_improvements_) { + send_pending_frames_cb_->scheduleCallbackNextIteration(); + } else { + for (auto& stream : active_streams_) { + stream->runLowWatermarkCallbacks(); + } } } @@ -190,7 +198,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable& final_headers, const HeaderMap& headers); void saveHeader(HeaderString&& name, HeaderString&& value); @@ -218,7 +226,7 @@ class ConnectionImpl : public virtual Connection, protected Logger::LoggablehighWatermark(); } const Network::Address::InstanceConstSharedPtr& connectionLocalAddress() override { return parent_.connection_.addressProvider().localAddress(); } @@ -243,9 +251,9 @@ class ConnectionImpl : public virtual Connection, protected Logger::LoggablesetWatermarks(high_watermark); + pending_send_data_->setWatermarks(high_watermark); } // If the receive buffer encounters watermark callbacks, enable/disable reads on this stream. @@ -284,14 +292,8 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable void { this->pendingRecvBufferLowWatermark(); }, - [this]() -> void { this->pendingRecvBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }}; - Buffer::WatermarkBuffer pending_send_data_{ - [this]() -> void { this->pendingSendBufferLowWatermark(); }, - [this]() -> void { this->pendingSendBufferHighWatermark(); }, - []() -> void { /* TODO(adisuissa): Handle overflow watermark */ }}; + Buffer::InstancePtr pending_recv_data_; + Buffer::InstancePtr pending_send_data_; HeaderMapPtr pending_trailers_to_encode_; std::unique_ptr metadata_decoder_; std::unique_ptr metadata_encoder_; @@ -503,7 +505,10 @@ class ConnectionImpl : public virtual Connection, protected Logger::Loggable max_outbound_frames_) { + size_t nghttp2_outbound_queue = session.getOutboundControlFrameQueueSize(); + if ((outbound_frames_ + nghttp2_outbound_queue) > max_outbound_frames_) { stats_.outbound_flood_.inc(); return status_ = bufferFloodError("Too many frames in the outbound queue."); } - if (outbound_control_frames_ > max_outbound_control_frames_) { + if ((outbound_control_frames_ + nghttp2_outbound_queue) > max_outbound_control_frames_) { stats_.outbound_control_flood_.inc(); return status_ = bufferFloodError("Too many control frames in the outbound queue."); } diff --git a/source/common/http/http2/protocol_constraints.h b/source/common/http/http2/protocol_constraints.h index d9219830a5e2..6279e7549bed 100644 --- a/source/common/http/http2/protocol_constraints.h +++ b/source/common/http/http2/protocol_constraints.h @@ -15,6 +15,20 @@ namespace Envoy { namespace Http { namespace Http2 { +/** + * Provides limited access to the nghttp2_session and allows mocking in tests. + */ +class Nghttp2SessionInterface { +public: + virtual ~Nghttp2SessionInterface() = default; + + /** + * Returns the size of nghttp2 session's outbound control frame queue. See + * nghttp2_session_get_outbound_queue_size. + */ + virtual size_t getOutboundControlFrameQueueSize() const PURE; +}; + // Class for detecting abusive peers and validating additional constraints imposed by Envoy. // This class does not check protocol compliance with the H/2 standard, as this is checked by // protocol framer/codec. Currently implemented constraints: @@ -51,7 +65,7 @@ class ProtocolConstraints { // Increment the number of DATA frames sent to the peer. void incrementOutboundDataFrameCount() { ++outbound_data_frames_; } - Status checkOutboundFrameLimits(); + Status checkOutboundFrameLimits(const Nghttp2SessionInterface& session); private: void releaseOutboundFrame(); diff --git a/source/common/runtime/runtime_features.cc b/source/common/runtime/runtime_features.cc index cf5187240fe3..052611e5151a 100644 --- a/source/common/runtime/runtime_features.cc +++ b/source/common/runtime/runtime_features.cc @@ -102,6 +102,8 @@ constexpr const char* runtime_features[] = { constexpr const char* disabled_runtime_features[] = { // v2 is fatal-by-default. "envoy.reloadable_features.enable_deprecated_v2_api", + // Improvements to HTTP2 watermark handling and buffering. + "envoy.reloadable_features.enable_h2_watermark_improvements", // Allow Envoy to upgrade or downgrade version of type url, should be removed when support for // v2 url is removed from codebase. "envoy.reloadable_features.enable_type_url_downgrade_and_upgrade", diff --git a/test/common/http/codec_impl_fuzz_test.cc b/test/common/http/codec_impl_fuzz_test.cc index d0fbdca11a4f..c71713355a9f 100644 --- a/test/common/http/codec_impl_fuzz_test.cc +++ b/test/common/http/codec_impl_fuzz_test.cc @@ -487,6 +487,7 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi const bool http2 = http_version == HttpVersion::Http2; if (http2) { + new Event::MockSchedulableCallback(&client_connection.dispatcher_); client = std::make_unique( client_connection, client_callbacks, Http2::CodecStats::atomicGet(http2_stats, stats_store), random, client_http2_options, max_request_headers_kb, max_response_headers_count, @@ -498,6 +499,7 @@ void codecFuzz(const test::common::http::CodecImplFuzzTestCase& input, HttpVersi } if (http2) { + new Event::MockSchedulableCallback(&server_connection.dispatcher_); const envoy::config::core::v3::Http2ProtocolOptions server_http2_options{ fromHttp2Settings(input.h2_settings().server())}; server = std::make_unique( diff --git a/test/common/http/http2/codec_impl_test.cc b/test/common/http/http2/codec_impl_test.cc index 3daec9a7d43f..e7e2e006e3b7 100644 --- a/test/common/http/http2/codec_impl_test.cc +++ b/test/common/http/http2/codec_impl_test.cc @@ -128,9 +128,11 @@ class Http2CodecImplTestFixture { virtual void initialize() { http2OptionsFromTuple(client_http2_options_, client_settings_); http2OptionsFromTuple(server_http2_options_, server_settings_); + new Event::MockSchedulableCallback(&client_connection_.dispatcher_); client_ = std::make_unique( client_connection_, client_callbacks_, client_stats_store_, client_http2_options_, random_, max_request_headers_kb_, max_response_headers_count_, ProdNghttp2SessionFactory::get()); + new Event::MockSchedulableCallback(&server_connection_.dispatcher_); server_ = std::make_unique( server_connection_, server_callbacks_, server_stats_store_, server_http2_options_, random_, max_request_headers_kb_, max_request_headers_count_, headers_with_underscores_action_); @@ -1090,7 +1092,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { // Now that the flow control window is full, further data causes the send buffer to back up. Buffer::OwnedImpl more_long_data(std::string(initial_stream_window, 'a')); request_encoder_->encodeData(more_long_data, false); - EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(initial_stream_window, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); EXPECT_EQ(initial_stream_window, server_->getStream(1)->unconsumed_bytes_); @@ -1099,7 +1101,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks, onAboveWriteBufferHighWatermark()); Buffer::OwnedImpl last_byte("!"); request_encoder_->encodeData(last_byte, false); - EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(initial_stream_window + 1, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(initial_stream_window + 1, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); @@ -1144,7 +1146,7 @@ TEST_P(Http2CodecImplFlowControlTest, TestFlowControlInPendingSendData) { EXPECT_CALL(callbacks2, onBelowWriteBufferLowWatermark()).Times(0); EXPECT_CALL(callbacks3, onBelowWriteBufferLowWatermark()); server_->getStream(1)->readDisable(false); - EXPECT_EQ(0, client_->getStream(1)->pending_send_data_.length()); + EXPECT_EQ(0, client_->getStream(1)->pending_send_data_->length()); EXPECT_EQ(0, TestUtility::findGauge(client_stats_store_, "http2.pending_send_bytes")->value()); // The extra 1 byte sent won't trigger another window update, so the final window should be the // initial window minus the last 1 byte flush from the client to server. @@ -1231,7 +1233,7 @@ TEST_P(Http2CodecImplFlowControlTest, FlowControlPendingRecvData) { // the recv buffer can be overrun by a client which negotiates a larger // SETTINGS_MAX_FRAME_SIZE but there's no current easy way to tweak that in // envoy (without sending raw HTTP/2 frames) so we lower the buffer limit instead. - server_->getStream(1)->setWriteBufferWatermarks(10, 20); + server_->getStream(1)->setWriteBufferWatermarks(20); EXPECT_CALL(request_decoder_, decodeData(_, false)); Buffer::OwnedImpl data(std::string(40, 'a')); @@ -1548,9 +1550,11 @@ class Http2CodecImplStreamLimitTest : public Http2CodecImplTest {}; TEST_P(Http2CodecImplStreamLimitTest, MaxClientStreams) { http2OptionsFromTuple(client_http2_options_, ::testing::get<0>(GetParam())); http2OptionsFromTuple(server_http2_options_, ::testing::get<1>(GetParam())); + new Event::MockSchedulableCallback(&client_connection_.dispatcher_); client_ = std::make_unique( client_connection_, client_callbacks_, client_stats_store_, client_http2_options_, random_, max_request_headers_kb_, max_response_headers_count_, ProdNghttp2SessionFactory::get()); + new Event::MockSchedulableCallback(&server_connection_.dispatcher_); server_ = std::make_unique( server_connection_, server_callbacks_, server_stats_store_, server_http2_options_, random_, max_request_headers_kb_, max_request_headers_count_, headers_with_underscores_action_); @@ -2730,9 +2734,11 @@ class Http2CodecMetadataTest : public Http2CodecImplTestFixture, public ::testin allow_metadata_ = true; http2OptionsFromTuple(client_http2_options_, client_settings_); http2OptionsFromTuple(server_http2_options_, server_settings_); + new Event::MockSchedulableCallback(&client_connection_.dispatcher_); client_ = std::make_unique( client_connection_, client_callbacks_, client_stats_store_, client_http2_options_, random_, max_request_headers_kb_, max_response_headers_count_, http2_session_factory_); + new Event::MockSchedulableCallback(&server_connection_.dispatcher_); server_ = std::make_unique( server_connection_, server_callbacks_, server_stats_store_, server_http2_options_, random_, max_request_headers_kb_, max_request_headers_count_, headers_with_underscores_action_); diff --git a/test/common/http/http2/frame_replay_test.cc b/test/common/http/http2/frame_replay_test.cc index 91a2ba4894e1..19488c1e1b46 100644 --- a/test/common/http/http2/frame_replay_test.cc +++ b/test/common/http/http2/frame_replay_test.cc @@ -57,6 +57,7 @@ TEST_F(RequestFrameCommentTest, SimpleExampleHuffman) { // Validate HEADERS decode. ServerCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.server_connection_.dispatcher_); TestServerConnectionImpl connection( codec.server_connection_, codec.server_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -90,6 +91,7 @@ TEST_F(ResponseFrameCommentTest, SimpleExampleHuffman) { // Validate HEADERS decode. ClientCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.client_connection_.dispatcher_); TestClientConnectionImpl connection( codec.client_connection_, codec.client_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -135,6 +137,7 @@ TEST_F(RequestFrameCommentTest, SimpleExamplePlain) { // Validate HEADERS decode. ServerCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.server_connection_.dispatcher_); TestServerConnectionImpl connection( codec.server_connection_, codec.server_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -170,6 +173,7 @@ TEST_F(ResponseFrameCommentTest, SimpleExamplePlain) { // Validate HEADERS decode. ClientCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.client_connection_.dispatcher_); TestClientConnectionImpl connection( codec.client_connection_, codec.client_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -200,6 +204,7 @@ TEST_F(RequestFrameCommentTest, SingleByteNulCrLfInHeaderFrame) { header.frame()[offset] = c; // Play the frames back. ServerCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.server_connection_.dispatcher_); TestServerConnectionImpl connection( codec.server_connection_, codec.server_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -233,6 +238,7 @@ TEST_F(ResponseFrameCommentTest, SingleByteNulCrLfInHeaderFrame) { header.frame()[offset] = c; // Play the frames back. ClientCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.client_connection_.dispatcher_); TestClientConnectionImpl connection( codec.client_connection_, codec.client_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -268,6 +274,7 @@ TEST_F(RequestFrameCommentTest, SingleByteNulCrLfInHeaderField) { header.frame()[offset] = c; // Play the frames back. ServerCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.server_connection_.dispatcher_); TestServerConnectionImpl connection( codec.server_connection_, codec.server_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, @@ -306,6 +313,7 @@ TEST_F(ResponseFrameCommentTest, SingleByteNulCrLfInHeaderField) { header.frame()[offset] = c; // Play the frames back. ClientCodecFrameInjector codec; + new Event::MockSchedulableCallback(&codec.client_connection_.dispatcher_); TestClientConnectionImpl connection( codec.client_connection_, codec.client_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, diff --git a/test/common/http/http2/http2_frame.cc b/test/common/http/http2/http2_frame.cc index 6717ec49b3b8..f1a15a2626de 100644 --- a/test/common/http/http2/http2_frame.cc +++ b/test/common/http/http2/http2_frame.cc @@ -40,6 +40,15 @@ uint32_t Http2Frame::payloadSize() const { return (uint32_t(data_[0]) << 16) + (uint32_t(data_[1]) << 8) + uint32_t(data_[2]); } +uint32_t Http2Frame::streamId() const { + ASSERT(type() == Type::Headers || type() == Type::Data); + uint32_t stream_id; + memcpy(&stream_id, &data_[5], sizeof(stream_id)); + return ntohl(stream_id); +} + +uint8_t Http2Frame::flags() const { return data_[4]; } + Http2Frame::ResponseStatus Http2Frame::responseStatus() const { if (empty() || Type::Headers != type() || size() <= HeaderSize || ((data_[HeaderSize] & 0x80) == 0)) { diff --git a/test/common/http/http2/http2_frame.h b/test/common/http/http2/http2_frame.h index 53465a6f9248..325abc8b2a47 100644 --- a/test/common/http/http2/http2_frame.h +++ b/test/common/http/http2/http2_frame.h @@ -179,6 +179,8 @@ class Http2Frame { static Http2Frame makeGenericFrameFromHexDump(absl::string_view contents); Type type() const { return static_cast(data_[3]); } + uint32_t streamId() const; + uint8_t flags() const; ResponseStatus responseStatus() const; // Copy HTTP2 header. The `header` parameter must at least be HeaderSize long. diff --git a/test/common/http/http2/protocol_constraints_test.cc b/test/common/http/http2/protocol_constraints_test.cc index e3084146650a..42816cd024b3 100644 --- a/test/common/http/http2/protocol_constraints_test.cc +++ b/test/common/http/http2/protocol_constraints_test.cc @@ -5,16 +5,24 @@ #include "gmock/gmock.h" #include "gtest/gtest.h" +using testing::Return; + namespace Envoy { namespace Http { namespace Http2 { +class MockNghttp2Session : public Nghttp2SessionInterface { +public: + MOCK_METHOD(size_t, getOutboundControlFrameQueueSize, (), (const)); +}; + class ProtocolConstraintsTest : public ::testing::Test { protected: Http::Http2::CodecStats& http2CodecStats() { return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, stats_store_); } + MockNghttp2Session session_; Stats::TestUtil::TestStore stats_store_; Http::Http2::CodecStats::AtomicPtr http2_codec_stats_; envoy::config::core::v3::Http2ProtocolOptions options_; @@ -31,9 +39,28 @@ TEST_F(ProtocolConstraintsTest, OutboundControlFrameFlood) { ProtocolConstraints constraints(http2CodecStats(), options_); constraints.incrementOutboundFrameCount(true); constraints.incrementOutboundFrameCount(true); - EXPECT_TRUE(constraints.checkOutboundFrameLimits().ok()); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(0)); + EXPECT_TRUE(constraints.checkOutboundFrameLimits(session_).ok()); constraints.incrementOutboundFrameCount(true); - EXPECT_FALSE(constraints.checkOutboundFrameLimits().ok()); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(0)); + EXPECT_FALSE(constraints.checkOutboundFrameLimits(session_).ok()); + EXPECT_TRUE(isBufferFloodError(constraints.status())); + EXPECT_EQ("Too many control frames in the outbound queue.", constraints.status().message()); + EXPECT_EQ(1, stats_store_.counter("http2.outbound_control_flood").value()); +} + +TEST_F(ProtocolConstraintsTest, QueuedOutboundControlFrameFlood) { + options_.mutable_max_outbound_frames()->set_value(20); + options_.mutable_max_outbound_control_frames()->set_value(4); + ProtocolConstraints constraints(http2CodecStats(), options_); + constraints.incrementOutboundFrameCount(true); + constraints.incrementOutboundFrameCount(true); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(2)); + EXPECT_TRUE(constraints.checkOutboundFrameLimits(session_).ok()); + // The two control frames in the output buffer plus 3 queued control frames at the nghttp2 level + // push the past the control frame limit. + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(3)); + EXPECT_FALSE(constraints.checkOutboundFrameLimits(session_).ok()); EXPECT_TRUE(isBufferFloodError(constraints.status())); EXPECT_EQ("Too many control frames in the outbound queue.", constraints.status().message()); EXPECT_EQ(1, stats_store_.counter("http2.outbound_control_flood").value()); @@ -46,11 +73,36 @@ TEST_F(ProtocolConstraintsTest, OutboundFrameFlood) { constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); - EXPECT_TRUE(constraints.checkOutboundFrameLimits().ok()); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(0)); + EXPECT_TRUE(constraints.checkOutboundFrameLimits(session_).ok()); + constraints.incrementOutboundFrameCount(false); + constraints.incrementOutboundFrameCount(false); + constraints.incrementOutboundFrameCount(false); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(0)); + EXPECT_FALSE(constraints.checkOutboundFrameLimits(session_).ok()); + EXPECT_TRUE(isBufferFloodError(constraints.status())); + EXPECT_EQ("Too many frames in the outbound queue.", constraints.status().message()); + EXPECT_EQ(1, stats_store_.counter("http2.outbound_flood").value()); +} + +TEST_F(ProtocolConstraintsTest, QueuedOutboundFrameFlood) { + options_.mutable_max_outbound_frames()->set_value(10); + options_.mutable_max_outbound_control_frames()->set_value(5); + ProtocolConstraints constraints(http2CodecStats(), options_); + constraints.incrementOutboundFrameCount(false); + constraints.incrementOutboundFrameCount(false); + constraints.incrementOutboundFrameCount(false); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(4)); + EXPECT_TRUE(constraints.checkOutboundFrameLimits(session_).ok()); + constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); - EXPECT_FALSE(constraints.checkOutboundFrameLimits().ok()); + // The 4 control frames queued at the nghttp2 level count against the total outbound frame limit, + // so those combined with the 7 frames in the output buffer push the connection past the outbound + // frame limit. + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(4)); + EXPECT_FALSE(constraints.checkOutboundFrameLimits(session_).ok()); EXPECT_TRUE(isBufferFloodError(constraints.status())); EXPECT_EQ("Too many frames in the outbound queue.", constraints.status().message()); EXPECT_EQ(1, stats_store_.counter("http2.outbound_flood").value()); @@ -66,13 +118,15 @@ TEST_F(ProtocolConstraintsTest, OutboundFrameFloodStatusIsIdempotent) { constraints.incrementOutboundFrameCount(true); constraints.incrementOutboundFrameCount(true); constraints.incrementOutboundFrameCount(true); - EXPECT_TRUE(isBufferFloodError(constraints.checkOutboundFrameLimits())); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).WillOnce(Return(0)); + EXPECT_TRUE(isBufferFloodError(constraints.checkOutboundFrameLimits(session_))); EXPECT_EQ("Too many control frames in the outbound queue.", constraints.status().message()); // Then trigger flood check for all frame types constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); constraints.incrementOutboundFrameCount(false); - EXPECT_FALSE(constraints.checkOutboundFrameLimits().ok()); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).Times(0); + EXPECT_FALSE(constraints.checkOutboundFrameLimits(session_).ok()); EXPECT_TRUE(isBufferFloodError(constraints.status())); // The status should still reflect the first violation EXPECT_EQ("Too many control frames in the outbound queue.", constraints.status().message()); @@ -114,7 +168,8 @@ TEST_F(ProtocolConstraintsTest, OutboundAndInboundFrameFloodStatusIsIdempotent) constraints.incrementOutboundFrameCount(true); constraints.incrementOutboundFrameCount(true); constraints.incrementOutboundFrameCount(true); - EXPECT_TRUE(isInboundFramesWithEmptyPayloadError(constraints.checkOutboundFrameLimits())); + EXPECT_CALL(session_, getOutboundControlFrameQueueSize()).Times(0); + EXPECT_TRUE(isInboundFramesWithEmptyPayloadError(constraints.checkOutboundFrameLimits(session_))); EXPECT_EQ(1, stats_store_.counter("http2.inbound_empty_frames_flood").value()); EXPECT_EQ(0, stats_store_.counter("http2.outbound_control_flood").value()); } diff --git a/test/common/http/http2/request_header_fuzz_test.cc b/test/common/http/http2/request_header_fuzz_test.cc index 90b8cdd758e3..74fbc46d6254 100644 --- a/test/common/http/http2/request_header_fuzz_test.cc +++ b/test/common/http/http2/request_header_fuzz_test.cc @@ -15,6 +15,7 @@ namespace { void replay(const Frame& frame, ServerCodecFrameInjector& codec) { // Create the server connection containing the nghttp2 session. + new Event::MockSchedulableCallback(&codec.server_connection_.dispatcher_); TestServerConnectionImpl connection( codec.server_connection_, codec.server_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, diff --git a/test/common/http/http2/response_header_fuzz_test.cc b/test/common/http/http2/response_header_fuzz_test.cc index 93e1baff480b..5e75a704a23c 100644 --- a/test/common/http/http2/response_header_fuzz_test.cc +++ b/test/common/http/http2/response_header_fuzz_test.cc @@ -16,6 +16,7 @@ namespace { void replay(const Frame& frame, ClientCodecFrameInjector& codec) { // Create the client connection containing the nghttp2 session. + new Event::MockSchedulableCallback(&codec.client_connection_.dispatcher_); TestClientConnectionImpl connection( codec.client_connection_, codec.client_callbacks_, codec.stats_store_, codec.options_, codec.random_, Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, diff --git a/test/common/http/mixed_conn_pool_test.cc b/test/common/http/mixed_conn_pool_test.cc index e5fa9c30887b..d3d7b0914e58 100644 --- a/test/common/http/mixed_conn_pool_test.cc +++ b/test/common/http/mixed_conn_pool_test.cc @@ -68,6 +68,9 @@ void MixedConnPoolImplTest::testAlpnHandshake(absl::optional protocol) NiceMock callbacks_; auto* connection = new NiceMock(); + if (protocol.has_value() && protocol.value() == Protocol::Http2) { + new Event::MockSchedulableCallback(&connection->dispatcher_); + } EXPECT_CALL(dispatcher_, createClientConnection_(_, _, _, _)).WillOnce(Return(connection)); NiceMock decoder; conn_pool_->newStream(decoder, callbacks_); diff --git a/test/common/upstream/health_checker_impl_test.cc b/test/common/upstream/health_checker_impl_test.cc index 32b673f63c37..16bbd7d6632c 100644 --- a/test/common/upstream/health_checker_impl_test.cc +++ b/test/common/upstream/health_checker_impl_test.cc @@ -3133,6 +3133,7 @@ TEST_F(HttpHealthCheckerImplTest, DEPRECATED_FEATURE_TEST(ServiceNameMismatch)) TEST_F(ProdHttpHealthCheckerTest, ProdHttpHealthCheckerH2HealthChecking) { setupNoServiceValidationHCWithHttp2(); + new Event::MockSchedulableCallback(&connection_->dispatcher_); EXPECT_EQ(Http::CodecClient::Type::HTTP2, health_checker_->createCodecClientForTest(std::move(connection_))->type()); } diff --git a/test/extensions/filters/network/common/fuzz/uber_per_readfilter.cc b/test/extensions/filters/network/common/fuzz/uber_per_readfilter.cc index f6976aedf6b9..e251690175ef 100644 --- a/test/extensions/filters/network/common/fuzz/uber_per_readfilter.cc +++ b/test/extensions/filters/network/common/fuzz/uber_per_readfilter.cc @@ -54,7 +54,8 @@ std::vector UberFilterFuzzer::filterNames() { return filter_names; } -void UberFilterFuzzer::perFilterSetup(const std::string& filter_name) { +void UberFilterFuzzer::perFilterSetup(const std::string& filter_name, + const Protobuf::Message& message) { // Set up response for ext_authz filter if (filter_name == NetworkFilterNames::get().ExtAuthorization) { @@ -94,6 +95,13 @@ void UberFilterFuzzer::perFilterSetup(const std::string& filter_name) { pipe_addr_); read_filter_callbacks_->connection_.stream_info_.downstream_address_provider_->setRemoteAddress( pipe_addr_); + + auto config = dynamic_cast(message); + if (config.codec_type() == envoy::extensions::filters::network::http_connection_manager::v3:: + HttpConnectionManager::HTTP2) { + new Event::MockSchedulableCallback(&read_filter_callbacks_->connection_.dispatcher_); + } } else if (filter_name == NetworkFilterNames::get().RateLimit) { async_client_factory_ = std::make_unique(); async_client_ = std::make_unique(); diff --git a/test/extensions/filters/network/common/fuzz/uber_readfilter.cc b/test/extensions/filters/network/common/fuzz/uber_readfilter.cc index 4d126b8d1cdc..8f7e6dd87c75 100644 --- a/test/extensions/filters/network/common/fuzz/uber_readfilter.cc +++ b/test/extensions/filters/network/common/fuzz/uber_readfilter.cc @@ -89,6 +89,7 @@ UberFilterFuzzer::UberFilterFuzzer() : time_source_(factory_context_.simulatedTi void UberFilterFuzzer::fuzz( const envoy::config::listener::v3::Filter& proto_config, const Protobuf::RepeatedPtrField<::test::extensions::filters::network::Action>& actions) { + ProtobufTypes::MessagePtr message; try { // Try to create the filter callback(cb_). Exit early if the config is invalid or violates PGV // constraints. @@ -96,7 +97,7 @@ void UberFilterFuzzer::fuzz( ENVOY_LOG_MISC(info, "filter name {}", filter_name); auto& factory = Config::Utility::getAndCheckFactoryByName< Server::Configuration::NamedNetworkFilterConfigFactory>(filter_name); - ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig( + message = Config::Utility::translateToFactoryConfig( proto_config, factory_context_.messageValidationVisitor(), factory); // Make sure no invalid system calls are executed in fuzzer. checkInvalidInputForFuzzer(filter_name, message.get()); @@ -106,7 +107,7 @@ void UberFilterFuzzer::fuzz( ENVOY_LOG_MISC(debug, "Controlled exception in filter setup {}", e.what()); return; } - perFilterSetup(proto_config.name()); + perFilterSetup(proto_config.name(), *message); // Add filter to connection_. cb_(read_filter_callbacks_->connection_); for (const auto& action : actions) { diff --git a/test/extensions/filters/network/common/fuzz/uber_readfilter.h b/test/extensions/filters/network/common/fuzz/uber_readfilter.h index d055c5e4451a..bf5150822e74 100644 --- a/test/extensions/filters/network/common/fuzz/uber_readfilter.h +++ b/test/extensions/filters/network/common/fuzz/uber_readfilter.h @@ -29,7 +29,7 @@ class UberFilterFuzzer { // Reset the states of the mock objects. void reset(); // Mock behaviors for specific filters. - void perFilterSetup(const std::string& filter_name); + void perFilterSetup(const std::string& filter_name, const Protobuf::Message& message); private: Server::Configuration::FakeFactoryContext factory_context_; diff --git a/test/integration/BUILD b/test/integration/BUILD index e3e1549221ca..28fc8b304962 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -1273,6 +1273,7 @@ envoy_cc_test( deps = [ ":tracked_watermark_buffer_lib", "//test/test_common:test_runtime_lib", + "//test/test_common:thread_factory_for_test_lib", ], ) diff --git a/test/integration/filters/test_socket_interface.h b/test/integration/filters/test_socket_interface.h index e32f8fe4a773..a06cf531a6c8 100644 --- a/test/integration/filters/test_socket_interface.h +++ b/test/integration/filters/test_socket_interface.h @@ -27,12 +27,30 @@ class TestIoSocketHandle : public IoSocketHandleImpl { bool socket_v6only = false, absl::optional domain = absl::nullopt) : IoSocketHandleImpl(fd, socket_v6only, domain), writev_override_(writev_override_proc) {} + void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, + Event::FileTriggerType trigger, uint32_t events) override { + absl::MutexLock lock(&mutex_); + dispatcher_ = &dispatcher; + IoSocketHandleImpl::initializeFileEvent(dispatcher, cb, trigger, events); + } + + // Schedule resumption on the IoHandle by posting a callback to the IoHandle's dispatcher. Note + // that this operation is inherently racy, nothing guarantees that the TestIoSocketHandle is not + // deleted before the posted callback executes. + void activateInDispatcherThreadForTest(uint32_t events) { + absl::MutexLock lock(&mutex_); + RELEASE_ASSERT(dispatcher_ != nullptr, "null dispatcher"); + dispatcher_->post([this, events]() { activateFileEvents(events); }); + } + private: IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; IoHandlePtr duplicate() override; const WritevOverrideProc writev_override_; + absl::Mutex mutex_; + Event::Dispatcher* dispatcher_ ABSL_GUARDED_BY(mutex_) = nullptr; }; /** diff --git a/test/integration/http2_flood_integration_test.cc b/test/integration/http2_flood_integration_test.cc index 3863d39b291c..457222d2a4a2 100644 --- a/test/integration/http2_flood_integration_test.cc +++ b/test/integration/http2_flood_integration_test.cc @@ -10,6 +10,7 @@ #include "common/http/header_map_impl.h" #include "common/network/socket_option_impl.h" +#include "test/config/utility.h" #include "test/integration/autonomous_upstream.h" #include "test/integration/filters/test_socket_interface.h" #include "test/integration/http_integration.h" @@ -26,30 +27,34 @@ using ::testing::HasSubstr; namespace Envoy { -namespace { -const uint32_t ControlFrameFloodLimit = 100; -const uint32_t AllFrameFloodLimit = 1000; -} // namespace - class SocketInterfaceSwap { public: // Object of this class hold the state determining the IoHandle which // should return EAGAIN from the `writev` call. struct IoHandleMatcher { - bool shouldReturnEgain(uint32_t src_port, uint32_t dst_port) const { - absl::ReaderMutexLock lock(&mutex_); - return writev_returns_egain_ && (src_port == src_port_ || dst_port == dst_port_); + bool shouldReturnEgain(Envoy::Network::TestIoSocketHandle* io_handle) { + absl::MutexLock lock(&mutex_); + if (writev_returns_egain_ && (io_handle->localAddress()->ip()->port() == src_port_ || + io_handle->peerAddress()->ip()->port() == dst_port_)) { + ASSERT(matched_iohandle_ == nullptr || matched_iohandle_ == io_handle, + "Matched multiple io_handles, expected at most one to match."); + matched_iohandle_ = io_handle; + return true; + } + return false; } // Source port to match. The port specified should be associated with a listener. void setSourcePort(uint32_t port) { absl::WriterMutexLock lock(&mutex_); src_port_ = port; + dst_port_ = 0; } // Destination port to match. The port specified should be associated with a listener. void setDestinationPort(uint32_t port) { absl::WriterMutexLock lock(&mutex_); + src_port_ = 0; dst_port_ = port; } @@ -59,11 +64,23 @@ class SocketInterfaceSwap { writev_returns_egain_ = true; } + void setResumeWrites() { + absl::MutexLock lock(&mutex_); + mutex_.Await(absl::Condition( + +[](Network::TestIoSocketHandle** matched_iohandle) { + return *matched_iohandle != nullptr; + }, + &matched_iohandle_)); + writev_returns_egain_ = false; + matched_iohandle_->activateInDispatcherThreadForTest(Event::FileReadyType::Write); + } + private: mutable absl::Mutex mutex_; uint32_t src_port_ ABSL_GUARDED_BY(mutex_) = 0; uint32_t dst_port_ ABSL_GUARDED_BY(mutex_) = 0; bool writev_returns_egain_ ABSL_GUARDED_BY(mutex_) = false; + Network::TestIoSocketHandle* matched_iohandle_{}; }; SocketInterfaceSwap() { @@ -73,8 +90,7 @@ class SocketInterfaceSwap { [writev_matcher = writev_matcher_]( Envoy::Network::TestIoSocketHandle* io_handle, const Buffer::RawSlice*, uint64_t) -> absl::optional { - if (writev_matcher->shouldReturnEgain(io_handle->localAddress()->ip()->port(), - io_handle->peerAddress()->ip()->port())) { + if (writev_matcher->shouldReturnEgain(io_handle)) { return Api::IoCallUint64Result( 0, Api::IoErrorPtr(Network::IoSocketError::getIoSocketEagainInstance(), Network::IoSocketError::deleteIoError)); @@ -100,16 +116,24 @@ class SocketInterfaceSwap { // destructor stops Envoy the SocketInterfaceSwap destructor needs to run after it. This order of // multiple inheritance ensures that SocketInterfaceSwap destructor runs after // Http2FrameIntegrationTest destructor completes. -class Http2FloodMitigationTest : public SocketInterfaceSwap, - public testing::TestWithParam, - public Http2RawFrameIntegrationTest { +class Http2FloodMitigationTest + : public SocketInterfaceSwap, + public testing::TestWithParam>, + public Http2RawFrameIntegrationTest { public: - Http2FloodMitigationTest() : Http2RawFrameIntegrationTest(GetParam()) { + Http2FloodMitigationTest() : Http2RawFrameIntegrationTest(std::get<0>(GetParam())) { + config_helper_.addRuntimeOverride("envoy.reloadable_features.enable_h2_watermark_improvements", + http2FlowControlV2() ? "true" : "false"); + config_helper_.addConfigModifier( [](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& hcm) { hcm.mutable_delayed_close_timeout()->set_seconds(1); }); + + setServerBufferFactory(buffer_factory_); } + bool http2FlowControlV2() const { return std::get<1>(GetParam()); } + protected: bool initializeUpstreamFloodTest(); std::vector serializeFrames(const Http2Frame& frame, uint32_t num_frames); @@ -120,15 +144,29 @@ class Http2FloodMitigationTest : public SocketInterfaceSwap, void floodClient(const Http2Frame& frame, uint32_t num_frames, const std::string& flood_stat); void setNetworkConnectionBufferSize(); - void beginSession() override; + void initialize() override { + Http2RawFrameIntegrationTest::initialize(); + writev_matcher_->setSourcePort(lookupPort("http")); + } void prefillOutboundDownstreamQueue(uint32_t data_frame_count, uint32_t data_frame_size = 10); IntegrationStreamDecoderPtr prefillOutboundUpstreamQueue(uint32_t frame_count); void triggerListenerDrain(); + + std::shared_ptr buffer_factory_ = + std::make_shared(); }; -INSTANTIATE_TEST_SUITE_P(IpVersions, Http2FloodMitigationTest, - testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), - TestUtility::ipTestParamsToString); +static std::string http2FloodMitigationTestParamsToString( + const ::testing::TestParamInfo>& params) { + return absl::StrCat(std::get<0>(params.param) == Network::Address::IpVersion::v4 ? "IPv4" + : "IPv6", + "_", std::get<1>(params.param) ? "Http2FlowControlV2" : "Http2FlowControlV1"); +} + +INSTANTIATE_TEST_SUITE_P( + IpVersions, Http2FloodMitigationTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), + http2FloodMitigationTestParamsToString); bool Http2FloodMitigationTest::initializeUpstreamFloodTest() { config_helper_.addRuntimeOverride("envoy.reloadable_features.upstream_http2_flood_checks", @@ -159,23 +197,6 @@ void Http2FloodMitigationTest::setNetworkConnectionBufferSize() { }); } -void Http2FloodMitigationTest::beginSession() { - setDownstreamProtocol(Http::CodecClient::Type::HTTP2); - setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); - // set lower outbound frame limits to make tests run faster - config_helper_.setDownstreamOutboundFramesLimits(AllFrameFloodLimit, ControlFrameFloodLimit); - initialize(); - // Set up a raw connection to easily send requests without reading responses. Also, set a small - // TCP receive buffer to speed up connection backup. - auto options = std::make_shared(); - options->emplace_back(std::make_shared( - envoy::config::core::v3::SocketOption::STATE_PREBIND, - ENVOY_MAKE_SOCKET_OPTION_NAME(SOL_SOCKET, SO_RCVBUF), 1024)); - writev_matcher_->setSourcePort(lookupPort("http")); - tcp_client_ = makeTcpConnection(lookupPort("http"), options); - startHttp2Session(); -} - std::vector Http2FloodMitigationTest::serializeFrames(const Http2Frame& frame, uint32_t num_frames) { // make sure all frames can fit into 16k buffer @@ -198,7 +219,8 @@ void Http2FloodMitigationTest::floodServer(const Http2Frame& frame, const std::s tcp_client_->waitForDisconnect(); EXPECT_EQ(1, test_server_->counter(flood_stat)->value()); - test_server_->waitForCounterGe("http.config_test.downstream_cx_delayed_close_timeout", 1); + test_server_->waitForCounterGe("http.config_test.downstream_cx_delayed_close_timeout", 1, + TestUtility::DefaultTimeout); } // Send header only request, flood client, and verify that the upstream is disconnected and client @@ -364,9 +386,6 @@ TEST_P(Http2FloodMitigationTest, 404) { // Verify that the server can detect flood of response DATA frames TEST_P(Http2FloodMitigationTest, Data) { - auto buffer_factory = std::make_shared(); - setServerBufferFactory(buffer_factory); - // Set large buffer limits so the test is not affected by the flow control. config_helper_.setBufferLimits(1024 * 1024 * 1024, 1024 * 1024 * 1024); autonomous_upstream_ = true; @@ -400,22 +419,22 @@ TEST_P(Http2FloodMitigationTest, Data) { // The factory will be used to create 4 buffers: the input and output buffers for request and // response pipelines. - EXPECT_EQ(4, buffer_factory->numBuffersCreated()); + EXPECT_EQ(8, buffer_factory_->numBuffersCreated()); // Expect at least 1000 1 byte data frames in the output buffer. Each data frame comes with a // 9-byte frame header; 10 bytes per data frame, 10000 bytes total. The output buffer should also // contain response headers, which should be less than 100 bytes. - EXPECT_LE(10000, buffer_factory->maxBufferSize()); - EXPECT_GE(10100, buffer_factory->maxBufferSize()); + EXPECT_LE(10000, buffer_factory_->maxBufferSize()); + EXPECT_GE(10100, buffer_factory_->maxBufferSize()); // The response pipeline input buffer could end up with the full upstream response in 1 go, but // there are no guarantees of that being the case. - EXPECT_LE(10000, buffer_factory->sumMaxBufferSizes()); + EXPECT_LE(10000, buffer_factory_->sumMaxBufferSizes()); // The max size of the input and output buffers used in the response pipeline is around 10kb each. - EXPECT_GE(22000, buffer_factory->sumMaxBufferSizes()); + EXPECT_GE(22000, buffer_factory_->sumMaxBufferSizes()); // Verify that all buffers have watermarks set. - EXPECT_THAT(buffer_factory->highWatermarkRange(), - testing::Pair(1024 * 1024 * 1024, 1024 * 1024 * 1024)); + EXPECT_THAT(buffer_factory_->highWatermarkRange(), + testing::Pair(256 * 1024 * 1024, 1024 * 1024 * 1024)); } // Verify that the server can detect flood triggered by a DATA frame from a decoder filter call @@ -1545,6 +1564,10 @@ TEST_P(Http2FloodMitigationTest, RequestMetadata) { {"header_key2", "header_value2"}, }; for (uint32_t frame = 0; frame < AllFrameFloodLimit + 1; ++frame) { + if (response->reset()) { + // Stream was reset, codec_client_ no longer valid. + break; + } codec_client_->sendMetadata(*request_encoder_, metadata_map); } @@ -1557,4 +1580,582 @@ TEST_P(Http2FloodMitigationTest, RequestMetadata) { EXPECT_EQ(1, test_server_->counter("cluster.cluster_0.http2.outbound_flood")->value()); } +class Http2BufferWatermarksTest : public Http2FloodMitigationTest { +public: + struct BufferParams { + uint32_t connection_watermark; + uint32_t downstream_h2_stream_window; + uint32_t downstream_h2_conn_window; + uint32_t upstream_h2_stream_window; + uint32_t upstream_h2_conn_window; + }; + + void initializeWithBufferConfig( + const BufferParams& buffer_params, uint32_t num_responses, + bool enable_downstream_frame_limits, + FakeHttpConnection::Type upstream_protocol = FakeHttpConnection::Type::HTTP2) { + config_helper_.setBufferLimits(buffer_params.connection_watermark, + buffer_params.connection_watermark); + + config_helper_.addConfigModifier( + [&](envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager& + hcm) -> void { + auto* h2_options = hcm.mutable_http2_protocol_options(); + h2_options->mutable_max_concurrent_streams()->set_value(num_responses); + h2_options->mutable_initial_stream_window_size()->set_value( + buffer_params.downstream_h2_stream_window); + h2_options->mutable_initial_connection_window_size()->set_value( + buffer_params.downstream_h2_conn_window); + }); + + if (upstream_protocol == FakeHttpConnection::Type::HTTP2) { + config_helper_.addConfigModifier( + [&](envoy::config::bootstrap::v3::Bootstrap& bootstrap) -> void { + ConfigHelper::HttpProtocolOptions protocol_options; + auto* upstream_h2_options = + protocol_options.mutable_explicit_http_config()->mutable_http2_protocol_options(); + upstream_h2_options->mutable_max_concurrent_streams()->set_value(100); + upstream_h2_options->mutable_initial_stream_window_size()->set_value( + buffer_params.upstream_h2_stream_window); + upstream_h2_options->mutable_initial_connection_window_size()->set_value( + buffer_params.upstream_h2_conn_window); + for (auto& cluster_config : *bootstrap.mutable_static_resources()->mutable_clusters()) { + ConfigHelper::setProtocolOptions(cluster_config, protocol_options); + } + }); + } + + autonomous_upstream_ = true; + autonomous_allow_incomplete_streams_ = true; + if (enable_downstream_frame_limits) { + beginSession(upstream_protocol); + } else { + beginSession(upstream_protocol, std::numeric_limits::max(), + std::numeric_limits::max()); + } + } + + std::vector sendRequests(uint32_t num_responses, uint32_t chunks_per_response, + uint32_t chunk_size, + uint32_t min_buffered_bytes_per_stream = 0) { + uint32_t expected_response_size = chunks_per_response * chunk_size; + std::vector stream_ids; + + auto connection_window_update = Http2Frame::makeWindowUpdateFrame(0, 1024 * 1024 * 1024); + sendFrame(connection_window_update); + + for (uint32_t idx = 0; idx < num_responses; ++idx) { + const uint32_t request_stream_id = Http2Frame::makeClientStreamId(idx); + stream_ids.push_back(request_stream_id); + const auto request = Http2Frame::makeRequest( + request_stream_id, "host", "/test/long/url", + {Http2Frame::Header("response_data_blocks", absl::StrCat(chunks_per_response)), + Http2Frame::Header("response_size_bytes", absl::StrCat(chunk_size)), + Http2Frame::Header("no_trailers", "0")}); + EXPECT_EQ(request_stream_id, request.streamId()); + sendFrame(request); + + auto stream_window_update = + Http2Frame::makeWindowUpdateFrame(request_stream_id, expected_response_size); + sendFrame(stream_window_update); + + if (min_buffered_bytes_per_stream > 0) { + EXPECT_TRUE(buffer_factory_->waitUntilTotalBufferedExceeds( + (idx + 1) * min_buffered_bytes_per_stream, TestUtility::DefaultTimeout)) + << "idx: " << idx << " buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize(); + } + } + + return stream_ids; + } + + void runTestWithBufferConfig( + const BufferParams& buffer_params, uint32_t num_responses, uint32_t chunks_per_response, + uint32_t chunk_size, uint32_t min_buffered_bytes, uint32_t min_buffered_bytes_per_stream = 0, + FakeHttpConnection::Type upstream_protocol = FakeHttpConnection::Type::HTTP2) { + uint32_t expected_response_size = chunks_per_response * chunk_size; + + initializeWithBufferConfig(buffer_params, num_responses, false, upstream_protocol); + + // Simulate TCP push back on the Envoy's downstream network socket, so that outbound frames + // start to accumulate in the transport socket buffer. + writev_matcher_->setWritevReturnsEgain(); + + std::vector stream_ids = + sendRequests(num_responses, chunks_per_response, chunk_size, min_buffered_bytes_per_stream); + + ASSERT_TRUE(buffer_factory_->waitUntilTotalBufferedExceeds(min_buffered_bytes, + TestUtility::DefaultTimeout)) + << "buffer total: " << buffer_factory_->totalBufferSize() + << " buffer max: " << buffer_factory_->maxBufferSize(); + writev_matcher_->setResumeWrites(); + + std::map response_info; + parseResponse(&response_info, num_responses); + ASSERT_EQ(stream_ids.size(), response_info.size()); + for (uint32_t stream_id : stream_ids) { + EXPECT_EQ(expected_response_size, response_info[stream_id].response_size_) << stream_id; + } + + tcp_client_->close(); + } + + struct ResponseInfo { + uint32_t response_size_ = 0; + bool reset_stream_ = false; + bool end_stream_ = false; + }; + + void parseResponse(std::map* response_info, uint32_t expect_completed) { + parseResponseUntil(response_info, [&]() -> bool { + uint32_t completed = 0; + for (const auto& item : *response_info) { + if (item.second.end_stream_ || item.second.reset_stream_) { + ++completed; + } + } + return completed >= expect_completed; + }); + } + + void parseResponseUntil(std::map* response_info, + const std::function& stop_predicate) { + while (!stop_predicate()) { + auto frame = readFrame(); + switch (frame.type()) { + case Http2Frame::Type::Headers: { + uint32_t stream_id = frame.streamId(); + auto result = response_info->emplace(stream_id, ResponseInfo()); + ASSERT(result.second); + } break; + case Http2Frame::Type::Data: { + uint32_t stream_id = frame.streamId(); + auto it = response_info->find(stream_id); + ASSERT(it != response_info->end()); + it->second.response_size_ += frame.payloadSize(); + if (frame.flags() > 0) { + it->second.end_stream_ = true; + } + } break; + case Http2Frame::Type::RstStream: { + uint32_t stream_id = frame.streamId(); + ENVOY_LOG_MISC(critical, "rst {}", stream_id); + auto it = response_info->find(stream_id); + ASSERT(it != response_info->end()); + it->second.reset_stream_ = true; + } break; + default: + RELEASE_ASSERT(false, absl::StrCat("Unknown frame type: ", frame.type())); + } + } + } +}; + +INSTANTIATE_TEST_SUITE_P( + IpVersions, Http2BufferWatermarksTest, + testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), testing::Bool()), + http2FloodMitigationTestParamsToString); + +// Verify buffering behavior when the downstream and upstream H2 stream high watermarks are +// configured to the same value. +TEST_P(Http2BufferWatermarksTest, DataFlowControlSymmetricStreamConfig) { + int num_requests = 5; + uint32_t connection_watermark = 32768; + uint32_t h2_stream_window = 128 * 1024; + uint32_t h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint64_t kBlockSize = 1024; + uint64_t kNumBlocks = 5 * h2_stream_window / kBlockSize; + + runTestWithBufferConfig( + {connection_watermark, h2_stream_window, h2_conn_window, h2_stream_window, h2_conn_window}, + num_requests, kNumBlocks, kBlockSize, h2_stream_window * num_requests, h2_stream_window); + + // Verify that the flood check was not triggered + EXPECT_EQ(0, test_server_->counter("http2.outbound_flood")->value()); + + EXPECT_EQ(24, buffer_factory_->numBuffersCreated()); + if (http2FlowControlV2()) { + EXPECT_LE(h2_stream_window - connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(3 * h2_stream_window, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LE(1.0, std::get<0>(overflow_info)); + EXPECT_GE(3.0, std::get<0>(overflow_info)); + // Max overflow happens on the stream buffer. + EXPECT_EQ(h2_stream_window, std::get<1>(overflow_info)); + } else { + EXPECT_LE(h2_stream_window * num_requests / 2, buffer_factory_->maxBufferSize()); + EXPECT_GE(h2_stream_window * num_requests + 2 * connection_watermark, + buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LT(18.0, std::get<0>(overflow_info)); + // Max overflow happens on the connection buffer. + EXPECT_EQ(connection_watermark, std::get<1>(overflow_info)); + } + EXPECT_LT(h2_stream_window * num_requests, buffer_factory_->sumMaxBufferSizes()); + EXPECT_THAT(buffer_factory_->highWatermarkRange(), testing::Pair(32768, 128 * 1024)); +} + +// Verify buffering behavior when the upstream buffer high watermarks are configured to a larger +// value than downstream. +TEST_P(Http2BufferWatermarksTest, DataFlowControlLargeUpstreamStreamWindow) { + int num_requests = 5; + uint32_t connection_watermark = 32768; + uint32_t downstream_h2_stream_window = 64 * 1024; + uint32_t downstream_h2_conn_window = 256 * 1024; + uint32_t upstream_h2_stream_window = 512 * 1024; + uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint64_t kBlockSize = 1024; + uint64_t kNumBlocks = 5 * upstream_h2_stream_window / kBlockSize; + + runTestWithBufferConfig( + {connection_watermark, downstream_h2_stream_window, downstream_h2_conn_window, + upstream_h2_stream_window, upstream_h2_conn_window}, + num_requests, kNumBlocks, kBlockSize, upstream_h2_stream_window * num_requests); + + // Verify that the flood check was not triggered + EXPECT_EQ(0, test_server_->counter("http2.outbound_flood")->value()); + + EXPECT_EQ(24, buffer_factory_->numBuffersCreated()); + if (http2FlowControlV2()) { + EXPECT_LE(upstream_h2_stream_window - connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(2 * upstream_h2_stream_window, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LE(7.5, std::get<0>(overflow_info)); + EXPECT_GT(10.0, std::get<0>(overflow_info)); + // Max overflow happens on the downstream H2 stream buffer. + EXPECT_EQ(downstream_h2_stream_window, std::get<1>(overflow_info)); + } else { + EXPECT_LE(upstream_h2_stream_window * num_requests - connection_watermark, + buffer_factory_->maxBufferSize()); + EXPECT_GE(upstream_h2_stream_window * num_requests + 2 * connection_watermark, + buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LT(30.0, std::get<0>(overflow_info)); + // Max overflow happens on the connection buffer. + EXPECT_EQ(connection_watermark, std::get<1>(overflow_info)); + } + EXPECT_LT(upstream_h2_stream_window * num_requests, buffer_factory_->sumMaxBufferSizes()); + EXPECT_THAT(buffer_factory_->highWatermarkRange(), testing::Pair(32768, 512 * 1024)); +} + +// Verify buffering behavior when the downstream buffer high watermarks are configured to a larger +// value than upstream. +TEST_P(Http2BufferWatermarksTest, DataFlowControlLargeDownstreamStreamWindow) { + int num_requests = 5; + uint32_t connection_watermark = 32768; + uint32_t downstream_h2_stream_window = 512 * 1024; + uint32_t downstream_h2_conn_window = 64 * 1024; + uint32_t upstream_h2_stream_window = 64 * 1024; + uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint64_t kBlockSize = 1024; + uint64_t kNumBlocks = 5 * downstream_h2_stream_window / kBlockSize; + + runTestWithBufferConfig({connection_watermark, downstream_h2_stream_window, + downstream_h2_conn_window, upstream_h2_stream_window, + upstream_h2_conn_window}, + num_requests, kNumBlocks, kBlockSize, + http2FlowControlV2() ? downstream_h2_stream_window * num_requests + : upstream_h2_stream_window * num_requests); + + // Verify that the flood check was not triggered + EXPECT_EQ(0, test_server_->counter("http2.outbound_flood")->value()); + + EXPECT_EQ(24, buffer_factory_->numBuffersCreated()); + if (http2FlowControlV2()) { + EXPECT_LE(downstream_h2_stream_window - connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(2 * downstream_h2_stream_window, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LE(1.0, std::get<0>(overflow_info)); + EXPECT_GT(2.0, std::get<0>(overflow_info)); + + EXPECT_LT(downstream_h2_stream_window * num_requests, buffer_factory_->sumMaxBufferSizes()); + EXPECT_GT(2 * downstream_h2_stream_window * num_requests, buffer_factory_->sumMaxBufferSizes()); + } else { + EXPECT_LE(num_requests * upstream_h2_stream_window - connection_watermark, + buffer_factory_->maxBufferSize()); + EXPECT_GE(2 * num_requests * upstream_h2_stream_window, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LT(9.0, std::get<0>(overflow_info)); + // Max overflow happens on the connection buffer. + EXPECT_EQ(connection_watermark, std::get<1>(overflow_info)); + + EXPECT_LT(upstream_h2_stream_window * num_requests, buffer_factory_->sumMaxBufferSizes()); + EXPECT_GT(2 * upstream_h2_stream_window * num_requests, buffer_factory_->sumMaxBufferSizes()); + } + EXPECT_THAT(buffer_factory_->highWatermarkRange(), testing::Pair(32768, 512 * 1024)); +} + +// Verify that buffering is limited when using HTTP1 upstreams. +TEST_P(Http2BufferWatermarksTest, DataFlowControlHttp1Upstream) { + int num_requests = 25; + uint32_t connection_watermark = 32768; + uint32_t h2_stream_window = 128 * 1024; + uint32_t h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint64_t kBlockSize = 1024; + uint64_t kNumBlocks = 5 * h2_stream_window / kBlockSize; + + runTestWithBufferConfig( + {connection_watermark, h2_stream_window, h2_conn_window, 0, 0}, num_requests, kNumBlocks, + kBlockSize, http2FlowControlV2() ? h2_stream_window * num_requests : connection_watermark, + http2FlowControlV2() ? h2_stream_window : 0, FakeHttpConnection::Type::HTTP1); + + // Verify that the flood check was not triggered + EXPECT_EQ(0, test_server_->counter("http2.outbound_flood")->value()); + + EXPECT_EQ(127, buffer_factory_->numBuffersCreated()); + if (http2FlowControlV2()) { + EXPECT_LE(h2_stream_window - connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(h2_stream_window + 2 * connection_watermark, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LE(1.0, std::get<0>(overflow_info)); + EXPECT_GE(2.0, std::get<0>(overflow_info)); + EXPECT_EQ(connection_watermark, std::get<1>(overflow_info)); + } else { + EXPECT_LE(connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(3 * connection_watermark, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LT(1.0, std::get<0>(overflow_info)); + EXPECT_GE(3.0, std::get<0>(overflow_info)); + // Max overflow happens on the connection buffer. + EXPECT_EQ(connection_watermark, std::get<1>(overflow_info)); + } + EXPECT_THAT(buffer_factory_->highWatermarkRange(), testing::Pair(32768, 128 * 1024)); +} + +// Verify that buffering is limited when handling small responses. +TEST_P(Http2BufferWatermarksTest, SmallResponseBuffering) { + int num_requests = 200; + uint64_t kNumBlocks = 1; + uint64_t kBlockSize = 10240; + uint32_t connection_watermark = 32768; + uint32_t downstream_h2_stream_window = 64 * 1024; + uint32_t downstream_h2_conn_window = 256 * 1024; + uint32_t upstream_h2_stream_window = 512 * 1024; + uint32_t upstream_h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + + runTestWithBufferConfig( + {connection_watermark, downstream_h2_stream_window, downstream_h2_conn_window, + upstream_h2_stream_window, upstream_h2_conn_window}, + num_requests, kNumBlocks, kBlockSize, num_requests * kNumBlocks * kBlockSize, kBlockSize); + + EXPECT_LT(800, buffer_factory_->numBuffersCreated()); + if (http2FlowControlV2()) { + EXPECT_LE(connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(2 * connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GT(2.0, std::get<0>(buffer_factory_->maxOverflowRatio())); + } else { + EXPECT_LE(50 * connection_watermark, buffer_factory_->maxBufferSize()); + EXPECT_GE(70 * connection_watermark, buffer_factory_->maxBufferSize()); + auto overflow_info = buffer_factory_->maxOverflowRatio(); + EXPECT_LT(50.0, std::get<0>(overflow_info)); + // Max overflow happens on the connection buffer. + EXPECT_EQ(connection_watermark, std::get<1>(overflow_info)); + } + EXPECT_THAT(buffer_factory_->highWatermarkRange(), testing::Pair(32768, 512 * 1024)); +} + +// Verify that control frame protections take effect even if control frames end up queued internally +// by nghttp2. +TEST_P(Http2BufferWatermarksTest, PingFloodAfterHighWatermark) { + int num_requests = 1; + uint64_t kNumBlocks = 10; + uint64_t kBlockSize = 16 * 1024; + uint32_t connection_watermark = 32768; + uint32_t h2_stream_window = 64 * 1024; + uint32_t h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + + initializeWithBufferConfig( + {connection_watermark, h2_stream_window, h2_conn_window, h2_stream_window, h2_conn_window}, + num_requests, true); + + writev_matcher_->setWritevReturnsEgain(); + + sendRequests(num_requests, kNumBlocks, kBlockSize, connection_watermark + h2_stream_window); + + floodServer(Http2Frame::makePingFrame(), "http2.outbound_control_flood", + ControlFrameFloodLimit + 1); +} + +TEST_P(Http2BufferWatermarksTest, RstStreamWhileBlockedProxyingDataFrame) { + int num_requests = 5; + uint64_t kNumBlocks = 20; + uint64_t kBlockSize = 16 * 1024; + uint32_t connection_watermark = 32768; + uint32_t h2_stream_window = 128 * 1024; + uint32_t h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint32_t expected_response_size = kNumBlocks * kBlockSize; + + initializeWithBufferConfig( + {connection_watermark, h2_stream_window, h2_conn_window, h2_stream_window, h2_conn_window}, + num_requests, true); + + writev_matcher_->setWritevReturnsEgain(); + + sendRequests(num_requests, kNumBlocks, kBlockSize, h2_stream_window); + if (http2FlowControlV2()) { + test_server_->waitForGaugeEq("http2.streams_active", 5); + } + + // Reset streams in reverse order except for the most recently created stream. Verify that the rst + // streams are properly cancelled. + for (int32_t idx = num_requests - 2; idx >= 0; --idx) { + const uint32_t request_stream_id = Http2Frame::makeClientStreamId(idx); + auto rst_frame = + Http2Frame::makeResetStreamFrame(request_stream_id, Http2Frame::ErrorCode::Cancel); + sendFrame(rst_frame); + } + + if (http2FlowControlV2()) { + test_server_->waitForGaugeEq("http2.streams_active", 1); + } + writev_matcher_->setResumeWrites(); + + uint32_t first_stream = Http2Frame::makeClientStreamId(0); + uint32_t last_stream = Http2Frame::makeClientStreamId(num_requests - 1); + + std::map response_info; + parseResponse(&response_info, 1); + if (http2FlowControlV2()) { + ASSERT_EQ(2, response_info.size()); + // Full response on last_stream. + EXPECT_EQ(expected_response_size, response_info[last_stream].response_size_); + // Partial response on first_stream. + EXPECT_LT(0, response_info[first_stream].response_size_); + EXPECT_GT(expected_response_size, response_info[first_stream].response_size_); + EXPECT_GT(2 * h2_stream_window, response_info[first_stream].response_size_); + } else { + ASSERT_EQ(num_requests, response_info.size()); + for (auto& [stream_id, info] : response_info) { + if (stream_id == last_stream) { + EXPECT_EQ(expected_response_size, response_info[stream_id].response_size_) << stream_id; + } else { + EXPECT_LT(0, response_info[stream_id].response_size_) << stream_id; + EXPECT_GT(expected_response_size, response_info[stream_id].response_size_) << stream_id; + EXPECT_GT(2 * h2_stream_window, response_info[stream_id].response_size_) << stream_id; + } + } + } + + tcp_client_->close(); +} + +// Create some requests and reset them immediately. Do not wait for the request to reach a specific +// point in the state machine. Only expect a full response on the one stream that we didn't reset. +TEST_P(Http2BufferWatermarksTest, RstStreamQuickly) { + int num_requests = 5; + uint64_t kNumBlocks = 20; + uint64_t kBlockSize = 16 * 1024; + uint32_t connection_watermark = 32768; + uint32_t h2_stream_window = 128 * 1024; + uint32_t h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint32_t expected_response_size = kNumBlocks * kBlockSize; + + initializeWithBufferConfig( + {connection_watermark, h2_stream_window, h2_conn_window, h2_stream_window, h2_conn_window}, + num_requests, true); + + writev_matcher_->setWritevReturnsEgain(); + + sendRequests(num_requests, kNumBlocks, kBlockSize); + // Reset streams in reverse order except for the most recently created stream. Verify that the rst + // streams are properly cancelled. + for (int32_t idx = num_requests - 2; idx >= 0; --idx) { + const uint32_t request_stream_id = Http2Frame::makeClientStreamId(idx); + auto rst_frame = + Http2Frame::makeResetStreamFrame(request_stream_id, Http2Frame::ErrorCode::Cancel); + sendFrame(rst_frame); + } + + if (http2FlowControlV2()) { + test_server_->waitForGaugeEq("http2.streams_active", 1); + } + writev_matcher_->setResumeWrites(); + + uint32_t last_stream = Http2Frame::makeClientStreamId(num_requests - 1); + + // Wait for a full response on |last_stream| + std::map response_info; + parseResponseUntil(&response_info, [&]() -> bool { + auto it = response_info.find(last_stream); + return it != response_info.end() && it->second.end_stream_; + }); + + // The test does not wait for replies to be generated on all streams before sending out the resets + // so we are only guaranteed that there will be a response on last_stream. + EXPECT_LE(1, response_info.size()); + for (auto& [stream_id, info] : response_info) { + if (stream_id == last_stream) { + EXPECT_EQ(expected_response_size, response_info[stream_id].response_size_) << stream_id; + } else { + // Other streams may get a partial response. + EXPECT_GT(expected_response_size, response_info[stream_id].response_size_) << stream_id; + EXPECT_GT(2 * h2_stream_window, response_info[stream_id].response_size_) << stream_id; + } + } + + tcp_client_->close(); +} + +// Cover the case where the frame that triggered blocking was a header frame, and a RST_STREAM +// arrives for that stream. +TEST_P(Http2BufferWatermarksTest, RstStreamWhileBlockedProxyingHeaderFrame) { + int num_requests = 5; + uint64_t kNumBlocks = 4; + uint64_t kBlockSize = 16 * 1024; + uint32_t connection_watermark = 64 * 1024; + uint32_t h2_stream_window = 64 * 1024; + uint32_t h2_conn_window = 1024 * 1024 * 1024; // Effectively unlimited + uint32_t expected_response_size = kNumBlocks * kBlockSize; + + initializeWithBufferConfig( + {connection_watermark, h2_stream_window, h2_conn_window, h2_stream_window, h2_conn_window}, + num_requests, true); + + writev_matcher_->setWritevReturnsEgain(); + + sendRequests(num_requests, kNumBlocks, kBlockSize, h2_stream_window); + if (http2FlowControlV2()) { + test_server_->waitForGaugeGe("http2.streams_active", 4); + } else { + test_server_->waitForGaugeEq("http2.streams_active", 0); + } + + for (int32_t idx = 1; idx < num_requests - 1; ++idx) { + const uint32_t request_stream_id = Http2Frame::makeClientStreamId(idx); + auto rst_frame = + Http2Frame::makeResetStreamFrame(request_stream_id, Http2Frame::ErrorCode::Cancel); + sendFrame(rst_frame); + } + if (http2FlowControlV2()) { + test_server_->waitForGaugeLe("http2.streams_active", 2); + } + + writev_matcher_->setResumeWrites(); + + uint32_t first_stream = Http2Frame::makeClientStreamId(0); + uint32_t last_stream = Http2Frame::makeClientStreamId(num_requests - 1); + + std::map response_info; + parseResponse(&response_info, 2); + if (http2FlowControlV2()) { + EXPECT_GE(3, response_info.size()); + EXPECT_LE(2, response_info.size()); + for (auto& [stream_id, info] : response_info) { + if (stream_id == first_stream || stream_id == last_stream) { + EXPECT_EQ(expected_response_size, response_info[stream_id].response_size_) << stream_id; + } else { + EXPECT_EQ(0, response_info[stream_id].response_size_) << stream_id; + } + } + } else { + EXPECT_EQ(2, response_info.size()); + for (auto& [stream_id, info] : response_info) { + EXPECT_EQ(expected_response_size, response_info[stream_id].response_size_) << stream_id; + } + } + + tcp_client_->close(); +} + } // namespace Envoy diff --git a/test/integration/http_integration.cc b/test/integration/http_integration.cc index f3044030a16b..37203cc2b0c6 100644 --- a/test/integration/http_integration.cc +++ b/test/integration/http_integration.cc @@ -1417,11 +1417,12 @@ void Http2RawFrameIntegrationTest::startHttp2Session() { readFrame(); } -void Http2RawFrameIntegrationTest::beginSession() { +void Http2RawFrameIntegrationTest::beginSession(FakeHttpConnection::Type upstream_protocol, + uint32_t max_all_frames, + uint32_t max_control_frames) { setDownstreamProtocol(Http::CodecClient::Type::HTTP2); - setUpstreamProtocol(FakeHttpConnection::Type::HTTP2); - // set lower outbound frame limits to make tests run faster - config_helper_.setDownstreamOutboundFramesLimits(1000, 100); + setUpstreamProtocol(upstream_protocol); + config_helper_.setDownstreamOutboundFramesLimits(max_all_frames, max_control_frames); initialize(); // Set up a raw connection to easily send requests without reading responses. auto options = std::make_shared(); @@ -1435,12 +1436,12 @@ void Http2RawFrameIntegrationTest::beginSession() { Http2Frame Http2RawFrameIntegrationTest::readFrame() { Http2Frame frame; EXPECT_TRUE(tcp_client_->waitForData(frame.HeaderSize)); - frame.setHeader(tcp_client_->data()); + frame.setHeader(absl::string_view(tcp_client_->data()).substr(0, frame.HeaderSize)); tcp_client_->clearData(frame.HeaderSize); auto len = frame.payloadSize(); if (len) { EXPECT_TRUE(tcp_client_->waitForData(len)); - frame.setPayload(tcp_client_->data()); + frame.setPayload(absl::string_view(tcp_client_->data()).substr(0, len)); tcp_client_->clearData(len); } return frame; diff --git a/test/integration/http_integration.h b/test/integration/http_integration.h index 5ee7de657997..8c13dcabb488 100644 --- a/test/integration/http_integration.h +++ b/test/integration/http_integration.h @@ -269,10 +269,18 @@ class Http2RawFrameIntegrationTest : public HttpIntegrationTest { : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, version) {} protected: + static constexpr uint32_t ControlFrameFloodLimit = 100; + static constexpr uint32_t AllFrameFloodLimit = 1000; + void startHttp2Session(); Http2Frame readFrame(); void sendFrame(const Http2Frame& frame); - virtual void beginSession(); + + // Use lower default outbound frame limits to make overflow tests run faster. + virtual void + beginSession(FakeHttpConnection::Type upstream_protocol = FakeHttpConnection::Type::HTTP2, + uint32_t max_all_frames = AllFrameFloodLimit, + uint32_t max_control_frames = ControlFrameFloodLimit); IntegrationTcpClientPtr tcp_client_; }; diff --git a/test/integration/server.h b/test/integration/server.h index d9ca03978889..e50b3fedd043 100644 --- a/test/integration/server.h +++ b/test/integration/server.h @@ -459,6 +459,12 @@ class IntegrationTestServer : public Logger::Loggable, ASSERT_TRUE(TestUtility::waitForGaugeGe(statStore(), name, value, time_system_, timeout)); } + void + waitForGaugeLe(const std::string& name, uint64_t value, + std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) override { + ASSERT_TRUE(TestUtility::waitForGaugeLe(statStore(), name, value, time_system_, timeout)); + } + void waitForCounterExists(const std::string& name) override { notifyingStatsAllocator().waitForCounterExists(name); } diff --git a/test/integration/server_stats.h b/test/integration/server_stats.h index 6c169ed68dd4..12f7cccd75a0 100644 --- a/test/integration/server_stats.h +++ b/test/integration/server_stats.h @@ -48,6 +48,16 @@ class IntegrationTestServerStats { waitForGaugeGe(const std::string& name, uint64_t value, std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) PURE; + /** + * Wait for a gauge to <= a given value. + * @param name gauge name. + * @param value target value. + * @param timeout amount of time to wait before asserting false, or 0 for no timeout. + */ + virtual void + waitForGaugeLe(const std::string& name, uint64_t value, + std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) PURE; + /** * Wait for a gauge to == a given value. * @param name gauge name. diff --git a/test/integration/tracked_watermark_buffer.cc b/test/integration/tracked_watermark_buffer.cc index e67acf088f3a..22cb4f0047b1 100644 --- a/test/integration/tracked_watermark_buffer.cc +++ b/test/integration/tracked_watermark_buffer.cc @@ -18,18 +18,22 @@ TrackedWatermarkBufferFactory::create(std::function below_low_watermark, return std::make_unique( [this, &buffer_info](uint64_t current_size) { absl::MutexLock lock(&mutex_); + total_buffer_size_ = total_buffer_size_ + current_size - buffer_info.current_size_; if (buffer_info.max_size_ < current_size) { buffer_info.max_size_ = current_size; } + buffer_info.current_size_ = current_size; }, [this, &buffer_info](uint32_t watermark) { absl::MutexLock lock(&mutex_); buffer_info.watermark_ = watermark; }, - [this]() { + [this, &buffer_info]() { absl::MutexLock lock(&mutex_); ASSERT(active_buffer_count_ > 0); --active_buffer_count_; + total_buffer_size_ -= buffer_info.current_size_; + buffer_info.current_size_ = 0; }, below_low_watermark, above_high_watermark, above_overflow_watermark); } @@ -44,6 +48,11 @@ uint64_t TrackedWatermarkBufferFactory::numBuffersActive() const { return active_buffer_count_; } +uint64_t TrackedWatermarkBufferFactory::totalBufferSize() const { + absl::MutexLock lock(&mutex_); + return total_buffer_size_; +} + uint64_t TrackedWatermarkBufferFactory::maxBufferSize() const { absl::MutexLock lock(&mutex_); uint64_t val = 0; @@ -94,5 +103,34 @@ std::pair TrackedWatermarkBufferFactory::highWatermarkRange( return std::make_pair(min_watermark, max_watermark); } +std::tuple TrackedWatermarkBufferFactory::maxOverflowRatio() const { + absl::MutexLock lock(&mutex_); + double max_overflow_ratio = 0.0; + BufferInfo info; + for (auto& item : buffer_infos_) { + if (item.second.watermark_ == 0) { + // Ignore, watermarks are disabled when watermark is 0. + continue; + } + + double overflow_ratio = static_cast(item.second.max_size_) / item.second.watermark_; + if (overflow_ratio >= max_overflow_ratio) { + max_overflow_ratio = overflow_ratio; + info = item.second; + } + } + return std::make_tuple(max_overflow_ratio, info.watermark_, info.max_size_); +} + +bool TrackedWatermarkBufferFactory::waitUntilTotalBufferedExceeds( + uint64_t byte_size, std::chrono::milliseconds timeout) { + absl::MutexLock lock(&mutex_); + auto predicate = [this, byte_size]() ABSL_SHARED_LOCKS_REQUIRED(mutex_) { + mutex_.AssertHeld(); + return total_buffer_size_ >= byte_size; + }; + return mutex_.AwaitWithTimeout(absl::Condition(&predicate), absl::Milliseconds(timeout.count())); +} + } // namespace Buffer } // namespace Envoy diff --git a/test/integration/tracked_watermark_buffer.h b/test/integration/tracked_watermark_buffer.h index 64d8473dbf10..6dba0f512c8a 100644 --- a/test/integration/tracked_watermark_buffer.h +++ b/test/integration/tracked_watermark_buffer.h @@ -11,13 +11,13 @@ namespace Buffer { // WatermarkBuffer subclass that hooks into updates to buffer size and buffer high watermark config. class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { public: - TrackedWatermarkBuffer(std::function update_max_size, + TrackedWatermarkBuffer(std::function update_size, std::function update_high_watermark, std::function on_delete, std::function below_low_watermark, std::function above_high_watermark, std::function above_overflow_watermark) : WatermarkBuffer(below_low_watermark, above_high_watermark, above_overflow_watermark), - update_max_size_(update_max_size), update_high_watermark_(update_high_watermark), + update_size_(update_size), update_high_watermark_(update_high_watermark), on_delete_(on_delete) {} ~TrackedWatermarkBuffer() override { on_delete_(); } @@ -28,12 +28,17 @@ class TrackedWatermarkBuffer : public Buffer::WatermarkBuffer { protected: void checkHighAndOverflowWatermarks() override { - update_max_size_(length()); + update_size_(length()); WatermarkBuffer::checkHighAndOverflowWatermarks(); } + void checkLowWatermark() override { + update_size_(length()); + WatermarkBuffer::checkLowWatermark(); + } + private: - std::function update_max_size_; + std::function update_size_; std::function update_high_watermark_; std::function on_delete_; }; @@ -52,6 +57,8 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t numBuffersCreated() const; // Number of buffers still in use. uint64_t numBuffersActive() const; + // Total bytes buffered. + uint64_t totalBufferSize() const; // Size of the largest buffer. uint64_t maxBufferSize() const; // Sum of the max size of all known buffers. @@ -59,10 +66,22 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { // Get lower and upper bound on buffer high watermarks. A watermark of 0 indicates that watermark // functionality is disabled. std::pair highWatermarkRange() const; + // Compute the overflow ratio of the buffer that overflowed the most. + std::tuple maxOverflowRatio() const; + + // Total bytes currently buffered across all known buffers. + uint64_t totalBytesBuffered() const { + absl::MutexLock lock(&mutex_); + return total_buffer_size_; + } + + // Wait until total bytes buffered exceeds the a given size. + bool waitUntilTotalBufferedExceeds(uint64_t byte_size, std::chrono::milliseconds timeout); private: struct BufferInfo { uint32_t watermark_ = 0; + uint64_t current_size_ = 0; uint64_t max_size_ = 0; }; @@ -71,6 +90,8 @@ class TrackedWatermarkBufferFactory : public Buffer::WatermarkFactory { uint64_t next_idx_ ABSL_GUARDED_BY(mutex_) = 0; // Number of buffers currently in existence. uint64_t active_buffer_count_ ABSL_GUARDED_BY(mutex_) = 0; + // total bytes buffered across all buffers. + uint64_t total_buffer_size_ ABSL_GUARDED_BY(mutex_) = 0; // Info about the buffer, by buffer idx. absl::node_hash_map buffer_infos_ ABSL_GUARDED_BY(mutex_); }; diff --git a/test/integration/tracked_watermark_buffer_test.cc b/test/integration/tracked_watermark_buffer_test.cc index 9d4eb6110d9e..c2aa6cc1ae07 100644 --- a/test/integration/tracked_watermark_buffer_test.cc +++ b/test/integration/tracked_watermark_buffer_test.cc @@ -1,6 +1,7 @@ #include "test/integration/tracked_watermark_buffer.h" #include "test/mocks/common.h" #include "test/test_common/test_runtime.h" +#include "test/test_common/thread_factory_for_test.h" #include "gtest/gtest.h" @@ -53,6 +54,11 @@ TEST_F(TrackedWatermarkBufferTest, WatermarkFunctions) { buffer->drain(250); EXPECT_CALL(low_watermark, ready()); buffer->drain(1); + + // Verify overflow stats. + EXPECT_EQ(3.01, std::get<0>(factory_.maxOverflowRatio())); + EXPECT_EQ(100, std::get<1>(factory_.maxOverflowRatio())); + EXPECT_EQ(301, std::get<2>(factory_.maxOverflowRatio())); } TEST_F(TrackedWatermarkBufferTest, BufferSizes) { @@ -69,19 +75,23 @@ TEST_F(TrackedWatermarkBufferTest, BufferSizes) { EXPECT_EQ(5, factory_.maxBufferSize()); EXPECT_EQ(6, factory_.sumMaxBufferSizes()); + EXPECT_EQ(6, factory_.totalBytesBuffered()); // Add more bytes and drain the buffer. Verify that max is latched. buffer->add(std::string(1000, 'a')); EXPECT_TRUE(buffer->highWatermarkTriggered()); + EXPECT_EQ(1006, factory_.totalBytesBuffered()); buffer->drain(1005); EXPECT_EQ(0, buffer->length()); EXPECT_FALSE(buffer->highWatermarkTriggered()); EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1006, factory_.sumMaxBufferSizes()); + EXPECT_EQ(1, factory_.totalBytesBuffered()); buffer2->add("a"); EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1007, factory_.sumMaxBufferSizes()); + EXPECT_EQ(2, factory_.totalBytesBuffered()); // Verify cleanup tracking. buffer.reset(); @@ -90,12 +100,32 @@ TEST_F(TrackedWatermarkBufferTest, BufferSizes) { buffer2.reset(); EXPECT_EQ(2, factory_.numBuffersCreated()); EXPECT_EQ(0, factory_.numBuffersActive()); + // Bytes in deleted buffers are removed from the total. + EXPECT_EQ(0, factory_.totalBytesBuffered()); // Max sizes are remembered even after buffers are deleted. EXPECT_EQ(1005, factory_.maxBufferSize()); EXPECT_EQ(1007, factory_.sumMaxBufferSizes()); } +TEST_F(TrackedWatermarkBufferTest, WaitUntilTotalBufferedExceeds) { + auto buffer1 = factory_.create([]() {}, []() {}, []() {}); + auto buffer2 = factory_.create([]() {}, []() {}, []() {}); + auto buffer3 = factory_.create([]() {}, []() {}, []() {}); + + auto thread1 = Thread::threadFactoryForTest().createThread([&]() { buffer1->add("a"); }); + auto thread2 = Thread::threadFactoryForTest().createThread([&]() { buffer2->add("b"); }); + auto thread3 = Thread::threadFactoryForTest().createThread([&]() { buffer3->add("c"); }); + + factory_.waitUntilTotalBufferedExceeds(2, std::chrono::milliseconds(10000)); + thread1->join(); + thread2->join(); + thread3->join(); + + EXPECT_EQ(3, factory_.totalBytesBuffered()); + EXPECT_EQ(1, factory_.maxBufferSize()); +} + } // namespace } // namespace Buffer } // namespace Envoy diff --git a/test/test_common/utility.cc b/test/test_common/utility.cc index 08c04e889c74..2c325603bc42 100644 --- a/test/test_common/utility.cc +++ b/test/test_common/utility.cc @@ -197,6 +197,19 @@ AssertionResult TestUtility::waitForCounterGe(Stats::Store& store, const std::st return AssertionSuccess(); } +AssertionResult TestUtility::waitForGaugeLe(Stats::Store& store, const std::string& name, + uint64_t value, Event::TestTimeSystem& time_system, + std::chrono::milliseconds timeout) { + Event::TestTimeSystem::RealTimeBound bound(timeout); + while (findGauge(store, name) == nullptr || findGauge(store, name)->value() > value) { + time_system.advanceTimeWait(std::chrono::milliseconds(10)); + if (timeout != std::chrono::milliseconds::zero() && !bound.withinBound()) { + return AssertionFailure() << fmt::format("timed out waiting for {} to be {}", name, value); + } + } + return AssertionSuccess(); +} + AssertionResult TestUtility::waitForGaugeGe(Stats::Store& store, const std::string& name, uint64_t value, Event::TestTimeSystem& time_system, std::chrono::milliseconds timeout) { diff --git a/test/test_common/utility.h b/test/test_common/utility.h index 7580a576e56d..13463313a9d3 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -226,6 +226,21 @@ class TestUtility { Event::TestTimeSystem& time_system, std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()); + /** + * Wait for a gauge to <= a given value. + * @param store supplies the stats store. + * @param name gauge name. + * @param value target value. + * @param time_system the time system to use for waiting. + * @param timeout the maximum time to wait before timing out, or 0 for no timeout. + * @return AssertionSuccess() if the counter gauge <= to the value within the timeout, else + * AssertionFailure(). + */ + static AssertionResult + waitForGaugeLe(Stats::Store& store, const std::string& name, uint64_t value, + Event::TestTimeSystem& time_system, + std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()); + /** * Wait for a gauge to >= a given value. * @param store supplies the stats store. diff --git a/tools/code_format/check_format.py b/tools/code_format/check_format.py index 71645f4cbfd0..07e303a6a7bc 100755 --- a/tools/code_format/check_format.py +++ b/tools/code_format/check_format.py @@ -150,7 +150,7 @@ COMMENT_REGEX = re.compile(r"//|\*") DURATION_VALUE_REGEX = re.compile(r'\b[Dd]uration\(([0-9.]+)') PROTO_VALIDATION_STRING = re.compile(r'\bmin_bytes\b') -VERSION_HISTORY_NEW_LINE_REGEX = re.compile("\* ([a-z \-_]+): ([a-z:`]+)") +VERSION_HISTORY_NEW_LINE_REGEX = re.compile("\* ([a-z0-9 \-_]+): ([a-z:`]+)") VERSION_HISTORY_SECTION_NAME = re.compile("^[A-Z][A-Za-z ]*$") RELOADABLE_FLAG_REGEX = re.compile(".*(..)(envoy.reloadable_features.[^ ]*)\s.*") INVALID_REFLINK = re.compile(".* ref:.*") diff --git a/tools/spelling/spelling_dictionary.txt b/tools/spelling/spelling_dictionary.txt index ecf2e82a8c2a..398a5ca4d0cd 100644 --- a/tools/spelling/spelling_dictionary.txt +++ b/tools/spelling/spelling_dictionary.txt @@ -1228,6 +1228,7 @@ wildcard wildcards winsock workspace +WOULDBLOCK writev xDS xDSes From 6442136fb490557e4b33a698e0e4c83b84e26cd1 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Thu, 21 Jan 2021 03:30:33 -0500 Subject: [PATCH 2/4] address review comments Signed-off-by: Antonio Vicente --- test/integration/filters/test_socket_interface.h | 2 +- test/integration/http2_flood_integration_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/filters/test_socket_interface.h b/test/integration/filters/test_socket_interface.h index a06cf531a6c8..07980b5a9e59 100644 --- a/test/integration/filters/test_socket_interface.h +++ b/test/integration/filters/test_socket_interface.h @@ -37,7 +37,7 @@ class TestIoSocketHandle : public IoSocketHandleImpl { // Schedule resumption on the IoHandle by posting a callback to the IoHandle's dispatcher. Note // that this operation is inherently racy, nothing guarantees that the TestIoSocketHandle is not // deleted before the posted callback executes. - void activateInDispatcherThreadForTest(uint32_t events) { + void activateInDispatcherThread(uint32_t events) { absl::MutexLock lock(&mutex_); RELEASE_ASSERT(dispatcher_ != nullptr, "null dispatcher"); dispatcher_->post([this, events]() { activateFileEvents(events); }); diff --git a/test/integration/http2_flood_integration_test.cc b/test/integration/http2_flood_integration_test.cc index adcdc2cb22a1..cfd61025f2a3 100644 --- a/test/integration/http2_flood_integration_test.cc +++ b/test/integration/http2_flood_integration_test.cc @@ -72,7 +72,7 @@ class SocketInterfaceSwap { }, &matched_iohandle_)); writev_returns_egain_ = false; - matched_iohandle_->activateInDispatcherThreadForTest(Event::FileReadyType::Write); + matched_iohandle_->activateInDispatcherThread(Event::FileReadyType::Write); } private: From 84897e3af2ce67db1b0fe4d0bd8b5d05a496e877 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Thu, 21 Jan 2021 23:39:25 -0500 Subject: [PATCH 3/4] revert changes to dispatcher_impl since they are no longer needed after merging main Signed-off-by: Antonio Vicente --- source/common/event/dispatcher_impl.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/common/event/dispatcher_impl.cc b/source/common/event/dispatcher_impl.cc index e21cfa05db96..f67d7787abc9 100644 --- a/source/common/event/dispatcher_impl.cc +++ b/source/common/event/dispatcher_impl.cc @@ -220,8 +220,7 @@ TimerPtr DispatcherImpl::createScaledTimer(ScaledTimerMinimum minimum, TimerCb c } Event::SchedulableCallbackPtr DispatcherImpl::createSchedulableCallback(std::function cb) { - // TODO(antoniovicente) Merge in https://github.com/envoyproxy/envoy/pull/14526 and restore ASSERT - // ASSERT(isThreadSafe()); + ASSERT(isThreadSafe()); return base_scheduler_.createSchedulableCallback([this, cb]() { touchWatchdog(); cb(); From d17f568814b5ea065f8e5468bfc203a0ba8c1806 Mon Sep 17 00:00:00 2001 From: Antonio Vicente Date: Mon, 25 Jan 2021 14:20:03 -0500 Subject: [PATCH 4/4] disable nghttp2 output flood protections Signed-off-by: Antonio Vicente --- source/common/http/http2/codec_impl.cc | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/source/common/http/http2/codec_impl.cc b/source/common/http/http2/codec_impl.cc index 829ec9e80560..8b02c2c9cda2 100644 --- a/source/common/http/http2/codec_impl.cc +++ b/source/common/http/http2/codec_impl.cc @@ -1416,13 +1416,9 @@ ConnectionImpl::Http2Options::Http2Options( ENVOY_LOG(trace, "Codec does not have Metadata frame support."); } - // nghttp2 v1.39.2 lowered the internal flood protection limit from 10K to 1K of ACK frames. - // This new limit may cause the internal nghttp2 mitigation to trigger more often (as it - // requires just 9K of incoming bytes for smallest 9 byte SETTINGS frame), bypassing the same - // mitigation and its associated behavior in the envoy HTTP/2 codec. Since envoy does not rely - // on this mitigation, set back to the old 10K number to avoid any changes in the HTTP/2 codec - // behavior. - nghttp2_option_set_max_outbound_ack(options_, 10000); + // Envoy implements flood protection on its own. Effectively disable nghttp2 outbound queue flood + // protections by setting the limit to a large number. + nghttp2_option_set_max_outbound_ack(options_, std::numeric_limits::max()); } ConnectionImpl::Http2Options::~Http2Options() { nghttp2_option_del(options_); }