Skip to content

Commit

Permalink
Add discardResponseMessage option for gRPC streams
Browse files Browse the repository at this point in the history
  • Loading branch information
lzakharov committed Jul 24, 2024
1 parent e9c3c94 commit 64c38bd
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 19 deletions.
9 changes: 5 additions & 4 deletions js/modules/k6/grpc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ func defineStream(rt *sobek.Runtime, s *stream) {

func (s *stream) beginStream(p *callParams) error {
req := &grpcext.StreamRequest{
Method: s.method,
MethodDescriptor: s.methodDescriptor,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
Method: s.method,
MethodDescriptor: s.methodDescriptor,
DiscardResponseMessage: p.DiscardResponseMessage,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
}

ctx := s.vu.Context()
Expand Down
85 changes: 85 additions & 0 deletions js/modules/k6/grpc/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,91 @@ func TestStream_ReceiveAllServerResponsesAfterEnd(t *testing.T) {
)
}

func TestStream_ReceiveAllServerResponsesAfterEndWithDiscardedMessages(t *testing.T) {
t.Parallel()

ts := newTestState(t)

stub := &featureExplorerStub{}

savedFeatures := []*grpcservice.Feature{
{
Name: "foo",
Location: &grpcservice.Point{
Latitude: 1,
Longitude: 2,
},
},
{
Name: "bar",
Location: &grpcservice.Point{
Latitude: 3,
Longitude: 4,
},
},
}

stub.listFeatures = func(_ *grpcservice.Rectangle, stream grpcservice.FeatureExplorer_ListFeaturesServer) error {
for _, feature := range savedFeatures {
// adding a delay to make server response "slower"
time.Sleep(200 * time.Millisecond)

if err := stream.Send(feature); err != nil {
return err
}
}

return nil
}

grpcservice.RegisterFeatureExplorerServer(ts.httpBin.ServerGRPC, stub)

initString := codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/grpcservice/route_guide.proto");`,
}
vuString := codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
let stream = new grpc.Stream(client, "main.FeatureExplorer/ListFeatures", { discardResponseMessage: true })
stream.on('data', function (data) {
call('Data: ' + JSON.stringify(data));
});
stream.on('end', function () {
call('End called');
});
stream.write({
lo: {
latitude: 1,
longitude: 2,
},
hi: {
latitude: 1,
longitude: 2,
},
});
stream.end();
`,
}

val, err := ts.Run(initString.code)
assertResponse(t, initString, err, val, ts)

ts.ToVUContext()

val, err = ts.RunOnEventLoop(vuString.code)

assertResponse(t, vuString, err, val, ts)

assert.Equal(t, []string{
"Data: {}",
"Data: {}",
"End called",
}, ts.callRecorder.Recorded())
}

// featureExplorerStub is a stub for FeatureExplorerServer
// it has ability to override methods
type featureExplorerStub struct {
Expand Down
18 changes: 10 additions & 8 deletions lib/netext/grpcext/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,12 @@ type InvokeResponse struct {

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Metadata metadata.MD
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
DiscardResponseMessage bool
TagsAndMeta *metrics.TagsAndMeta
Metadata metadata.MD
}

type clientConnCloser interface {
Expand Down Expand Up @@ -204,9 +205,10 @@ func (c *Conn) NewStream(
}

return &Stream{
raw: stream,
method: req.Method,
methodDescriptor: req.MethodDescriptor,
raw: stream,
method: req.Method,
methodDescriptor: req.MethodDescriptor,
discardResponseMessage: req.DiscardResponseMessage,
}, nil
}

Expand Down
25 changes: 18 additions & 7 deletions lib/netext/grpcext/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/emptypb"
)

// Stream is the wrapper around the grpc.ClientStream
// with some handy methods.
type Stream struct {
method string
methodDescriptor protoreflect.MethodDescriptor
raw grpc.ClientStream
marshaler protojson.MarshalOptions
method string
methodDescriptor protoreflect.MethodDescriptor
discardResponseMessage bool
raw grpc.ClientStream
marshaler protojson.MarshalOptions
}

// ErrCanceled canceled by client (k6)
Expand All @@ -35,6 +37,10 @@ func (s *Stream) ReceiveConverted() (interface{}, error) {
return nil, err
}

if s.discardResponseMessage {
return struct{}{}, err
}

msg, errConv := convert(s.marshaler, raw)
if errConv != nil {
return nil, errConv
Expand All @@ -43,9 +49,14 @@ func (s *Stream) ReceiveConverted() (interface{}, error) {
return msg, err
}

func (s *Stream) receive() (*dynamicpb.Message, error) {
msg := dynamicpb.NewMessage(s.methodDescriptor.Output())
err := s.raw.RecvMsg(msg)
func (s *Stream) receive() (msg *dynamicpb.Message, err error) {
if s.discardResponseMessage {
msg = dynamicpb.NewMessage((&emptypb.Empty{}).ProtoReflect().Descriptor())
} else {
msg = dynamicpb.NewMessage(s.methodDescriptor.Output())
}

err = s.raw.RecvMsg(msg)

// io.EOF means that the stream has been closed successfully
if err == nil || errors.Is(err, io.EOF) {
Expand Down

0 comments on commit 64c38bd

Please sign in to comment.