Skip to content

Commit

Permalink
Fix issue where agentless endpoints would fail to populate after snap…
Browse files Browse the repository at this point in the history
…shot restore.

Fixes an issue that was introduced in #17775. This issue happens because
a long-lived pointer to the state store is held, which is unsafe to do.
Snapshot restorations will swap out this state store, meaning that the
proxycfg watches would break for agentless.
  • Loading branch information
hashi-derek committed Aug 31, 2023
1 parent 699aa47 commit 029a517
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4617,7 +4617,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
// for more details.
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg-glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Store interface {
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error)
CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
}

// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
Expand Down
16 changes: 7 additions & 9 deletions agent/proxycfg-glue/health_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -39,14 +38,13 @@ import (
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
// This means that while agents should technically have this same issue, they don't experience it with mesh health
// watches.
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, 5 * time.Minute}
}

type serverHealthBlocking struct {
deps ServerDataSourceDeps
remoteSource proxycfg.Health
state *state.Store
watchTimeout time.Duration
}

Expand All @@ -66,7 +64,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
}

// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
var f func(memdb.WatchSet, Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
switch {
case args.Connect:
f = serviceNodesConnect
Expand Down Expand Up @@ -115,7 +113,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
}

var thisReply structs.IndexedCheckServiceNodes
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
thisReply.Index, thisReply.Nodes, err = f(ws, store, args)
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -151,14 +149,14 @@ func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token str
return nil
}

func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
func serviceNodesConnect(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}

func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
func serviceNodesIngress(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}

func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
func serviceNodesDefault(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
5 changes: 2 additions & 3 deletions agent/proxycfg-glue/health_blocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func TestServerHealthBlocking(t *testing.T) {
remoteSource := newMockHealth(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)

store := state.NewStateStore(nil)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})
Expand All @@ -52,7 +51,7 @@ func TestServerHealthBlocking(t *testing.T) {
Datacenter: datacenter,
ACLResolver: aclResolver,
Logger: testutil.Logger(t),
}, nil, store)
}, nil)
dataSource.watchTimeout = 1 * time.Second

// Watch for all events
Expand Down

0 comments on commit 029a517

Please sign in to comment.