Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only async trigger changes #8778

Closed
wants to merge 10 commits into from
29 changes: 13 additions & 16 deletions agent/agent_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,15 +507,6 @@ func (s *HTTPHandlers) AgentForceLeave(resp http.ResponseWriter, req *http.Reque
return nil, s.agent.ForceLeave(addr, prune)
}

// syncChanges is a helper function which wraps a blocking call to sync
// services and checks to the server. If the operation fails, we only
// only warn because the write did succeed and anti-entropy will sync later.
func (s *HTTPHandlers) syncChanges() {
if err := s.agent.State.SyncChanges(); err != nil {
s.agent.logger.Error("failed to sync changes", "error", err)
}
}

func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var token string
s.parseToken(req, &token)
Expand Down Expand Up @@ -581,7 +572,8 @@ func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Re
if err := s.agent.AddCheck(health, chkType, true, token, ConfigSourceRemote); err != nil {
return nil, err
}
s.syncChanges()

s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down Expand Up @@ -610,7 +602,8 @@ func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http.
if err := s.agent.RemoveCheck(checkID, true); err != nil {
return nil, err
}
s.syncChanges()

s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down Expand Up @@ -698,7 +691,8 @@ func (s *HTTPHandlers) agentCheckUpdate(_resp http.ResponseWriter, req *http.Req
if err := s.agent.updateTTLCheck(cid, status, output); err != nil {
return nil, err
}
s.syncChanges()

s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down Expand Up @@ -1003,7 +997,8 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
}
}
}
s.syncChanges()

s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down Expand Up @@ -1033,7 +1028,7 @@ func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *htt
return nil, err
}

s.syncChanges()
s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down Expand Up @@ -1096,7 +1091,8 @@ func (s *HTTPHandlers) AgentServiceMaintenance(resp http.ResponseWriter, req *ht
return nil, nil
}
}
s.syncChanges()

s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down Expand Up @@ -1133,7 +1129,8 @@ func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http.
} else {
s.agent.DisableNodeMaintenance()
}
s.syncChanges()

s.agent.State.TriggerSyncChanges()
return nil, nil
}

Expand Down
37 changes: 4 additions & 33 deletions agent/local/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ type State struct {
// Config is the agent config
config Config

// nodeInfoInSync tracks whether the server has our correct top-level
// node information in sync
nodeInfoInSync bool

// Services tracks the local services
services map[structs.ServiceID]*ServiceState

Expand Down Expand Up @@ -900,11 +896,12 @@ func (l *State) updateSyncState() error {
l.Lock()
defer l.Unlock()

// Check if node info needs syncing
// Sync node info if needed
if svcNode == nil || svcNode.ID != l.config.NodeID ||
!reflect.DeepEqual(svcNode.TaggedAddresses, l.config.TaggedAddresses) ||
!reflect.DeepEqual(svcNode.Meta, l.metadata) {
l.nodeInfoInSync = false

l.syncNodeInfo()
}

// Check which services need syncing
Expand Down Expand Up @@ -1039,19 +1036,6 @@ func (l *State) SyncChanges() error {
l.Lock()
defer l.Unlock()

// Sync the node level info if we need to.
if l.nodeInfoInSync {
l.logger.Debug("Node info in sync")
} else {
if err := l.syncNodeInfo(); err != nil {
return err
}
}

// We will do node-level info syncing at the end, since it will get
// updated by a service or check sync anyway, given how the register
// API works.

// Sync the services
// (logging happens in the helper methods)
for id, s := range l.services {
Expand Down Expand Up @@ -1225,7 +1209,6 @@ func (l *State) syncService(key structs.ServiceID) error {
Service: l.services[key].Service,
EnterpriseMeta: key.EnterpriseMeta,
WriteRequest: structs.WriteRequest{Token: st},
SkipNodeUpdate: l.nodeInfoInSync,
}

// Backwards-compatibility for Consul < 0.5
Expand All @@ -1240,9 +1223,6 @@ func (l *State) syncService(key structs.ServiceID) error {
switch {
case err == nil:
l.services[key].InSync = true
// Given how the register API works, this info is also updated
// every time we sync a service.
l.nodeInfoInSync = true
for _, check := range checks {
checkKey := structs.NewCheckID(check.CheckID, &check.EnterpriseMeta)
l.checks[checkKey].InSync = true
Expand Down Expand Up @@ -1286,7 +1266,6 @@ func (l *State) syncCheck(key structs.CheckID) error {
Check: c.Check,
EnterpriseMeta: c.Check.EnterpriseMeta,
WriteRequest: structs.WriteRequest{Token: ct},
SkipNodeUpdate: l.nodeInfoInSync,
}

serviceKey := structs.NewServiceID(c.Check.ServiceID, &key.EnterpriseMeta)
Expand All @@ -1302,9 +1281,6 @@ func (l *State) syncCheck(key structs.CheckID) error {
switch {
case err == nil:
l.checks[key].InSync = true
// Given how the register API works, this info is also updated
// every time we sync a check.
l.nodeInfoInSync = true
l.logger.Info("Synced check", "check", key.String())
return nil

Expand All @@ -1326,7 +1302,7 @@ func (l *State) syncCheck(key structs.CheckID) error {
}
}

func (l *State) syncNodeInfo() error {
func (l *State) syncNodeInfo() {
at := l.tokens.AgentToken()
req := structs.RegisterRequest{
Datacenter: l.config.Datacenter,
Expand All @@ -1341,22 +1317,17 @@ func (l *State) syncNodeInfo() error {
err := l.Delegate.RPC("Catalog.Register", &req, &out)
switch {
case err == nil:
l.nodeInfoInSync = true
l.logger.Info("Synced node info")
return nil

case acl.IsErrPermissionDenied(err), acl.IsErrNotFound(err):
// todo(fs): mark the node info to be in sync to prevent excessive retrying before next full sync
// todo(fs): some backoff strategy might be a better solution
l.nodeInfoInSync = true
accessorID := l.aclAccessorID(at)
l.logger.Warn("Node info update blocked by ACLs", "node", l.config.NodeID, "accessorID", accessorID)
metrics.IncrCounter([]string{"acl", "blocked", "node", "registration"}, 1)
return nil

default:
l.logger.Warn("Syncing node info failed.", "error", err)
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the consequences of this function no longer returning an error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on your comment I revisited this part and reverted to returning and handling any errors from syncNodeInfo. I also added a longer explanation for these changes to the PR description.

}
}

Expand Down
11 changes: 11 additions & 0 deletions api/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ func TestAPI_HealthChecks(t *testing.T) {
t.Fatalf("err: %v", err)
}

retry.Run(t, func(r *retry.R) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for service to sync to catalog. testrpc cannot be imported here.

services, _, err := c.Catalog().Services(nil)
if err != nil {
r.Fatal(err)
}
if _, ok := services["foo"]; !ok {
r.Fatal("service foo not synced")
}
})

retry.Run(t, func(r *retry.R) {
checks := HealthChecks{
&HealthCheck{
Expand All @@ -234,6 +244,7 @@ func TestAPI_HealthChecks(t *testing.T) {
if meta.LastIndex == 0 {
r.Fatalf("bad: %v", meta)
}
require.NotEmpty(r, out)
checks[0].CreateIndex = out[0].CreateIndex
checks[0].ModifyIndex = out[0].ModifyIndex
require.Equal(r, checks, out)
Expand Down
2 changes: 2 additions & 0 deletions command/catalog/list/services/catalog_list_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func TestCatalogListServicesCommand(t *testing.T) {
t.Fatal(err)
}

testrpc.WaitForTestAgent(t, a.RPC, "dc1", testrpc.WaitForService("testing"))

t.Run("simple", func(t *testing.T) {
ui := cli.NewMockUi()
c := New(ui)
Expand Down
3 changes: 3 additions & 0 deletions connect/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -102,6 +103,8 @@ func TestConsulResolver_Resolve(t *testing.T) {
agent.Config.AdvertiseAddrLAN.String() + ":9091",
}

testrpc.WaitForTestAgent(t, agent.RPC, "dc1", testrpc.WaitForService("db"))

type fields struct {
Namespace string
Name string
Expand Down
1 change: 0 additions & 1 deletion sdk/testutil/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func NewTestServerConfigT(t TestingTB, cb ServerConfigCallback) (*TestServer, er
return nil, errors.Wrap(err, "failed marshaling json")
}

t.Logf("CONFIG JSON: %s", string(b))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove debug.

configFile := filepath.Join(tmpdir, "config.json")
if err := ioutil.WriteFile(configFile, b, 0644); err != nil {
cfg.ReturnPorts()
Expand Down
30 changes: 30 additions & 0 deletions testrpc/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func WaitUntilNoLeader(t *testing.T, rpc rpcFn, dc string, options ...waitOption
type waitOption struct {
Token string
WaitForAntiEntropySync bool
WaitForService string
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add ability to wait for a service because services are no longer synced immediately.

}

func WithToken(token string) waitOption {
Expand All @@ -74,6 +75,10 @@ func WaitForAntiEntropySync() waitOption {
return waitOption{WaitForAntiEntropySync: true}
}

func WaitForService(service string) waitOption {
return waitOption{WaitForService: service}
}

func flattenOptions(options []waitOption) waitOption {
var flat waitOption
for _, opt := range options {
Expand All @@ -83,10 +88,17 @@ func flattenOptions(options []waitOption) waitOption {
if opt.WaitForAntiEntropySync {
flat.WaitForAntiEntropySync = true
}
if opt.WaitForService != "" {
flat.WaitForService = opt.WaitForService
}
}
return flat
}

func WaitForSyncedTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
t.Helper()
}

// WaitForTestAgent ensures we have a node with serfHealth check registered
func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption) {
t.Helper()
Expand All @@ -95,6 +107,7 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption)

var nodes structs.IndexedNodes
var checks structs.IndexedHealthChecks
var services structs.IndexedServices

retry.Run(t, func(r *retry.R) {
dcReq := &structs.DCSpecificRequest{
Expand Down Expand Up @@ -134,6 +147,23 @@ func WaitForTestAgent(t *testing.T, rpc rpcFn, dc string, options ...waitOption)
if !found {
r.Fatalf("serfHealth check not found")
}

if flat.WaitForService != "" {
servicesReq := &structs.DCSpecificRequest{
Datacenter: dc,
QueryOptions: structs.QueryOptions{Token: flat.Token},
}
if err := rpc("Catalog.ListServices", servicesReq, &services); err != nil {
r.Fatalf("Catalog.ListServices failed: %v", err)
}
if len(services.Services) == 0 {
r.Fatalf("No registered services")
}
if _, ok := services.Services[flat.WaitForService]; !ok {
r.Fatalf("service not found")
}
}

})
}

Expand Down