Skip to content

Commit

Permalink
Add discardResponseMessage option for gRPC client (#3820)
Browse files Browse the repository at this point in the history
Now gRPC unary RPCs respects `discardResponseMessage` option. When the option is used the client still fetches the response body but it doesn't serialize the expected content as JSON. It removes a step that would take significant CPU cycles.

Note that unfortunately, gRPC doesn't support discarding completely the response body so it is resolved using emptypb.Empty. This is the reason why the response body is still loaded in memory.
  • Loading branch information
lzakharov authored Aug 21, 2024
1 parent 441f4d6 commit eaa5419
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 17 deletions.
13 changes: 7 additions & 6 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,13 @@ func (c *Client) buildInvokeRequest(
p.SetSystemTags(state, c.addr, method)

return grpcext.InvokeRequest{
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
DiscardResponseMessage: p.DiscardResponseMessage,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
Metadata: p.Metadata,
}, nil
}

Expand Down
69 changes: 69 additions & 0 deletions js/modules/k6/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,19 @@ func TestClient(t *testing.T) {
client.invoke("grpc.testing.TestService/EmptyCall", {}, { timeout: 2000 })`,
},
},
{
name: "InvokeDiscardResponseMessage",
initString: codeBlock{
code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`,
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.invoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true })`,
},
},
{
name: "Invoke",
initString: codeBlock{code: `
Expand All @@ -333,6 +346,32 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "InvokeDiscardResponseMessage",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
var resp = client.invoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true })
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
if (resp.message !== null) {
throw new Error("unexpected message: " + JSON.stringify(resp.message))
}`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "AsyncInvoke",
initString: codeBlock{code: `
Expand Down Expand Up @@ -360,6 +399,36 @@ func TestClient(t *testing.T) {
},
},
},
{
name: "AsyncInvokeDiscardResponseMessage",
initString: codeBlock{code: `
var client = new grpc.Client();
client.load([], "../../../../lib/testutils/httpmultibin/grpc_testing/test.proto");`},
setup: func(tb *httpmultibin.HTTPMultiBin) {
tb.GRPCStub.EmptyCallFunc = func(context.Context, *grpc_testing.Empty) (*grpc_testing.Empty, error) {
return &grpc_testing.Empty{}, nil
}
},
vuString: codeBlock{
code: `
client.connect("GRPCBIN_ADDR");
client.asyncInvoke("grpc.testing.TestService/EmptyCall", {}, { discardResponseMessage: true }).then(function(resp) {
if (resp.status !== grpc.StatusOK) {
throw new Error("unexpected error: " + JSON.stringify(resp.error) + "or status: " + resp.status)
}
if (resp.message !== null) {
throw new Error("unexpected message: " + JSON.stringify(resp.message))
}
}, (err) => {
throw new Error("unexpected error: " + err)
})
`,
asserts: func(t *testing.T, rb *httpmultibin.HTTPMultiBin, samples chan metrics.SampleContainer, _ error) {
samplesBuf := metrics.GetBufferedSamples(samples)
assertMetricEmitted(t, metrics.GRPCReqDurationName, samplesBuf, rb.Replacer.Replace("GRPCBIN_ADDR/grpc.testing.TestService/EmptyCall"))
},
},
},
{
name: "InvokeAnyProto",
initString: codeBlock{code: `
Expand Down
9 changes: 6 additions & 3 deletions js/modules/k6/grpc/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import (
// callParams is the parameters that can be passed to a gRPC calls
// like invoke or newStream.
type callParams struct {
Metadata metadata.MD
TagsAndMeta metrics.TagsAndMeta
Timeout time.Duration
Metadata metadata.MD
TagsAndMeta metrics.TagsAndMeta
Timeout time.Duration
DiscardResponseMessage bool
}

// newCallParams constructs the call parameters from the input value.
Expand Down Expand Up @@ -58,6 +59,8 @@ func newCallParams(vu modules.VU, input sobek.Value) (*callParams, error) {
if err != nil {
return result, fmt.Errorf("invalid timeout value: %w", err)
}
case "discardResponseMessage":
result.DiscardResponseMessage = params.Get(k).ToBoolean()
default:
return result, fmt.Errorf("unknown param: %q", k)
}
Expand Down
41 changes: 41 additions & 0 deletions js/modules/k6/grpc/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,47 @@ func TestCallParamsTimeOutParse(t *testing.T) {
}
}

func TestCallParamsDiscardResponseMessageParse(t *testing.T) {
t.Parallel()

testCases := []struct {
Name string
JSON string
DiscardResponseMessage bool
}{
{
Name: "Empty",
JSON: `{}`,
DiscardResponseMessage: false,
},
{
Name: "DiscardResponseMessageFalse",
JSON: `{ discardResponseMessage: false }`,
DiscardResponseMessage: false,
},
{
Name: "DiscardResponseMessageTrue",
JSON: `{ discardResponseMessage: true }`,
DiscardResponseMessage: true,
},
}

for _, tc := range testCases {
tc := tc

t.Run(tc.Name, func(t *testing.T) {
t.Parallel()

testRuntime, params := newParamsTestRuntime(t, tc.JSON)

p, err := newCallParams(testRuntime.VU, params)
require.NoError(t, err)

assert.Equal(t, tc.DiscardResponseMessage, p.DiscardResponseMessage)
})
}
}

// newParamsTestRuntime creates a new test runtime
// that could be used to test the params
// it also moves to the VU context and creates the params
Expand Down
24 changes: 16 additions & 8 deletions lib/netext/grpcext/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
"google.golang.org/protobuf/types/known/emptypb"
)

// InvokeRequest represents a unary gRPC request.
type InvokeRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Message []byte
Metadata metadata.MD
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
DiscardResponseMessage bool
Message []byte
Metadata metadata.MD
}

// InvokeResponse represents a gRPC response.
Expand Down Expand Up @@ -133,7 +135,13 @@ func (c *Conn) Invoke(

ctx = withRPCState(ctx, &rpcState{tagsAndMeta: req.TagsAndMeta})

resp := dynamicpb.NewMessage(req.MethodDescriptor.Output())
var resp *dynamicpb.Message
if req.DiscardResponseMessage {
resp = dynamicpb.NewMessage((&emptypb.Empty{}).ProtoReflect().Descriptor())
} else {
resp = dynamicpb.NewMessage(req.MethodDescriptor.Output())
}

header, trailer := metadata.New(nil), metadata.New(nil)

copts := make([]grpc.CallOption, 0, len(opts)+2)
Expand Down Expand Up @@ -165,7 +173,7 @@ func (c *Conn) Invoke(
response.Error = errMsg
}

if resp != nil {
if resp != nil && !req.DiscardResponseMessage {
msg, err := convert(marshaler, resp)
if err != nil {
return nil, fmt.Errorf("unable to convert response object to JSON: %w", err)
Expand Down
22 changes: 22 additions & 0 deletions lib/netext/grpcext/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,28 @@ func TestInvokeWithCallOptions(t *testing.T) {
assert.NotNil(t, res)
}

func TestInvokeWithDiscardResponseMessage(t *testing.T) {
t.Parallel()

reply := func(_, _ *dynamicpb.Message, opts ...grpc.CallOption) error {
assert.Len(t, opts, 3) // two by default plus one injected
return nil
}

c := Conn{raw: invokemock(reply)}
r := InvokeRequest{
Method: "/hello.HelloService/NoOp",
MethodDescriptor: methodFromProto("NoOp"),
DiscardResponseMessage: true,
Message: []byte(`{}`),
Metadata: metadata.New(nil),
}
res, err := c.Invoke(context.Background(), r, grpc.UseCompressor("fakeone"))
require.NoError(t, err)
assert.NotNil(t, res)
assert.Nil(t, res.Message)
}

func TestInvokeReturnError(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit eaa5419

Please sign in to comment.