diff --git a/agent/proxycfg-sources/local/sync.go b/agent/proxycfg-sources/local/sync.go index c6cee8c61d159..ccaf6403918ca 100644 --- a/agent/proxycfg-sources/local/sync.go +++ b/agent/proxycfg-sources/local/sync.go @@ -2,6 +2,7 @@ package local import ( "context" + "time" "github.com/hashicorp/go-hclog" @@ -50,12 +51,15 @@ func Sync(ctx context.Context, cfg SyncConfig) { cfg.State.Notify(stateCh) defer cfg.State.StopNotify(stateCh) + const resyncFrequency = 30 * time.Second + for { sync(cfg) select { case <-stateCh: // Wait for a state change. + case <-time.After(resyncFrequency): case <-ctx.Done(): return } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index c58268e7e039b..d21ff4f1ea5bd 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -158,7 +158,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour func (m *Manager) register(id ProxyID, ns *structs.NodeService, source ProxySource, token string, overwrite bool) error { state, ok := m.proxies[id] - if ok { + if ok && !state.stoppedRunning() { if state.source != source && !overwrite { // Registered by a different source, leave as-is. return nil diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index d312c3b4c10cb..2347b04cc53b8 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -83,10 +83,20 @@ type state struct { ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot + doneCh chan struct{} rateLimiter *rate.Limiter } +func (s *state) stoppedRunning() bool { + select { + case <-s.doneCh: + return true + default: + return false + } +} + // failed returns whether run exited because a data source is in an // irrecoverable state. func (s *state) failed() bool { @@ -182,6 +192,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str ch: ch, snapCh: make(chan ConfigSnapshot, 1), reqCh: make(chan chan *ConfigSnapshot, 1), + doneCh: make(chan struct{}), rateLimiter: rateLimiter, }, nil } @@ -265,6 +276,9 @@ func (s *state) Watch() (<-chan ConfigSnapshot, error) { // Close discards the state and stops any long-running watches. func (s *state) Close(failed bool) error { + if s.stoppedRunning() { + return nil + } if s.cancel != nil { s.cancel() } @@ -314,6 +328,9 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { } func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) { + // Closing the done channel signals that this entire state is no longer + // going to be updated. + defer close(s.doneCh) // Close the channel we return from Watch when we stop so consumers can stop // watching and clean up their goroutines. It's important we do this here and // not in Close since this routine sends on this chan and so might panic if it @@ -429,9 +446,20 @@ func (s *state) unsafeRun(ctx context.Context, snap *ConfigSnapshot) { func (s *state) CurrentSnapshot() *ConfigSnapshot { // Make a chan for the response to be sent on ch := make(chan *ConfigSnapshot, 1) - s.reqCh <- ch + + select { + case <-s.doneCh: + return nil + case s.reqCh <- ch: + } + // Wait for the response - return <-ch + select { + case <-s.doneCh: + return nil + case resp := <-ch: + return resp + } } // Changed returns whether or not the passed NodeService has had any of the