From e26457d0c976a5669aea35e3f97889c576324983 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 28 Aug 2023 13:26:31 -0700 Subject: [PATCH] stream: swallow Header errors as we used to; RecvMsg can still return it (#6591) --- stream.go | 7 ++++-- stream_test.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 2 deletions(-) create mode 100644 stream_test.go diff --git a/stream.go b/stream.go index 7e7f23445191..421a41f8854f 100644 --- a/stream.go +++ b/stream.go @@ -91,7 +91,9 @@ type Stream interface { // status package. type ClientStream interface { // Header returns the header metadata received from the server if there - // is any. It blocks if the metadata is not ready to read. + // is any. It blocks if the metadata is not ready to read. If the metadata + // is nil and the error is also nil, then the stream was terminated without + // headers, and the status can be discovered by calling RecvMsg. Header() (metadata.MD, error) // Trailer returns the trailer metadata from the server, if there is any. // It must only be called after stream.CloseAndRecv has returned, or @@ -802,7 +804,8 @@ func (cs *clientStream) Header() (metadata.MD, error) { if err != nil { cs.finish(err) - return nil, err + // Do not return the error. The user should get it by calling Recv(). + return nil, nil } if len(cs.binlogs) != 0 && !cs.serverHeaderBinlogged && m != nil { diff --git a/stream_test.go b/stream_test.go new file mode 100644 index 000000000000..7af066799c16 --- /dev/null +++ b/stream_test.go @@ -0,0 +1,67 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc_test + +import ( + "context" + "testing" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/interop/grpc_testing" + "google.golang.org/grpc/status" +) + +const defaultTestTimeout = 10 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +func (s) TestStream_Header_TrailersOnly(t *testing.T) { + ss := stubserver.StubServer{ + FullDuplexCallF: func(stream grpc_testing.TestService_FullDuplexCallServer) error { + return status.Errorf(codes.NotFound, "a test error") + }, + } + if err := ss.Start(nil); err != nil { + t.Fatal("Error starting server:", err) + } + defer ss.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + s, err := ss.Client.FullDuplexCall(ctx) + if err != nil { + t.Fatal("Error staring call", err) + } + if md, err := s.Header(); md != nil || err != nil { + t.Fatalf("s.Header() = %v, %v; want nil, nil", md, err) + } + if _, err := s.Recv(); status.Code(err) != codes.NotFound { + t.Fatalf("s.Recv() = _, %v; want _, err.Code()=codes.NotFound", err) + } +}