Skip to content

Commit

Permalink
grpc: Refactor decode method (#32676)
Browse files Browse the repository at this point in the history
This PR is to improve the error status/code

In PR #32511, we introduce a max_frame_length feature (optional) . Now gRPC frame decoding can fail EITHER (1) due to decoding error OR (2) due to over-frame-limit error.

To better surface the error message, this PR refactor return type from bool to absl::status , so that the caller site can differentiate the error status. source/common/grpc/async_client_impl.cc in this PR can be an user example

Risk level: Low
Testing: Unit tests

Signed-off-by: tyxia <tyxia@google.com>
  • Loading branch information
tyxia authored Mar 29, 2024
1 parent 279e469 commit 77458ea
Show file tree
Hide file tree
Showing 14 changed files with 55 additions and 41 deletions.
12 changes: 11 additions & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,17 @@ void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_s

void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {
decoded_frames_.clear();
if (!decoder_.decode(data, decoded_frames_)) {
auto status = decoder_.decode(data, decoded_frames_);

// decode() currently only returns two types of error:
// - decoding error is mapped to ResourceExhausted
// - over-limit error is mapped to Internal.
// Other potential errors in the future are mapped to internal for now.
if (status.code() == absl::StatusCode::kResourceExhausted) {
streamError(Status::WellKnownGrpcStatus::ResourceExhausted);
return;
}
if (status.code() != absl::StatusCode::kOk) {
streamError(Status::WellKnownGrpcStatus::Internal);
return;
}
Expand Down
12 changes: 8 additions & 4 deletions source/common/grpc/codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ void Encoder::prependFrameHeader(uint8_t flags, Buffer::Instance& buffer, uint32
buffer.prepend(frame_buffer);
}

bool Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) {
absl::Status Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) {
// Make sure those flags are set to initial state.
decoding_error_ = false;
is_frame_oversized_ = false;
output_ = &output;
inspect(input);
output_ = nullptr;

if (decoding_error_ || is_frame_oversized_) {
return false;
if (decoding_error_) {
return absl::InternalError("Grpc decoding error");
}

if (is_frame_oversized_) {
return absl::ResourceExhaustedError("Grpc frame length exceeds limit");
}

input.drain(input.length());
return true;
return absl::OkStatus();
}

bool Decoder::frameStart(uint8_t flags) {
Expand Down
4 changes: 2 additions & 2 deletions source/common/grpc/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class Decoder : public FrameInspector {
// error happened, the input buffer remains unchanged.
// @param input supplies the binary octets wrapped in a GRPC data frame.
// @param output supplies the buffer to store the decoded data.
// @return bool whether the decoding succeeded or not.
bool decode(Buffer::Instance& input, std::vector<Frame>& output);
// @return absl::status whether the decoding succeeded or not.
absl::Status decode(Buffer::Instance& input, std::vector<Frame>& output);

// Determine the length of the current frame being decoded. This is useful when supplying a
// partial frame to decode() and wanting to know how many more bytes need to be read to complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ void JsonTranscoderFilter::maybeSendHttpBodyRequestMessage(Buffer::Instance* dat
bool JsonTranscoderFilter::buildResponseFromHttpBodyOutput(
Http::ResponseHeaderMap& response_headers, Buffer::Instance& data) {
std::vector<Grpc::Frame> frames;
decoder_.decode(data, frames);
std::ignore = decoder_.decode(data, frames);
if (frames.empty()) {
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/grpc_web/grpc_web_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ Http::FilterDataStatus GrpcWebFilter::encodeData(Buffer::Instance& data, bool en
// The decoder always consumes and drains the given buffer. Incomplete data frame is buffered
// inside the decoder.
std::vector<Grpc::Frame> frames;
decoder_.decode(data, frames);
std::ignore = decoder_.decode(data, frames);
if (frames.empty()) {
// We don't have enough data to decode for one single frame, stop iteration until more data
// comes in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Ins
}
// We should end up with only one frame here.
std::vector<Grpc::Frame> decoded_frames;
if (!decoder_.decode(data, decoded_frames)) {
if (!decoder_.decode(data, decoded_frames).ok()) {
onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "gRPC wire protocol decode error",
false);
return;
Expand Down
4 changes: 2 additions & 2 deletions test/common/grpc/codec_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ DEFINE_FUZZER(const uint8_t* buf, size_t len) {
: provider.ConsumeIntegralInRange<uint64_t>(0, wire_buffer.length());
Buffer::OwnedImpl decode_buffer;
decode_buffer.move(wire_buffer, decode_length);
const bool decode_result = decoder.decode(decode_buffer, frames);
const absl::Status decode_result = decoder.decode(decode_buffer, frames);
// If we have recovered the original frames, we're decoding garbage. It
// might end up being a valid frame, but there is no predictability, so just
// drain and move on. If we haven't recovered the original frames, we
// shouldn't have any errors and should be consuming all of decode_buffer.
if (frames.size() >= num_encode_frames) {
decode_buffer.drain(decode_buffer.length());
} else {
FUZZ_ASSERT(decode_result);
FUZZ_ASSERT(decode_result.ok());
FUZZ_ASSERT(decode_buffer.length() == 0);
}
}
Expand Down
22 changes: 11 additions & 11 deletions test/common/grpc/codec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ TEST(GrpcCodecTest, decodeIncompleteFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(static_cast<size_t>(0), frames.size());
EXPECT_EQ(static_cast<uint32_t>(request.ByteSize()), decoder.length());
EXPECT_EQ(true, decoder.hasBufferedData());

buffer.add(request_buffer.c_str() + 5);
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(static_cast<size_t>(1), frames.size());
EXPECT_EQ(static_cast<uint32_t>(0), decoder.length());
Expand All @@ -102,7 +102,7 @@ TEST(GrpcCodecTest, decodeInvalidFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kInternal);
EXPECT_EQ(size, buffer.length());
}

Expand All @@ -117,7 +117,7 @@ TEST(GrpcCodecTest, DecodeMultipleFramesInvalid) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kInternal);
// When the decoder doesn't successfully decode, it puts decoded frames up until
// an invalid frame into output frame vector.
EXPECT_EQ(1, frames.size());
Expand Down Expand Up @@ -146,7 +146,7 @@ TEST(GrpcCodecTest, DecodeValidFrameWithInvalidFrameAfterward) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kInternal);
// When the decoder doesn't successfully decode, it puts valid frames up until
// an invalid frame into output frame vector.
EXPECT_EQ(1, frames.size());
Expand All @@ -162,7 +162,7 @@ TEST(GrpcCodecTest, decodeEmptyFrame) {

Decoder decoder;
std::vector<Frame> frames;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());

EXPECT_EQ(1, frames.size());
EXPECT_EQ(0, frames[0].length_);
Expand All @@ -181,7 +181,7 @@ TEST(GrpcCodecTest, decodeSingleFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(frames.size(), static_cast<uint64_t>(1));
EXPECT_EQ(GRPC_FH_DEFAULT, frames[0].flags_);
Expand All @@ -208,7 +208,7 @@ TEST(GrpcCodecTest, decodeMultipleFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(frames.size(), static_cast<uint64_t>(1009));
for (Frame& frame : frames) {
Expand Down Expand Up @@ -240,7 +240,7 @@ TEST(GrpcCodecTest, decodeSingleFrameOverLimit) {
decoder.setMaxFrameLength(32 * 1024);

// The decoder doesn't successfully decode due to oversized frame.
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kResourceExhausted);
EXPECT_EQ(buffer.length(), size);
}

Expand Down Expand Up @@ -275,7 +275,7 @@ TEST(GrpcCodecTest, decodeSingleFrameWithMultiBuffersOverLimit) {

// Both decoding attempts failed due to the total frame size exceeding the limit.
for (uint32_t i = 0; i < buffers.size(); ++i) {
EXPECT_FALSE(decoder.decode(buffers[i], frames));
EXPECT_EQ(decoder.decode(buffers[i], frames).code(), absl::StatusCode::kResourceExhausted);
}

EXPECT_EQ(frames.size(), 0);
Expand Down Expand Up @@ -309,7 +309,7 @@ TEST(GrpcCodecTest, decodeMultipleFramesOverLimit) {
Decoder decoder;
decoder.setMaxFrameLength(32 * 1024);

EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kResourceExhausted);
// When the decoder doesn't successfully decode, it puts valid frames up until
// an oversized frame into output frame vector.
ASSERT_EQ(frames.size(), 1);
Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5139,7 +5139,7 @@ class GrpcHealthCheckerImplTestBase : public Event::TestUsingSimulatedTime,
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
std::vector<Grpc::Frame> decoded_frames;
Grpc::Decoder decoder;
ASSERT_TRUE(decoder.decode(data, decoded_frames));
ASSERT_TRUE(decoder.decode(data, decoded_frames).ok());
ASSERT_EQ(1U, decoded_frames.size());
auto& frame = decoded_frames[0];
Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
Expand Down Expand Up @@ -5323,7 +5323,7 @@ TEST_F(GrpcHealthCheckerImplTest, SuccessWithAdditionalHeaders) {
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
std::vector<Grpc::Frame> decoded_frames;
Grpc::Decoder decoder;
ASSERT_TRUE(decoder.decode(data, decoded_frames));
ASSERT_TRUE(decoder.decode(data, decoded_frames).ok());
ASSERT_EQ(1U, decoded_frames.size());
auto& frame = decoded_frames[0];
Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void checkSerializedData(Envoy::Buffer::Instance& data,
std::vector<MessageType> expected_requests) {
::Envoy::Grpc::Decoder grpc_decoder;
std::vector<::Envoy::Grpc::Frame> frames_after_processing;
ASSERT_TRUE(grpc_decoder.decode(data, frames_after_processing));
ASSERT_TRUE(grpc_decoder.decode(data, frames_after_processing).ok());

ASSERT_EQ(expected_requests.size(), frames_after_processing.size());
for (unsigned long i = 0; i < frames_after_processing.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ TEST_F(ReverseBridgeTest, GrpcRequest) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -376,7 +376,7 @@ TEST_F(ReverseBridgeTest, GrpcRequestNoContentLength) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -509,7 +509,7 @@ TEST_F(ReverseBridgeTest, GrpcRequestInternalError) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -722,7 +722,7 @@ TEST_F(ReverseBridgeTest, FilterConfigPerRouteEnabled) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -799,7 +799,7 @@ TEST_F(ReverseBridgeTest, RouteWithTrailers) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(4, trailers.size());
EXPECT_EQ(1, frames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ name: grpc_json_transcoder
if (!expected_grpc_request_messages.empty()) {
Grpc::Decoder grpc_decoder;
std::vector<Grpc::Frame> frames;
ASSERT_TRUE(grpc_decoder.decode(upstream_request_->body(), frames)) << dump;
ASSERT_TRUE(grpc_decoder.decode(upstream_request_->body(), frames).ok()) << dump;
EXPECT_EQ(expected_grpc_request_messages.size(), frames.size());

for (size_t i = 0; i < expected_grpc_request_messages.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPost) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(request_data, frames);
std::ignore = decoder.decode(request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -642,7 +642,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithPackageServiceMetho

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(request_data, frames);
std::ignore = decoder.decode(request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -704,7 +704,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(*request_data, frames);
std::ignore = decoder.decode(*request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -737,7 +737,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) {
EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(*response_data, true));

frames.clear();
decoder.decode(*response_data, frames);
std::ignore = decoder.decode(*response_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -1089,7 +1089,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithHttpBody) {
// decodeData with EOS will output the grpc frame.
std::vector<Grpc::Frame> frames;
Grpc::Decoder decoder;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);
ASSERT_EQ(frames.size(), 1);

bookstore::EchoBodyRequest expected_request;
Expand Down Expand Up @@ -1197,7 +1197,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamPostWithHttpBody) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);
EXPECT_EQ(frames.size(), 1);

bookstore::EchoBodyRequest expected_request;
Expand Down Expand Up @@ -1828,7 +1828,7 @@ TEST_P(GrpcJsonTranscoderFilterUnescapeTest, UnescapeSpec) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(request_data, frames);
std::ignore = decoder.decode(request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down
4 changes: 2 additions & 2 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class FakeStream : public Http::RequestDecoder,
{
absl::MutexLock lock(&lock_);
last_body_size = body_.length();
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
return testing::AssertionFailure()
<< "Couldn't decode gRPC data frame: " << body_.toString();
}
Expand All @@ -210,7 +210,7 @@ class FakeStream : public Http::RequestDecoder,
}
{
absl::MutexLock lock(&lock_);
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
return testing::AssertionFailure()
<< "Couldn't decode gRPC data frame: " << body_.toString();
}
Expand Down

0 comments on commit 77458ea

Please sign in to comment.