Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: add doAfterNext & doAfterSuccess to the other types #4835

Merged
merged 1 commit into from
Nov 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2273,6 +2273,25 @@ public final Maybe<T> delaySubscription(long delay, TimeUnit unit, Scheduler sch
return delaySubscription(Flowable.timer(delay, unit, scheduler));
}

/**
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
* <p>Note that the {@code onAfterNext} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Maybe instance
* @since 2.0.1 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Maybe<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null");
return RxJavaPlugins.onAssembly(new MaybeDoAfterSuccess<T>(this, onAfterSuccess));
}

/**
* Registers an {@link Action} to be called when this Maybe invokes either
* {@link MaybeObserver#onComplete onSuccess},
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6416,6 +6416,27 @@ public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
}

/**
* Calls the specified consumer with the current item after this item has been emitted to the downstream.
* <p>Note that the {@code onAfterNext} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterNext} does not operate by default on a particular {@link Scheduler}.</dd>
* <td><b>Operator-fusion:</b></dt>
* <dd>This operator supports boundary-limited synchronous or asynchronous queue-fusion.</dd>
* </dl>
* @param onAfterNext the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Observable instance
* @since 2.0.1 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext) {
ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null");
return RxJavaPlugins.onAssembly(new ObservableDoAfterNext<T>(this, onAfterNext));
}

/**
* Registers an {@link Action} to be called when this ObservableSource invokes either
* {@link Observer#onComplete onComplete} or {@link Observer#onError onError}.
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1716,6 +1716,25 @@ public final <U> Single<T> delaySubscription(long time, TimeUnit unit, Scheduler
return delaySubscription(Observable.timer(time, unit, scheduler));
}

/**
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
* <p>Note that the {@code doAfterSuccess} action is shared between subscriptions and as such
* should be thread-safe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doAfterSuccess} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onAfterSuccess the Consumer that will be called after emitting an item from upstream to the downstream
* @return the new Single instance
* @since 2.0.1 - experimental
*/
@SchedulerSupport(SchedulerSupport.NONE)
@Experimental
public final Single<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
ObjectHelper.requireNonNull(onAfterSuccess, "doAfterSuccess is null");
return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess<T>(this, onAfterSuccess));
}

/**
* Calls the specified action after this Single signals onSuccess or onError or gets disposed by
* the downstream.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Calls a consumer after pushing the current item to the downstream.
* @param <T> the value type
* @since 2.0.1 - experimental
*/
@Experimental
public final class MaybeDoAfterSuccess<T> extends AbstractMaybeWithUpstream<T, T> {

final Consumer<? super T> onAfterSuccess;

public MaybeDoAfterSuccess(MaybeSource<T> source, Consumer<? super T> onAfterSuccess) {
super(source);
this.onAfterSuccess = onAfterSuccess;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> s) {
source.subscribe(new DoAfterObserver<T>(s, onAfterSuccess));
}

static final class DoAfterObserver<T> implements MaybeObserver<T>, Disposable {

final MaybeObserver<? super T> actual;

final Consumer<? super T> onAfterSuccess;

Disposable d;

DoAfterObserver(MaybeObserver<? super T> actual, Consumer<? super T> onAfterSuccess) {
this.actual = actual;
this.onAfterSuccess = onAfterSuccess;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T t) {
actual.onSuccess(t);

try {
onAfterSuccess.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// remember, onSuccess is a terminal event and we can't call onError
RxJavaPlugins.onError(ex);
}
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
actual.onComplete();
}

@Override
public void dispose() {
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.observers.BasicFuseableObserver;

/**
* Calls a consumer after pushing the current item to the downstream.
* @param <T> the value type
* @since 2.0.1 - experimental
*/
@Experimental
public final class ObservableDoAfterNext<T> extends AbstractObservableWithUpstream<T, T> {

final Consumer<? super T> onAfterNext;

public ObservableDoAfterNext(ObservableSource<T> source, Consumer<? super T> onAfterNext) {
super(source);
this.onAfterNext = onAfterNext;
}

@Override
protected void subscribeActual(Observer<? super T> s) {
source.subscribe(new DoAfterObserver<T>(s, onAfterNext));
}

static final class DoAfterObserver<T> extends BasicFuseableObserver<T, T> {

final Consumer<? super T> onAfterNext;

DoAfterObserver(Observer<? super T> actual, Consumer<? super T> onAfterNext) {
super(actual);
this.onAfterNext = onAfterNext;
}

@Override
public void onNext(T t) {
actual.onNext(t);

if (sourceMode == NONE) {
try {
onAfterNext.accept(t);
} catch (Throwable ex) {
fail(ex);
}
}
}

@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}

@Override
public T poll() throws Exception {
T v = qs.poll();
if (v != null) {
onAfterNext.accept(v);
}
return v;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import io.reactivex.*;
import io.reactivex.annotations.Experimental;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Calls a consumer after pushing the current item to the downstream.
* @param <T> the value type
* @since 2.0.1 - experimental
*/
@Experimental
public final class SingleDoAfterSuccess<T> extends Single<T> {

final SingleSource<T> source;

final Consumer<? super T> onAfterSuccess;

public SingleDoAfterSuccess(SingleSource<T> source, Consumer<? super T> onAfterSuccess) {
this.source = source;
this.onAfterSuccess = onAfterSuccess;
}

@Override
protected void subscribeActual(SingleObserver<? super T> s) {
source.subscribe(new DoAfterObserver<T>(s, onAfterSuccess));
}

static final class DoAfterObserver<T> implements SingleObserver<T>, Disposable {

final SingleObserver<? super T> actual;

final Consumer<? super T> onAfterSuccess;

Disposable d;

DoAfterObserver(SingleObserver<? super T> actual, Consumer<? super T> onAfterSuccess) {
this.actual = actual;
this.onAfterSuccess = onAfterSuccess;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;

actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T t) {
actual.onSuccess(t);

try {
onAfterSuccess.accept(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
// remember, onSuccess is a terminal event and we can't call onError
RxJavaPlugins.onError(ex);
}
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void dispose() {
d.dispose();
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}
}
}
Loading