Skip to content

Commit

Permalink
Add mimirpb.CIRCUIT_BREAKER_OPEN error cause (grafana#7330)
Browse files Browse the repository at this point in the history
* Add mimirpb.CIRCUIT_BREAKER_OPEN error cause

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

* Fixing review findings

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>

---------

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic authored and beatkind committed Feb 13, 2024
1 parent 28e09c5 commit bbcb640
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 157 deletions.
4 changes: 1 addition & 3 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/grafana/dskit/flagext"
Expand Down Expand Up @@ -1056,8 +1055,7 @@ func TestDistributor_PushWithCircuitBreakers(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
err := ds[0].push(ctx, NewParsedRequest(makeWriteRequest(123456789000, 10, 0, false, true, "foo")))
require.Error(t, err)
require.ErrorIs(t, err, circuitbreaker.ErrOpen)
require.ErrorAs(t, err, &client.ErrCircuitBreakerOpen{})
require.ErrorAs(t, err, &circuitBreakerOpenError{})
}

func TestDistributor_PushQuery(t *testing.T) {
Expand Down
36 changes: 33 additions & 3 deletions pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package distributor

import (
"fmt"
"time"

"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
Expand Down Expand Up @@ -196,6 +197,30 @@ func (e ingesterPushError) errorCause() mimirpb.ErrorCause {
// Ensure that ingesterPushError implements distributorError.
var _ distributorError = ingesterPushError{}

type circuitBreakerOpenError struct {
err client.ErrCircuitBreakerOpen
}

// newCircuitBreakerOpenError creates a circuitBreakerOpenError wrapping the passed client.ErrCircuitBreakerOpen.
func newCircuitBreakerOpenError(err client.ErrCircuitBreakerOpen) circuitBreakerOpenError {
return circuitBreakerOpenError{err: err}
}

func (e circuitBreakerOpenError) Error() string {
return e.err.Error()
}

func (e circuitBreakerOpenError) RemainingDelay() time.Duration {
return e.err.RemainingDelay()
}

func (e circuitBreakerOpenError) errorCause() mimirpb.ErrorCause {
return mimirpb.CIRCUIT_BREAKER_OPEN
}

// Ensure that circuitBreakerOpenError implements distributorError.
var _ distributorError = circuitBreakerOpenError{}

// toGRPCError converts the given error into an appropriate gRPC error.
func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error {
var (
Expand All @@ -220,9 +245,9 @@ func toGRPCError(pushErr error, serviceOverloadErrorEnabled bool) error {
errCode = codes.AlreadyExists
case mimirpb.TOO_MANY_CLUSTERS:
errCode = codes.FailedPrecondition
case mimirpb.CIRCUIT_BREAKER_OPEN:
errCode = codes.Unavailable
}
} else if errors.As(pushErr, &client.ErrCircuitBreakerOpen{}) {
errCode = codes.Unavailable
}
stat := status.New(errCode, pushErr.Error())
if errDetails != nil {
Expand All @@ -241,7 +266,12 @@ func wrapIngesterPushError(err error, ingesterID string) error {

stat, ok := grpcutil.ErrorToStatus(err)
if !ok {
return errors.Wrap(err, fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID))
pushErr := err
var errCircuitBreakerOpen client.ErrCircuitBreakerOpen
if errors.As(pushErr, &errCircuitBreakerOpen) {
pushErr = newCircuitBreakerOpenError(errCircuitBreakerOpen)
}
return errors.Wrap(pushErr, fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID))
}
statusCode := stat.Code()
if util.IsHTTPStatusCode(statusCode) {
Expand Down
44 changes: 33 additions & 11 deletions pkg/distributor/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,27 @@ func TestNewIngesterPushError(t *testing.T) {
}
}

func TestNewCircuitBreakerOpenError(t *testing.T) {
errCircuitBreakerOpen := client.ErrCircuitBreakerOpen{}

err := newCircuitBreakerOpenError(errCircuitBreakerOpen)
expectedErrorMsg := errCircuitBreakerOpen.Error()
assert.Error(t, err)
assert.EqualError(t, err, expectedErrorMsg)
checkDistributorError(t, err, mimirpb.CIRCUIT_BREAKER_OPEN)

assert.NotErrorIs(t, err, errCircuitBreakerOpen)
assert.Equal(t, errCircuitBreakerOpen.RemainingDelay(), err.RemainingDelay())

assert.True(t, errors.As(err, &circuitBreakerOpenError{}))
assert.False(t, errors.As(err, &replicasDidNotMatchError{}))

wrappedErr := fmt.Errorf("wrapped %w", err)
assert.ErrorIs(t, wrappedErr, err)
assert.True(t, errors.As(wrappedErr, &circuitBreakerOpenError{}))
checkDistributorError(t, wrappedErr, mimirpb.CIRCUIT_BREAKER_OPEN)
}

func TestToGRPCError(t *testing.T) {
const (
ingesterID = "ingester-25"
Expand Down Expand Up @@ -331,15 +352,17 @@ func TestToGRPCError(t *testing.T) {
expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, originalMsg),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.UNKNOWN_CAUSE},
},
"a client.ErrCircuitBreakerOpen error gets translated into an Unavailable error without details": {
err: client.ErrCircuitBreakerOpen{},
expectedGRPCCode: codes.Unavailable,
expectedErrorMsg: circuitbreaker.ErrOpen.Error(),
"a circuitBreakerOpenError gets translated into an Unavailable error with CIRCUIT_BREAKER_OPEN cause": {
err: newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}),
expectedGRPCCode: codes.Unavailable,
expectedErrorMsg: circuitbreaker.ErrOpen.Error(),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.CIRCUIT_BREAKER_OPEN},
},
"a wrapped client.ErrCircuitBreakerOpen error gets translated into an Unavailable error without details": {
err: errors.Wrap(client.ErrCircuitBreakerOpen{}, fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID)),
expectedGRPCCode: codes.Unavailable,
expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, circuitbreaker.ErrOpen),
"a wrapped circuitBreakerOpenError gets translated into an Unavailable error witch CIRCUIT_BREAKER_OPEN cause": {
err: errors.Wrap(newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}), fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID)),
expectedGRPCCode: codes.Unavailable,
expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, circuitbreaker.ErrOpen),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.CIRCUIT_BREAKER_OPEN},
},
}
for name, tc := range testCases {
Expand Down Expand Up @@ -377,12 +400,11 @@ func TestWrapIngesterPushError(t *testing.T) {
})

// Ensure that client.ErrCircuitBreakerOpen error gets correctly wrapped.
t.Run("an ErrCircuitBreakerOpen error gives an ErrCircuitBreakerOpen error", func(t *testing.T) {
t.Run("an ErrCircuitBreakerOpen error gives an circuitBreakerOpenErr error", func(t *testing.T) {
ingesterPushErr := client.ErrCircuitBreakerOpen{}
err := wrapIngesterPushError(ingesterPushErr, ingesterID)
require.Error(t, err)
require.ErrorIs(t, err, circuitbreaker.ErrOpen)
require.ErrorAs(t, err, &client.ErrCircuitBreakerOpen{})
require.ErrorAs(t, err, &circuitBreakerOpenError{})
})

// Ensure that the errors created by httpgrpc get translated into
Expand Down
5 changes: 2 additions & 3 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/opentracing/opentracing-go"

"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/globalerror"
Expand Down Expand Up @@ -227,9 +226,9 @@ func toHTTPStatus(ctx context.Context, pushErr error, limits *validation.Overrid
return http.StatusBadRequest
case mimirpb.TSDB_UNAVAILABLE:
return http.StatusServiceUnavailable
case mimirpb.CIRCUIT_BREAKER_OPEN:
return http.StatusServiceUnavailable
}
} else if errors.As(pushErr, &client.ErrCircuitBreakerOpen{}) {
return http.StatusServiceUnavailable
}

return http.StatusInternalServerError
Expand Down
8 changes: 4 additions & 4 deletions pkg/distributor/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,13 +1136,13 @@ func TestHandler_ToHTTPStatus(t *testing.T) {
expectedHTTPStatus: http.StatusInternalServerError,
expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, context.DeadlineExceeded),
},
"a client.ErrCircuitBreakerOpen error gets translated into an HTTP 503": {
err: client.ErrCircuitBreakerOpen{},
"a circuitBreakerOpenError gets translated into an HTTP 503": {
err: newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}),
expectedHTTPStatus: http.StatusServiceUnavailable,
expectedErrorMsg: circuitbreaker.ErrOpen.Error(),
},
"a wrapped client.ErrCircuitBreakerOpen error gets translated into an HTTP 503": {
err: errors.Wrap(client.ErrCircuitBreakerOpen{}, fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID)),
"a wrapped circuitBreakerOpenError gets translated into an HTTP 503": {
err: errors.Wrap(newCircuitBreakerOpenError(client.ErrCircuitBreakerOpen{}), fmt.Sprintf("%s %s", failedPushingToIngesterMessage, ingesterID)),
expectedHTTPStatus: http.StatusServiceUnavailable,
expectedErrorMsg: fmt.Sprintf("%s %s: %s", failedPushingToIngesterMessage, ingesterID, circuitbreaker.ErrOpen),
},
Expand Down
Loading

0 comments on commit bbcb640

Please sign in to comment.