Skip to content

Commit

Permalink
ReactiveX#35 Add Reactor support for circuit breaker, bulkhead and ra…
Browse files Browse the repository at this point in the history
…te limiter. (ReactiveX#205)

* ReactiveX#35 Add Reactor support for circuit breaker, bulkhead and rate limiter.

* Fix Codacity warnings

* Commit build.gradle file for resilence4j-reactor

* Static import assertThat
  • Loading branch information
madgnome authored and RobWin committed Feb 21, 2018
1 parent ff33488 commit 97b695b
Show file tree
Hide file tree
Showing 33 changed files with 1,335 additions and 0 deletions.
14 changes: 14 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ Observable.fromCallable(backendService::doSomething)

NOTE: Resilience4j also provides RxJava operators for `RateLimiter`, `Bulkhead` and `Retry`. Find out more in our *http://resilience4j.github.io/resilience4j/[User Guide]*

=== CircuitBreaker and Reactor

The following example shows how to decorate a Mono by using the custom Reactor operator.

[source,java]
----
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("testName");
Mono.fromCallable(backendService::doSomething)
.transform(CircuitBreakerOperator.of(circuitBreaker))
----


NOTE: Resilience4j also provides Reactor operators for `RateLimiter` and `Bulkhead`. Find out more in our *http://resilience4j.github.io/resilience4j/[User Guide]*


[[ratelimiter]]
=== RateLimiter
Expand Down
5 changes: 5 additions & 0 deletions libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ ext {
spockVersion = '1.1-groovy-2.4-rc-4'
retrofitVersion = '2.3.0'
prometheusSimpleClientVersion = '0.0.21'
reactorVersion = '3.1.3.RELEASE'
reactiveStreamsVersion = '1.0.2'

libraries = [
// compile
vavr: "io.vavr:vavr:${vavrVersion}",
slf4j: "org.slf4j:slf4j-api:${slf4jVersion}",
rxjava2: "io.reactivex.rxjava2:rxjava:${rxJavaVersion}",
jcache: "javax.cache:cache-api:${jcacheVersion}",
reactor: "io.projectreactor:reactor-core:${reactorVersion}",

// testCompile
junit: "junit:junit:${junitVersion}",
Expand All @@ -36,6 +39,8 @@ ext {
powermock_api_mockito: "org.powermock:powermock-api-mockito:${powermockVersion}",
powermock_module_junit4: "org.powermock:powermock-module-junit4:${powermockVersion}",
awaitility: "com.jayway.awaitility:awaitility:${awaitilityVersion}",
reactor_test: "io.projectreactor:reactor-test:${reactorVersion}",
reactive_streams_tck: "org.reactivestreams:reactive-streams-tck:${reactiveStreamsVersion}",

// Vert.x addon
vertx: "io.vertx:vertx-core:${vertxVersion}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,19 @@ Observable.fromCallable(backendService::doSomething)

Other reactive types (Flowable, Single, Maybe and Completable) are also supported.

===== Bulkhead and RxJava

The following example shows how to decorate a Mono by using the custom Reactor operator.

[source,java]
----
Bulkhead bulkhead = Bulkhead.ofDefaults("backendName");
Mono.fromCallable(backendService::doSomething)
.transform(BulkheadOperator.of(bulkhead));
----

Flux is also supported.

===== Saturated Bulkhead example

In this example the decorated runnable is not executed because the Bulkhead is saturated and will not allow any more parallel executions. The call to `Try.run` returns a `Failure<Throwable>` Monad so that the chained function is not invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ Observable.fromCallable(backendService::doSomething)

Other reactive types (Flowable, Single, Maybe and Completable) are also supported.

===== CircuitBreaker and Reactor

The following example shows how to decorate a Mono by using the custom Reactor operator.

[source,java]
----
CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("backendName");
Mono.fromCallable(backendService::doSomething)
.transform(CircuitBreakerOperator.of(circuitBreaker))
----

Flux is also supported.

===== OPEN CircuitBreaker example

In this example `map` is not invoked, because the CircuitBreaker is OPEN. The call to `Try.of` returns a `Failure<Throwable>` Monad so that the chained function is not invoked.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ Observable.fromCallable(backendService::doSomething)

Other reactive types (Flowable, Single, Maybe and Completable) are also supported.

===== RateLimiter and Reactor

The following example shows how to decorate a Mono by using the custom Reactor operator.

[source,java]
----
RateLimiter rateLimiter = RateLimiter.ofDefaults("backendName");
Mono.fromCallable(backendService::doSomething)
.transform(RateLimiterOperator.of(rateLimiter))
----

Flux is also supported.

===== Consume emitted RateLimiterEvents

The RateLimiter emits a stream of RateLimiterEvents. An event can be a successful permission acquire or acquire failure.
Expand Down
15 changes: 15 additions & 0 deletions resilience4j-reactor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
dependencies {
compileOnly project(':resilience4j-circuitbreaker')
compileOnly project(':resilience4j-ratelimiter')
compileOnly project(':resilience4j-bulkhead')
compileOnly project(':resilience4j-retry')
compile (libraries.reactor)
testCompile project(':resilience4j-test')
testCompile project(':resilience4j-circuitbreaker')
testCompile project(':resilience4j-ratelimiter')
testCompile project(':resilience4j-bulkhead')
testCompile project(':resilience4j-retry')
testCompile (libraries.reactor_test)
testCompile (libraries.assertj)
testCompile (libraries.reactive_streams_tck)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.github.resilience4j.reactor;

import reactor.core.publisher.Flux;

public abstract class FluxResilience<T> extends Flux<T> {

public static <T> Flux<T> onAssembly(Flux<T> source) {
return Flux.onAssembly(source);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.github.resilience4j.reactor;

import reactor.core.publisher.Mono;

public abstract class MonoResilience<T> extends Mono<T> {

public static <T> Mono<T> onAssembly(Mono<T> source) {
return Mono.onAssembly(source);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.github.resilience4j.reactor;

/**
* Represents the possible states of a permit.
*/
public enum Permit {
PENDING, ACQUIRED, REJECTED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.github.resilience4j.reactor.bulkhead.operator;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.reactor.FluxResilience;
import io.github.resilience4j.reactor.MonoResilience;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.util.function.Function;

/**
* A Reactor operator which wraps a reactive type in a bulkhead.
*
* @param <T> the value type of the upstream and downstream
*/
public class BulkheadOperator<T> implements Function<Publisher<T>, Publisher<T>> {
private final Bulkhead bulkhead;
private final Scheduler scheduler;

private BulkheadOperator(Bulkhead bulkhead, Scheduler scheduler) {
this.bulkhead = bulkhead;
this.scheduler = scheduler;
}

/**
* Creates a BulkheadOperator.
*
* @param <T> the value type of the upstream and downstream
* @param bulkhead the Bulkhead
* @return a BulkheadOperator
*/
public static <T> BulkheadOperator<T> of(Bulkhead bulkhead) {
return of(bulkhead, Schedulers.parallel());
}

/**
* Creates a BulkheadOperator.
*
* @param <T> the value type of the upstream and downstream
* @param bulkhead the Bulkhead
* @param scheduler the {@link Scheduler} where to publish
* @return a BulkheadOperator
*/
public static <T> BulkheadOperator<T> of(Bulkhead bulkhead, Scheduler scheduler) {
return new BulkheadOperator<>(bulkhead, scheduler);
}

@Override
public Publisher<T> apply(Publisher<T> publisher) {
if (publisher instanceof Mono) {
return MonoResilience
.onAssembly(new MonoBulkhead<T>((Mono<? extends T>) publisher, bulkhead, scheduler));
} else if (publisher instanceof Flux) {
return FluxResilience
.onAssembly(new FluxBulkhead<T>((Flux<? extends T>) publisher, bulkhead, scheduler));
}

throw new IllegalStateException("Publisher of type <" + publisher.getClass().getSimpleName()
+ "> are not supported by this operator");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.github.resilience4j.reactor.bulkhead.operator;

import io.github.resilience4j.bulkhead.Bulkhead;
import io.github.resilience4j.bulkhead.BulkheadFullException;
import io.github.resilience4j.reactor.Permit;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Operators;

import java.util.concurrent.atomic.AtomicReference;

import static java.util.Objects.requireNonNull;

/**
* A Reactor {@link Subscriber} to wrap another subscriber in a bulkhead.
*
* @param <T> the value type of the upstream and downstream
*/
class BulkheadSubscriber<T> extends Operators.MonoSubscriber<T, T> {

private final Bulkhead bulkhead;
private final AtomicReference<Permit> permitted = new AtomicReference<>(Permit.PENDING);

private Subscription subscription;

public BulkheadSubscriber(Bulkhead bulkhead,
CoreSubscriber<? super T> actual) {
super(actual);
this.bulkhead = requireNonNull(bulkhead);
}

@Override
public void onSubscribe(Subscription subscription) {
if (Operators.validate(this.subscription, subscription)) {
this.subscription = subscription;
if (acquireCallPermit()) {
actual.onSubscribe(this);
} else {
cancel();
actual.onSubscribe(this);
actual.onError(new BulkheadFullException(
String.format("Bulkhead '%s' is full", bulkhead.getName())));
}
}
}

@Override
public void onNext(T t) {
requireNonNull(t);

if (isInvocationPermitted()) {
actual.onNext(t);
}
}

@Override
public void onError(Throwable t) {
requireNonNull(t);

if (isInvocationPermitted()) {
bulkhead.onComplete();
actual.onError(t);
}
}

@Override
public void onComplete() {
if (isInvocationPermitted()) {
releaseBulkhead();
actual.onComplete();
}
}

@Override
public void request(long n) {
subscription.request(n);
}

@Override
public void cancel() {
super.cancel();
}

private boolean acquireCallPermit() {
boolean callPermitted = false;
if (permitted.compareAndSet(Permit.PENDING, Permit.ACQUIRED)) {
callPermitted = bulkhead.isCallPermitted();
if (!callPermitted) {
permitted.set(Permit.REJECTED);
}
}
return callPermitted;
}

private boolean isInvocationPermitted() {
return notCancelled() && wasCallPermitted();
}

private boolean notCancelled() {
return !this.isCancelled();
}

private boolean wasCallPermitted() {
return permitted.get() == Permit.ACQUIRED;
}

private void releaseBulkhead() {
if (wasCallPermitted()) {
bulkhead.onComplete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.github.resilience4j.reactor.bulkhead.operator;

import io.github.resilience4j.bulkhead.Bulkhead;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.scheduler.Scheduler;

public class FluxBulkhead<T> extends FluxOperator<T, T> {

private final Bulkhead bulkhead;
private final Scheduler scheduler;

public FluxBulkhead(Flux<? extends T> source, Bulkhead bulkhead,
Scheduler scheduler) {
super(source);
this.bulkhead = bulkhead;
this.scheduler = scheduler;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new BulkheadSubscriber<>(bulkhead, actual));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.github.resilience4j.reactor.bulkhead.operator;

import io.github.resilience4j.bulkhead.Bulkhead;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoOperator;
import reactor.core.scheduler.Scheduler;

public class MonoBulkhead<T> extends MonoOperator<T, T> {
private final Bulkhead bulkhead;
private final Scheduler scheduler;

public MonoBulkhead(Mono<? extends T> source, Bulkhead bulkhead,
Scheduler scheduler) {
super(source);
this.bulkhead = bulkhead;
this.scheduler = scheduler;
}

@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.publishOn(scheduler)
.subscribe(new BulkheadSubscriber<>(bulkhead, actual));
}
}
Loading

0 comments on commit 97b695b

Please sign in to comment.