Skip to content

Commit

Permalink
3.x: Add doOnLifecycle to M/S/C (#6877)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jan 26, 2020
1 parent db0bd71 commit 4257ef5
Show file tree
Hide file tree
Showing 9 changed files with 817 additions and 20 deletions.
34 changes: 14 additions & 20 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

29 changes: 29 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
Expand Down Expand Up @@ -1751,6 +1752,34 @@ public final Completable doOnEvent(@NonNull Consumer<@Nullable ? super Throwable
return RxJavaPlugins.onAssembly(new CompletableDoOnEvent(this, onEvent));
}

/**
* Calls the appropriate {@code onXXX} method (shared between all {@link CompletableObserver}s) for the lifecycle events of
* the sequence (subscription, disposal).
* <p>
* <img width="640" height="257" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.doOnLifecycle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* a {@link Consumer} called with the {@link Disposable} sent via {@link CompletableObserver#onSubscribe(Disposable)}
* @param onDispose
* called when the downstream disposes the {@code Disposable} via {@code dispose()}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Completable doOnLifecycle(@NonNull Consumer<? super Disposable> onSubscribe, @NonNull Action onDispose) {
return doOnLifecycle(onSubscribe, Functions.emptyConsumer(),
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION,
Functions.EMPTY_ACTION, onDispose);
}

/**
* Returns a {@code Completable} instance that calls the various callbacks upon the specific
* lifecycle events.
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3388,6 +3388,34 @@ public final Maybe<T> doOnEvent(@NonNull BiConsumer<@Nullable ? super T, @Nullab
return RxJavaPlugins.onAssembly(new MaybeDoOnEvent<>(this, onEvent));
}

/**
* Calls the appropriate {@code onXXX} method (shared between all {@link MaybeObserver}s) for the lifecycle events of
* the sequence (subscription, disposal).
* <p>
* <img width="640" height="183" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.doOnLifecycle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* a {@link Consumer} called with the {@link Disposable} sent via {@link MaybeObserver#onSubscribe(Disposable)}
* @param onDispose
* called when the downstream disposes the {@code Disposable} via {@code dispose()}
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Maybe<T> doOnLifecycle(@NonNull Consumer<? super Disposable> onSubscribe, @NonNull Action onDispose) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
Objects.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new MaybeDoOnLifecycle<>(this, onSubscribe, onDispose));
}

/**
* Calls the shared {@link Consumer} with the {@link Disposable} sent through the {@code onSubscribe} for each
* {@link MaybeObserver} that subscribes to the current {@code Maybe}.
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,34 @@ public final Single<T> doFinally(@NonNull Action onFinally) {
return RxJavaPlugins.onAssembly(new SingleDoFinally<>(this, onFinally));
}

/**
* Calls the appropriate {@code onXXX} method (shared between all {@link SingleObserver}s) for the lifecycle events of
* the sequence (subscription, disposal).
* <p>
* <img width="640" height="232" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.doOnLifecycle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnLifecycle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onSubscribe
* a {@link Consumer} called with the {@link Disposable} sent via {@link SingleObserver#onSubscribe(Disposable)}
* @param onDispose
* called when the downstream disposes the {@code Disposable} via {@code dispose()}
* @return the new {@code Single} instance
* @throws NullPointerException if {@code onSubscribe} or {@code onDispose} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since 3.0.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final Single<T> doOnLifecycle(@NonNull Consumer<? super Disposable> onSubscribe, @NonNull Action onDispose) {
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
Objects.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new SingleDoOnLifecycle<>(this, onSubscribe, onDispose));
}

/**
* Calls the shared consumer with the {@link Disposable} sent through the {@code onSubscribe} for each
* {@link SingleObserver} that subscribes to the current {@code Single}.
Expand Down Expand Up @@ -3455,6 +3483,7 @@ public final Flowable<T> mergeWith(@NonNull SingleSource<? extends T> other) {
* @return the new {@link Maybe} instance
* @throws NullPointerException if {@code clazz} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.maybe;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Invokes callbacks upon {@code onSubscribe} from upstream and
* {@code dispose} from downstream.
*
* @param <T> the element type of the flow
* @since 3.0.0
*/
public final class MaybeDoOnLifecycle<T> extends AbstractMaybeWithUpstream<T, T> {

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

public MaybeDoOnLifecycle(Maybe<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
super(upstream);
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new MaybeLifecycleObserver<>(observer, onSubscribe, onDispose));
}

static final class MaybeLifecycleObserver<T> implements MaybeObserver<T>, Disposable {

final MaybeObserver<? super T> downstream;

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

Disposable upstream;

MaybeLifecycleObserver(MaybeObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
public void onSubscribe(@NonNull Disposable d) {
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
try {
onSubscribe.accept(d);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
d.dispose();
this.upstream = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, downstream);
return;
}
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}

@Override
public void onSuccess(@NonNull T t) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onSuccess(t);
}
}

@Override
public void onError(@NonNull Throwable e) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onComplete();
}
}

@Override
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.internal.operators.single;

import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.Exceptions;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.*;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;

/**
* Invokes callbacks upon {@code onSubscribe} from upstream and
* {@code dispose} from downstream.
*
* @param <T> the element type of the flow
* @since 3.0.0
*/
public final class SingleDoOnLifecycle<T> extends Single<T> {

final Single<T> source;

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

public SingleDoOnLifecycle(Single<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onDispose) {
this.source = upstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
source.subscribe(new SingleLifecycleObserver<>(observer, onSubscribe, onDispose));
}

static final class SingleLifecycleObserver<T> implements SingleObserver<T>, Disposable {

final SingleObserver<? super T> downstream;

final Consumer<? super Disposable> onSubscribe;

final Action onDispose;

Disposable upstream;

SingleLifecycleObserver(SingleObserver<? super T> downstream, Consumer<? super Disposable> onSubscribe, Action onDispose) {
this.downstream = downstream;
this.onSubscribe = onSubscribe;
this.onDispose = onDispose;
}

@Override
public void onSubscribe(@NonNull Disposable d) {
// this way, multiple calls to onSubscribe can show up in tests that use doOnSubscribe to validate behavior
try {
onSubscribe.accept(d);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
d.dispose();
this.upstream = DisposableHelper.DISPOSED;
EmptyDisposable.error(e, downstream);
return;
}
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}

@Override
public void onSuccess(@NonNull T t) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onSuccess(t);
}
}

@Override
public void onError(@NonNull Throwable e) {
if (upstream != DisposableHelper.DISPOSED) {
upstream = DisposableHelper.DISPOSED;
downstream.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void dispose() {
try {
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
upstream.dispose();
upstream = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
}
}
Loading

0 comments on commit 4257ef5

Please sign in to comment.