Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: Refactor decode method #32676

Merged
merged 19 commits into from
Mar 29, 2024
7 changes: 6 additions & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,16 @@ void AsyncStreamImpl::onHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_s

void AsyncStreamImpl::onData(Buffer::Instance& data, bool end_stream) {
decoded_frames_.clear();
if (!decoder_.decode(data, decoded_frames_)) {
if (decoder_.decode(data, decoded_frames_).code() == absl::StatusCode::kInternal) {
streamError(Status::WellKnownGrpcStatus::Internal);
return;
}

if (decoder_.decode(data, decoded_frames_).code() == absl::StatusCode::kResourceExhausted) {
tyxia marked this conversation as resolved.
Show resolved Hide resolved
streamError(Status::WellKnownGrpcStatus::ResourceExhausted);
tyxia marked this conversation as resolved.
Show resolved Hide resolved
return;
}

for (auto& frame : decoded_frames_) {
if (frame.length_ > 0 && frame.flags_ != GRPC_FH_DEFAULT) {
streamError(Status::WellKnownGrpcStatus::Internal);
Expand Down
12 changes: 8 additions & 4 deletions source/common/grpc/codec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ void Encoder::prependFrameHeader(uint8_t flags, Buffer::Instance& buffer, uint32
buffer.prepend(frame_buffer);
}

bool Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) {
absl::Status Decoder::decode(Buffer::Instance& input, std::vector<Frame>& output) {
// Make sure those flags are set to initial state.
decoding_error_ = false;
is_frame_oversized_ = false;
output_ = &output;
inspect(input);
output_ = nullptr;

if (decoding_error_ || is_frame_oversized_) {
return false;
if (decoding_error_) {
return absl::InternalError("Grpc decoding error");
tyxia marked this conversation as resolved.
Show resolved Hide resolved
}

if (is_frame_oversized_) {
return absl::ResourceExhaustedError("Grpc frame length exceeds limit");
}

input.drain(input.length());
return true;
return absl::OkStatus();
}

bool Decoder::frameStart(uint8_t flags) {
Expand Down
4 changes: 2 additions & 2 deletions source/common/grpc/codec.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ class Decoder : public FrameInspector {
// error happened, the input buffer remains unchanged.
// @param input supplies the binary octets wrapped in a GRPC data frame.
// @param output supplies the buffer to store the decoded data.
// @return bool whether the decoding succeeded or not.
bool decode(Buffer::Instance& input, std::vector<Frame>& output);
// @return absl::status whether the decoding succeeded or not.
absl::Status decode(Buffer::Instance& input, std::vector<Frame>& output);

// Determine the length of the current frame being decoded. This is useful when supplying a
// partial frame to decode() and wanting to know how many more bytes need to be read to complete
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ void JsonTranscoderFilter::maybeSendHttpBodyRequestMessage(Buffer::Instance* dat
bool JsonTranscoderFilter::buildResponseFromHttpBodyOutput(
Http::ResponseHeaderMap& response_headers, Buffer::Instance& data) {
std::vector<Grpc::Frame> frames;
decoder_.decode(data, frames);
std::ignore = decoder_.decode(data, frames);
if (frames.empty()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ Http::FilterDataStatus GrpcWebFilter::encodeData(Buffer::Instance& data, bool en
// The decoder always consumes and drains the given buffer. Incomplete data frame is buffered
// inside the decoder.
std::vector<Grpc::Frame> frames;
decoder_.decode(data, frames);
std::ignore = decoder_.decode(data, frames);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these turn into TODOs or bugs?

Copy link
Member Author

@tyxia tyxia Mar 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODOs is probably better?

I am hoping to leave it to filter owner to make the call and do the change accordingly @TAOXUY @nareddyt @lizan

if (frames.empty()) {
// We don't have enough data to decode for one single frame, stop iteration until more data
// comes in.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void GrpcHealthCheckerImpl::GrpcActiveHealthCheckSession::decodeData(Buffer::Ins
}
// We should end up with only one frame here.
std::vector<Grpc::Frame> decoded_frames;
if (!decoder_.decode(data, decoded_frames)) {
if (!decoder_.decode(data, decoded_frames).ok()) {
onRpcComplete(Grpc::Status::WellKnownGrpcStatus::Internal, "gRPC wire protocol decode error",
false);
return;
Expand Down
4 changes: 2 additions & 2 deletions test/common/grpc/codec_fuzz_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ DEFINE_FUZZER(const uint8_t* buf, size_t len) {
: provider.ConsumeIntegralInRange<uint64_t>(0, wire_buffer.length());
Buffer::OwnedImpl decode_buffer;
decode_buffer.move(wire_buffer, decode_length);
const bool decode_result = decoder.decode(decode_buffer, frames);
const absl::Status decode_result = decoder.decode(decode_buffer, frames);
// If we have recovered the original frames, we're decoding garbage. It
// might end up being a valid frame, but there is no predictability, so just
// drain and move on. If we haven't recovered the original frames, we
// shouldn't have any errors and should be consuming all of decode_buffer.
if (frames.size() >= num_encode_frames) {
decode_buffer.drain(decode_buffer.length());
} else {
FUZZ_ASSERT(decode_result);
FUZZ_ASSERT(decode_result.ok());
FUZZ_ASSERT(decode_buffer.length() == 0);
}
}
Expand Down
22 changes: 11 additions & 11 deletions test/common/grpc/codec_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ TEST(GrpcCodecTest, decodeIncompleteFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(static_cast<size_t>(0), frames.size());
EXPECT_EQ(static_cast<uint32_t>(request.ByteSize()), decoder.length());
EXPECT_EQ(true, decoder.hasBufferedData());

buffer.add(request_buffer.c_str() + 5);
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(static_cast<size_t>(1), frames.size());
EXPECT_EQ(static_cast<uint32_t>(0), decoder.length());
Expand All @@ -102,7 +102,7 @@ TEST(GrpcCodecTest, decodeInvalidFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_FALSE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(size, buffer.length());
}

Expand All @@ -117,7 +117,7 @@ TEST(GrpcCodecTest, DecodeMultipleFramesInvalid) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_FALSE(decoder.decode(buffer, frames).ok());
// When the decoder doesn't successfully decode, it puts decoded frames up until
// an invalid frame into output frame vector.
EXPECT_EQ(1, frames.size());
Expand Down Expand Up @@ -146,7 +146,7 @@ TEST(GrpcCodecTest, DecodeValidFrameWithInvalidFrameAfterward) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_FALSE(decoder.decode(buffer, frames).ok());
// When the decoder doesn't successfully decode, it puts valid frames up until
// an invalid frame into output frame vector.
EXPECT_EQ(1, frames.size());
Expand All @@ -162,7 +162,7 @@ TEST(GrpcCodecTest, decodeEmptyFrame) {

Decoder decoder;
std::vector<Frame> frames;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());

EXPECT_EQ(1, frames.size());
EXPECT_EQ(0, frames[0].length_);
Expand All @@ -181,7 +181,7 @@ TEST(GrpcCodecTest, decodeSingleFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(frames.size(), static_cast<uint64_t>(1));
EXPECT_EQ(GRPC_FH_DEFAULT, frames[0].flags_);
Expand All @@ -208,7 +208,7 @@ TEST(GrpcCodecTest, decodeMultipleFrame) {

std::vector<Frame> frames;
Decoder decoder;
EXPECT_TRUE(decoder.decode(buffer, frames));
EXPECT_TRUE(decoder.decode(buffer, frames).ok());
EXPECT_EQ(static_cast<size_t>(0), buffer.length());
EXPECT_EQ(frames.size(), static_cast<uint64_t>(1009));
for (Frame& frame : frames) {
Expand Down Expand Up @@ -240,7 +240,7 @@ TEST(GrpcCodecTest, decodeSingleFrameOverLimit) {
decoder.setMaxFrameLength(32 * 1024);

// The decoder doesn't successfully decode due to oversized frame.
EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kResourceExhausted);
EXPECT_EQ(buffer.length(), size);
}

Expand Down Expand Up @@ -275,7 +275,7 @@ TEST(GrpcCodecTest, decodeSingleFrameWithMultiBuffersOverLimit) {

// Both decoding attempts failed due to the total frame size exceeding the limit.
for (uint32_t i = 0; i < buffers.size(); ++i) {
EXPECT_FALSE(decoder.decode(buffers[i], frames));
EXPECT_EQ(decoder.decode(buffers[i], frames).code(), absl::StatusCode::kResourceExhausted);
}

EXPECT_EQ(frames.size(), 0);
Expand Down Expand Up @@ -309,7 +309,7 @@ TEST(GrpcCodecTest, decodeMultipleFramesOverLimit) {
Decoder decoder;
decoder.setMaxFrameLength(32 * 1024);

EXPECT_FALSE(decoder.decode(buffer, frames));
EXPECT_EQ(decoder.decode(buffer, frames).code(), absl::StatusCode::kResourceExhausted);
// When the decoder doesn't successfully decode, it puts valid frames up until
// an oversized frame into output frame vector.
ASSERT_EQ(frames.size(), 1);
Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/health_checker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5041,7 +5041,7 @@ class GrpcHealthCheckerImplTestBase : public Event::TestUsingSimulatedTime,
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
std::vector<Grpc::Frame> decoded_frames;
Grpc::Decoder decoder;
ASSERT_TRUE(decoder.decode(data, decoded_frames));
ASSERT_TRUE(decoder.decode(data, decoded_frames).ok());
ASSERT_EQ(1U, decoded_frames.size());
auto& frame = decoded_frames[0];
Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
Expand Down Expand Up @@ -5225,7 +5225,7 @@ TEST_F(GrpcHealthCheckerImplTest, SuccessWithAdditionalHeaders) {
.WillOnce(Invoke([&](Buffer::Instance& data, bool) {
std::vector<Grpc::Frame> decoded_frames;
Grpc::Decoder decoder;
ASSERT_TRUE(decoder.decode(data, decoded_frames));
ASSERT_TRUE(decoder.decode(data, decoded_frames).ok());
ASSERT_EQ(1U, decoded_frames.size());
auto& frame = decoded_frames[0];
Buffer::ZeroCopyInputStreamImpl stream(std::move(frame.data_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void checkSerializedData(Envoy::Buffer::Instance& data,
std::vector<MessageType> expected_requests) {
::Envoy::Grpc::Decoder grpc_decoder;
std::vector<::Envoy::Grpc::Frame> frames_after_processing;
ASSERT_TRUE(grpc_decoder.decode(data, frames_after_processing));
ASSERT_TRUE(grpc_decoder.decode(data, frames_after_processing).ok());

ASSERT_EQ(expected_requests.size(), frames_after_processing.size());
for (unsigned long i = 0; i < frames_after_processing.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ TEST_F(ReverseBridgeTest, GrpcRequest) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -376,7 +376,7 @@ TEST_F(ReverseBridgeTest, GrpcRequestNoContentLength) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -509,7 +509,7 @@ TEST_F(ReverseBridgeTest, GrpcRequestInternalError) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -722,7 +722,7 @@ TEST_F(ReverseBridgeTest, FilterConfigPerRouteEnabled) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(1, frames.size());
EXPECT_EQ(12, frames[0].length_);
Expand Down Expand Up @@ -799,7 +799,7 @@ TEST_F(ReverseBridgeTest, RouteWithTrailers) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);

EXPECT_EQ(4, trailers.size());
EXPECT_EQ(1, frames.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ name: grpc_json_transcoder
if (!expected_grpc_request_messages.empty()) {
Grpc::Decoder grpc_decoder;
std::vector<Grpc::Frame> frames;
ASSERT_TRUE(grpc_decoder.decode(upstream_request_->body(), frames)) << dump;
ASSERT_TRUE(grpc_decoder.decode(upstream_request_->body(), frames).ok()) << dump;
EXPECT_EQ(expected_grpc_request_messages.size(), frames.size());

for (size_t i = 0; i < expected_grpc_request_messages.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPost) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(request_data, frames);
std::ignore = decoder.decode(request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -642,7 +642,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithPackageServiceMetho

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(request_data, frames);
std::ignore = decoder.decode(request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -704,7 +704,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(*request_data, frames);
std::ignore = decoder.decode(*request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -737,7 +737,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, ForwardUnaryPostGrpc) {
EXPECT_EQ(Http::FilterDataStatus::Continue, filter_.encodeData(*response_data, true));

frames.clear();
decoder.decode(*response_data, frames);
std::ignore = decoder.decode(*response_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down Expand Up @@ -1089,7 +1089,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingUnaryPostWithHttpBody) {
// decodeData with EOS will output the grpc frame.
std::vector<Grpc::Frame> frames;
Grpc::Decoder decoder;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);
ASSERT_EQ(frames.size(), 1);

bookstore::EchoBodyRequest expected_request;
Expand Down Expand Up @@ -1197,7 +1197,7 @@ TEST_F(GrpcJsonTranscoderFilterTest, TranscodingStreamPostWithHttpBody) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(buffer, frames);
std::ignore = decoder.decode(buffer, frames);
EXPECT_EQ(frames.size(), 1);

bookstore::EchoBodyRequest expected_request;
Expand Down Expand Up @@ -1828,7 +1828,7 @@ TEST_P(GrpcJsonTranscoderFilterUnescapeTest, UnescapeSpec) {

Grpc::Decoder decoder;
std::vector<Grpc::Frame> frames;
decoder.decode(request_data, frames);
std::ignore = decoder.decode(request_data, frames);

EXPECT_EQ(1, frames.size());

Expand Down
4 changes: 2 additions & 2 deletions test/integration/fake_upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class FakeStream : public Http::RequestDecoder,
{
absl::MutexLock lock(&lock_);
last_body_size = body_.length();
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
return testing::AssertionFailure()
<< "Couldn't decode gRPC data frame: " << body_.toString();
}
Expand All @@ -210,7 +210,7 @@ class FakeStream : public Http::RequestDecoder,
}
{
absl::MutexLock lock(&lock_);
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
return testing::AssertionFailure()
<< "Couldn't decode gRPC data frame: " << body_.toString();
}
Expand Down
Loading