-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expand service config support #1165
Changes from 7 commits
50d4175
13b5f12
a0b902a
ad16b94
a66f923
f02290b
f1bb70f
6f8b553
fa29686
c6a3937
8788b75
cb02ab4
bab6b61
983d837
9c5f260
eaa9ccb
ecbc34a
ea230c7
3ea2870
d926544
59426b3
bdf9a64
35d77ea
4d2b4b5
7505481
504db8e
d19bbe8
ed64d51
27ae147
cb64938
4a7b4d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,7 +52,7 @@ import ( | |
// | ||
// TODO(zhaoq): Check whether the received message sequence is valid. | ||
// TODO ctx is used for stats collection and processing. It is the context passed from the application. | ||
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { | ||
func recvResponse(ctx context.Context, dopts dialOptions, msgSizeLimit int, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { | ||
// Try to acquire header metadata from the server if there is any. | ||
defer func() { | ||
if err != nil { | ||
|
@@ -73,7 +73,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran | |
} | ||
} | ||
for { | ||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, dopts.maxMsgSize, inPayload); err != nil { | ||
if err = recv(p, dopts.codec, stream, dopts.dc, reply, msgSizeLimit, inPayload); err != nil { | ||
if err == io.EOF { | ||
break | ||
} | ||
|
@@ -93,7 +93,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran | |
} | ||
|
||
// sendRequest writes out various information of an RPC such as Context and Message. | ||
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) { | ||
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, msgSizeLimit int, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) { | ||
stream, err := t.NewStream(ctx, callHdr) | ||
if err != nil { | ||
return nil, err | ||
|
@@ -122,6 +122,9 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, | |
if err != nil { | ||
return nil, Errorf(codes.Internal, "grpc: %v", err) | ||
} | ||
if len(outBuf) > msgSizeLimit { | ||
return nil, Errorf(codes.InvalidArgument, "Sent message larger than max (%d vs. %d)", len(outBuf), msgSizeLimit) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gRPC itself should not return code |
||
} | ||
err = t.Write(stream, outBuf, opts) | ||
if err == nil && outPayload != nil { | ||
outPayload.SentTime = time.Now() | ||
|
@@ -147,15 +150,49 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
return invoke(ctx, method, args, reply, cc, opts...) | ||
} | ||
|
||
const defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | ||
const defaultClientMaxSendMessageSize = 1024 * 1024 * 4 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move those to clientconn.go, around line 106? Since they are shared by Unary and Streaming. const (
defaultMaxReceiveMessageSize = 1024 * 1024 * 4
defaultMaxSendMessageSize = 1024 * 1024 * 4
) |
||
|
||
func min(a, b int) int { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to rpc_util.go |
||
if a < b { | ||
return a | ||
} | ||
return b | ||
} | ||
|
||
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { | ||
c := defaultCallInfo | ||
if mc, ok := cc.getMethodConfig(method); ok { | ||
c.failFast = !mc.WaitForReady | ||
if mc.Timeout > 0 { | ||
maxReceiveMessageSize := defaultClientMaxReceiveMessageSize | ||
maxSendMessageSize := defaultClientMaxSendMessageSize | ||
if mc, ok := cc.GetMethodConfig(method); ok { | ||
if mc.WaitForReady != nil { | ||
c.failFast = !*mc.WaitForReady | ||
} | ||
|
||
if mc.Timeout != nil && *mc.Timeout >= 0 { | ||
var cancel context.CancelFunc | ||
ctx, cancel = context.WithTimeout(ctx, mc.Timeout) | ||
ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) | ||
defer cancel() | ||
} | ||
|
||
if mc.MaxReqSize != nil && cc.dopts.maxSendMessageSize >= 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about something like this: func getMaxSize(mcMax *int, doptMax, default int) int {
if mcMax == nil && doptMax < 0 {
return default
}
if mcMax != nil && doptMax >= 0 {
return min(*mcMax, doptMax)
}
if mcMax != nil {
return *mcMax
}
return doptMax
} then call that to compute both send and receive limits? |
||
maxSendMessageSize = min(*mc.MaxReqSize, cc.dopts.maxSendMessageSize) | ||
} else if mc.MaxReqSize != nil { | ||
maxSendMessageSize = *mc.MaxReqSize | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What if |
||
|
||
if mc.MaxRespSize != nil && cc.dopts.maxReceiveMessageSize >= 0 { | ||
maxReceiveMessageSize = min(*mc.MaxRespSize, cc.dopts.maxReceiveMessageSize) | ||
} else if mc.MaxRespSize != nil { | ||
maxReceiveMessageSize = *mc.MaxRespSize | ||
} | ||
} else { | ||
if cc.dopts.maxSendMessageSize >= 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If cc.GetMethodConfig returned an empty MethodConfig (instead of ok=false), then this wouldn't need to be special-cased. There would be nils for these fields and the logic above would apply. Is that feasible? |
||
maxSendMessageSize = cc.dopts.maxSendMessageSize | ||
} | ||
if cc.dopts.maxReceiveMessageSize >= 0 { | ||
maxReceiveMessageSize = cc.dopts.maxReceiveMessageSize | ||
} | ||
} | ||
for _, o := range opts { | ||
if err := o.before(&c); err != nil { | ||
|
@@ -246,7 +283,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
if c.traceInfo.tr != nil { | ||
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) | ||
} | ||
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, callHdr, t, args, topts) | ||
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, maxSendMessageSize, callHdr, t, args, topts) | ||
if err != nil { | ||
if put != nil { | ||
put() | ||
|
@@ -263,7 +300,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli | |
} | ||
return toRPCErr(err) | ||
} | ||
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) | ||
err = recvResponse(ctx, cc.dopts, maxReceiveMessageSize, t, &c, stream, reply) | ||
if err != nil { | ||
if put != nil { | ||
put() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,7 @@ import ( | |
"fmt" | ||
"math" | ||
"net" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -86,30 +87,45 @@ var ( | |
// dialOptions configure a Dial call. dialOptions are set by the DialOption | ||
// values passed to Dial. | ||
type dialOptions struct { | ||
unaryInt UnaryClientInterceptor | ||
streamInt StreamClientInterceptor | ||
codec Codec | ||
cp Compressor | ||
dc Decompressor | ||
bs backoffStrategy | ||
balancer Balancer | ||
block bool | ||
insecure bool | ||
timeout time.Duration | ||
scChan <-chan ServiceConfig | ||
copts transport.ConnectOptions | ||
maxMsgSize int | ||
unaryInt UnaryClientInterceptor | ||
streamInt StreamClientInterceptor | ||
codec Codec | ||
cp Compressor | ||
dc Decompressor | ||
bs backoffStrategy | ||
balancer Balancer | ||
block bool | ||
insecure bool | ||
timeout time.Duration | ||
scChan <-chan ServiceConfig | ||
copts transport.ConnectOptions | ||
maxReceiveMessageSize int | ||
maxSendMessageSize int | ||
} | ||
|
||
const defaultClientMaxMsgSize = math.MaxInt32 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove this const if it's not used. |
||
|
||
// DialOption configures how we set up the connection. | ||
type DialOption func(*dialOptions) | ||
|
||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. | ||
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. This function is for backward API compatibility. It has essentially the same functionality as WithMaxReceiveMessageSize. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Deprecated: use WithMaxReceiveMessageSize instead. |
||
func WithMaxMsgSize(s int) DialOption { | ||
return func(o *dialOptions) { | ||
o.maxMsgSize = s | ||
o.maxReceiveMessageSize = s | ||
} | ||
} | ||
|
||
// WithMaxReceiveMessageSize returns a DialOption which sets the maximum message size the client can receive. Negative input is invalid and has the same effect as not setting the field. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mention the default value as the server side? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. server defaults has been moved from server.go to clientconn.go:109 |
||
func WithMaxReceiveMessageSize(s int) DialOption { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these be call options instead? I think the abstraction of restricting configuration that applies to calls to CallOptions and then providing a "default call option" as a dial option is nice. I.e. WithDefaultCallOptions(cos ...CallOption) DialOption {
return func (o *dialOptions) {
o.callOptions = append(o.callOptions, cos...)
}
} It makes it slightly more cumbersome to set it when dialing, but it also provides more flexibility. My recommendation is to add the above and then make WithMax{Send,Receive}MessageSize return CallOption instead of DialOption. The legacy WithMaxMsgSize could still be provided to avoid an API breaking change, but it could be implemented by |
||
return func(o *dialOptions) { | ||
o.maxReceiveMessageSize = s | ||
} | ||
} | ||
|
||
// WithMaxSendMessageSize returns a DialOption which sets the maximum message size the client can send. Negative input is invalid and has the same effect as not seeting the field. | ||
func WithMaxSendMessageSize(s int) DialOption { | ||
return func(o *dialOptions) { | ||
o.maxSendMessageSize = s | ||
} | ||
} | ||
|
||
|
@@ -305,7 +321,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | |
conns: make(map[Address]*addrConn), | ||
} | ||
cc.ctx, cc.cancel = context.WithCancel(context.Background()) | ||
cc.dopts.maxMsgSize = defaultClientMaxMsgSize | ||
|
||
// initialize maxReceiveMessageSize and maxSendMessageSize to -1 before applying DialOption functions to distinguish whether the user set the message limit or not. | ||
cc.dopts.maxReceiveMessageSize = -1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably make those pointers, too... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree; nil pointers are a better signal of not-set than magic values. |
||
cc.dopts.maxSendMessageSize = -1 | ||
|
||
for _, opt := range opts { | ||
opt(&cc.dopts) | ||
} | ||
|
@@ -336,14 +356,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * | |
}() | ||
|
||
if cc.dopts.scChan != nil { | ||
// Wait for the initial service config. | ||
// Try to get an initial service config. | ||
select { | ||
case sc, ok := <-cc.dopts.scChan: | ||
if ok { | ||
cc.sc = sc | ||
} | ||
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
default: | ||
} | ||
} | ||
// Set defaults. | ||
|
@@ -610,11 +629,16 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) | |
return nil | ||
} | ||
|
||
// GetMethodConfig gets the method config of the input method. If there's no exact match for the input method (i.e. /service/method), we will return the default config for all methods under the service (/service/). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also add the comment about this behavior on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: please wrap comment lines to ~80 cols. |
||
// TODO: Avoid the locking here. | ||
func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { | ||
func (cc *ClientConn) GetMethodConfig(method string) (m MethodConfig, ok bool) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can safely remove the ", ok" and document that this will return an empty MethodConfig if one cannot be found for the method OR the service. |
||
cc.mu.RLock() | ||
defer cc.mu.RUnlock() | ||
m, ok = cc.sc.Methods[method] | ||
if !ok { | ||
i := strings.LastIndex(method, "/") | ||
m, ok = cc.sc.Methods[method[:i+1]] | ||
} | ||
return | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -241,7 +241,7 @@ type parser struct { | |
// No other error values or types must be returned, which also means | ||
// that the underlying io.Reader must not return an incompatible | ||
// error. | ||
func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err error) { | ||
func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) { | ||
if _, err := io.ReadFull(p.r, p.header[:]); err != nil { | ||
return 0, nil, err | ||
} | ||
|
@@ -252,8 +252,8 @@ func (p *parser) recvMsg(maxMsgSize int) (pf payloadFormat, msg []byte, err erro | |
if length == 0 { | ||
return pf, nil, nil | ||
} | ||
if length > uint32(maxMsgSize) { | ||
return 0, nil, Errorf(codes.Internal, "grpc: received message length %d exceeding the max size %d", length, maxMsgSize) | ||
if length > uint32(maxReceiveMessageSize) { | ||
return 0, nil, Errorf(codes.InvalidArgument, "grpc: Received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) | ||
} | ||
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead | ||
// of making it for each message: | ||
|
@@ -337,8 +337,8 @@ func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) er | |
return nil | ||
} | ||
|
||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxMsgSize int, inPayload *stats.InPayload) error { | ||
pf, d, err := p.recvMsg(maxMsgSize) | ||
func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error { | ||
pf, d, err := p.recvMsg(maxReceiveMessageSize) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -354,10 +354,10 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{ | |
return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) | ||
} | ||
} | ||
if len(d) > maxMsgSize { | ||
if len(d) > maxReceiveMessageSize { | ||
// TODO: Revisit the error code. Currently keep it consistent with java | ||
// implementation. | ||
return Errorf(codes.Internal, "grpc: received a message of %d bytes exceeding %d limit", len(d), maxMsgSize) | ||
return Errorf(codes.InvalidArgument, "grpc: Received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this is |
||
} | ||
if err := c.Unmarshal(d, m); err != nil { | ||
return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) | ||
|
@@ -459,24 +459,22 @@ type MethodConfig struct { | |
// WaitForReady indicates whether RPCs sent to this method should wait until | ||
// the connection is ready by default (!failfast). The value specified via the | ||
// gRPC client API will override the value set here. | ||
WaitForReady bool | ||
WaitForReady *bool | ||
// Timeout is the default timeout for RPCs sent to this method. The actual | ||
// deadline used will be the minimum of the value specified here and the value | ||
// set by the application via the gRPC client API. If either one is not set, | ||
// then the other will be used. If neither is set, then the RPC has no deadline. | ||
Timeout time.Duration | ||
Timeout *time.Duration | ||
// MaxReqSize is the maximum allowed payload size for an individual request in a | ||
// stream (client->server) in bytes. The size which is measured is the serialized | ||
// payload after per-message compression (but before stream compression) in bytes. | ||
// The actual value used is the minumum of the value specified here and the value set | ||
// by the application via the gRPC client API. If either one is not set, then the other | ||
// will be used. If neither is set, then the built-in default is used. | ||
// TODO: support this. | ||
MaxReqSize uint32 | ||
MaxReqSize *int | ||
// MaxRespSize is the maximum allowed payload size for an individual response in a | ||
// stream (server->client) in bytes. | ||
// TODO: support this. | ||
MaxRespSize uint32 | ||
MaxRespSize *int | ||
} | ||
|
||
// ServiceConfig is provided by the service provider and contains parameters for how | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing the method config instead so that we don't need to continue extending the parameter list here.