Skip to content

Commit

Permalink
Merge pull request #32 from bobrik/lazy-updates
Browse files Browse the repository at this point in the history
Implement lazy state updates
  • Loading branch information
bobrik committed Apr 3, 2016
2 parents 4e69714 + 0738c47 commit 701a0e5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 6 deletions.
7 changes: 4 additions & 3 deletions cmd/zoidberg/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"errors"
"flag"
"fmt"
"log"
Expand All @@ -9,8 +10,6 @@ import (
"strings"
"time"

"errors"

"github.com/bobrik/zoidberg"
"github.com/bobrik/zoidberg/application"
"github.com/bobrik/zoidberg/balancer"
Expand All @@ -24,6 +23,8 @@ func main() {
bff := flag.String("balancer-finder", os.Getenv("BALANCER_FINDER"), "balancer finder")
aff := flag.String("application-finder", os.Getenv("APPLICATION_FINDER"), "application finder")
z := flag.String("zk", os.Getenv("ZK"), "zk connection in host:port,host:port/path format")
i := flag.Duration("interval", time.Second, "discovery interval")
l := flag.Duration("laziness", time.Minute, "time to skip balancer updates if there are no changes")

application.RegisterFlags()
balancer.RegisterFlags()
Expand All @@ -50,7 +51,7 @@ func main() {
log.Fatal(err)
}

e, err := zoidberg.NewExplorer(*n, af, bf, zc, zp)
e, err := zoidberg.NewExplorer(*n, af, bf, zc, zp, *i, *l)
if err != nil {
log.Fatal(err)
}
Expand Down
33 changes: 30 additions & 3 deletions explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"net/http"
"path"
"reflect"
"strings"
"sync"
"time"
Expand All @@ -24,13 +25,16 @@ type Explorer struct {
zookeeper *zk.Conn
zp string
state state.State
updated map[string]update
interval time.Duration
laziness time.Duration
mutex sync.Mutex
}

// NewExplorer creates a new Explorer instance with a name,
// application and balancer finders and zookeeper connection
// to persist versioning information
func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk.Conn, zp string) (*Explorer, error) {
func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk.Conn, zp string, interval, laziness time.Duration) (*Explorer, error) {
s := state.State{}

ss, _, err := zc.Get(zp)
Expand All @@ -54,6 +58,9 @@ func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk.
zookeeper: zc,
zp: zp,
state: s,
updated: map[string]update{},
interval: interval,
laziness: laziness,
mutex: sync.Mutex{},
}, nil
}
Expand All @@ -62,7 +69,7 @@ func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk.
// and updates load balancers' state
func (e *Explorer) Run() error {
for {
time.Sleep(time.Second)
time.Sleep(e.interval)

d, err := e.discover()
if err != nil {
Expand Down Expand Up @@ -98,9 +105,19 @@ func (e *Explorer) updateBalancers(discovery *Discovery) {
state := e.getState()

wg := sync.WaitGroup{}
wg.Add(len(discovery.Balancers))

now := time.Now()

for _, b := range discovery.Balancers {
bs := b.String()
if reflect.DeepEqual(e.updated[bs].apps, discovery.Apps) {
if now.Sub(e.updated[bs].time) < e.laziness {
continue
}
}

wg.Add(1)

go func(b balancer.Balancer) {
defer wg.Done()

Expand All @@ -109,6 +126,11 @@ func (e *Explorer) updateBalancers(discovery *Discovery) {
log.Printf("error updating state on %s: %s\n", b, err)
return
}

e.updated[b.String()] = update{
time: now,
apps: discovery.Apps,
}
}(b)
}

Expand Down Expand Up @@ -238,3 +260,8 @@ func (e *Explorer) ServeMux() *http.ServeMux {

return mux
}

type update struct {
time time.Time
apps application.Apps
}

0 comments on commit 701a0e5

Please sign in to comment.