Skip to content

Commit

Permalink
Issue ReactiveX#357: Fixed a bug in ResilienceBaseSubscriber. If a su…
Browse files Browse the repository at this point in the history
…bscriber is is not permitted to subscribe, the subscription must be canceled and the ResilienceBaseSubscriber must call onSubscribe on the target Subscriber with an EmptySubscription followed by a call to onError with the supplied error. (ReactiveX#452)

Issue ReactiveX#357: Fixed a bug in ResilienceBaseSubscriber. If a subscriber is is not permitted to subscribe, the subscription must be canceled and the ResilienceBaseSubscriber must call onSubscribe on the target Subscriber with an EmptySubscription followed by a call to onError with the supplied error. The bug was that the ResilienceBaseSubscriber cancelled it's own subscription as well.
  • Loading branch information
RobWin committed May 10, 2019
1 parent 3bc6137 commit 0407044
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class CircuitBreakerConfig {
public static final int DEFAULT_WAIT_DURATION_IN_OPEN_STATE = 60; // Seconds
public static final int DEFAULT_RING_BUFFER_SIZE_IN_HALF_OPEN_STATE = 10;
public static final int DEFAULT_RING_BUFFER_SIZE_IN_CLOSED_STATE = 100;
private static final Predicate<Throwable> DEFAULT_RECORD_FAILURE_PREDICATE = (throwable) -> true;
private static final Predicate<Throwable> DEFAULT_RECORD_FAILURE_PREDICATE = throwable -> true;

@SuppressWarnings("unchecked")
private Class<? extends Throwable>[] recordExceptions = new Class[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,7 @@ public final void onSubscribe(Subscription s) {
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(getThrowable());
Operators.error(actual, Operators.onOperatorError(s, getThrowable(), actual.currentContext()));
}
}
catch (Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package io.github.resilience4j.reactor;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

import org.junit.Test;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadConfig;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import io.github.resilience4j.circuitbreaker.CircuitBreaker;
import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig;
import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import io.github.resilience4j.ratelimiter.RateLimiter;
import io.github.resilience4j.ratelimiter.RateLimiterConfig;
import io.github.resilience4j.reactor.bulkhead.operator.BulkheadOperator;
Expand All @@ -19,11 +13,16 @@
import io.github.resilience4j.reactor.retry.RetryOperator;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;

public class CombinedOperatorsTest {

private final RateLimiter rateLimiter = RateLimiter.of("test",
Expand Down Expand Up @@ -110,7 +109,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorDuringSub
.transform(CircuitBreakerOperator.of(circuitBreaker))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
).expectError(CircuitBreakerOpenException.class)
).expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));
}

Expand All @@ -122,7 +121,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubs
.transform(CircuitBreakerOperator.of(circuitBreaker))
.transform(BulkheadOperator.of(bulkhead, Schedulers.immediate()))
.transform(RateLimiterOperator.of(rateLimiter, Schedulers.immediate()))
).expectError(CircuitBreakerOpenException.class)
).expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.github.resilience4j.reactor.circuitbreaker.operator;

import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -65,7 +65,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() {
StepVerifier.create(
Flux.just("Event 1", "Event 2")
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
Expand All @@ -77,7 +77,7 @@ public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
StepVerifier.create(
Flux.error(new IOException("BAM!"), true)
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
Expand All @@ -89,7 +89,7 @@ public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe()
StepVerifier.create(
Flux.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.github.resilience4j.reactor.circuitbreaker.operator;

import io.github.resilience4j.circuitbreaker.CircuitBreakerOpenException;
import io.github.resilience4j.circuitbreaker.CallNotPermittedException;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -66,7 +66,7 @@ public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorNotOnSubscribe() {
StepVerifier.create(
Mono.error(new IOException("BAM!")).delayElement(Duration.ofMillis(1))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
Expand All @@ -78,7 +78,7 @@ public void shouldEmitCircuitBreakerOpenExceptionEvenWhenErrorDuringSubscribe()
StepVerifier.create(
Mono.error(new IOException("BAM!"))
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
Expand All @@ -90,7 +90,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() {
StepVerifier.create(
Mono.just("Event")
.transform(CircuitBreakerOperator.of(circuitBreaker)))
.expectError(CircuitBreakerOpenException.class)
.expectError(CallNotPermittedException.class)
.verify(Duration.ofSeconds(1));

assertNoRegisteredCall();
Expand Down

0 comments on commit 0407044

Please sign in to comment.