diff --git a/config.go b/config.go index 39c62f3..4856eb3 100644 --- a/config.go +++ b/config.go @@ -13,10 +13,13 @@ import ( ) const ( - DefaultHTTPTimeout = 10 * time.Second - DefaultUpdateEvery = 24 * time.Hour - DefaultRateLimitInterval = 100 * time.Millisecond - DefaultRateLimitBurst = 10 + DefaultHTTPTimeout = 10 * time.Second + DefaultUpdateEvery = 24 * time.Hour + DefaultRateLimitInterval = 100 * time.Millisecond + DefaultRateLimitBurst = 10 + DefaultCircuitBreakerOpenThreshold = 5 + DefaultCircuitBreakerHalfOpenTimeout = time.Minute + DefaultCircuitBreakerResetFailuresTimeout = 20 * time.Second ) type duration struct { @@ -71,15 +74,15 @@ func (c config) GetWorkerPoolSize() int { } func (c config) GetBasicAuthUser() []byte { - return []byte(c.BasicAuthUser) + return []byte(c.BasicAuthUser) } func (c config) GetBasicAuthPassword() []byte { - return []byte(c.BasicAuthPassword) + return []byte(c.BasicAuthPassword) } func (c config) HasBasicAuth() bool { - return c.BasicAuthUser != "" || c.BasicAuthPassword != "" + return c.BasicAuthUser != "" || c.BasicAuthPassword != "" } func (c config) GetProviders() []configProvider { @@ -87,13 +90,16 @@ func (c config) GetProviders() []configProvider { } type configProvider struct { - Name string `json:"name"` - Directory string `json:"directory"` - RateLimitInterval duration `json:"rate_limit_interval"` - RateLimitBurst uint `json:"rate_limit_burst"` - UpdateEvery duration `json:"update_every"` - HTTPTimeout duration `json:"http_timeout"` - SpecificParameters map[string]string `json:"specific_parameters"` + Name string `json:"name"` + Directory string `json:"directory"` + RateLimitInterval duration `json:"rate_limit_interval"` + RateLimitBurst uint `json:"rate_limit_burst"` + CircuitBreakerOpenThreshold uint32 `json:"circuit_breaker_open_threshold"` + CircuitBreakerHalfOpenTimeout duration `json:"circuit_breaker_half_open_timeout"` + CircuitBreakerResetFailuresTimeout duration `json:"circuit_breaker_reset_failures_timeout"` + UpdateEvery duration `json:"update_every"` + HTTPTimeout duration `json:"http_timeout"` + SpecificParameters map[string]string `json:"specific_parameters"` } func (c configProvider) GetName() string { @@ -124,6 +130,30 @@ func (c configProvider) GetRateLimitBurst() int { return int(c.RateLimitBurst) } +func (c configProvider) GetCircuitBreakerOpenThreshold() uint32 { + if c.CircuitBreakerOpenThreshold == 0 { + return DefaultCircuitBreakerOpenThreshold + } + + return c.CircuitBreakerOpenThreshold +} + +func (c configProvider) GetCircuitBreakerHalfOpenTimeout() time.Duration { + if c.CircuitBreakerHalfOpenTimeout.Duration == 0 { + return DefaultCircuitBreakerHalfOpenTimeout + } + + return c.CircuitBreakerHalfOpenTimeout.Duration +} + +func (c configProvider) GetCircuitBreakerResetFailuresTimeout() time.Duration { + if c.CircuitBreakerResetFailuresTimeout.Duration == 0 { + return DefaultCircuitBreakerResetFailuresTimeout + } + + return c.CircuitBreakerResetFailuresTimeout.Duration +} + func (c configProvider) GetUpdateEvery() time.Duration { if c.UpdateEvery.Duration == 0 { return DefaultUpdateEvery diff --git a/example.config.hjson b/example.config.hjson index 57f5ac8..d61ed70 100644 --- a/example.config.hjson +++ b/example.config.hjson @@ -53,6 +53,9 @@ // "directory": "${provider_name}", // "rate_limit_interval": "100ms", // "rate_limit_burst": 10, + // "circuit_breaker_open_threshold": 5, + // "circuit_breaker_half_open_timeout": "1m", + // "circuit_breaker_reset_failures_timeout": "20s", // "update_every": "24h", // "http_timeout": "10s", // "specific_parameters": {} @@ -67,6 +70,19 @@ // Please see rate_limit_interval and rate_limit_burst meanings // here: https://pkg.go.dev/golang.org/x/time/rate // + // circuit_breaker_open_threshold defines a number of errors + // required to transit into OPEN state. In open state circuit + // breaker does not allow real requests to the target netloc. + // + // circuit_breaker_half_open_timeout defines a time period when + // circuit breaker closes for a single request to check if we + // already can proceed with out requests or not. If request fails, + // it immediately goes into OPEN state. Succeeds - to CLOSED state. + // + // circuit_breaker_reset_failures_timeout defines a time interval + // when failure counter resets. It is applicable only for closed + // state. + // // update_every is a periodicity that is used to update provider // database if this is applicable. // diff --git a/go.mod b/go.mod index e81c3ac..8fc617c 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/antchfx/htmlquery v1.2.3 github.com/antchfx/xpath v1.1.11 // indirect github.com/antzucaro/matchr v0.0.0-20210222213004-b04723ef80f0 - github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/dgraph-io/ristretto v0.0.3 github.com/hjson/hjson-go v3.1.0+incompatible github.com/ip2location/ip2location-go v8.3.0+incompatible @@ -15,7 +14,6 @@ require ( github.com/kentik/patricia v0.0.0-20201202224819-f9447a6e25f1 github.com/leaanthony/clir v1.0.4 github.com/mccutchen/go-httpbin v1.1.1 - github.com/mercari/go-circuitbreaker v0.0.0-20201130021310-aff740600e91 github.com/oschwald/maxminddb-golang v1.8.0 github.com/panjf2000/ants/v2 v2.4.3 github.com/pariz/gountries v0.0.0-20200430155801-1c6a393df9c7 diff --git a/go.sum b/go.sum index b8e163d..c50b499 100644 --- a/go.sum +++ b/go.sum @@ -9,9 +9,6 @@ github.com/antchfx/xpath v1.1.11 h1:WOFtK8TVAjLm3lbgqeP0arlHpvCEeTANeWZ/csPpJkQ= github.com/antchfx/xpath v1.1.11/go.mod h1:i54GszH55fYfBmoZXapTHN8T8tkcHfRgLyVwwqzXNcs= github.com/antzucaro/matchr v0.0.0-20210222213004-b04723ef80f0 h1:R/qAiUxFT3mNgQaNqJe0IVznjKRNm23ohAIh9lgtlzc= github.com/antzucaro/matchr v0.0.0-20210222213004-b04723ef80f0/go.mod h1:v3ZDlfVAL1OrkKHbGSFFK60k0/7hruHPDq2XMs9Gu6U= -github.com/cenkalti/backoff/v3 v3.1.1/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= -github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= -github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= @@ -22,8 +19,6 @@ github.com/dgraph-io/ristretto v0.0.3 h1:jh22xisGBjrEVnRZ1DVTpBVQm0Xndu8sMl0CWDz github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a h1:yDWHCSQ40h88yih2JAcL6Ls/kVkSE8GFACTGVnMPruw= -github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a/go.mod h1:7Ga40egUymuWXxAe151lTNnCv97MddSOVsjpPPkityA= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/hjson/hjson-go v3.1.0+incompatible h1:DY/9yE8ey8Zv22bY+mHV1uk2yRy0h8tKhZ77hEdi0Aw= @@ -38,8 +33,6 @@ github.com/leaanthony/clir v1.0.4 h1:Dov2y9zWJmZr7CjaCe86lKa4b5CSxskGAt2yBkoDyiU github.com/leaanthony/clir v1.0.4/go.mod h1:k/RBkdkFl18xkkACMCLt09bhiZnrGORoxmomeMvDpE0= github.com/mccutchen/go-httpbin v1.1.1 h1:aEws49HEJEyXHLDnshQVswfUlCVoS8g6h9YaDyaW7RE= github.com/mccutchen/go-httpbin v1.1.1/go.mod h1:fhpOYavp5g2K74XDl/ao2y4KvhqVtKlkg1e+0UaQv7I= -github.com/mercari/go-circuitbreaker v0.0.0-20201130021310-aff740600e91 h1:mIBx9ZkszRfzGtqTR0Cl8RTTIP1dJ7RjEt8klMg+2gs= -github.com/mercari/go-circuitbreaker v0.0.0-20201130021310-aff740600e91/go.mod h1:C0UM01bzV6QJSeEcGQ5HtFmYtt7uExjYKTKwH7uhzPY= github.com/oschwald/maxminddb-golang v1.8.0 h1:Uh/DSnGoxsyp/KYbY1AuP0tYEwfs0sCph9p/UMXK/Hk= github.com/oschwald/maxminddb-golang v1.8.0/go.mod h1:RXZtst0N6+FY/3qCNmZMBApR19cdQj43/NM9VkrNAis= github.com/panjf2000/ants/v2 v2.4.3 h1:wHghL17YKFanB62QjPQ9o+DuM4q7WrQ7zAhoX8+eBXU= diff --git a/providers/init_test.go b/providers/init_test.go index 029806f..a1fce91 100644 --- a/providers/init_test.go +++ b/providers/init_test.go @@ -51,7 +51,10 @@ func (suite *ProviderTestSuite) SetupTest() { suite.http = topolib.NewHTTPClient(&http.Client{}, "test-agent", time.Millisecond, - 100) + 100, + 5, + time.Minute, + time.Minute) } type OnlineProviderTestSuite struct { diff --git a/topolib/circuit_breaker.go b/topolib/circuit_breaker.go new file mode 100644 index 0000000..deadee8 --- /dev/null +++ b/topolib/circuit_breaker.go @@ -0,0 +1,195 @@ +package topolib + +import ( + "context" + "errors" + "net/http" + "sync/atomic" + "time" +) + +type circuitBreakerCallback func(context.Context) (*http.Response, error) + +const ( + circuitBreakerStateClosed uint32 = iota + circuitBreakerStateHalfOpened + circuitBreakerStateOpened +) + +type circuitBreaker struct { + state uint32 + stateMutexChan chan bool + + halfOpenTimer *time.Timer + failuresCleanupTimer *time.Timer + + halfOpenAttempts uint32 + failuresCount uint32 + + openThreshold uint32 + halfOpenTimeout time.Duration + resetFailuresTimeout time.Duration +} + +func (c *circuitBreaker) Do(ctx context.Context, callback circuitBreakerCallback) (*http.Response, error) { + switch atomic.LoadUint32(&c.state) { + case circuitBreakerStateClosed: + return c.doClosed(ctx, callback) + case circuitBreakerStateHalfOpened: + return c.doHalfOpened(ctx, callback) + default: + return nil, ErrCircuitBreakerOpened + } +} + +func (c *circuitBreaker) doClosed(ctx context.Context, callback circuitBreakerCallback) (*http.Response, error) { + resp, err := callback(ctx) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + select { + case <-ctx.Done(): + return nil, ctx.Err() + case c.stateMutexChan <- true: + defer func() { + <-c.stateMutexChan + }() + } + } + + if c.isErrorOk(err) { + c.switchState(circuitBreakerStateClosed) + + return resp, err + } + + c.failuresCount++ + + if c.state == circuitBreakerStateClosed && c.failuresCount > c.openThreshold { + c.switchState(circuitBreakerStateOpened) + } + + return resp, err +} + +func (c *circuitBreaker) doHalfOpened(ctx context.Context, callback circuitBreakerCallback) (*http.Response, error) { + if !atomic.CompareAndSwapUint32(&c.halfOpenAttempts, 0, 1) { + return nil, ErrCircuitBreakerOpened + } + + resp, err := callback(ctx) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + select { + case <-ctx.Done(): + return nil, ctx.Err() + case c.stateMutexChan <- true: + defer func() { + <-c.stateMutexChan + }() + } + } + + if c.state != circuitBreakerStateHalfOpened { + return resp, err + } + + if c.isErrorOk(err) { + c.switchState(circuitBreakerStateClosed) + } else { + c.switchState(circuitBreakerStateOpened) + } + + return resp, err +} + +func (c *circuitBreaker) switchState(state uint32) { + switch state { + case circuitBreakerStateClosed: + c.stopTimer(&c.halfOpenTimer) + c.ensureTimer(&c.failuresCleanupTimer, c.resetFailuresTimeout, c.resetFailures) + case circuitBreakerStateHalfOpened: + c.stopTimer(&c.failuresCleanupTimer) + c.stopTimer(&c.halfOpenTimer) + case circuitBreakerStateOpened: + c.stopTimer(&c.failuresCleanupTimer) + c.ensureTimer(&c.halfOpenTimer, c.halfOpenTimeout, c.tryHalfOpen) + } + + c.failuresCount = 0 + + atomic.StoreUint32(&c.halfOpenAttempts, 0) + atomic.StoreUint32(&c.state, state) +} + +func (c *circuitBreaker) resetFailures() { + c.stateMutexChan <- true + + defer func() { + <-c.stateMutexChan + }() + + c.stopTimer(&c.failuresCleanupTimer) + + if c.state == circuitBreakerStateClosed { + c.switchState(circuitBreakerStateClosed) + } +} + +func (c *circuitBreaker) tryHalfOpen() { + c.stateMutexChan <- true + + defer func() { + <-c.stateMutexChan + }() + + if c.state == circuitBreakerStateOpened { + c.switchState(circuitBreakerStateHalfOpened) + } +} + +func (c *circuitBreaker) stopTimer(timerRef **time.Timer) { + timer := *timerRef + + if timer == nil { + return + } + + timer.Stop() + + select { + case <-timer.C: + default: + } + + *timerRef = nil +} + +func (c *circuitBreaker) ensureTimer(timerRef **time.Timer, timeout time.Duration, callback func()) { + if *timerRef == nil { + *timerRef = time.AfterFunc(timeout, callback) + } +} + +func (c *circuitBreaker) isErrorOk(err error) bool { + return err == nil || errors.Is(err, ErrCircuitBreakerIgnore) +} + +func newCircuitBreaker(openThreshold uint32, + halfOpenTimeout, resetFailuresTimeout time.Duration) *circuitBreaker { + cb := &circuitBreaker{ + stateMutexChan: make(chan bool, 1), + openThreshold: openThreshold, + halfOpenTimeout: halfOpenTimeout, + resetFailuresTimeout: resetFailuresTimeout, + } + + cb.switchState(circuitBreakerStateClosed) + + return cb +} diff --git a/topolib/circuit_breaker_internal_test.go b/topolib/circuit_breaker_internal_test.go new file mode 100644 index 0000000..195e05d --- /dev/null +++ b/topolib/circuit_breaker_internal_test.go @@ -0,0 +1,239 @@ +package topolib + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +type CircuitBreakerTestSuite struct { + suite.Suite + + cb *circuitBreaker + cbMutex sync.Mutex + ctx context.Context + ctxCancel context.CancelFunc +} + +func (suite *CircuitBreakerTestSuite) SetupTest() { + suite.cbMutex = sync.Mutex{} + suite.ctx, suite.ctxCancel = context.WithCancel(context.Background()) + suite.cb = newCircuitBreaker(2, 200*time.Millisecond, 500*time.Millisecond) +} + +func (suite *CircuitBreakerTestSuite) CallbackOk(_ context.Context) (*http.Response, error) { + rec := httptest.NewRecorder() + + rec.WriteHeader(http.StatusCreated) + + return rec.Result(), nil +} + +func (suite *CircuitBreakerTestSuite) CallbackErr(_ context.Context) (*http.Response, error) { + return nil, io.EOF +} + +func (suite *CircuitBreakerTestSuite) CallbackIgnore(_ context.Context) (*http.Response, error) { + return nil, ErrCircuitBreakerIgnore +} + +func (suite *CircuitBreakerTestSuite) AssertResponseOk(resp *http.Response) { + suite.NotNil(resp) + suite.Equal(http.StatusCreated, resp.StatusCode) +} + +func (suite *CircuitBreakerTestSuite) TearDownTest() { + suite.ctxCancel() + + suite.cb.stateMutexChan <- true + + suite.cb.stopTimer(&suite.cb.failuresCleanupTimer) + suite.cb.stopTimer(&suite.cb.halfOpenTimer) +} + +func (suite *CircuitBreakerTestSuite) TestManyExecuted() { + wg := &sync.WaitGroup{} + + wg.Add(5) + + go func() { + wg.Wait() + suite.ctxCancel() + }() + + for i := 0; i < 5; i++ { + go func() { + defer wg.Done() + + resp, err := suite.cb.Do(suite.ctx, suite.CallbackOk) + + suite.cbMutex.Lock() + defer suite.cbMutex.Unlock() + + suite.NoError(err) + suite.AssertResponseOk(resp) + }() + } + + suite.Eventually(func() bool { + _, ok := <-suite.ctx.Done() + + return !ok + }, 500*time.Second, 10*time.Millisecond) +} + +func (suite *CircuitBreakerTestSuite) TestSomeFailuresButStillWorks() { + wg := &sync.WaitGroup{} + + wg.Add(5) + + go func() { + wg.Wait() + suite.ctxCancel() + }() + + _, err := suite.cb.Do(suite.ctx, suite.CallbackErr) + + suite.Error(err) + + for i := 0; i < 5; i++ { + go func() { + defer wg.Done() + + resp, err := suite.cb.Do(suite.ctx, suite.CallbackOk) + + suite.cbMutex.Lock() + defer suite.cbMutex.Unlock() + + suite.NoError(err) + suite.AssertResponseOk(resp) + }() + } + + suite.Eventually(func() bool { + _, ok := <-suite.ctx.Done() + + return !ok + }, 500*time.Second, 10*time.Millisecond) + suite.EqualValues(0, suite.cb.failuresCount) + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestSomeFailuresButStillClosed() { + _, err := suite.cb.Do(suite.ctx, suite.CallbackErr) + + suite.Error(err) + suite.EqualValues(1, suite.cb.failuresCount) + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) + + _, err = suite.cb.Do(suite.ctx, suite.CallbackErr) + + suite.Error(err) + suite.EqualValues(2, suite.cb.failuresCount) + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) + + _, err = suite.cb.Do(suite.ctx, suite.CallbackErr) + + suite.Error(err) + suite.EqualValues(0, suite.cb.failuresCount) + suite.EqualValues(circuitBreakerStateOpened, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestClosedFailureReset() { + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + time.Sleep(time.Second) + + suite.EqualValues(0, suite.cb.failuresCount) + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestOpenedExecute() { + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + _, err := suite.cb.Do(suite.ctx, suite.CallbackOk) // nolint: errcheck + + suite.Error(err) + suite.EqualValues(circuitBreakerStateOpened, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestOpenedIgnore() { + suite.cb.Do(suite.ctx, suite.CallbackIgnore) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackIgnore) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackIgnore) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackIgnore) // nolint: errcheck + + _, err := suite.cb.Do(suite.ctx, suite.CallbackOk) // nolint: errcheck + + suite.NoError(err) + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestHalfOpened() { + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + time.Sleep(700 * time.Millisecond) + + suite.EqualValues(circuitBreakerStateHalfOpened, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestHalfOpenedErr() { + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + time.Sleep(700 * time.Millisecond) + + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + suite.EqualValues(circuitBreakerStateOpened, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestHalfOpenedOk() { + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + time.Sleep(700 * time.Millisecond) + + suite.cb.Do(suite.ctx, suite.CallbackOk) // nolint: errcheck + + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) +} + +func (suite *CircuitBreakerTestSuite) TestCheckConcurrentExecutionInHalfOpened() { + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + suite.cb.Do(suite.ctx, suite.CallbackErr) // nolint: errcheck + + time.Sleep(700 * time.Millisecond) + + go suite.cb.Do(suite.ctx, func(_ context.Context) (*http.Response, error) { // nolint: errcheck + time.Sleep(500 * time.Millisecond) + + return nil, nil + }) + + time.Sleep(10 * time.Millisecond) + + _, err := suite.cb.Do(suite.ctx, suite.CallbackOk) // nolint: errcheck + + suite.Error(err) + time.Sleep(time.Second) + suite.EqualValues(circuitBreakerStateClosed, suite.cb.state) +} + +func TestCircuitBreaker(t *testing.T) { + suite.Run(t, &CircuitBreakerTestSuite{}) +} diff --git a/topolib/errors.go b/topolib/errors.go index b269b65..1dd77b5 100644 --- a/topolib/errors.go +++ b/topolib/errors.go @@ -14,6 +14,14 @@ var ( // ErrContextIsClosed returns if context is closed during execution // of the method. ErrContextIsClosed = errors.New("context is closed") + + // ErrCircuitBreakerOpened returns by http client if circuit breaker + // is opened. + ErrCircuitBreakerOpened = errors.New("circuit breaker is opened") + + // ErrCircuitBreakerIgnore should be returned if it is necessary to + // ignore circuit breaker error in http client. + ErrCircuitBreakerIgnore = errors.New("this error should be ignores by circuit breaker") ) type jsonHTTPError struct { diff --git a/topolib/http_client.go b/topolib/http_client.go index 81f053a..05accd8 100644 --- a/topolib/http_client.go +++ b/topolib/http_client.go @@ -8,7 +8,6 @@ import ( "net/http" "time" - "github.com/mercari/go-circuitbreaker" "golang.org/x/time/rate" ) @@ -16,7 +15,7 @@ type httpClient struct { userAgent string client *http.Client rateLimiter *rate.Limiter - circuitBreaker *circuitbreaker.CircuitBreaker + circuitBreaker *circuitBreaker } func (h httpClient) Do(req *http.Request) (*http.Response, error) { @@ -29,11 +28,11 @@ func (h httpClient) Do(req *http.Request) (*http.Response, error) { req.Header.Set("User-Agent", h.userAgent) - resp, err := h.circuitBreaker.Do(ctx, func() (interface{}, error) { + resp, err := h.circuitBreaker.Do(ctx, func(ctx context.Context) (*http.Response, error) { resp, err := h.client.Do(req.WithContext(ctx)) if err := h.rateLimiter.Wait(ctx); err != nil { - return nil, circuitbreaker.Ignore(fmt.Errorf("rate limited: %w", err)) + return nil, ErrCircuitBreakerIgnore } if err != nil { @@ -59,7 +58,7 @@ func (h httpClient) Do(req *http.Request) (*http.Response, error) { return nil, err } - return resp.(*http.Response), err + return resp, err } // NewHTTPClient prepares a new HTTP client, wraps it with rate limiter, @@ -67,14 +66,36 @@ func (h httpClient) Do(req *http.Request) (*http.Response, error) { // // Please see https://pkg.go.dev/golang.org/x/time/rate to get a meaning // of rate limiter parameters. +// +// A meaning of circuit breaker parameters: +// +// circuitBreakerOpenThreshold - this is a threshold of failures when +// circuit breaker becomes OPEN. So, if you pass 3 here, then after 3 +// failures, circuit breaker switches into OPEN state and blocks access +// to a target. +// +// circuitBreakerResetFailuresTimeout - is tightly coupled with +// circuitBreakerOpenThreshold. Each time period when circuit breaker +// is closed, we try to reset a failure counter. So, if you pass 10 +// here, make 2 errors then after 10 seconds this counter is going to be +// reset. +// +// circuitBreakerHalfOpenTimeout - when circuit breaker is closed, we +// open it after this time perios and it goes into HALF_OPEN state. +// Within this state we allow 1 attempt. If this attempt fails, then it +// goes into OPEN state again. If succeed - goes to CLOSED. func NewHTTPClient(client *http.Client, userAgent string, rateLimiterInterval time.Duration, - rateLimitBurst int) HTTPClient { + rateLimitBurst int, + circuitBreakerOpenThreshold uint32, + circuitBreakerHalfOpenTimeout, circuitBreakerResetFailuresTimeout time.Duration) HTTPClient { return httpClient{ - userAgent: userAgent, - client: client, - rateLimiter: rate.NewLimiter(rate.Every(rateLimiterInterval), rateLimitBurst), - circuitBreaker: circuitbreaker.New(nil), + userAgent: userAgent, + client: client, + rateLimiter: rate.NewLimiter(rate.Every(rateLimiterInterval), rateLimitBurst), + circuitBreaker: newCircuitBreaker(circuitBreakerOpenThreshold, + circuitBreakerHalfOpenTimeout, + circuitBreakerResetFailuresTimeout), } } diff --git a/topolib/http_client_test.go b/topolib/http_client_test.go index a2c19b7..e63d678 100644 --- a/topolib/http_client_test.go +++ b/topolib/http_client_test.go @@ -31,7 +31,10 @@ func (suite *HTTPClientTestSuite) SetupTest() { suite.c = topolib.NewHTTPClient(suite.httpbinEndpoint.Client(), "test", 100*time.Millisecond, - 1) + 1, + 5, + time.Minute, + time.Minute) } func (suite *HTTPClientTestSuite) TestRateLimiter() { diff --git a/utils.go b/utils.go index 3b86111..958c3c8 100644 --- a/utils.go +++ b/utils.go @@ -118,7 +118,10 @@ func makeNewHTTPClient(conf configProvider) topolib.HTTPClient { return topolib.NewHTTPClient(httpClient, "topographer/"+version, conf.GetRateLimitInterval(), - conf.GetRateLimitBurst()) + conf.GetRateLimitBurst(), + conf.GetCircuitBreakerOpenThreshold(), + conf.GetCircuitBreakerHalfOpenTimeout(), + conf.GetCircuitBreakerResetFailuresTimeout()) } func boolParam(param string) bool {