Skip to content

Commit

Permalink
Merge pull request #15 from 9seconds/circuitbreaker
Browse files Browse the repository at this point in the history
Custom circuit breaker implementation
  • Loading branch information
9seconds authored Mar 1, 2021
2 parents 2318113 + 94f2d22 commit 1455807
Show file tree
Hide file tree
Showing 11 changed files with 545 additions and 36 deletions.
58 changes: 44 additions & 14 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ import (
)

const (
DefaultHTTPTimeout = 10 * time.Second
DefaultUpdateEvery = 24 * time.Hour
DefaultRateLimitInterval = 100 * time.Millisecond
DefaultRateLimitBurst = 10
DefaultHTTPTimeout = 10 * time.Second
DefaultUpdateEvery = 24 * time.Hour
DefaultRateLimitInterval = 100 * time.Millisecond
DefaultRateLimitBurst = 10
DefaultCircuitBreakerOpenThreshold = 5
DefaultCircuitBreakerHalfOpenTimeout = time.Minute
DefaultCircuitBreakerResetFailuresTimeout = 20 * time.Second
)

type duration struct {
Expand Down Expand Up @@ -71,29 +74,32 @@ func (c config) GetWorkerPoolSize() int {
}

func (c config) GetBasicAuthUser() []byte {
return []byte(c.BasicAuthUser)
return []byte(c.BasicAuthUser)
}

func (c config) GetBasicAuthPassword() []byte {
return []byte(c.BasicAuthPassword)
return []byte(c.BasicAuthPassword)
}

func (c config) HasBasicAuth() bool {
return c.BasicAuthUser != "" || c.BasicAuthPassword != ""
return c.BasicAuthUser != "" || c.BasicAuthPassword != ""
}

func (c config) GetProviders() []configProvider {
return c.Providers
}

type configProvider struct {
Name string `json:"name"`
Directory string `json:"directory"`
RateLimitInterval duration `json:"rate_limit_interval"`
RateLimitBurst uint `json:"rate_limit_burst"`
UpdateEvery duration `json:"update_every"`
HTTPTimeout duration `json:"http_timeout"`
SpecificParameters map[string]string `json:"specific_parameters"`
Name string `json:"name"`
Directory string `json:"directory"`
RateLimitInterval duration `json:"rate_limit_interval"`
RateLimitBurst uint `json:"rate_limit_burst"`
CircuitBreakerOpenThreshold uint32 `json:"circuit_breaker_open_threshold"`
CircuitBreakerHalfOpenTimeout duration `json:"circuit_breaker_half_open_timeout"`
CircuitBreakerResetFailuresTimeout duration `json:"circuit_breaker_reset_failures_timeout"`
UpdateEvery duration `json:"update_every"`
HTTPTimeout duration `json:"http_timeout"`
SpecificParameters map[string]string `json:"specific_parameters"`
}

func (c configProvider) GetName() string {
Expand Down Expand Up @@ -124,6 +130,30 @@ func (c configProvider) GetRateLimitBurst() int {
return int(c.RateLimitBurst)
}

func (c configProvider) GetCircuitBreakerOpenThreshold() uint32 {
if c.CircuitBreakerOpenThreshold == 0 {
return DefaultCircuitBreakerOpenThreshold
}

return c.CircuitBreakerOpenThreshold
}

func (c configProvider) GetCircuitBreakerHalfOpenTimeout() time.Duration {
if c.CircuitBreakerHalfOpenTimeout.Duration == 0 {
return DefaultCircuitBreakerHalfOpenTimeout
}

return c.CircuitBreakerHalfOpenTimeout.Duration
}

func (c configProvider) GetCircuitBreakerResetFailuresTimeout() time.Duration {
if c.CircuitBreakerResetFailuresTimeout.Duration == 0 {
return DefaultCircuitBreakerResetFailuresTimeout
}

return c.CircuitBreakerResetFailuresTimeout.Duration
}

func (c configProvider) GetUpdateEvery() time.Duration {
if c.UpdateEvery.Duration == 0 {
return DefaultUpdateEvery
Expand Down
16 changes: 16 additions & 0 deletions example.config.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
// "directory": "${provider_name}",
// "rate_limit_interval": "100ms",
// "rate_limit_burst": 10,
// "circuit_breaker_open_threshold": 5,
// "circuit_breaker_half_open_timeout": "1m",
// "circuit_breaker_reset_failures_timeout": "20s",
// "update_every": "24h",
// "http_timeout": "10s",
// "specific_parameters": {}
Expand All @@ -67,6 +70,19 @@
// Please see rate_limit_interval and rate_limit_burst meanings
// here: https://pkg.go.dev/golang.org/x/time/rate
//
// circuit_breaker_open_threshold defines a number of errors
// required to transit into OPEN state. In open state circuit
// breaker does not allow real requests to the target netloc.
//
// circuit_breaker_half_open_timeout defines a time period when
// circuit breaker closes for a single request to check if we
// already can proceed with out requests or not. If request fails,
// it immediately goes into OPEN state. Succeeds - to CLOSED state.
//
// circuit_breaker_reset_failures_timeout defines a time interval
// when failure counter resets. It is applicable only for closed
// state.
//
// update_every is a periodicity that is used to update provider
// database if this is applicable.
//
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ require (
github.com/antchfx/htmlquery v1.2.3
github.com/antchfx/xpath v1.1.11 // indirect
github.com/antzucaro/matchr v0.0.0-20210222213004-b04723ef80f0
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/dgraph-io/ristretto v0.0.3
github.com/hjson/hjson-go v3.1.0+incompatible
github.com/ip2location/ip2location-go v8.3.0+incompatible
github.com/jarcoal/httpmock v1.0.7
github.com/kentik/patricia v0.0.0-20201202224819-f9447a6e25f1
github.com/leaanthony/clir v1.0.4
github.com/mccutchen/go-httpbin v1.1.1
github.com/mercari/go-circuitbreaker v0.0.0-20201130021310-aff740600e91
github.com/oschwald/maxminddb-golang v1.8.0
github.com/panjf2000/ants/v2 v2.4.3
github.com/pariz/gountries v0.0.0-20200430155801-1c6a393df9c7
Expand Down
7 changes: 0 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@ github.com/antchfx/xpath v1.1.11 h1:WOFtK8TVAjLm3lbgqeP0arlHpvCEeTANeWZ/csPpJkQ=
github.com/antchfx/xpath v1.1.11/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs=
github.com/antzucaro/matchr v0.0.0-20210222213004-b04723ef80f0 h1:R/qAiUxFT3mNgQaNqJe0IVznjKRNm23ohAIh9lgtlzc=
github.com/antzucaro/matchr v0.0.0-20210222213004-b04723ef80f0/go.mod h1:v3ZDlfVAL1OrkKHbGSFFK60k0/7hruHPDq2XMs9Gu6U=
github.com/cenkalti/backoff/v3 v3.1.1/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand All @@ -22,8 +19,6 @@ github.com/dgraph-io/ristretto v0.0.3 h1:jh22xisGBjrEVnRZ1DVTpBVQm0Xndu8sMl0CWDz
github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw=
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/hjson/hjson-go v3.1.0+incompatible h1:DY/9yE8ey8Zv22bY+mHV1uk2yRy0h8tKhZ77hEdi0Aw=
Expand All @@ -38,8 +33,6 @@ github.com/leaanthony/clir v1.0.4 h1:Dov2y9zWJmZr7CjaCe86lKa4b5CSxskGAt2yBkoDyiU
github.com/leaanthony/clir v1.0.4/go.mod h1:k/RBkdkFl18xkkACMCLt09bhiZnrGORoxmomeMvDpE0=
github.com/mccutchen/go-httpbin v1.1.1 h1:aEws49HEJEyXHLDnshQVswfUlCVoS8g6h9YaDyaW7RE=
github.com/mccutchen/go-httpbin v1.1.1/go.mod h1:fhpOYavp5g2K74XDl/ao2y4KvhqVtKlkg1e+0UaQv7I=
github.com/mercari/go-circuitbreaker v0.0.0-20201130021310-aff740600e91 h1:mIBx9ZkszRfzGtqTR0Cl8RTTIP1dJ7RjEt8klMg+2gs=
github.com/mercari/go-circuitbreaker v0.0.0-20201130021310-aff740600e91/go.mod h1:C0UM01bzV6QJSeEcGQ5HtFmYtt7uExjYKTKwH7uhzPY=
github.com/oschwald/maxminddb-golang v1.8.0 h1:Uh/DSnGoxsyp/KYbY1AuP0tYEwfs0sCph9p/UMXK/Hk=
github.com/oschwald/maxminddb-golang v1.8.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis=
github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU=
Expand Down
5 changes: 4 additions & 1 deletion providers/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ func (suite *ProviderTestSuite) SetupTest() {
suite.http = topolib.NewHTTPClient(&http.Client{},
"test-agent",
time.Millisecond,
100)
100,
5,
time.Minute,
time.Minute)
}

type OnlineProviderTestSuite struct {
Expand Down
195 changes: 195 additions & 0 deletions topolib/circuit_breaker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package topolib

import (
"context"
"errors"
"net/http"
"sync/atomic"
"time"
)

type circuitBreakerCallback func(context.Context) (*http.Response, error)

const (
circuitBreakerStateClosed uint32 = iota
circuitBreakerStateHalfOpened
circuitBreakerStateOpened
)

type circuitBreaker struct {
state uint32
stateMutexChan chan bool

halfOpenTimer *time.Timer
failuresCleanupTimer *time.Timer

halfOpenAttempts uint32
failuresCount uint32

openThreshold uint32
halfOpenTimeout time.Duration
resetFailuresTimeout time.Duration
}

func (c *circuitBreaker) Do(ctx context.Context, callback circuitBreakerCallback) (*http.Response, error) {
switch atomic.LoadUint32(&c.state) {
case circuitBreakerStateClosed:
return c.doClosed(ctx, callback)
case circuitBreakerStateHalfOpened:
return c.doHalfOpened(ctx, callback)
default:
return nil, ErrCircuitBreakerOpened
}
}

func (c *circuitBreaker) doClosed(ctx context.Context, callback circuitBreakerCallback) (*http.Response, error) {
resp, err := callback(ctx)

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.stateMutexChan <- true:
defer func() {
<-c.stateMutexChan
}()
}
}

if c.isErrorOk(err) {
c.switchState(circuitBreakerStateClosed)

return resp, err
}

c.failuresCount++

if c.state == circuitBreakerStateClosed && c.failuresCount > c.openThreshold {
c.switchState(circuitBreakerStateOpened)
}

return resp, err
}

func (c *circuitBreaker) doHalfOpened(ctx context.Context, callback circuitBreakerCallback) (*http.Response, error) {
if !atomic.CompareAndSwapUint32(&c.halfOpenAttempts, 0, 1) {
return nil, ErrCircuitBreakerOpened
}

resp, err := callback(ctx)

select {
case <-ctx.Done():
return nil, ctx.Err()
default:
select {
case <-ctx.Done():
return nil, ctx.Err()
case c.stateMutexChan <- true:
defer func() {
<-c.stateMutexChan
}()
}
}

if c.state != circuitBreakerStateHalfOpened {
return resp, err
}

if c.isErrorOk(err) {
c.switchState(circuitBreakerStateClosed)
} else {
c.switchState(circuitBreakerStateOpened)
}

return resp, err
}

func (c *circuitBreaker) switchState(state uint32) {
switch state {
case circuitBreakerStateClosed:
c.stopTimer(&c.halfOpenTimer)
c.ensureTimer(&c.failuresCleanupTimer, c.resetFailuresTimeout, c.resetFailures)
case circuitBreakerStateHalfOpened:
c.stopTimer(&c.failuresCleanupTimer)
c.stopTimer(&c.halfOpenTimer)
case circuitBreakerStateOpened:
c.stopTimer(&c.failuresCleanupTimer)
c.ensureTimer(&c.halfOpenTimer, c.halfOpenTimeout, c.tryHalfOpen)
}

c.failuresCount = 0

atomic.StoreUint32(&c.halfOpenAttempts, 0)
atomic.StoreUint32(&c.state, state)
}

func (c *circuitBreaker) resetFailures() {
c.stateMutexChan <- true

defer func() {
<-c.stateMutexChan
}()

c.stopTimer(&c.failuresCleanupTimer)

if c.state == circuitBreakerStateClosed {
c.switchState(circuitBreakerStateClosed)
}
}

func (c *circuitBreaker) tryHalfOpen() {
c.stateMutexChan <- true

defer func() {
<-c.stateMutexChan
}()

if c.state == circuitBreakerStateOpened {
c.switchState(circuitBreakerStateHalfOpened)
}
}

func (c *circuitBreaker) stopTimer(timerRef **time.Timer) {
timer := *timerRef

if timer == nil {
return
}

timer.Stop()

select {
case <-timer.C:
default:
}

*timerRef = nil
}

func (c *circuitBreaker) ensureTimer(timerRef **time.Timer, timeout time.Duration, callback func()) {
if *timerRef == nil {
*timerRef = time.AfterFunc(timeout, callback)
}
}

func (c *circuitBreaker) isErrorOk(err error) bool {
return err == nil || errors.Is(err, ErrCircuitBreakerIgnore)
}

func newCircuitBreaker(openThreshold uint32,
halfOpenTimeout, resetFailuresTimeout time.Duration) *circuitBreaker {
cb := &circuitBreaker{
stateMutexChan: make(chan bool, 1),
openThreshold: openThreshold,
halfOpenTimeout: halfOpenTimeout,
resetFailuresTimeout: resetFailuresTimeout,
}

cb.switchState(circuitBreakerStateClosed)

return cb
}
Loading

0 comments on commit 1455807

Please sign in to comment.