Skip to content

Commit

Permalink
chore: Remove ring client pool from JumpHashClientPool (#14367)
Browse files Browse the repository at this point in the history
The ring client pool has functionality, such as periodic health checking of the servers and removing of stale clients, which is not needed in the JumpHashClientPool.

The removal of stale clients was problematic, because it compared the original server list (which is a list of CNAME records from the DNS lookup) with the client cache, which uses the IP of the server as key. Therefore it removed all cached clients on every check interval.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored Oct 3, 2024
1 parent 0c39e15 commit 46f61fd
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 70 deletions.
13 changes: 2 additions & 11 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1298,18 +1298,9 @@ Experimental: The `bloom_gateway` block configures the Loki bloom gateway server
client:
# Configures the behavior of the connection pool.
pool_config:
# How frequently to clean up clients for servers that have gone away or are
# unhealthy.
# How frequently to update the list of servers.
# CLI flag: -bloom-gateway-client.pool.check-interval
[check_interval: <duration> | default = 10s]
# Run a health check on each server during periodic cleanup.
# CLI flag: -bloom-gateway-client.pool.enable-health-check
[enable_health_check: <boolean> | default = true]
# Timeout for the health check if health check is enabled.
# CLI flag: -bloom-gateway-client.pool.health-check-timeout
[health_check_timeout: <duration> | default = 1s]
[check_interval: <duration> | default = 15s]
# The grpc_client block configures the gRPC client used to communicate between
# a client and server component in Loki.
Expand Down
17 changes: 5 additions & 12 deletions pkg/bloomgateway/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func NewClient(
}
}

poolFactory := func(addr string) (ringclient.PoolClient, error) {
clientFactory := func(addr string) (ringclient.PoolClient, error) {
pool, err := NewBloomGatewayGRPCPool(addr, dialOpts)
if err != nil {
return nil, errors.Wrap(err, "new bloom gateway grpc pool")
Expand All @@ -185,17 +185,10 @@ func NewClient(
// Make an attempt to do one DNS lookup so we can start with addresses
dnsProvider.RunOnce()

clientPool := ringclient.NewPool(
"bloom-gateway",
ringclient.PoolConfig(cfg.PoolConfig),
func() ([]string, error) { return dnsProvider.Addresses(), nil },
ringclient.PoolAddrFunc(poolFactory),
metrics.clients,
logger,
)

pool := NewJumpHashClientPool(clientPool, dnsProvider, cfg.PoolConfig.CheckInterval, logger)
pool.Start()
pool, err := NewJumpHashClientPool(clientFactory, dnsProvider, cfg.PoolConfig.CheckInterval, logger)
if err != nil {
return nil, err
}

return &GatewayClient{
cfg: cfg,
Expand Down
111 changes: 65 additions & 46 deletions pkg/bloomgateway/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package bloomgateway
import (
"context"
"flag"
"sort"
"sync"
"time"

"github.com/go-kit/log"
Expand All @@ -15,53 +15,65 @@ import (
)

// PoolConfig is config for creating a Pool.
// It has the same fields as "github.com/grafana/dskit/ring/client.PoolConfig" so it can be cast.
type PoolConfig struct {
CheckInterval time.Duration `yaml:"check_interval"`
HealthCheckEnabled bool `yaml:"enable_health_check"`
HealthCheckTimeout time.Duration `yaml:"health_check_timeout"`
MaxConcurrentHealthChecks int `yaml:"-"`
CheckInterval time.Duration `yaml:"check_interval"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *PoolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 10*time.Second, "How frequently to clean up clients for servers that have gone away or are unhealthy.")
f.BoolVar(&cfg.HealthCheckEnabled, prefix+"enable-health-check", true, "Run a health check on each server during periodic cleanup.")
f.DurationVar(&cfg.HealthCheckTimeout, prefix+"health-check-timeout", 1*time.Second, "Timeout for the health check if health check is enabled.")
f.DurationVar(&cfg.CheckInterval, prefix+"check-interval", 15*time.Second, "How frequently to update the list of servers.")
}

func (cfg *PoolConfig) Validate() error {
return nil
}

// compiler check
var _ clientPool = &JumpHashClientPool{}

type ClientFactory func(addr string) (client.PoolClient, error)

func (f ClientFactory) New(addr string) (client.PoolClient, error) {
return f(addr)
}

type JumpHashClientPool struct {
*client.Pool
services.Service
*jumphash.Selector
sync.RWMutex

provider AddressProvider
logger log.Logger

done chan struct{}
logger log.Logger
clients map[string]client.PoolClient
clientFactory ClientFactory
}

type AddressProvider interface {
Addresses() []string
}

func NewJumpHashClientPool(pool *client.Pool, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) *JumpHashClientPool {
func NewJumpHashClientPool(clientFactory ClientFactory, dnsProvider AddressProvider, updateInterval time.Duration, logger log.Logger) (*JumpHashClientPool, error) {
selector := jumphash.DefaultSelector()
err := selector.SetServers(dnsProvider.Addresses()...)
if err != nil {
level.Warn(logger).Log("msg", "error updating servers", "err", err)
}

p := &JumpHashClientPool{
Pool: pool,
Selector: selector,
done: make(chan struct{}),
logger: logger,
Selector: selector,
clientFactory: clientFactory,
provider: dnsProvider,
logger: logger,
clients: make(map[string]client.PoolClient, len(dnsProvider.Addresses())),
}
go p.updateLoop(dnsProvider, updateInterval)

return p
p.Service = services.NewTimerService(updateInterval, nil, p.updateLoop, nil)
return p, services.StartAndAwaitRunning(context.Background(), p.Service)
}

func (p *JumpHashClientPool) Stop() {
_ = services.StopAndAwaitTerminated(context.Background(), p.Service)
}

func (p *JumpHashClientPool) AddrForFingerprint(fp uint64) (string, error) {
Expand All @@ -80,35 +92,42 @@ func (p *JumpHashClientPool) Addr(key string) (string, error) {
return addr.String(), nil
}

func (p *JumpHashClientPool) Start() {
ctx := context.Background()
_ = services.StartAndAwaitRunning(ctx, p.Pool)
func (p *JumpHashClientPool) updateLoop(_ context.Context) error {
err := p.SetServers(p.provider.Addresses()...)
if err != nil {
level.Warn(p.logger).Log("msg", "error updating servers", "err", err)
}
return nil
}

func (p *JumpHashClientPool) Stop() {
ctx := context.Background()
_ = services.StopAndAwaitTerminated(ctx, p.Pool)
close(p.done)
}
// GetClientFor implements clientPool.
func (p *JumpHashClientPool) GetClientFor(addr string) (client.PoolClient, error) {
client, ok := p.fromCache(addr)
if ok {
return client, nil
}

// No client in cache so create one
p.Lock()
defer p.Unlock()

func (p *JumpHashClientPool) updateLoop(provider AddressProvider, updateInterval time.Duration) {
ticker := time.NewTicker(updateInterval)
defer ticker.Stop()

for {
select {
case <-p.done:
return
case <-ticker.C:
servers := provider.Addresses()
// ServerList deterministically maps keys to _index_ of the server list.
// Since DNS returns records in different order each time, we sort to
// guarantee best possible match between nodes.
sort.Strings(servers)
err := p.SetServers(servers...)
if err != nil {
level.Warn(p.logger).Log("msg", "error updating servers", "err", err)
}
}
// Check if a client has been created just after checking the cache and before acquiring the lock.
client, ok = p.clients[addr]
if ok {
return client, nil
}

client, err := p.clientFactory.New(addr)
if err != nil {
return nil, err
}
p.clients[addr] = client
return client, nil
}

func (p *JumpHashClientPool) fromCache(addr string) (client.PoolClient, bool) {
p.RLock()
defer p.RUnlock()
client, ok := p.clients[addr]
return client, ok
}
3 changes: 2 additions & 1 deletion pkg/bloomgateway/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ func TestJumpHashClientPool_UpdateLoop(t *testing.T) {

provider := &provider{}
provider.UpdateAddresses([]string{"localhost:9095"})
pool := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger())
pool, err := NewJumpHashClientPool(nil, provider, interval, log.NewNopLogger())
require.NoError(t, err)
require.Len(t, pool.Addrs(), 1)
require.Equal(t, "127.0.0.1:9095", pool.Addrs()[0].String())

Expand Down

0 comments on commit 46f61fd

Please sign in to comment.