From 32a662f07925060d40e374bafcee001a00ce749c Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Thu, 1 Aug 2024 16:46:29 -0400 Subject: [PATCH] feat: unregister peer asap on shutdown (#1260) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Which problem is this PR solving? In order to recalculate new destination for traces during shutdown as soon as possible, we need to announce peer unregistration first thing. ## Short description of the changes - use the `done` channel from catching the termination signal in main.go in peer management --------- Co-authored-by: Yingrong Zhao --- cmd/refinery/main.go | 9 ++++++--- collect/stressRelief.go | 4 +++- collect/stress_relief_test.go | 1 - internal/peer/file.go | 7 ++----- internal/peer/mock.go | 4 +--- internal/peer/peers.go | 1 - internal/peer/pubsub_redis.go | 23 ++++++++++++++++------- sharder/deterministic_test.go | 7 ------- 8 files changed, 28 insertions(+), 28 deletions(-) diff --git a/cmd/refinery/main.go b/cmd/refinery/main.go index d970304579..1923eed034 100644 --- a/cmd/refinery/main.go +++ b/cmd/refinery/main.go @@ -116,6 +116,10 @@ func main() { os.Exit(1) } + // when refinery receives a shutdown signal, we need to + // immediately let its peers know so they can stop sending + // data to it. + done := make(chan struct{}) // set up the peer management and pubsub implementations var peers peer.Peers var pubsubber pubsub.PubSub @@ -125,11 +129,11 @@ func main() { // In the case of file peers, we do not use Redis for anything, including pubsub, so // we use the local pubsub implementation. Even if we have multiple peers, these // peers cannot communicate using pubsub. - peers = &peer.FilePeers{} + peers = &peer.FilePeers{Done: done} pubsubber = &pubsub.LocalPubSub{} case "redis": // if we're using redis, we need to set it up for both peers and pubsub - peers = &peer.RedisPubsubPeers{} + peers = &peer.RedisPubsubPeers{Done: done} pubsubber = &pubsub.GoRedisPubSub{} default: // this should have been caught by validation @@ -195,7 +199,6 @@ func main() { os.Exit(1) } - done := make(chan struct{}) stressRelief := &collect.StressRelief{Done: done} upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream") peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer") diff --git a/collect/stressRelief.go b/collect/stressRelief.go index cc5fb30c60..4e77caa265 100644 --- a/collect/stressRelief.go +++ b/collect/stressRelief.go @@ -10,6 +10,7 @@ import ( "time" "github.com/dgryski/go-wyhash" + "github.com/facebookgo/startstop" "github.com/honeycombio/refinery/config" "github.com/honeycombio/refinery/internal/health" "github.com/honeycombio/refinery/internal/peer" @@ -22,12 +23,13 @@ import ( const stressReliefTopic = "refinery-stress-relief" type StressReliever interface { - Start() error UpdateFromConfig(cfg config.StressReliefConfig) Recalc() uint Stressed() bool GetSampleRate(traceID string) (rate uint, keep bool, reason string) ShouldSampleDeterministically(traceID string) bool + + startstop.Starter } var _ StressReliever = &MockStressReliever{} diff --git a/collect/stress_relief_test.go b/collect/stress_relief_test.go index 73874a3380..45b57a9958 100644 --- a/collect/stress_relief_test.go +++ b/collect/stress_relief_test.go @@ -288,7 +288,6 @@ func newStressRelief(t *testing.T, clock clockwork.Clock, channel pubsub.PubSub) return sr, func() { require.NoError(t, healthReporter.Stop()) - require.NoError(t, peer.Stop()) require.NoError(t, channel.Stop()) } } diff --git a/internal/peer/file.go b/internal/peer/file.go index bac3f52ef4..06452bd3c2 100644 --- a/internal/peer/file.go +++ b/internal/peer/file.go @@ -8,11 +8,12 @@ import ( "github.com/honeycombio/refinery/metrics" ) -var _ Peers = &FilePeers{} +var _ Peers = (*FilePeers)(nil) type FilePeers struct { Cfg config.Config `inject:""` Metrics metrics.Metrics `inject:"metrics"` + Done chan struct{} id string } @@ -55,10 +56,6 @@ func (p *FilePeers) Start() (err error) { return nil } -func (p *FilePeers) Stop() error { - return nil -} - func (p *FilePeers) publicAddr() (string, error) { addr := p.Cfg.GetPeerListenAddr() host, port, err := net.SplitHostPort(addr) diff --git a/internal/peer/mock.go b/internal/peer/mock.go index dcab8e374c..9cf42abdd1 100644 --- a/internal/peer/mock.go +++ b/internal/peer/mock.go @@ -26,6 +26,4 @@ func (p *MockPeers) Start() error { return nil } -func (p *MockPeers) Stop() error { - return nil -} +func (p *MockPeers) stop() {} diff --git a/internal/peer/peers.go b/internal/peer/peers.go index 0d9188cf8a..0eadee5293 100644 --- a/internal/peer/peers.go +++ b/internal/peer/peers.go @@ -11,5 +11,4 @@ type Peers interface { RegisterUpdatedPeersCallback(callback func()) // make it injectable startstop.Starter - startstop.Stopper } diff --git a/internal/peer/pubsub_redis.go b/internal/peer/pubsub_redis.go index 3e78615732..f2cca50005 100644 --- a/internal/peer/pubsub_redis.go +++ b/internal/peer/pubsub_redis.go @@ -69,6 +69,8 @@ func (p *peerCommand) marshal() string { return string(p.action) + p.peer } +var _ Peers = (*RedisPubsubPeers)(nil) + type RedisPubsubPeers struct { Config config.Config `inject:""` Metrics metrics.Metrics `inject:"metrics"` @@ -76,11 +78,17 @@ type RedisPubsubPeers struct { PubSub pubsub.PubSub `inject:""` Clock clockwork.Clock `inject:""` + // Done is a channel that will be closed when the service should stop. + // After it is closed, peers service should signal the rest of the cluster + // that it is no longer available. + // However, any messages send on the peers channel will still be processed + // since the pubsub subscription is still active. + Done chan struct{} + peers *generics.SetWithTTL[string] hash uint64 callbacks []func() sub pubsub.Subscription - done chan struct{} } // checkHash checks the hash of the current list of peers and calls any registered callbacks @@ -124,7 +132,6 @@ func (p *RedisPubsubPeers) Start() error { p.Logger = &logger.NullLogger{} } - p.done = make(chan struct{}) p.peers = generics.NewSetWithTTL[string](PeerEntryTimeout) p.callbacks = make([]func(), 0) p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen) @@ -153,7 +160,8 @@ func (p *RedisPubsubPeers) Start() error { defer logTicker.Stop() for { select { - case <-p.done: + case <-p.Done: + p.stop() return case <-ticker.Chan(): // publish our presence periodically @@ -174,15 +182,16 @@ func (p *RedisPubsubPeers) Start() error { return nil } -func (p *RedisPubsubPeers) Stop() error { +// stop send a message to the pubsub channel to unregister this peer +// but it does not close the subscription. +func (p *RedisPubsubPeers) stop() { // unregister ourselves myaddr, err := p.publicAddr() if err != nil { - return err + p.Logger.Error().Logf("failed to get public address") + return } p.PubSub.Publish(context.Background(), "peers", newPeerCommand(Unregister, myaddr).marshal()) - close(p.done) - return nil } func (p *RedisPubsubPeers) GetPeers() ([]string, error) { diff --git a/sharder/deterministic_test.go b/sharder/deterministic_test.go index 30c3a67a9a..d0ec33abff 100644 --- a/sharder/deterministic_test.go +++ b/sharder/deterministic_test.go @@ -34,7 +34,6 @@ func TestWhichShard(t *testing.T) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(t, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config, @@ -81,7 +80,6 @@ func TestWhichShardAtEdge(t *testing.T) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(t, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config, @@ -136,7 +134,6 @@ func BenchmarkShardBulk(b *testing.B) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(b, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config, @@ -186,7 +183,6 @@ func TestShardBulk(t *testing.T) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(t, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config, @@ -262,7 +258,6 @@ func TestShardDrop(t *testing.T) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(t, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config, @@ -349,7 +344,6 @@ func TestShardAddHash(t *testing.T) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(t, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config, @@ -435,7 +429,6 @@ func BenchmarkDeterministicShard(b *testing.B) { filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}} require.NoError(b, filePeers.Start()) - defer filePeers.Stop() sharder := DeterministicSharder{ Config: config,