Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open circuit breakers on timeouts and per-instance limit errors only #7310

Merged
merged 2 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [CHANGE] Query-frontend: the default value of the CLI flag `-query-frontend.max-cache-freshness` (and its respective YAML configuration parameter) has been changed from `1m` to `10m`. #7161
* [CHANGE] Distributor: default the optimization `-distributor.write-requests-buffer-pooling-enabled` to `true`. #7165
* [CHANGE] Tracing: Move query information to span attributes instead of span logs. #7046
* [CHANGE] Distributor: the default value of circuit breaker's CLI flag `-ingester.client.circuit-breaker.cooldown-period` has been changed from `1m` to `10s`. #7310
* [FEATURE] Introduce `-server.log-source-ips-full` option to log all IPs from `Forwarded`, `X-Real-IP`, `X-Forwarded-For` headers. #7250
* [FEATURE] Introduce `-tenant-federation.max-tenants` option to limit the max number of tenants allowed for requests when federation is enabled. #6959
* [FEATURE] Cardinality API: added a new `count_method` parameter which enables counting active label values. #7085
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -2267,7 +2267,7 @@
"required": false,
"desc": "How long the circuit breaker will stay in the open state before allowing some requests",
"fieldValue": null,
"fieldDefaultValue": 60000000000,
"fieldDefaultValue": 10000000000,
"fieldFlag": "ingester.client.circuit-breaker.cooldown-period",
"fieldType": "duration",
"fieldCategory": "experimental"
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1274,7 +1274,7 @@ Usage of ./cmd/mimir/mimir:
-ingester.client.backoff-retries int
Number of times to backoff and retry before failing. (default 10)
-ingester.client.circuit-breaker.cooldown-period duration
[experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 1m0s)
[experimental] How long the circuit breaker will stay in the open state before allowing some requests (default 10s)
-ingester.client.circuit-breaker.enabled
[experimental] Enable circuit breaking when making requests to ingesters
-ingester.client.circuit-breaker.failure-execution-threshold uint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2356,7 +2356,7 @@ circuit_breaker:
# (experimental) How long the circuit breaker will stay in the open state
# before allowing some requests
# CLI flag: -ingester.client.circuit-breaker.cooldown-period
[cooldown_period: <duration> | default = 1m]
[cooldown_period: <duration> | default = 10s]

# (deprecated) If set to true, gRPC status codes will be reported in
# "status_code" label of "cortex_ingester_client_request_duration_seconds"
Expand Down
24 changes: 19 additions & 5 deletions pkg/ingester/client/circuitbreaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/ring"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
)

const (
Expand Down Expand Up @@ -97,9 +99,21 @@ func isFailure(err error) bool {
return false
}

// We only consider timeouts or the ingester being unavailable (returned when hitting
// per-instance limits) to be errors worthy of tripping the circuit breaker since these
// We only consider timeouts or ingester hitting a per-instance limit
// to be errors worthy of tripping the circuit breaker since these
// are specific to a particular ingester, not a user or request.
code := status.Code(err)
return code == codes.Unavailable || code == codes.DeadlineExceeded
if stat, ok := grpcutil.ErrorToStatus(err); ok {
if stat.Code() == codes.DeadlineExceeded {
return true
}

details := stat.Details()
if len(details) != 1 {
return false
}
if errDetails, ok := details[0].(*mimirpb.ErrorDetails); ok {
return errDetails.GetCause() == mimirpb.INSTANCE_LIMIT
}
}
return false
}
31 changes: 27 additions & 4 deletions pkg/ingester/client/circuitbreaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import (
"time"

"github.com/failsafe-go/failsafe-go/circuitbreaker"
"github.com/gogo/status"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/ring"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/test"
)

Expand All @@ -44,11 +45,33 @@ func TestIsFailure(t *testing.T) {
require.True(t, isFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable", func(t *testing.T) {
err := status.Error(codes.Unavailable, "broken!")
t.Run("gRPC unavailable with INSTANCE_LIMIT details", func(t *testing.T) {
err := perInstanceLimitError(t)
require.True(t, isFailure(err))
require.True(t, isFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable with SERVICE_UNAVAILABLE details is not a failure", func(t *testing.T) {
stat := status.New(codes.Unavailable, "broken!")
stat, err := stat.WithDetails(&mimirpb.ErrorDetails{Cause: mimirpb.SERVICE_UNAVAILABLE})
require.NoError(t, err)
err = stat.Err()
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
})

t.Run("gRPC unavailable without details is not a failure", func(t *testing.T) {
err := status.Error(codes.Unavailable, "broken!")
require.False(t, isFailure(err))
require.False(t, isFailure(fmt.Errorf("%w", err)))
})
}

func perInstanceLimitError(t *testing.T) error {
stat := status.New(codes.Unavailable, "broken!")
stat, err := stat.WithDetails(&mimirpb.ErrorDetails{Cause: mimirpb.INSTANCE_LIMIT})
require.NoError(t, err)
return stat.Err()
}

func TestNewCircuitBreaker(t *testing.T) {
Expand All @@ -59,7 +82,7 @@ func TestNewCircuitBreaker(t *testing.T) {

// gRPC invoker that returns an error that will be treated as an error by the circuit breaker
failure := func(currentCtx context.Context, currentMethod string, currentReq, currentRepl interface{}, currentConn *grpc.ClientConn, currentOpts ...grpc.CallOption) error {
return status.Error(codes.Unavailable, "failed")
return perInstanceLimitError(t)
}

conn := grpc.ClientConn{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (cfg *CircuitBreakerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.
f.UintVar(&cfg.FailureThreshold, prefix+".circuit-breaker.failure-threshold", 10, "Max percentage of requests that can fail over period before the circuit breaker opens")
f.UintVar(&cfg.FailureExecutionThreshold, prefix+".circuit-breaker.failure-execution-threshold", 100, "How many requests must have been executed in period for the circuit breaker to be eligible to open for the rate of failures")
f.DurationVar(&cfg.ThresholdingPeriod, prefix+".circuit-breaker.thresholding-period", time.Minute, "Moving window of time that the percentage of failed requests is computed over")
f.DurationVar(&cfg.CooldownPeriod, prefix+".circuit-breaker.cooldown-period", time.Minute, "How long the circuit breaker will stay in the open state before allowing some requests")
f.DurationVar(&cfg.CooldownPeriod, prefix+".circuit-breaker.cooldown-period", 10*time.Second, "How long the circuit breaker will stay in the open state before allowing some requests")
}

func (cfg *CircuitBreakerConfig) Validate() error {
Expand Down
Loading