Skip to content

Commit

Permalink
Merge pull request #176 from msiegemund/optional-timelimiter
Browse files Browse the repository at this point in the history
introduce disable-time-limiter property
  • Loading branch information
ryanjbaxter authored Nov 30, 2023
2 parents 8527f3a + 08e4d44 commit 5c7026a
Show file tree
Hide file tree
Showing 12 changed files with 310 additions and 43 deletions.
18 changes: 18 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,24 @@ resilience4j.timelimiter:

For more information on Resilience4j property configuration, see https://resilience4j.readme.io/docs/getting-started-3#configuration[Resilience4J Spring Boot 2 Configuration].

===== Disabling the TimeLimiter

By default, the `TimeLimiter` is enabled and every execution is backed by a time limit. This time limit is either defined explicitly or the default time limit (provided by `io.github.resilience4j.timelimiter.TimeLimiterConfig#ofDefaults`) is used.

The `TimeLimiter` can be globally disabled by setting the property `spring.cloud.circuitbreaker.resilience4j.disable-time-limiter` to `true`.

[source,yaml]
----
spring:
cloud:
circuitbreaker:
resilience4j:
disable-time-limiter: true
----

This type of option is only provided on a global scope within the `spring-cloud-circuitbreaker` and applies to the
basic and to the reactive circuitbreaker implementation.

==== Bulkhead pattern supporting
If `resilience4j-bulkhead` is on the classpath, Spring Cloud CircuitBreaker will wrap all methods with a Resilience4j Bulkhead.
You can disable the Resilience4j Bulkhead by setting `spring.cloud.circuitbreaker.bulkhead.resilience4j.enabled` to `false`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ public class ReactiveResilience4JAutoConfiguration {
@Bean
@ConditionalOnMissingBean(ReactiveCircuitBreakerFactory.class)
public ReactiveResilience4JCircuitBreakerFactory reactiveResilience4JCircuitBreakerFactory(
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry) {
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
ReactiveResilience4JCircuitBreakerFactory factory = new ReactiveResilience4JCircuitBreakerFactory(
circuitBreakerRegistry, timeLimiterRegistry);
circuitBreakerRegistry, timeLimiterRegistry, resilience4JConfigurationProperties);
customizers.forEach(customizer -> customizer.customize(factory));
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.cloud.circuitbreaker.resilience4j;

import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -59,29 +60,46 @@ public class ReactiveResilience4JCircuitBreaker implements ReactiveCircuitBreake

private final Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer;

private final boolean disableTimeLimiter;

@Deprecated
public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer) {
this(id, groupName, config, circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerCustomizer, false);
}

public ReactiveResilience4JCircuitBreaker(String id, String groupName,
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config,
CircuitBreakerRegistry circuitBreakerRegistry, TimeLimiterRegistry timeLimiterRegistry,
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer) {
Optional<Customizer<CircuitBreaker>> circuitBreakerCustomizer,
boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = config.getCircuitBreakerConfig();
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.circuitBreakerCustomizer = circuitBreakerCustomizer;
this.timeLimiterConfig = config.getTimeLimiterConfig();
this.timeLimiterRegistry = timeLimiterRegistry;
this.disableTimeLimiter = disableTimeLimiter;
}

@Override
public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {
Tuple2<CircuitBreaker, TimeLimiter> tuple = buildCircuitBreakerAndTimeLimiter();
Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()))
.timeout(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration())
// Since we are using the Mono timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1().onError(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS, t));
Tuple2<CircuitBreaker, Optional<TimeLimiter>> tuple = buildCircuitBreakerAndTimeLimiter();
Mono<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn
.timeout(timeoutDuration)
// Since we are using the Mono timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1()
.onError(timeoutDuration.toMillis(),
TimeUnit.MILLISECONDS, t));
}
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
}
Expand All @@ -90,28 +108,37 @@ public <T> Mono<T> run(Mono<T> toRun, Function<Throwable, Mono<T>> fallback) {

@Override
public <T> Flux<T> run(Flux<T> toRun, Function<Throwable, Flux<T>> fallback) {
Tuple2<CircuitBreaker, TimeLimiter> tuple = buildCircuitBreakerAndTimeLimiter();
Flux<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()))
.timeout(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration())
// Since we are using the Flux timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1().onError(tuple.getT2().getTimeLimiterConfig().getTimeoutDuration().toMillis(),
TimeUnit.MILLISECONDS, t));
Tuple2<CircuitBreaker, Optional<TimeLimiter>> tuple = buildCircuitBreakerAndTimeLimiter();
Flux<T> toReturn = toRun.transform(CircuitBreakerOperator.of(tuple.getT1()));
if (tuple.getT2().isPresent()) {
final Duration timeoutDuration = tuple.getT2().get().getTimeLimiterConfig().getTimeoutDuration();
toReturn = toReturn.timeout(timeoutDuration)
// Since we are using the Flux timeout we need to tell the circuit breaker
// about the error
.doOnError(TimeoutException.class,
t -> tuple.getT1()
.onError(timeoutDuration.toMillis(),
TimeUnit.MILLISECONDS, t));
}
if (fallback != null) {
toReturn = toReturn.onErrorResume(fallback);
}
return toReturn;
}

private Tuple2<CircuitBreaker, TimeLimiter> buildCircuitBreakerAndTimeLimiter() {
private Tuple2<CircuitBreaker, Optional<TimeLimiter>> buildCircuitBreakerAndTimeLimiter() {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
CircuitBreaker circuitBreaker = circuitBreakerRegistry.circuitBreaker(id, circuitBreakerConfig, tags);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(circuitBreaker));
if (disableTimeLimiter) {
/* do not provide/load time-limiter */
return Tuples.of(circuitBreaker, Optional.empty());
}
/* provide time-limiter */
TimeLimiter timeLimiter = this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, tags)));
return Tuples.of(circuitBreaker, timeLimiter);
return Tuples.of(circuitBreaker, Optional.of(timeLimiter));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,25 @@ public class ReactiveResilience4JCircuitBreakerFactory extends

private TimeLimiterRegistry timeLimiterRegistry = TimeLimiterRegistry.ofDefaults();

private Map<String, Customizer<CircuitBreaker>> circuitBreakerCustomizers = new HashMap<>();
private final Map<String, Customizer<CircuitBreaker>> circuitBreakerCustomizers = new HashMap<>();

private final Resilience4JConfigurationProperties resilience4JConfigurationProperties;

@Deprecated
public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
this(circuitBreakerRegistry, timeLimiterRegistry, null);
}

public ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry) {
TimeLimiterRegistry timeLimiterRegistry,
Resilience4JConfigurationProperties resilience4JConfigurationProperties) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
this.timeLimiterRegistry = timeLimiterRegistry;
this.defaultConfiguration = id -> new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(this.circuitBreakerRegistry.getDefaultConfig())
.timeLimiterConfig(this.timeLimiterRegistry.getDefaultConfig()).build();
this.resilience4JConfigurationProperties = resilience4JConfigurationProperties;
}

@Override
Expand Down Expand Up @@ -91,7 +101,14 @@ public ReactiveCircuitBreaker create(String id, String groupName) {
Resilience4JConfigBuilder.Resilience4JCircuitBreakerConfiguration config = new Resilience4JConfigBuilder(id)
.circuitBreakerConfig(circuitBreakerConfig).timeLimiterConfig(timeLimiterConfig).build();
return new ReactiveResilience4JCircuitBreaker(id, groupName, config, circuitBreakerRegistry,
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)));
timeLimiterRegistry, Optional.ofNullable(circuitBreakerCustomizers.get(id)), isDisableTimeLimiter());
}

private boolean isDisableTimeLimiter() {
if (resilience4JConfigurationProperties != null) {
return resilience4JConfigurationProperties.isDisableTimeLimiter();
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class Resilience4JCircuitBreaker implements CircuitBreaker {
private final String id;

private final String groupName;
private final Map<String, String> tags;

private Resilience4jBulkheadProvider bulkheadProvider;

Expand All @@ -60,12 +61,26 @@ public class Resilience4JCircuitBreaker implements CircuitBreaker {

private final Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer;

private final boolean disableTimeLimiter;

@Deprecated
public Resilience4JCircuitBreaker(String id, String groupName,
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
this(id, groupName, circuitBreakerConfig, timeLimiterConfig, circuitBreakerRegistry, timeLimiterRegistry,
executorService, circuitBreakerCustomizer, bulkheadProvider, false);
}

public Resilience4JCircuitBreaker(String id, String groupName,
io.github.resilience4j.circuitbreaker.CircuitBreakerConfig circuitBreakerConfig,
TimeLimiterConfig timeLimiterConfig, CircuitBreakerRegistry circuitBreakerRegistry,
TimeLimiterRegistry timeLimiterRegistry, ExecutorService executorService,
Optional<Customizer<io.github.resilience4j.circuitbreaker.CircuitBreaker>> circuitBreakerCustomizer,
Resilience4jBulkheadProvider bulkheadProvider) {
Resilience4jBulkheadProvider bulkheadProvider,
boolean disableTimeLimiter) {
this.id = id;
this.groupName = groupName;
this.circuitBreakerConfig = circuitBreakerConfig;
Expand All @@ -75,6 +90,8 @@ public Resilience4JCircuitBreaker(String id, String groupName,
this.executorService = executorService;
this.circuitBreakerCustomizer = circuitBreakerCustomizer;
this.bulkheadProvider = bulkheadProvider;
this.disableTimeLimiter = disableTimeLimiter;
this.tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
}

public Resilience4JCircuitBreaker(String id, String groupName,
Expand All @@ -90,40 +107,42 @@ public Resilience4JCircuitBreaker(String id, String groupName,
@Override
public <T> T run(Supplier<T> toRun, Function<Throwable, T> fallback) {
final Map<String, String> tags = Map.of(CIRCUIT_BREAKER_GROUP_TAG, this.groupName);
TimeLimiter timeLimiter = this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, tags)));
Optional<TimeLimiter> timeLimiter = loadTimeLimiter();
io.github.resilience4j.circuitbreaker.CircuitBreaker defaultCircuitBreaker = registry.circuitBreaker(this.id,
this.circuitBreakerConfig, tags);
this.circuitBreakerConfig, tags);
circuitBreakerCustomizer.ifPresent(customizer -> customizer.customize(defaultCircuitBreaker));
if (bulkheadProvider != null) {

if (executorService != null) {
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
Callable<T> timeLimitedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
/* conditionally wrap in time-limiter */
Callable<T> timeLimitedCall = timeLimiter.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> bulkheadCall = bulkheadProvider.decorateCallable(this.groupName, tags, timeLimitedCall);
Callable<T> circuitBreakerCall = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
return getAndApplyFallback(circuitBreakerCall, fallback);
}
else {
Callable<T> bulkheadCall = bulkheadProvider.decorateCallable(this.groupName, tags, toRun::get);
Callable<T> circuitBreakerCall = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
.decorateCallable(defaultCircuitBreaker, bulkheadCall);
return getAndApplyFallback(circuitBreakerCall, fallback);
}
}
else {
if (executorService != null) {
Supplier<Future<T>> futureSupplier = () -> executorService.submit(toRun::get);
Callable<T> restrictedCall = TimeLimiter.decorateFutureSupplier(timeLimiter, futureSupplier);
/* conditionally wrap in time-limiter */
Callable<T> restrictedCall = timeLimiter.map(tl -> TimeLimiter.decorateFutureSupplier(tl, futureSupplier))
.orElse(() -> futureSupplier.get().get());
Callable<T> callable = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateCallable(defaultCircuitBreaker, restrictedCall);
.decorateCallable(defaultCircuitBreaker, restrictedCall);
return getAndApplyFallback(callable, fallback);
}
else {
Supplier<T> decorator = io.github.resilience4j.circuitbreaker.CircuitBreaker
.decorateSupplier(defaultCircuitBreaker, toRun);
.decorateSupplier(defaultCircuitBreaker, toRun);
return getAndApplyFallback(decorator, fallback);
}
}
Expand All @@ -147,4 +166,12 @@ private static <T> T getAndApplyFallback(Callable<T> callable, Function<Throwabl
}
}

private Optional<TimeLimiter> loadTimeLimiter() {
if (disableTimeLimiter) {
return Optional.empty();
}
return Optional.of(this.timeLimiterRegistry.find(this.id)
.orElseGet(() -> this.timeLimiterRegistry.find(this.groupName)
.orElseGet(() -> this.timeLimiterRegistry.timeLimiter(this.id, this.timeLimiterConfig, this.tags))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ private Resilience4JCircuitBreaker create(String id, String groupName,
else {
return new Resilience4JCircuitBreaker(id, groupName, circuitBreakerConfig, timeLimiterConfig,
circuitBreakerRegistry, timeLimiterRegistry, circuitBreakerExecutorService,
Optional.ofNullable(circuitBreakerCustomizers.get(id)), bulkheadProvider);
Optional.ofNullable(circuitBreakerCustomizers.get(id)), bulkheadProvider,
this.resilience4JConfigurationProperties.isDisableTimeLimiter());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class Resilience4JConfigurationProperties {

private boolean disableThreadPool = false;

private boolean disableTimeLimiter = false;

public boolean isEnableGroupMeterFilter() {
return enableGroupMeterFilter;
}
Expand Down Expand Up @@ -64,4 +66,12 @@ public void setDisableThreadPool(boolean disableThreadPool) {
this.disableThreadPool = disableThreadPool;
}


boolean isDisableTimeLimiter() {
return disableTimeLimiter;
}

void setDisableTimeLimiter(boolean disableTimeLimiter) {
this.disableTimeLimiter = disableTimeLimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,12 @@ private boolean useSemaphoreBulkhead(String id) {
|| (bulkheadRegistry.find(id).isPresent() && threadPoolBulkheadRegistry.find(id).isEmpty());
}

private <T> Callable<T> decorateTimeLimiter(final Supplier<CompletionStage<T>> supplier, TimeLimiter timeLimiter) {
private static <T> Callable<T> decorateTimeLimiter(final Supplier<? extends CompletionStage<T>> supplier, TimeLimiter timeLimiter) {
final Supplier<Future<T>> futureSupplier = () -> supplier.get().toCompletableFuture();
if (timeLimiter == null) {
/* execute without time-limiter */
return () -> futureSupplier.get().get();
}
return timeLimiter.decorateFutureSupplier(futureSupplier);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ReactiveResilience4JAutoConfigurationWithoutMetricsTest {

static ReactiveResilience4JCircuitBreakerFactory circuitBreakerFactory = spy(
new ReactiveResilience4JCircuitBreakerFactory(CircuitBreakerRegistry.ofDefaults(),
TimeLimiterRegistry.ofDefaults()));
TimeLimiterRegistry.ofDefaults(), new Resilience4JConfigurationProperties()));

@Test
public void testWithoutMetrics() {
Expand Down
Loading

0 comments on commit 5c7026a

Please sign in to comment.