diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java index 4c44fccb49..2b31e241ac 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriber.java @@ -21,7 +21,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.core.publisher.Operators; +import reactor.core.publisher.BaseSubscriber; import java.util.concurrent.atomic.AtomicReference; @@ -32,71 +32,53 @@ * * @param the value type of the upstream and downstream */ -class BulkheadSubscriber extends Operators.MonoSubscriber { +class BulkheadSubscriber extends BaseSubscriber { + private final CoreSubscriber actual; private final Bulkhead bulkhead; private final AtomicReference permitted = new AtomicReference<>(Permit.PENDING); - private Subscription subscription; - public BulkheadSubscriber(Bulkhead bulkhead, CoreSubscriber actual) { - super(actual); + this.actual = actual; this.bulkhead = requireNonNull(bulkhead); } @Override - public void onSubscribe(Subscription subscription) { - if (Operators.validate(this.subscription, subscription)) { - this.subscription = subscription; - if (acquireCallPermit()) { - actual.onSubscribe(this); - } else { - cancel(); - actual.onSubscribe(this); - actual.onError(new BulkheadFullException( - String.format("Bulkhead '%s' is full", bulkhead.getName()))); - } + public void hookOnSubscribe(Subscription subscription) { + if (acquireCallPermit()) { + actual.onSubscribe(this); + } else { + cancel(); + actual.onSubscribe(this); + actual.onError(new BulkheadFullException( + String.format("Bulkhead '%s' is full", bulkhead.getName()))); } } @Override - public void onNext(T t) { - requireNonNull(t); - - if (isInvocationPermitted()) { + public void hookOnNext(T t) { + if (notCancelled() && wasCallPermitted()) { actual.onNext(t); } } @Override - public void onError(Throwable t) { - requireNonNull(t); - - if (isInvocationPermitted()) { + public void hookOnError(Throwable t) { + if (wasCallPermitted()) { bulkhead.onComplete(); actual.onError(t); } } @Override - public void onComplete() { - if (isInvocationPermitted()) { + public void hookOnComplete() { + if (wasCallPermitted()) { releaseBulkhead(); actual.onComplete(); } } - @Override - public void request(long n) { - subscription.request(n); - } - - @Override - public void cancel() { - super.cancel(); - } - private boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { @@ -108,12 +90,8 @@ private boolean acquireCallPermit() { return callPermitted; } - private boolean isInvocationPermitted() { - return notCancelled() && wasCallPermitted(); - } - private boolean notCancelled() { - return !this.isCancelled(); + return !this.isDisposed(); } private boolean wasCallPermitted() { diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java index 82d7d6af10..6e085b94d7 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriber.java @@ -22,7 +22,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.core.publisher.Operators; +import reactor.core.publisher.BaseSubscriber; import java.util.concurrent.atomic.AtomicReference; @@ -33,65 +33,54 @@ * * @param the value type of the upstream and downstream */ -class CircuitBreakerSubscriber extends Operators.MonoSubscriber { +class CircuitBreakerSubscriber extends BaseSubscriber { + private final CoreSubscriber actual; private final CircuitBreaker circuitBreaker; private final AtomicReference permitted = new AtomicReference<>(Permit.PENDING); private StopWatch stopWatch; - private Subscription subscription; public CircuitBreakerSubscriber(CircuitBreaker circuitBreaker, CoreSubscriber actual) { - super(actual); + this.actual = actual; this.circuitBreaker = requireNonNull(circuitBreaker); } @Override - public void onSubscribe(Subscription subscription) { - if (Operators.validate(this.subscription, subscription)) { - this.subscription = subscription; - - if (acquireCallPermit()) { - actual.onSubscribe(this); - } else { - cancel(); - actual.onSubscribe(this); - actual.onError(new CircuitBreakerOpenException( - String.format("CircuitBreaker '%s' is open", circuitBreaker.getName()))); - } - } - } - - @Override - public void onNext(T t) { - requireNonNull(t); - - if (isInvocationPermitted()) { - actual.onNext(t); + protected void hookOnSubscribe(Subscription subscription) { + if (acquireCallPermit()) { + actual.onSubscribe(this); + } else { + cancel(); + actual.onSubscribe(this); + actual.onError(new CircuitBreakerOpenException( + String.format("CircuitBreaker '%s' is open", circuitBreaker.getName()))); } } @Override - public void onError(Throwable t) { - requireNonNull(t); - - markFailure(t); - if (isInvocationPermitted()) { - actual.onError(t); + protected void hookOnNext(T value) { + if (notCancelled() && wasCallPermitted()) { + actual.onNext(value); } } @Override - public void onComplete() { + protected void hookOnComplete() { markSuccess(); - if (isInvocationPermitted()) { + if (wasCallPermitted()) { actual.onComplete(); } } @Override - public void request(long n) { - subscription.request(n); + protected void hookOnError(Throwable t) { + requireNonNull(t); + + markFailure(t); + if (wasCallPermitted()) { + actual.onError(t); + } } private boolean acquireCallPermit() { @@ -107,10 +96,6 @@ private boolean acquireCallPermit() { return callPermitted; } - private boolean isInvocationPermitted() { - return !this.isCancelled() && wasCallPermitted(); - } - private void markFailure(Throwable e) { if (wasCallPermitted()) { circuitBreaker.onError(stopWatch.stop().getProcessingDuration().toNanos(), e); @@ -123,6 +108,10 @@ private void markSuccess() { } } + private boolean notCancelled() { + return !this.isDisposed(); + } + private boolean wasCallPermitted() { return permitted.get() == Permit.ACQUIRED; } diff --git a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java index 1d08de30ee..be8a8acb85 100644 --- a/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java +++ b/resilience4j-reactor/src/main/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriber.java @@ -21,7 +21,7 @@ import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; -import reactor.core.publisher.Operators; +import reactor.core.publisher.BaseSubscriber; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -33,39 +33,33 @@ * * @param the value type of the upstream and downstream */ -class RateLimiterSubscriber extends Operators.MonoSubscriber { +class RateLimiterSubscriber extends BaseSubscriber { + private final CoreSubscriber actual; private final RateLimiter rateLimiter; private final AtomicReference permitted = new AtomicReference<>(Permit.PENDING); private final AtomicBoolean firstEvent = new AtomicBoolean(true); - private Subscription subscription; - public RateLimiterSubscriber(RateLimiter rateLimiter, CoreSubscriber actual) { - super(actual); + this.actual = actual; this.rateLimiter = requireNonNull(rateLimiter); } @Override - public void onSubscribe(Subscription subscription) { - if (Operators.validate(this.subscription, subscription)) { - this.subscription = subscription; - if (acquireCallPermit()) { - actual.onSubscribe(this); - } else { - cancel(); - actual.onSubscribe(this); - actual.onError(rateLimitExceededException()); - } + public void hookOnSubscribe(Subscription subscription) { + if (acquireCallPermit()) { + actual.onSubscribe(this); + } else { + cancel(); + actual.onSubscribe(this); + actual.onError(rateLimitExceededException()); } } @Override - public void onNext(T t) { - requireNonNull(t); - - if (isInvocationPermitted()) { + public void hookOnNext(T t) { + if (notCancelled() && wasCallPermitted()) { if (firstEvent.getAndSet(false) || rateLimiter.getPermission(rateLimiter.getRateLimiterConfig().getTimeoutDuration())) { actual.onNext(t); } else { @@ -76,31 +70,19 @@ public void onNext(T t) { } @Override - public void onError(Throwable t) { - requireNonNull(t); - - if (isInvocationPermitted()) { + public void hookOnError(Throwable t) { + if (wasCallPermitted()) { actual.onError(t); } } @Override - public void onComplete() { - if (isInvocationPermitted()) { + public void hookOnComplete() { + if (wasCallPermitted()) { actual.onComplete(); } } - @Override - public void request(long n) { - subscription.request(n); - } - - @Override - public void cancel() { - super.cancel(); - } - private boolean acquireCallPermit() { boolean callPermitted = false; if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) { @@ -112,12 +94,8 @@ private boolean acquireCallPermit() { return callPermitted; } - private boolean isInvocationPermitted() { - return notCancelled() && wasCallPermitted(); - } - private boolean notCancelled() { - return !this.isCancelled(); + return !this.isDisposed(); } private boolean wasCallPermitted() { diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/CombinedOperatorsTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/CombinedOperatorsTest.java new file mode 100644 index 0000000000..eba1bf7b60 --- /dev/null +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/CombinedOperatorsTest.java @@ -0,0 +1,69 @@ +package io.github.resilience4j.reactor; + +import io.github.resilience4j.bulkhead.Bulkhead; +import io.github.resilience4j.bulkhead.BulkheadConfig; +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerConfig; +import io.github.resilience4j.ratelimiter.RateLimiter; +import io.github.resilience4j.ratelimiter.RateLimiterConfig; +import io.github.resilience4j.reactor.bulkhead.operator.BulkheadOperator; +import io.github.resilience4j.reactor.circuitbreaker.operator.CircuitBreakerOperator; +import io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterOperator; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.time.Duration; +import java.time.temporal.ChronoUnit; + +public class CombinedOperatorsTest { + + private final RateLimiter rateLimiter = RateLimiter.of("test", + RateLimiterConfig.custom().limitForPeriod(5).timeoutDuration(Duration.ZERO).limitRefreshPeriod(Duration.ofSeconds(10)).build()); + + private final CircuitBreaker circuitBreaker = CircuitBreaker.of("test", + CircuitBreakerConfig.custom() + .waitDurationInOpenState(Duration.of(10, ChronoUnit.SECONDS)) + .ringBufferSizeInClosedState(4) + .ringBufferSizeInHalfOpenState(4) + .build()); + + private Bulkhead bulkhead = Bulkhead + .of("test", BulkheadConfig.custom().maxConcurrentCalls(1).maxWaitTime(0).build()); + + @Test + public void shouldEmitEvents() { + StepVerifier.create( + Flux.just("Event 1", "Event 2") + .transform(BulkheadOperator.of(bulkhead)) + .transform(RateLimiterOperator.of(rateLimiter)) + .transform(CircuitBreakerOperator.of(circuitBreaker)) + ).expectNext("Event 1") + .expectNext("Event 2") + .verifyComplete(); + } + + @Test + public void shouldEmitEvent() { + StepVerifier.create( + Mono.just("Event 1") + .transform(BulkheadOperator.of(bulkhead)) + .transform(RateLimiterOperator.of(rateLimiter)) + .transform(CircuitBreakerOperator.of(circuitBreaker)) + ).expectNext("Event 1") + .verifyComplete(); + } + + @Test + public void shouldPropagateError() { + StepVerifier.create( + Flux.error(new IOException("BAM!")) + .transform(BulkheadOperator.of(bulkhead)) + .transform(RateLimiterOperator.of(rateLimiter)) + .transform(CircuitBreakerOperator.of(circuitBreaker)) + ).expectError(IOException.class) + .verify(Duration.ofSeconds(1)); + } +} diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriberWhiteboxVerification.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriberWhiteboxVerification.java index e3825855de..f5b262ac23 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriberWhiteboxVerification.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/bulkhead/operator/BulkheadSubscriberWhiteboxVerification.java @@ -33,8 +33,8 @@ public BulkheadSubscriberWhiteboxVerification() { public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { return new io.github.resilience4j.reactor.bulkhead.operator.BulkheadSubscriber(Bulkhead.ofDefaults("verification"), MonoProcessor.create()) { @Override - public void onSubscribe(Subscription subscription) { - super.onSubscribe(subscription); + public void hookOnSubscribe(Subscription subscription) { + super.hookOnSubscribe(subscription); // register a successful Subscription, and create a Puppet, // for the WhiteboxVerification to be able to drive its tests: @@ -53,20 +53,20 @@ public void signalCancel() { } @Override - public void onNext(Integer integer) { - super.onNext(integer); + public void hookOnNext(Integer integer) { + super.hookOnNext(integer); probe.registerOnNext(integer); } @Override - public void onError(Throwable t) { - super.onError(t); + public void hookOnError(Throwable t) { + super.hookOnError(t); probe.registerOnError(t); } @Override - public void onComplete() { - super.onComplete(); + public void hookOnComplete() { + super.hookOnComplete(); probe.registerOnComplete(); } }; diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java index e07283419a..27b3a01176 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/CircuitBreakerSubscriberWhiteboxVerification.java @@ -32,9 +32,10 @@ public CircuitBreakerSubscriberWhiteboxVerification() { @Override public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { return new CircuitBreakerSubscriber(CircuitBreaker.ofDefaults("verification"), MonoProcessor.create()) { + @Override - public void onSubscribe(Subscription subscription) { - super.onSubscribe(subscription); + protected void hookOnSubscribe(Subscription subscription) { + super.hookOnSubscribe(subscription); // register a successful Subscription, and create a Puppet, // for the WhiteboxVerification to be able to drive its tests: @@ -53,20 +54,20 @@ public void signalCancel() { } @Override - public void onNext(Integer integer) { - super.onNext(integer); + public void hookOnNext(Integer integer) { + super.hookOnNext(integer); probe.registerOnNext(integer); } @Override - public void onError(Throwable t) { - super.onError(t); + public void hookOnError(Throwable t) { + super.hookOnError(t); probe.registerOnError(t); } @Override - public void onComplete() { - super.onComplete(); + public void hookOnComplete() { + super.hookOnComplete(); probe.registerOnComplete(); } }; diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java index a1aef8858a..fc70b45f2c 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/FluxCircuitBreakerTest.java @@ -21,6 +21,7 @@ import reactor.test.StepVerifier; import java.io.IOException; +import java.time.Duration; public class FluxCircuitBreakerTest extends CircuitBreakerAssertions { @@ -42,7 +43,7 @@ public void shouldPropagateError() { Flux.error(new IOException("BAM!")) .transform(CircuitBreakerOperator.of(circuitBreaker))) .expectError(IOException.class) - .verify(); + .verify(Duration.ofSeconds(1)); assertSingleFailedCall(); } @@ -54,7 +55,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() { Flux.just("Event 1", "Event 2") .transform(CircuitBreakerOperator.of(circuitBreaker))) .expectError(CircuitBreakerOpenException.class) - .verify(); + .verify(Duration.ofSeconds(1)); assertNoRegisteredCall(); } diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java index 2014a6d240..088475805d 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/circuitbreaker/operator/MonoCircuitBreakerTest.java @@ -21,6 +21,7 @@ import reactor.test.StepVerifier; import java.io.IOException; +import java.time.Duration; public class MonoCircuitBreakerTest extends CircuitBreakerAssertions { @@ -41,7 +42,7 @@ public void shouldPropagateError() { Mono.error(new IOException("BAM!")) .transform(CircuitBreakerOperator.of(circuitBreaker))) .expectError(IOException.class) - .verify(); + .verify(Duration.ofSeconds(1)); assertSingleFailedCall(); } @@ -53,7 +54,7 @@ public void shouldEmitErrorWithCircuitBreakerOpenException() { Mono.just("Event") .transform(CircuitBreakerOperator.of(circuitBreaker))) .expectError(CircuitBreakerOpenException.class) - .verify(); + .verify(Duration.ofSeconds(1)); assertNoRegisteredCall(); } diff --git a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriberWhiteboxVerification.java b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriberWhiteboxVerification.java index b979fafb52..95eeb5635c 100644 --- a/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriberWhiteboxVerification.java +++ b/resilience4j-reactor/src/test/java/io/github/resilience4j/reactor/ratelimiter/operator/RateLimiterSubscriberWhiteboxVerification.java @@ -33,8 +33,8 @@ public RateLimiterSubscriberWhiteboxVerification() { public Subscriber createSubscriber(WhiteboxSubscriberProbe probe) { return new RateLimiterSubscriber(RateLimiter.ofDefaults("verification"), MonoProcessor.create()) { @Override - public void onSubscribe(Subscription subscription) { - super.onSubscribe(subscription); + public void hookOnSubscribe(Subscription subscription) { + super.hookOnSubscribe(subscription); // register a successful Subscription, and create a Puppet, // for the WhiteboxVerification to be able to drive its tests: @@ -53,20 +53,20 @@ public void signalCancel() { } @Override - public void onNext(Integer integer) { - super.onNext(integer); + public void hookOnNext(Integer integer) { + super.hookOnNext(integer); probe.registerOnNext(integer); } @Override - public void onError(Throwable t) { - super.onError(t); + public void hookOnError(Throwable t) { + super.hookOnError(t); probe.registerOnError(t); } @Override - public void onComplete() { - super.onComplete(); + public void hookOnComplete() { + super.hookOnComplete(); probe.registerOnComplete(); } };