Skip to content

Commit

Permalink
feat: unregister peer asap on shutdown (#1260)
Browse files Browse the repository at this point in the history
<!--
Thank you for contributing to the project! 💜
Please make sure to:
- Chat with us first if this is a big change
  - Open a new issue (or comment on an existing one)
- We want to make sure you don't spend time implementing something we
might have to say No to
- Add unit tests
- Mention any relevant issues in the PR description (e.g. "Fixes #123")

Please see our [OSS process
document](https://github.com/honeycombio/home/blob/main/honeycomb-oss-lifecycle-and-practices.md#)
to get an idea of how we operate.
-->

## Which problem is this PR solving?

In order to recalculate new destination for traces during shutdown as
soon as possible, we need to announce peer unregistration first thing.

## Short description of the changes

- use the `done` channel from catching the termination signal in main.go
in peer management

---------

Co-authored-by: Yingrong Zhao <yingrongzhao@yingrongzhao.attlocal.net>
  • Loading branch information
VinozzZ and Yingrong Zhao authored Aug 1, 2024
1 parent 086f6bd commit 32a662f
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 28 deletions.
9 changes: 6 additions & 3 deletions cmd/refinery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ func main() {
os.Exit(1)
}

// when refinery receives a shutdown signal, we need to
// immediately let its peers know so they can stop sending
// data to it.
done := make(chan struct{})
// set up the peer management and pubsub implementations
var peers peer.Peers
var pubsubber pubsub.PubSub
Expand All @@ -125,11 +129,11 @@ func main() {
// In the case of file peers, we do not use Redis for anything, including pubsub, so
// we use the local pubsub implementation. Even if we have multiple peers, these
// peers cannot communicate using pubsub.
peers = &peer.FilePeers{}
peers = &peer.FilePeers{Done: done}
pubsubber = &pubsub.LocalPubSub{}
case "redis":
// if we're using redis, we need to set it up for both peers and pubsub
peers = &peer.RedisPubsubPeers{}
peers = &peer.RedisPubsubPeers{Done: done}
pubsubber = &pubsub.GoRedisPubSub{}
default:
// this should have been caught by validation
Expand Down Expand Up @@ -195,7 +199,6 @@ func main() {
os.Exit(1)
}

done := make(chan struct{})
stressRelief := &collect.StressRelief{Done: done}
upstreamTransmission := transmit.NewDefaultTransmission(upstreamClient, upstreamMetricsRecorder, "upstream")
peerTransmission := transmit.NewDefaultTransmission(peerClient, peerMetricsRecorder, "peer")
Expand Down
4 changes: 3 additions & 1 deletion collect/stressRelief.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/dgryski/go-wyhash"
"github.com/facebookgo/startstop"
"github.com/honeycombio/refinery/config"
"github.com/honeycombio/refinery/internal/health"
"github.com/honeycombio/refinery/internal/peer"
Expand All @@ -22,12 +23,13 @@ import (
const stressReliefTopic = "refinery-stress-relief"

type StressReliever interface {
Start() error
UpdateFromConfig(cfg config.StressReliefConfig)
Recalc() uint
Stressed() bool
GetSampleRate(traceID string) (rate uint, keep bool, reason string)
ShouldSampleDeterministically(traceID string) bool

startstop.Starter
}

var _ StressReliever = &MockStressReliever{}
Expand Down
1 change: 0 additions & 1 deletion collect/stress_relief_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ func newStressRelief(t *testing.T, clock clockwork.Clock, channel pubsub.PubSub)

return sr, func() {
require.NoError(t, healthReporter.Stop())
require.NoError(t, peer.Stop())
require.NoError(t, channel.Stop())
}
}
7 changes: 2 additions & 5 deletions internal/peer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"github.com/honeycombio/refinery/metrics"
)

var _ Peers = &FilePeers{}
var _ Peers = (*FilePeers)(nil)

type FilePeers struct {
Cfg config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Done chan struct{}

id string
}
Expand Down Expand Up @@ -55,10 +56,6 @@ func (p *FilePeers) Start() (err error) {
return nil
}

func (p *FilePeers) Stop() error {
return nil
}

func (p *FilePeers) publicAddr() (string, error) {
addr := p.Cfg.GetPeerListenAddr()
host, port, err := net.SplitHostPort(addr)
Expand Down
4 changes: 1 addition & 3 deletions internal/peer/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,4 @@ func (p *MockPeers) Start() error {
return nil
}

func (p *MockPeers) Stop() error {
return nil
}
func (p *MockPeers) stop() {}
1 change: 0 additions & 1 deletion internal/peer/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,4 @@ type Peers interface {
RegisterUpdatedPeersCallback(callback func())
// make it injectable
startstop.Starter
startstop.Stopper
}
23 changes: 16 additions & 7 deletions internal/peer/pubsub_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,26 @@ func (p *peerCommand) marshal() string {
return string(p.action) + p.peer
}

var _ Peers = (*RedisPubsubPeers)(nil)

type RedisPubsubPeers struct {
Config config.Config `inject:""`
Metrics metrics.Metrics `inject:"metrics"`
Logger logger.Logger `inject:""`
PubSub pubsub.PubSub `inject:""`
Clock clockwork.Clock `inject:""`

// Done is a channel that will be closed when the service should stop.
// After it is closed, peers service should signal the rest of the cluster
// that it is no longer available.
// However, any messages send on the peers channel will still be processed
// since the pubsub subscription is still active.
Done chan struct{}

peers *generics.SetWithTTL[string]
hash uint64
callbacks []func()
sub pubsub.Subscription
done chan struct{}
}

// checkHash checks the hash of the current list of peers and calls any registered callbacks
Expand Down Expand Up @@ -124,7 +132,6 @@ func (p *RedisPubsubPeers) Start() error {
p.Logger = &logger.NullLogger{}
}

p.done = make(chan struct{})
p.peers = generics.NewSetWithTTL[string](PeerEntryTimeout)
p.callbacks = make([]func(), 0)
p.sub = p.PubSub.Subscribe(context.Background(), "peers", p.listen)
Expand Down Expand Up @@ -153,7 +160,8 @@ func (p *RedisPubsubPeers) Start() error {
defer logTicker.Stop()
for {
select {
case <-p.done:
case <-p.Done:
p.stop()
return
case <-ticker.Chan():
// publish our presence periodically
Expand All @@ -174,15 +182,16 @@ func (p *RedisPubsubPeers) Start() error {
return nil
}

func (p *RedisPubsubPeers) Stop() error {
// stop send a message to the pubsub channel to unregister this peer
// but it does not close the subscription.
func (p *RedisPubsubPeers) stop() {
// unregister ourselves
myaddr, err := p.publicAddr()
if err != nil {
return err
p.Logger.Error().Logf("failed to get public address")
return
}
p.PubSub.Publish(context.Background(), "peers", newPeerCommand(Unregister, myaddr).marshal())
close(p.done)
return nil
}

func (p *RedisPubsubPeers) GetPeers() ([]string, error) {
Expand Down
7 changes: 0 additions & 7 deletions sharder/deterministic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func TestWhichShard(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -81,7 +80,6 @@ func TestWhichShardAtEdge(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -136,7 +134,6 @@ func BenchmarkShardBulk(b *testing.B) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(b, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -186,7 +183,6 @@ func TestShardBulk(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -262,7 +258,6 @@ func TestShardDrop(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -349,7 +344,6 @@ func TestShardAddHash(t *testing.T) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(t, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down Expand Up @@ -435,7 +429,6 @@ func BenchmarkDeterministicShard(b *testing.B) {

filePeers := &peer.FilePeers{Cfg: config, Metrics: &metrics.NullMetrics{}}
require.NoError(b, filePeers.Start())
defer filePeers.Stop()

sharder := DeterministicSharder{
Config: config,
Expand Down

0 comments on commit 32a662f

Please sign in to comment.