diff --git a/api/envoy/config/core/v3/grpc_service.proto b/api/envoy/config/core/v3/grpc_service.proto index e42ffc2622e8..471f9165b9d1 100644 --- a/api/envoy/config/core/v3/grpc_service.proto +++ b/api/envoy/config/core/v3/grpc_service.proto @@ -49,6 +49,12 @@ message GrpcService { // Currently only supported for xDS gRPC streams. // If not set, xDS gRPC streams default base interval:500ms, maximum interval:30s will be applied. RetryPolicy retry_policy = 3; + + // Maximum gRPC message size that is allowed to be received. + // If a message over this limit is received, the gRPC stream is terminated with the RESOURCE_EXHAUSTED error. + // This limit is applied to individual messages in the streaming response and not the total size of streaming response. + // Defaults to 0, which means unlimited. + google.protobuf.UInt32Value max_receive_message_length = 4; } // [#next-free-field: 9] diff --git a/changelogs/current.yaml b/changelogs/current.yaml index e735d1cb2d1b..9b52192e6fc5 100644 --- a/changelogs/current.yaml +++ b/changelogs/current.yaml @@ -425,3 +425,8 @@ deprecated: deprecated :ref:`split_spans_for_request ` in favor of :ref:`spawn_upstream_span `. +- area: grpc + change: | + Added maximum gRPC message size that is allowed to be received in Envoy gRPC. If a message over this limit is received, + the gRPC stream is terminated with the RESOURCE_EXHAUSTED error. This limit is applied to individual messages in the + streaming response and not the total size of streaming response. Defaults to 0, which means unlimited. diff --git a/source/common/grpc/async_client_impl.cc b/source/common/grpc/async_client_impl.cc index 79698905729b..d3b50108bff5 100644 --- a/source/common/grpc/async_client_impl.cc +++ b/source/common/grpc/async_client_impl.cc @@ -17,7 +17,9 @@ namespace Grpc { AsyncClientImpl::AsyncClientImpl(Upstream::ClusterManager& cm, const envoy::config::core::v3::GrpcService& config, TimeSource& time_source) - : cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()), + : max_recv_message_length_( + PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.envoy_grpc(), max_receive_message_length, 0)), + cm_(cm), remote_cluster_name_(config.envoy_grpc().cluster_name()), host_name_(config.envoy_grpc().authority()), time_source_(time_source), metadata_parser_(Router::HeaderParser::configure( config.initial_metadata(), @@ -81,6 +83,8 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view serv if (!options.retry_policy.has_value() && parent_.retryPolicy().has_value()) { options_.setRetryPolicy(*parent_.retryPolicy()); } + // Configure the maximum frame length + decoder_.setMaxFrameLength(parent_.max_recv_message_length_); } void AsyncStreamImpl::initialize(bool buffer_body_for_retry) { diff --git a/source/common/grpc/async_client_impl.h b/source/common/grpc/async_client_impl.h index 226118f4085a..72b6ca767c2d 100644 --- a/source/common/grpc/async_client_impl.h +++ b/source/common/grpc/async_client_impl.h @@ -43,6 +43,7 @@ class AsyncClientImpl final : public RawAsyncClient { } private: + const uint32_t max_recv_message_length_; Upstream::ClusterManager& cm_; const std::string remote_cluster_name_; // The host header value in the http transport. diff --git a/test/common/grpc/grpc_client_integration_test.cc b/test/common/grpc/grpc_client_integration_test.cc index f3fe4bea09ac..ae32643aa11e 100644 --- a/test/common/grpc/grpc_client_integration_test.cc +++ b/test/common/grpc/grpc_client_integration_test.cc @@ -225,6 +225,28 @@ TEST_P(GrpcClientIntegrationTest, BadReplyGrpcFraming) { dispatcher_helper_.runDispatcher(); } +// Validate that a reply that exceeds gRPC maximum frame size is handled as an RESOURCE_EXHAUSTED +// gRPC error. +TEST_P(GrpcClientIntegrationTest, BadReplyOverGrpcFrameLimit) { + // Only testing behavior of Envoy client, since `max_receive_message_length` configuration is + // added to Envoy-gRPC only. + SKIP_IF_GRPC_CLIENT(ClientType::GoogleGrpc); + + helloworld::HelloReply reply; + reply.set_message("HelloWorld"); + + initialize(/*envoy_grpc_max_recv_msg_length=*/2); + + auto stream = createStream(empty_metadata_); + stream->sendRequest(); + stream->sendServerInitialMetadata(empty_metadata_); + stream->expectTrailingMetadata(empty_metadata_); + stream->expectGrpcStatus(Status::WellKnownGrpcStatus::ResourceExhausted); + auto serialized_response = Grpc::Common::serializeToGrpcFrame(reply); + stream->fake_stream_->encodeData(*serialized_response, true); + dispatcher_helper_.runDispatcher(); +} + // Validate that custom channel args can be set on the Google gRPC client. // TEST_P(GrpcClientIntegrationTest, CustomChannelArgs) { diff --git a/test/common/grpc/grpc_client_integration_test_harness.h b/test/common/grpc/grpc_client_integration_test_harness.h index d12a6b22d199..352cac598059 100644 --- a/test/common/grpc/grpc_client_integration_test_harness.h +++ b/test/common/grpc/grpc_client_integration_test_harness.h @@ -278,14 +278,14 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest { dispatcher_(api_->allocateDispatcher("test_thread")), http_context_(stats_store_.symbolTable()), router_context_(stats_store_.symbolTable()) {} - virtual void initialize() { + virtual void initialize(uint32_t envoy_grpc_max_recv_msg_length = 0) { if (fake_upstream_ == nullptr) { fake_upstream_config_.upstream_protocol_ = Http::CodecType::HTTP2; fake_upstream_ = std::make_unique(0, ipVersion(), fake_upstream_config_); } switch (clientType()) { case ClientType::EnvoyGrpc: - grpc_client_ = createAsyncClientImpl(); + grpc_client_ = createAsyncClientImpl(envoy_grpc_max_recv_msg_length); break; case ClientType::GoogleGrpc: { grpc_client_ = createGoogleAsyncClientImpl(); @@ -321,7 +321,7 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest { // Create a Grpc::AsyncClientImpl instance backed by enough fake/mock // infrastructure to initiate a loopback TCP connection to fake_upstream_. - RawAsyncClientPtr createAsyncClientImpl() { + RawAsyncClientPtr createAsyncClientImpl(uint32_t envoy_grpc_max_recv_msg_length = 0) { client_connection_ = std::make_unique( *dispatcher_, fake_upstream_->localAddress(), nullptr, std::move(async_client_transport_socket_), nullptr, nullptr); @@ -347,6 +347,11 @@ class GrpcClientIntegrationTest : public GrpcClientIntegrationParamTest { .WillRepeatedly(ReturnRef(*http_async_client_)); envoy::config::core::v3::GrpcService config; config.mutable_envoy_grpc()->set_cluster_name("fake_cluster"); + if (envoy_grpc_max_recv_msg_length != 0) { + config.mutable_envoy_grpc()->mutable_max_receive_message_length()->set_value( + envoy_grpc_max_recv_msg_length); + } + fillServiceWideInitialMetadata(config); return std::make_unique(cm_, config, dispatcher_->timeSource()); } @@ -548,7 +553,7 @@ class GrpcSslClientIntegrationTest : public GrpcClientIntegrationTest { return config; } - void initialize() override { + void initialize(uint32_t envoy_grpc_max_recv_msg_length = 0) override { envoy::extensions::transport_sockets::tls::v3::UpstreamTlsContext tls_context; auto* common_tls_context = tls_context.mutable_common_tls_context(); auto* validation_context = common_tls_context->mutable_validation_context(); @@ -575,7 +580,7 @@ class GrpcSslClientIntegrationTest : public GrpcClientIntegrationTest { fake_upstream_ = std::make_unique(createUpstreamSslContext(), 0, ipVersion(), config); - GrpcClientIntegrationTest::initialize(); + GrpcClientIntegrationTest::initialize(envoy_grpc_max_recv_msg_length); } Network::DownstreamTransportSocketFactoryPtr createUpstreamSslContext() {