Skip to content

Commit

Permalink
make ResolveNow non block (#5)
Browse files Browse the repository at this point in the history
* make resolve now non block

* Update resolver.go

Co-authored-by: Geon Kim <geon0250@gmail.com>

* rename lookupCloudmap

---------

Co-authored-by: Geon Kim <geon0250@gmail.com>
  • Loading branch information
glenn-kim and KimMachineGun authored Jul 18, 2023
1 parent 01012f2 commit fb354c5
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 37 deletions.
17 changes: 10 additions & 7 deletions builder.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cloudmap

import (
"sync"
"context"
"time"

"google.golang.org/grpc/grpclog"
Expand Down Expand Up @@ -37,11 +37,11 @@ type builder struct {
// so you don't need to call this function to register the default builder.
//
// Default Options:
//
// Session: session.NewSession()
// HealthStatusFilter: HealthStatusFilterHealthy
// MaxResults: 100
// RefreshInterval: 30s
//
func Register(opts ...Opt) {
b := &builder{
healthStatusFilter: HealthStatusFilterHealthy,
Expand Down Expand Up @@ -72,23 +72,26 @@ func (b *builder) Build(t grpcresolver.Target, cc grpcresolver.ClientConn, _ grp
}
}

ctx, cancel := context.WithCancel(context.Background())
r := &resolver{
mu: &sync.RWMutex{},

logger: grpclog.Component(b.Scheme()),

cc: cc,

ticker: time.NewTicker(b.refreshInterval),

sd: servicediscovery.New(sess),
namespace: cmT.namespace,
service: cmT.service,
healthStatusFilter: b.healthStatusFilter,
maxResults: b.maxResults,

ctx: ctx,
cancel: cancel,
ticker: time.NewTicker(b.refreshInterval),
resolveCmd: make(chan struct{}, 1),
}

go r.watch()
r.wg.Add(1)
go r.watcher()

return r, nil
}
73 changes: 43 additions & 30 deletions resolver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cloudmap

import (
"context"
"fmt"
"sync"
"time"
Expand All @@ -14,34 +15,35 @@ import (
"github.com/aws/aws-sdk-go/service/servicediscovery"
)

type resolver struct {
mu *sync.RWMutex
isClosed bool
type serviceDiscovery interface {
DiscoverInstances(input *servicediscovery.DiscoverInstancesInput) (*servicediscovery.DiscoverInstancesOutput, error)
}

type resolver struct {
logger grpclog.LoggerV2
cc grpcresolver.ClientConn

cc grpcresolver.ClientConn

ticker *time.Ticker

sd *servicediscovery.ServiceDiscovery
sd serviceDiscovery
namespace string
service string
healthStatusFilter string
maxResults int64

ctx context.Context
cancel context.CancelFunc
ticker *time.Ticker
resolveCmd chan struct{}
wg sync.WaitGroup
}

func (c *resolver) ResolveNow(grpcresolver.ResolveNowOptions) {
locked := c.mu.TryLock()
if !locked { // already resolving
return
}
defer c.mu.Unlock()

if c.isClosed {
return
select {
case c.resolveCmd <- struct{}{}:
default:
}
}

func (c *resolver) lookupCloudmap() (*grpcresolver.State, error) {
output, err := c.sd.DiscoverInstances(&servicediscovery.DiscoverInstancesInput{
NamespaceName: aws.String(c.namespace),
ServiceName: aws.String(c.service),
Expand All @@ -65,34 +67,45 @@ func (c *resolver) ResolveNow(grpcresolver.ResolveNowOptions) {
} else {
c.logger.Errorln(err.Error())
}
c.cc.ReportError(err)
return
return nil, err
}

addrs := make([]grpcresolver.Address, len(output.Instances))
for i, instance := range output.Instances {
addrs[i] = httpInstanceSummaryToAddr(instance)
}

c.cc.UpdateState(grpcresolver.State{Addresses: addrs})
return &grpcresolver.State{Addresses: addrs}, nil
}

func (c *resolver) Close() {
c.mu.Lock()
defer c.mu.Unlock()

if c.isClosed {
return
}

c.isClosed = true
c.cancel()
c.ticker.Stop()
c.wg.Wait()
}

func (c *resolver) watch() {
func (c *resolver) watcher() {
defer c.wg.Done()

for {
c.ResolveNow(grpcresolver.ResolveNowOptions{})
<-c.ticker.C
state, err := c.lookupCloudmap()
if err != nil {
c.cc.ReportError(err)
} else {
err = c.cc.UpdateState(*state)
}

if err != nil {
c.logger.Errorln(err)
// wait for next iteration
}

select {
case <-c.ctx.Done():
return
case <-c.ticker.C:
case <-c.resolveCmd:
}
}
}

Expand Down
70 changes: 70 additions & 0 deletions resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package cloudmap

import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/service/servicediscovery"
"google.golang.org/grpc/grpclog"
grpcresolver "google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"testing"
"time"
)

type mockCC struct{}

func (m mockCC) UpdateState(state grpcresolver.State) error { return nil }

func (m mockCC) ReportError(err error) {}

func (m mockCC) NewAddress(addresses []grpcresolver.Address) {}

func (m mockCC) NewServiceConfig(serviceConfig string) {}

func (m mockCC) ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult {
return nil
}

type mockDiscovery struct{}

func (m mockDiscovery) DiscoverInstances(input *servicediscovery.DiscoverInstancesInput) (*servicediscovery.DiscoverInstancesOutput, error) {
time.Sleep(1 * time.Second)
fmt.Println("DiscoverInstances called")
return &servicediscovery.DiscoverInstancesOutput{
Instances: make([]*servicediscovery.HttpInstanceSummary, 0),
}, nil
}

func Test_resolver(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
r := &resolver{
logger: grpclog.Component("test"),

cc: mockCC{},
sd: mockDiscovery{},

ctx: ctx,
cancel: cancel,
ticker: time.NewTicker(10 * time.Second),
resolveCmd: make(chan struct{}, 1),
}

r.wg.Add(1)
go r.watcher()

timeout := time.After(100 * time.Millisecond)
done := make(chan bool)
go func() {
for i := 0; i < 10; i++ {
r.ResolveNow(grpcresolver.ResolveNowOptions{})
}
done <- true
}()
select {
case <-timeout:
t.Error("timeout")
case <-done:
t.Log("done")
}
r.Close()
}

0 comments on commit fb354c5

Please sign in to comment.