Skip to content

Commit

Permalink
2.x: add resilient versions of parallel map(), filter() & doOnNext()
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Mar 20, 2017
1 parent 21a7a05 commit dc08894
Show file tree
Hide file tree
Showing 11 changed files with 2,173 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,294 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.parallel;

import org.reactivestreams.*;

import io.reactivex.exceptions.*;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.ConditionalSubscriber;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.parallel.*;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Calls a Consumer for each upstream value passing by
* and handles any failure with a handler function.
*
* @param <T> the input value type
* @since 2.0.8 - experimental
*/
public final class ParallelDoOnNextTry<T> extends ParallelFlowable<T> {

final ParallelFlowable<T> source;

final Consumer<? super T> onNext;

final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;

public ParallelDoOnNextTry(ParallelFlowable<T> source, Consumer<? super T> onNext,
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
this.source = source;
this.onNext = onNext;
this.errorHandler = errorHandler;
}

@Override
public void subscribe(Subscriber<? super T>[] subscribers) {
if (!validate(subscribers)) {
return;
}

int n = subscribers.length;
@SuppressWarnings("unchecked")
Subscriber<? super T>[] parents = new Subscriber[n];

for (int i = 0; i < n; i++) {
Subscriber<? super T> a = subscribers[i];
if (a instanceof ConditionalSubscriber) {
parents[i] = new ParallelDoOnNextConditionalSubscriber<T>((ConditionalSubscriber<? super T>)a, onNext, errorHandler);
} else {
parents[i] = new ParallelDoOnNextSubscriber<T>(a, onNext, errorHandler);
}
}

source.subscribe(parents);
}

@Override
public int parallelism() {
return source.parallelism();
}

static final class ParallelDoOnNextSubscriber<T> implements ConditionalSubscriber<T>, Subscription {

final Subscriber<? super T> actual;

final Consumer<? super T> onNext;

final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;

Subscription s;

boolean done;

ParallelDoOnNextSubscriber(Subscriber<? super T> actual, Consumer<? super T> onNext,
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
this.actual = actual;
this.onNext = onNext;
this.errorHandler = errorHandler;
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (!tryOnNext(t)) {
s.request(1);
}
}

@Override
public boolean tryOnNext(T t) {
if (done) {
return false;
}
long retries = 0;

for (;;) {
try {
onNext.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

ParallelFailureHandling h;

try {
h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item");
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
cancel();
onError(new CompositeException(ex, exc));
return false;
}

switch (h) {
case RETRY:
continue;
case SKIP:
return false;
case STOP:
cancel();
onComplete();
return false;
default:
cancel();
onError(ex);
return false;
}
}

actual.onNext(t);
return true;
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

}
static final class ParallelDoOnNextConditionalSubscriber<T> implements ConditionalSubscriber<T>, Subscription {

final ConditionalSubscriber<? super T> actual;

final Consumer<? super T> onNext;

final BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler;
Subscription s;

boolean done;

ParallelDoOnNextConditionalSubscriber(ConditionalSubscriber<? super T> actual,
Consumer<? super T> onNext,
BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
this.actual = actual;
this.onNext = onNext;
this.errorHandler = errorHandler;
}

@Override
public void request(long n) {
s.request(n);
}

@Override
public void cancel() {
s.cancel();
}

@Override
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.s, s)) {
this.s = s;

actual.onSubscribe(this);
}
}

@Override
public void onNext(T t) {
if (!tryOnNext(t) && !done) {
s.request(1);
}
}

@Override
public boolean tryOnNext(T t) {
if (done) {
return false;
}
long retries = 0;

for (;;) {
try {
onNext.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

ParallelFailureHandling h;

try {
h = ObjectHelper.requireNonNull(errorHandler.apply(++retries, ex), "The errorHandler returned a null item");
} catch (Throwable exc) {
Exceptions.throwIfFatal(exc);
cancel();
onError(new CompositeException(ex, exc));
return false;
}

switch (h) {
case RETRY:
continue;
case SKIP:
return false;
case STOP:
cancel();
onComplete();
return false;
default:
cancel();
onError(ex);
return false;
}
}

return actual.tryOnNext(t);
}
}

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
actual.onComplete();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public final void cancel() {

@Override
public final void onNext(T t) {
if (!tryOnNext(t)) {
if (!tryOnNext(t) && !done) {
s.request(1);
}
}
Expand Down
Loading

0 comments on commit dc08894

Please sign in to comment.