From 0738c47887921f5ac225899994050a14c372ecb4 Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Sun, 3 Apr 2016 00:03:14 +0100 Subject: [PATCH] Implement lazy state updates --- cmd/zoidberg/main.go | 7 ++++--- explorer.go | 33 ++++++++++++++++++++++++++++++--- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/cmd/zoidberg/main.go b/cmd/zoidberg/main.go index 8c99eb7..04edcac 100644 --- a/cmd/zoidberg/main.go +++ b/cmd/zoidberg/main.go @@ -1,6 +1,7 @@ package main import ( + "errors" "flag" "fmt" "log" @@ -9,8 +10,6 @@ import ( "strings" "time" - "errors" - "github.com/bobrik/zoidberg" "github.com/bobrik/zoidberg/application" "github.com/bobrik/zoidberg/balancer" @@ -24,6 +23,8 @@ func main() { bff := flag.String("balancer-finder", os.Getenv("BALANCER_FINDER"), "balancer finder") aff := flag.String("application-finder", os.Getenv("APPLICATION_FINDER"), "application finder") z := flag.String("zk", os.Getenv("ZK"), "zk connection in host:port,host:port/path format") + i := flag.Duration("interval", time.Second, "discovery interval") + l := flag.Duration("laziness", time.Minute, "time to skip balancer updates if there are no changes") application.RegisterFlags() balancer.RegisterFlags() @@ -50,7 +51,7 @@ func main() { log.Fatal(err) } - e, err := zoidberg.NewExplorer(*n, af, bf, zc, zp) + e, err := zoidberg.NewExplorer(*n, af, bf, zc, zp, *i, *l) if err != nil { log.Fatal(err) } diff --git a/explorer.go b/explorer.go index bd3fadf..590ea46 100644 --- a/explorer.go +++ b/explorer.go @@ -6,6 +6,7 @@ import ( "log" "net/http" "path" + "reflect" "strings" "sync" "time" @@ -24,13 +25,16 @@ type Explorer struct { zookeeper *zk.Conn zp string state state.State + updated map[string]update + interval time.Duration + laziness time.Duration mutex sync.Mutex } // NewExplorer creates a new Explorer instance with a name, // application and balancer finders and zookeeper connection // to persist versioning information -func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk.Conn, zp string) (*Explorer, error) { +func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk.Conn, zp string, interval, laziness time.Duration) (*Explorer, error) { s := state.State{} ss, _, err := zc.Get(zp) @@ -54,6 +58,9 @@ func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk. zookeeper: zc, zp: zp, state: s, + updated: map[string]update{}, + interval: interval, + laziness: laziness, mutex: sync.Mutex{}, }, nil } @@ -62,7 +69,7 @@ func NewExplorer(name string, af application.Finder, bf balancer.Finder, zc *zk. // and updates load balancers' state func (e *Explorer) Run() error { for { - time.Sleep(time.Second) + time.Sleep(e.interval) d, err := e.discover() if err != nil { @@ -98,9 +105,19 @@ func (e *Explorer) updateBalancers(discovery *Discovery) { state := e.getState() wg := sync.WaitGroup{} - wg.Add(len(discovery.Balancers)) + + now := time.Now() for _, b := range discovery.Balancers { + bs := b.String() + if reflect.DeepEqual(e.updated[bs].apps, discovery.Apps) { + if now.Sub(e.updated[bs].time) < e.laziness { + continue + } + } + + wg.Add(1) + go func(b balancer.Balancer) { defer wg.Done() @@ -109,6 +126,11 @@ func (e *Explorer) updateBalancers(discovery *Discovery) { log.Printf("error updating state on %s: %s\n", b, err) return } + + e.updated[b.String()] = update{ + time: now, + apps: discovery.Apps, + } }(b) } @@ -238,3 +260,8 @@ func (e *Explorer) ServeMux() *http.ServeMux { return mux } + +type update struct { + time time.Time + apps application.Apps +}