Skip to content

Commit

Permalink
Support push model for service discovery
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuri Shkuro committed May 17, 2017
1 parent d67bb4c commit 449bebb
Show file tree
Hide file tree
Showing 42 changed files with 820 additions and 552 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ Session.vim
# auto-generated tag files
tags

# dependency management files
glide.lock
glide.yaml
vendor/

2 changes: 1 addition & 1 deletion coverage.bash
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
set -e

function go_files { find . -name '*_test.go' ; }
function filter { grep -v '/_' ; }
function filter { grep -v -e '/_' -e vendor ; }
function remove_relative_prefix { sed -e 's/^\.\///g' ; }

function directories {
Expand Down
18 changes: 10 additions & 8 deletions examples/apigateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,19 @@ func main() {
tags = []string{}
passingOnly = true
endpoints = addsvc.Endpoints{}
instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly)
)
{
factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger)
subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
endpoints.SumEndpoint = retry
}
{
factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger)
subscriber := consulsd.NewSubscriber(client, factory, logger, "addsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
endpoints.ConcatEndpoint = retry
}
Expand All @@ -120,18 +121,19 @@ func main() {
passingOnly = true
uppercase endpoint.Endpoint
count endpoint.Endpoint
instancer = consulsd.NewInstancer(client, logger, "stringsvc", tags, passingOnly)
)
{
factory := stringsvcFactory(ctx, "GET", "/uppercase")
subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
uppercase = retry
}
{
factory := stringsvcFactory(ctx, "GET", "/count")
subscriber := consulsd.NewSubscriber(client, factory, logger, "stringsvc", tags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(*retryMax, *retryTimeout, balancer)
count = retry
}
Expand Down
59 changes: 41 additions & 18 deletions examples/profilesvc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,68 +40,91 @@ func New(consulAddr string, logger log.Logger) (profilesvc.Service, error) {

var (
sdclient = consul.NewClient(apiclient)
instancer = consul.NewInstancer(sdclient, logger, consulService, consulTags, passingOnly)
endpoints profilesvc.Endpoints
)
// TODO: thought experiment
mapping := []struct {
factory func(s profilesvc.Service) endpoint.Endpoint
endpoint *endpoint.Endpoint
}{
{
factory: profilesvc.MakePostProfileEndpoint,
endpoint: &endpoints.PostProfileEndpoint,
},
{
factory: profilesvc.MakeGetProfileEndpoint,
endpoint: &endpoints.GetProfileEndpoint,
},
}
for _, m := range mapping {
factory := factoryFor(m.factory)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
*m.endpoint = retry
}
// TODO: why not 2 lines per endpoint registration above instead of 7 lines per endpoint below?
{
factory := factoryFor(profilesvc.MakePostProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePutProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PutProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePatchProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PatchProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteProfileEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteProfileEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressesEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressesEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeGetAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.GetAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakePostAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.PostAddressEndpoint = retry
}
{
factory := factoryFor(profilesvc.MakeDeleteAddressEndpoint)
subscriber := consul.NewSubscriber(sdclient, factory, logger, consulService, consulTags, passingOnly)
balancer := lb.NewRoundRobin(subscriber)
endpointer := sd.NewEndpointer(instancer, factory, logger)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(retryMax, retryTimeout, balancer)
endpoints.DeleteAddressEndpoint = retry
}
Expand Down
6 changes: 3 additions & 3 deletions examples/stringsvc3/proxying.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ func proxyingMiddleware(ctx context.Context, instances string, logger log.Logger
// discovery system.
var (
instanceList = split(instances)
subscriber sd.FixedSubscriber
endpointer sd.FixedEndpointer
)
logger.Log("proxy_to", fmt.Sprint(instanceList))
for _, instance := range instanceList {
var e endpoint.Endpoint
e = makeUppercaseProxy(ctx, instance)
e = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(e)
e = ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(float64(qps), int64(qps)))(e)
subscriber = append(subscriber, e)
endpointer = append(endpointer, e)
}

// Now, build a single, retrying, load-balancing endpoint out of all of
// those individual endpoints.
balancer := lb.NewRoundRobin(subscriber)
balancer := lb.NewRoundRobin(endpointer)
retry := lb.Retry(maxAttempts, maxTime, balancer)

// And finally, return the ServiceMiddleware, implemented by proxymw.
Expand Down
6 changes: 3 additions & 3 deletions sd/cache/benchmark_test.go → sd/benchmark_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cache
package sd

import (
"io"
Expand All @@ -14,12 +14,12 @@ func BenchmarkEndpoints(b *testing.B) {
cb = make(closer)
cmap = map[string]io.Closer{"a": ca, "b": cb}
factory = func(instance string) (endpoint.Endpoint, io.Closer, error) { return endpoint.Nop, cmap[instance], nil }
c = New(factory, log.NewNopLogger())
c = newEndpointCache(factory, log.NewNopLogger(), endpointerOptions{})
)

b.ReportAllocs()

c.Update([]string{"a", "b"})
c.Update(Event{Instances: []string{"a", "b"}})

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
Expand Down
136 changes: 136 additions & 0 deletions sd/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package sd

import (
"io"
"sort"
"sync"
"time"

"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/log"
)

// endpointCache collects the most recent set of instances from a service discovery
// system, creates endpoints for them using a factory function, and makes
// them available to consumers.
type endpointCache struct {
options endpointerOptions
mtx sync.RWMutex
factory Factory
cache map[string]endpointCloser
err error
endpoints []endpoint.Endpoint
logger log.Logger
invalidateDeadline time.Time
}

type endpointCloser struct {
endpoint.Endpoint
io.Closer
}

// newEndpointCache returns a new, empty endpointCache.
func newEndpointCache(factory Factory, logger log.Logger, options endpointerOptions) *endpointCache {
return &endpointCache{
options: options,
factory: factory,
cache: map[string]endpointCloser{},
logger: logger,
}
}

// Update should be invoked by clients with a complete set of current instance
// strings whenever that set changes. The cache manufactures new endpoints via
// the factory, closes old endpoints when they disappear, and persists existing
// endpoints if they survive through an update.
func (c *endpointCache) Update(event Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

if event.Err == nil {
c.updateCache(event.Instances)
c.invalidateDeadline = time.Time{}
c.err = nil
}

c.logger.Log("err", event.Err)

if c.options.invalidateOnErrorTimeout == nil {
// keep returning the last known endpoints on error
return
}

c.err = event.Err

if !c.invalidateDeadline.IsZero() {
// aleady in the error state, do nothing
return
}
// set new deadline to invalidate Endpoints unless non-error Event is received
c.invalidateDeadline = time.Now().Add(*c.options.invalidateOnErrorTimeout)
return
}

func (c *endpointCache) updateCache(instances []string) {
// Deterministic order (for later).
sort.Strings(instances)

// Produce the current set of services.
cache := make(map[string]endpointCloser, len(instances))
for _, instance := range instances {
// If it already exists, just copy it over.
if sc, ok := c.cache[instance]; ok {
cache[instance] = sc
delete(c.cache, instance)
continue
}

// If it doesn't exist, create it.
service, closer, err := c.factory(instance)
if err != nil {
c.logger.Log("instance", instance, "err", err)
continue
}
cache[instance] = endpointCloser{service, closer}
}

// Close any leftover endpoints.
for _, sc := range c.cache {
if sc.Closer != nil {
sc.Closer.Close()
}
}

// Populate the slice of endpoints.
endpoints := make([]endpoint.Endpoint, 0, len(cache))
for _, instance := range instances {
// A bad factory may mean an instance is not present.
if _, ok := cache[instance]; !ok {
continue
}
endpoints = append(endpoints, cache[instance].Endpoint)
}

// Swap and trigger GC for old copies.
c.endpoints = endpoints
c.cache = cache
}

// Endpoints yields the current set of (presumably identical) endpoints, ordered
// lexicographically by the corresponding instance string.
func (c *endpointCache) Endpoints() ([]endpoint.Endpoint, error) {
c.mtx.RLock()

if c.err == nil || time.Now().Before(c.invalidateDeadline) {
defer c.mtx.RUnlock()
return c.endpoints, nil
}

c.mtx.RUnlock()
c.mtx.Lock()
defer c.mtx.Unlock()

c.updateCache(nil) // close any remaining active endpoints

return nil, c.err
}
Loading

0 comments on commit 449bebb

Please sign in to comment.