Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Roy Chiang <roychi@amazon.com>
  • Loading branch information
roystchiang committed Jul 28, 2021
1 parent f3b2dfb commit e80b1f3
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 42 deletions.
7 changes: 6 additions & 1 deletion pkg/cacheutil/memcached_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,12 @@ type memcachedClient struct {
dataSize *prometheus.HistogramVec
}

// AddressProvider performs node address resolution given a list of clusters.
type AddressProvider interface {
// Resolves the provided list of memcached cluster to the actual nodes
Resolve(context.Context, []string) error

// Returns the nodes
Addresses() []string
}

Expand Down Expand Up @@ -237,7 +241,8 @@ func newMemcachedClient(
addressProvider = memcacheDiscovery.NewProvider(
logger,
promRegisterer,
2*time.Second)
config.Timeout,
)
} else {
addressProvider = dns.NewProvider(
logger,
Expand Down
10 changes: 7 additions & 3 deletions pkg/discovery/memcache/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
)

// Provider is a stateful cache for asynchronous memcached auto-discovery resolution. It provides a way to resolve
// addresses and obtain them.
type Provider struct {
sync.RWMutex
resolver Resolver
clusterConfigs map[string]*ClusterConfig
clusterConfigs map[string]*clusterConfig
logger log.Logger

configVersion *extprom.TxGaugeVec
Expand All @@ -32,7 +34,7 @@ type Provider struct {
func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.Duration) *Provider {
p := &Provider{
resolver: &memcachedAutoDiscovery{dialTimeout: dialTimeout},
clusterConfigs: map[string]*ClusterConfig{},
clusterConfigs: map[string]*clusterConfig{},
configVersion: extprom.NewTxGaugeVec(reg, prometheus.GaugeOpts{
Name: "auto_discovery_config_version",
Help: "The current auto discovery config version",
Expand All @@ -54,8 +56,9 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, dialTimeout time.
return p
}

// Resolve stores a list of nodes auto-discovered from the provided addresses.
func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
clusterConfigs := map[string]*ClusterConfig{}
clusterConfigs := map[string]*clusterConfig{}
errs := errutil.MultiError{}

for _, address := range addresses {
Expand Down Expand Up @@ -96,6 +99,7 @@ func (p *Provider) Resolve(ctx context.Context, addresses []string) error {
return errs.Err()
}

// Addresses returns the latest addresses present in the Provider.
func (p *Provider) Addresses() []string {
var result []string
for _, config := range p.clusterConfigs {
Expand Down
41 changes: 17 additions & 24 deletions pkg/discovery/memcache/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,25 @@ func TestProviderUpdatesAddresses(t *testing.T) {
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

err := provider.Resolve(ctx, clusters)
testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
resolver.configs = map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-3", ip: "ip-3", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
}
err = provider.Resolve(ctx, clusters)

testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080", "dns-3:11211"}, addresses)
}

Expand All @@ -51,36 +47,33 @@ func TestProviderDoesNotUpdateAddressIfFailed(t *testing.T) {
clusters := []string{"memcached-cluster-1", "memcached-cluster-2"}
provider := NewProvider(log.NewNopLogger(), nil, 5*time.Second)
resolver := mockResolver{
configs: map[string]*ClusterConfig{
"memcached-cluster-1": {nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []Node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
configs: map[string]*clusterConfig{
"memcached-cluster-1": {nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}},
"memcached-cluster-2": {nodes: []node{{dns: "dns-2", ip: "ip-2", port: 8080}}},
},
}
provider.resolver = &resolver

err := provider.Resolve(ctx, clusters)
testutil.Ok(t, provider.Resolve(ctx, clusters))
addresses := provider.Addresses()
sort.Strings(addresses)

testutil.Ok(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)

resolver.configs = nil
resolver.err = errors.New("oops")
err = provider.Resolve(ctx, clusters)

testutil.NotOk(t, provider.Resolve(ctx, clusters))
addresses = provider.Addresses()
sort.Strings(addresses)

testutil.NotOk(t, err)
testutil.Equals(t, []string{"dns-1:11211", "dns-2:8080"}, addresses)
}

type mockResolver struct {
configs map[string]*ClusterConfig
configs map[string]*clusterConfig
err error
}

func (r *mockResolver) Resolve(_ context.Context, address string) (*ClusterConfig, error) {
func (r *mockResolver) Resolve(_ context.Context, address string) (*clusterConfig, error) {
if r.err != nil {
return nil, r.err
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/discovery/memcache/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,35 @@ import (
"strconv"
"strings"
"time"

"github.com/thanos-io/thanos/pkg/runutil"
)

type ClusterConfig struct {
type clusterConfig struct {
version int
nodes []Node
nodes []node
}

type Node struct {
type node struct {
dns string
ip string
port int
}

type Resolver interface {
Resolve(ctx context.Context, address string) (*ClusterConfig, error)
Resolve(ctx context.Context, address string) (*clusterConfig, error)
}

type memcachedAutoDiscovery struct {
dialTimeout time.Duration
}

func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *ClusterConfig, err error) {
func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (config *clusterConfig, err error) {
conn, err := net.DialTimeout("tcp", address, s.dialTimeout)
if err != nil {
return nil, err
}
defer func() {
err = conn.Close()
}()
defer runutil.CloseWithErrCapture(&err, conn, "closing connection")

rw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
if _, err := fmt.Fprintf(rw, "config get cluster\n"); err != nil {
Expand All @@ -57,8 +57,8 @@ func (s *memcachedAutoDiscovery) Resolve(ctx context.Context, address string) (c
return config, err
}

func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*ClusterConfig, error) {
clusterConfig := new(ClusterConfig)
func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*clusterConfig, error) {
clusterConfig := new(clusterConfig)

configMeta, err := reader.ReadString('\n')
if err != nil {
Expand Down Expand Up @@ -104,7 +104,7 @@ func (s *memcachedAutoDiscovery) parseConfig(reader *bufio.Reader) (*ClusterConf
if err != nil {
return nil, fmt.Errorf("failed to parse port: %s, err: %s", dnsIpPort, err)
}
clusterConfig.nodes = append(clusterConfig.nodes, Node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port})
clusterConfig.nodes = append(clusterConfig.nodes, node{dns: dnsIpPort[0], ip: dnsIpPort[1], port: port})
}

return clusterConfig, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/discovery/memcache/resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ func TestGoodClusterConfigs(t *testing.T) {
resolver := memcachedAutoDiscovery{}
testCases := []struct {
content string
config ClusterConfig
config clusterConfig
}{
{"CONFIG cluster 0 23\r\n100\r\ndns-1|ip-1|11211\r\nEND\r\n",
ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100},
clusterConfig{nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}}, version: 100},
},
{"CONFIG cluster 0 37\r\n0\r\ndns-1|ip-1|11211 dns-2|ip-2|8080\r\nEND\r\n",
ClusterConfig{nodes: []Node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0},
clusterConfig{nodes: []node{{dns: "dns-1", ip: "ip-1", port: 11211}, {dns: "dns-2", ip: "ip-2", port: 8080}}, version: 0},
},
}

Expand Down

0 comments on commit e80b1f3

Please sign in to comment.