-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lrs: handle multiple clusters in LRS stream #3935
Conversation
d5de61e
to
985f6e5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't added much comments. But based on our discussion offline to have the lrsClient
contain all LRS functionality, I will wait for that change to be made before adding other comments. Thanks.
a7dd78c
to
232759f
Compare
|
if err != nil { | ||
// An error from a non-blocking dial indicates something serious. | ||
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err) | ||
return func() {} | ||
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something better than just throwing a log when the Dial
fails. This seems to be irrecoverable at this point, because even if there is another call to ReportLoad
we wont retry the Dial
since the refCount
will be non-zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just panic... If this non-blocking dial fails, there's no way the LRS will keep working...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm ... but we are only throwing an Info
log here. Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to add panic...
What I meant was, this should never happen (for real????). But we still Info
just in case..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. All fixed. PTAL.
loadWrapper *loadStoreWrapper | ||
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's | ||
// returned by the client. | ||
loadOriginal *load.Store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startLoadReport
is only called when the LRS server changes.
We keep this around for cases where only the EDS service name changes (so we don't restart LRS, but we get a new PerClusterReport with the new service name).
if err != nil { | ||
// An error from a non-blocking dial indicates something serious. | ||
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err) | ||
return func() {} | ||
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just panic... If this non-blocking dial fails, there's no way the LRS will keep working...
This comment has been minimized.
This comment has been minimized.
loadWrapper *loadStoreWrapper | ||
// loadOriginal is the load.Store for reporting loads to lrsServerName. It's | ||
// returned by the client. | ||
loadOriginal *load.Store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could instead cache the load.Store
returned by the call to xdsclient.ReportLoad()
in the loadWrapper
itself. And split the update()
method into two: setStore()
and setServiceName()
which will called appropriately when handling the update. What do you think about this approach?
} | ||
} | ||
|
||
if updateLoadStore { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a copy-paste gotcha. You have the same conditional statement twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This whole if statement is deleted.
} | ||
|
||
if attr == nil { | ||
return fmt.Errorf("failed to get xdsClient from attributes: attributes is nil") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these errors be prefixed with xds:
or lrs:
. I'm a little confused about the policy for these error message. I guess we dont have to do that for log statements since the prefix logger takes care of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added lrs:
.
The caller (which is the implementation of the balancer interface) could wrap. But that will be only to add the prefix, and seems to be not worth it. I added it here.
if err != nil { | ||
// An error from a non-blocking dial indicates something serious. | ||
c.logger.Infof("xds: failed to dial load report server {%s}: %v", server, err) | ||
return func() {} | ||
lrsC.parent.logger.Infof("xds: failed to dial load report server {%s}: %v", lrsC.server, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm ... but we are only throwing an Info
log here. Am I missing something?
// Report to the same address should not create new ClientConn. | ||
store1, lrsCancel1 := xdsC.ReportLoad(fs.Address) | ||
defer lrsCancel1() | ||
ctx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please create a different context here, so that we can use the context with the defaultTestTimeout
deadline for other things which need it.
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if u, err := fs.NewConnChan.Receive(sCtx); err != context.DeadlineExceeded {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
// Report to a different address should create new ClientConn. | ||
store2, lrsCancel2 := xdsC.ReportLoad(fs2.Address) | ||
defer lrsCancel2() | ||
ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we do the above, we wont need to reinitialize the context with the defaultTestTimeout
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ClusterServiceName: "eds", | ||
TotalDroppedRequests: 1, | ||
DroppedRequests: []*endpointpb.ClusterStats_DroppedRequests{{Category: "test", DroppedCount: 1}}, | ||
}); !proto.Equal(want, receivedLoad[0]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be possible to use protocmp.Diff
or whatever is the equivalent in the protobuf package and pass it to cmp.Diff
so that we can print a useful error message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
xds/internal/client/client_test.go
Outdated
@@ -115,7 +115,7 @@ func (c *testAPIClient) RemoveWatch(resourceType ResourceType, resourceName stri | |||
c.removeWatches[resourceType].Send(resourceName) | |||
} | |||
|
|||
func (c *testAPIClient) ReportLoad(ctx context.Context, cc *grpc.ClientConn, opts LoadReportingOptions) { | |||
func (c *testAPIClient) reportLoad(_ context.Context, _ *grpc.ClientConn, _ loadReportingOptions) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The underscores could be removed I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
dec9231
to
13f8352
Compare
@@ -177,7 +177,9 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { | |||
return | |||
} | |||
|
|||
x.client.handleUpdate(cfg, u.ResolverState.Attributes) | |||
if err := x.client.handleUpdate(cfg, u.ResolverState.Attributes); err != nil { | |||
x.logger.Infof("failed to update xds clients: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Errorf
or Warningf
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to warning
func (lsw *loadStoreWrapper) CallStarted(locality string) { | ||
lsw.mu.RLock() | ||
defer lsw.mu.RUnlock() | ||
if lsw.perCluster == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The perClusterStore
type in package load
already checks for nil
. Why is this required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot.
Deleted.
} | ||
|
||
var dopts []grpc.DialOption | ||
if dialer := c.bbo.Dialer; dialer != nil { | ||
dopts = []grpc.DialOption{grpc.WithContextDialer(dialer)} | ||
} | ||
|
||
// TODO: there's no long a need to read bootstrap file and create a new xds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/long/longer/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
func (lsw *loadStoreWrapper) CallStarted(locality string) { | ||
lsw.mu.RLock() | ||
defer lsw.mu.RUnlock() | ||
if lsw.perCluster == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment about nil
checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
- xdsclient.ReportLoad - create one LRS stream and one load.Store for each server address - to take just the server address, and return the load.Store - update EDS and LRS balancing policy to recreate LRS stream only when server address changes - LRS stream (v2 and v3) - set feature `send_all_clusters` in `node` - handle `resp.Clusters` and only send load for those clusters - handle `resp.SendAllClusters` and send load for all clusters
13f8352
to
e199a5a
Compare
send_all_clusters
innode
resp.Clusters
and only send load for those clustersresp.SendAllClusters
and send load for all clusters