-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expand service config support #1165
Changes from 29 commits
50d4175
13b5f12
a0b902a
ad16b94
a66f923
f02290b
f1bb70f
6f8b553
fa29686
c6a3937
8788b75
cb02ab4
bab6b61
983d837
9c5f260
eaa9ccb
ecbc34a
ea230c7
3ea2870
d926544
59426b3
bdf9a64
35d77ea
4d2b4b5
7505481
504db8e
d19bbe8
ed64d51
27ae147
cb64938
4a7b4d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,7 +73,10 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran | |
} | ||
} | ||
for { | ||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil { | ||
if c.maxReceiveMessageSize == nil { | ||
return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | ||
} | ||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { | ||
if err == io.EOF { | ||
break | ||
} | ||
|
@@ -93,7 +96,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran | |
} | ||
|
||
// sendRequest writes out various information of an RPC such as Context and Message. | ||
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { | ||
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { | ||
defer func() { | ||
if err != nil { | ||
// If err is connection error, t will be closed, no need to close stream here. | ||
|
@@ -118,6 +121,12 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |
if err != nil { | ||
return Errorf(codes.Internal, "grpc: %v", err) | ||
} | ||
if c.maxSendMessageSize == nil { | ||
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
} | ||
if len(outBuf) > *c.maxSendMessageSize { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if c.maxSendMessageSize != nil && ... ? Or a getter that handles nil and returns the default? Otherwise this could be a nil pointer dereference. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ideally should be pushed into encode() instead. Otherwise, we will still be allocating more memory than desired. encode() gets the size before it allocates the buffer, so we can safe the work of encoding the object if we check it there after we get the size. |
||
return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize) | ||
} | ||
err = t.Write(stream, outBuf, opts) | ||
if err == nil && outPayload != nil { | ||
outPayload.SentTime = time.Now() | ||
|
@@ -145,14 +154,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
|
||
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { | ||
c := defaultCallInfo | ||
if mc, ok := cc.getMethodConfig(method); ok { | ||
c.failFast = !mc.WaitForReady | ||
if mc.Timeout > 0 { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, mc.Timeout) | ||
defer cancel() | ||
} | ||
mc := cc.GetMethodConfig(method) | ||
if mc.WaitForReady != nil { | ||
c.failFast = !*mc.WaitForReady | ||
} | ||
|
||
if mc.Timeout != nil && *mc.Timeout >= 0 { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) | ||
defer cancel() | ||
} | ||
|
||
opts = append(cc.dopts.callOptions, opts...) | ||
for _, o := range opts { | ||
if err := o.before(&c); err != nil { | ||
return toRPCErr(err) | ||
|
@@ -163,6 +176,10 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
o.after(&c) | ||
} | ||
}() | ||
|
||
c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) | ||
c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | ||
|
||
if EnableTracing { | ||
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) | ||
defer c.traceInfo.tr.Finish() | ||
|
@@ -260,7 +277,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
} | ||
return toRPCErr(err) | ||
} | ||
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, stream, t, args, topts) | ||
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts) | ||
if err != nil { | ||
if put != nil { | ||
updateRPCInfoInContext(ctx, rpcInfo{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,12 +137,14 @@ func (d *gzipDecompressor) Type() string { | |
|
||
// callInfo contains all related configuration and information about an RPC. | ||
type callInfo struct { | ||
failFast bool | ||
headerMD metadata.MD | ||
trailerMD metadata.MD | ||
peer *peer.Peer | ||
traceInfo traceInfo // in trace.go | ||
creds credentials.PerRPCCredentials | ||
failFast bool | ||
headerMD metadata.MD | ||
trailerMD metadata.MD | ||
peer *peer.Peer | ||
traceInfo traceInfo // in trace.go | ||
maxReceiveMessageSize *int | ||
maxSendMessageSize *int | ||
creds credentials.PerRPCCredentials | ||
} | ||
|
||
var defaultCallInfo = callInfo{failFast: true} | ||
|
@@ -217,6 +219,22 @@ func FailFast(failFast bool) CallOption { | |
}) | ||
} | ||
|
||
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive. | ||
func MaxCallRecvMsgSize(s int) CallOption { | ||
return beforeCall(func(o *callInfo) error { | ||
o.maxReceiveMessageSize = &s | ||
return nil | ||
}) | ||
} | ||
|
||
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send. | ||
func MaxCallSendMsgSize(s int) CallOption { | ||
return beforeCall(func(o *callInfo) error { | ||
o.maxSendMessageSize = &s | ||
return nil | ||
}) | ||
} | ||
|
||
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials | ||
// for a call. | ||
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { | ||
|
@@ -259,7 +277,7 @@ type parser struct { | |
// No other error values or types must be returned, which also means | ||
// that the underlying io.Reader must not return an incompatible | ||
// error. | ||
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) { | ||
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) { | ||
if _, err := io.ReadFull(p.r, p.header[:]); err != nil { | ||
return 0, nil, err | ||
} | ||
|
@@ -270,8 +288,8 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro | |
if length == 0 { | ||
return pf, nil, nil | ||
} | ||
if length > uint32(maxMsgSize) { | ||
return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize) | ||
if length > uint32(maxReceiveMessageSize) { | ||
return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) | ||
} | ||
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead | ||
// of making it for each message: | ||
|
@@ -314,7 +332,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl | |
length = uint(len(b)) | ||
} | ||
if length > math.MaxUint32 { | ||
return nil, Errorf(codes.InvalidArgument, "grpc: message too large (%d bytes)", length) | ||
return nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", length) | ||
} | ||
|
||
const ( | ||
|
@@ -355,8 +373,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er | |
return nil | ||
} | ||
|
||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error { | ||
pf, d, err := p.recvMsg(maxMsgSize) | ||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error { | ||
pf, d, err := p.recvMsg(maxReceiveMessageSize) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -372,10 +390,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ | |
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) | ||
} | ||
} | ||
if len(d) > maxMsgSize { | ||
if len(d) > maxReceiveMessageSize { | ||
// TODO: Revisit the error code. Currently keep it consistent with java | ||
// implementation. | ||
return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize) | ||
return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) | ||
} | ||
if err := c.Unmarshal(d, m); err != nil { | ||
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) | ||
|
@@ -501,24 +519,22 @@ type MethodConfig struct { | |
// WaitForReady indicates whether RPCs sent to this method should wait until | ||
// the connection is ready by default (!failfast). The value specified via the | ||
// gRPC client API will override the value set here. | ||
WaitForReady bool | ||
WaitForReady *bool | ||
// Timeout is the default timeout for RPCs sent to this method. The actual | ||
// deadline used will be the minimum of the value specified here and the value | ||
// set by the application via the gRPC client API. If either one is not set, | ||
// then the other will be used. If neither is set, then the RPC has no deadline. | ||
Timeout time.Duration | ||
Timeout *time.Duration | ||
// MaxReqSize is the maximum allowed payload size for an individual request in a | ||
// stream (client->server) in bytes. The size which is measured is the serialized | ||
// payload after per-message compression (but before stream compression) in bytes. | ||
// The actual value used is the minumum of the value specified here and the value set | ||
// by the application via the gRPC client API. If either one is not set, then the other | ||
// will be used. If neither is set, then the built-in default is used. | ||
// TODO: support this. | ||
MaxReqSize uint32 | ||
MaxReqSize *int | ||
// MaxRespSize is the maximum allowed payload size for an individual response in a | ||
// stream (server->client) in bytes. | ||
// TODO: support this. | ||
MaxRespSize uint32 | ||
MaxRespSize *int | ||
} | ||
|
||
// ServiceConfig is provided by the service provider and contains parameters for how | ||
|
@@ -529,9 +545,32 @@ type ServiceConfig struct { | |
// via grpc.WithBalancer will override this. | ||
LB Balancer | ||
// Methods contains a map for the methods in this service. | ||
// If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig. | ||
// If there's no exact match, look for the default config for all methods under the service (/service/) and use the corresponding MethodConfig. | ||
// Otherwise, the method has no MethodConfig to use. | ||
Methods map[string]MethodConfig | ||
} | ||
|
||
func min(a, b *int) *int { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: move these two new functions before the |
||
if *a < *b { | ||
return a | ||
} | ||
return b | ||
} | ||
|
||
func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { | ||
if mcMax == nil && doptMax == nil { | ||
return &defaultVal | ||
} | ||
if mcMax != nil && doptMax != nil { | ||
return min(mcMax, doptMax) | ||
} | ||
if mcMax != nil { | ||
return mcMax | ||
} | ||
return doptMax | ||
} | ||
|
||
// SupportPackageIsVersion4 is referenced from generated protocol buffer files | ||
// to assert that that code is compatible with this version of the grpc package. | ||
// | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass "c" instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recv() is also used by serverStream