-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
only async trigger changes #8778
Changes from 9 commits
ef66dd0
704ddee
84560ab
0a1f586
ac177c6
a970ebd
eb6b8cc
b7e3d4c
770139f
f6d637f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,10 +148,6 @@ type State struct { | |
// Config is the agent config | ||
config Config | ||
|
||
// nodeInfoInSync tracks whether the server has our correct top-level | ||
// node information in sync | ||
nodeInfoInSync bool | ||
|
||
// Services tracks the local services | ||
services map[structs.ServiceID]*ServiceState | ||
|
||
|
@@ -900,11 +896,12 @@ func (l *State) updateSyncState() error { | |
l.Lock() | ||
defer l.Unlock() | ||
|
||
// Check if node info needs syncing | ||
// Sync node info if needed | ||
if svcNode == nil || svcNode.ID != l.config.NodeID || | ||
!reflect.DeepEqual(svcNode.TaggedAddresses, l.config.TaggedAddresses) || | ||
!reflect.DeepEqual(svcNode.Meta, l.metadata) { | ||
l.nodeInfoInSync = false | ||
|
||
l.syncNodeInfo() | ||
} | ||
|
||
// Check which services need syncing | ||
|
@@ -1039,19 +1036,6 @@ func (l *State) SyncChanges() error { | |
l.Lock() | ||
defer l.Unlock() | ||
|
||
// Sync the node level info if we need to. | ||
if l.nodeInfoInSync { | ||
l.logger.Debug("Node info in sync") | ||
} else { | ||
if err := l.syncNodeInfo(); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// We will do node-level info syncing at the end, since it will get | ||
// updated by a service or check sync anyway, given how the register | ||
// API works. | ||
|
||
// Sync the services | ||
// (logging happens in the helper methods) | ||
for id, s := range l.services { | ||
|
@@ -1225,7 +1209,6 @@ func (l *State) syncService(key structs.ServiceID) error { | |
Service: l.services[key].Service, | ||
EnterpriseMeta: key.EnterpriseMeta, | ||
WriteRequest: structs.WriteRequest{Token: st}, | ||
SkipNodeUpdate: l.nodeInfoInSync, | ||
} | ||
|
||
// Backwards-compatibility for Consul < 0.5 | ||
|
@@ -1240,9 +1223,6 @@ func (l *State) syncService(key structs.ServiceID) error { | |
switch { | ||
case err == nil: | ||
l.services[key].InSync = true | ||
// Given how the register API works, this info is also updated | ||
// every time we sync a service. | ||
l.nodeInfoInSync = true | ||
for _, check := range checks { | ||
checkKey := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta) | ||
l.checks[checkKey].InSync = true | ||
|
@@ -1286,7 +1266,6 @@ func (l *State) syncCheck(key structs.CheckID) error { | |
Check: c.Check, | ||
EnterpriseMeta: c.Check.EnterpriseMeta, | ||
WriteRequest: structs.WriteRequest{Token: ct}, | ||
SkipNodeUpdate: l.nodeInfoInSync, | ||
} | ||
|
||
serviceKey := structs.NewServiceID(c.Check.ServiceID, &key.EnterpriseMeta) | ||
|
@@ -1302,9 +1281,6 @@ func (l *State) syncCheck(key structs.CheckID) error { | |
switch { | ||
case err == nil: | ||
l.checks[key].InSync = true | ||
// Given how the register API works, this info is also updated | ||
// every time we sync a check. | ||
l.nodeInfoInSync = true | ||
l.logger.Info("Synced check", "check", key.String()) | ||
return nil | ||
|
||
|
@@ -1326,7 +1302,7 @@ func (l *State) syncCheck(key structs.CheckID) error { | |
} | ||
} | ||
|
||
func (l *State) syncNodeInfo() error { | ||
func (l *State) syncNodeInfo() { | ||
at := l.tokens.AgentToken() | ||
req := structs.RegisterRequest{ | ||
Datacenter: l.config.Datacenter, | ||
|
@@ -1341,22 +1317,17 @@ func (l *State) syncNodeInfo() error { | |
err := l.Delegate.RPC("Catalog.Register", &req, &out) | ||
switch { | ||
case err == nil: | ||
l.nodeInfoInSync = true | ||
l.logger.Info("Synced node info") | ||
return nil | ||
|
||
case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err): | ||
// todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync | ||
// todo(fs): some backoff strategy might be a better solution | ||
l.nodeInfoInSync = true | ||
accessorID := l.aclAccessorID(at) | ||
l.logger.Warn("Node info update blocked by ACLs", "node", l.config.NodeID, "accessorID", accessorID) | ||
metrics.IncrCounter([]string{"acl", "blocked", "node", "registration"}, 1) | ||
return nil | ||
|
||
default: | ||
l.logger.Warn("Syncing node info failed.", "error", err) | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What are the consequences of this function no longer returning an error? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on your comment I revisited this part and reverted to returning and handling any errors from |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -212,6 +212,16 @@ func TestAPI_HealthChecks(t *testing.T) { | |
t.Fatalf("err: %v", err) | ||
} | ||
|
||
retry.Run(t, func(r *retry.R) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wait for service to sync to catalog. |
||
services, _, err := c.Catalog().Services(nil) | ||
if err != nil { | ||
r.Fatal(err) | ||
} | ||
if _, ok := services["foo"]; !ok { | ||
r.Fatal("service foo not synced") | ||
} | ||
}) | ||
|
||
retry.Run(t, func(r *retry.R) { | ||
checks := HealthChecks{ | ||
&HealthCheck{ | ||
|
@@ -234,6 +244,7 @@ func TestAPI_HealthChecks(t *testing.T) { | |
if meta.LastIndex == 0 { | ||
r.Fatalf("bad: %v", meta) | ||
} | ||
require.NotEmpty(r, out) | ||
checks[0].CreateIndex = out[0].CreateIndex | ||
checks[0].ModifyIndex = out[0].ModifyIndex | ||
require.Equal(r, checks, out) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -250,7 +250,6 @@ func NewTestServerConfigT(t TestingTB, cb ServerConfigCallback) (*TestServer, er | |
return nil, errors.Wrap(err, "failed marshaling json") | ||
} | ||
|
||
t.Logf("CONFIG JSON: %s", string(b)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. remove debug. |
||
configFile := filepath.Join(tmpdir, "config.json") | ||
if err := ioutil.WriteFile(configFile, b, 0644); err != nil { | ||
cfg.ReturnPorts() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,7 @@ func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption | |
type waitOption struct { | ||
Token string | ||
WaitForAntiEntropySync bool | ||
WaitForService string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add ability to wait for a service because services are no longer synced immediately. |
||
} | ||
|
||
func WithToken(token string) waitOption { | ||
|
@@ -74,6 +75,10 @@ func WaitForAntiEntropySync() waitOption { | |
return waitOption{WaitForAntiEntropySync: true} | ||
} | ||
|
||
func WaitForService(service string) waitOption { | ||
return waitOption{WaitForService: service} | ||
} | ||
|
||
func flattenOptions(options []waitOption) waitOption { | ||
var flat waitOption | ||
for _, opt := range options { | ||
|
@@ -83,6 +88,9 @@ func flattenOptions(options []waitOption) waitOption { | |
if opt.WaitForAntiEntropySync { | ||
flat.WaitForAntiEntropySync = true | ||
} | ||
if opt.WaitForService != "" { | ||
flat.WaitForService = opt.WaitForService | ||
} | ||
} | ||
return flat | ||
} | ||
|
@@ -95,6 +103,7 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) | |
|
||
var nodes structs.IndexedNodes | ||
var checks structs.IndexedHealthChecks | ||
var services structs.IndexedServices | ||
|
||
retry.Run(t, func(r *retry.R) { | ||
dcReq := &structs.DCSpecificRequest{ | ||
|
@@ -134,6 +143,23 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) | |
if !found { | ||
r.Fatalf("serfHealth check not found") | ||
} | ||
|
||
if flat.WaitForService != "" { | ||
servicesReq := &structs.DCSpecificRequest{ | ||
Datacenter: dc, | ||
QueryOptions: structs.QueryOptions{Token: flat.Token}, | ||
} | ||
if err := rpc("Catalog.ListServices", servicesReq, &services); err != nil { | ||
r.Fatalf("Catalog.ListServices failed: %v", err) | ||
} | ||
if len(services.Services) == 0 { | ||
r.Fatalf("No registered services") | ||
} | ||
if _, ok := services.Services[flat.WaitForService]; !ok { | ||
r.Fatalf("service not found") | ||
} | ||
} | ||
|
||
}) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hopefully fixing a flaky test with this.