-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
2.x: Add materialize() and dematerialize() (#6278)
* 2.x: Add materialize() and dematerialize() * Add remaining test cases * Correct dematerialize javadoc * Use dematerialize selector fix some docs
- Loading branch information
Showing
12 changed files
with
685 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
40 changes: 40 additions & 0 deletions
40
src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/** | ||
* Copyright (c) 2016-present, RxJava Contributors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | ||
* compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is | ||
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | ||
* the License for the specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.reactivex.internal.operators.completable; | ||
|
||
import io.reactivex.*; | ||
import io.reactivex.annotations.Experimental; | ||
import io.reactivex.internal.operators.mixed.MaterializeSingleObserver; | ||
|
||
/** | ||
* Turn the signal types of a Completable source into a single Notification of | ||
* equal kind. | ||
* | ||
* @param <T> the element type of the source | ||
* @since 2.2.4 - experimental | ||
*/ | ||
@Experimental | ||
public final class CompletableMaterialize<T> extends Single<Notification<T>> { | ||
|
||
final Completable source; | ||
|
||
public CompletableMaterialize(Completable source) { | ||
this.source = source; | ||
} | ||
|
||
@Override | ||
protected void subscribeActual(SingleObserver<? super Notification<T>> observer) { | ||
source.subscribe(new MaterializeSingleObserver<T>(observer)); | ||
} | ||
} |
40 changes: 40 additions & 0 deletions
40
src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
/** | ||
* Copyright (c) 2016-present, RxJava Contributors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | ||
* compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is | ||
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | ||
* the License for the specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.reactivex.internal.operators.maybe; | ||
|
||
import io.reactivex.*; | ||
import io.reactivex.annotations.Experimental; | ||
import io.reactivex.internal.operators.mixed.MaterializeSingleObserver; | ||
|
||
/** | ||
* Turn the signal types of a Maybe source into a single Notification of | ||
* equal kind. | ||
* | ||
* @param <T> the element type of the source | ||
* @since 2.2.4 - experimental | ||
*/ | ||
@Experimental | ||
public final class MaybeMaterialize<T> extends Single<Notification<T>> { | ||
|
||
final Maybe<T> source; | ||
|
||
public MaybeMaterialize(Maybe<T> source) { | ||
this.source = source; | ||
} | ||
|
||
@Override | ||
protected void subscribeActual(SingleObserver<? super Notification<T>> observer) { | ||
source.subscribe(new MaterializeSingleObserver<T>(observer)); | ||
} | ||
} |
71 changes: 71 additions & 0 deletions
71
src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
/** | ||
* Copyright (c) 2016-present, RxJava Contributors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | ||
* compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is | ||
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | ||
* the License for the specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.reactivex.internal.operators.mixed; | ||
|
||
import io.reactivex.*; | ||
import io.reactivex.annotations.Experimental; | ||
import io.reactivex.disposables.Disposable; | ||
import io.reactivex.internal.disposables.DisposableHelper; | ||
|
||
/** | ||
* A consumer that implements the consumer types of Maybe, Single and Completable | ||
* and turns their signals into Notifications for a SingleObserver. | ||
* @param <T> the element type of the source | ||
* @since 2.2.4 - experimental | ||
*/ | ||
@Experimental | ||
public final class MaterializeSingleObserver<T> | ||
implements SingleObserver<T>, MaybeObserver<T>, CompletableObserver, Disposable { | ||
|
||
final SingleObserver<? super Notification<T>> downstream; | ||
|
||
Disposable upstream; | ||
|
||
public MaterializeSingleObserver(SingleObserver<? super Notification<T>> downstream) { | ||
this.downstream = downstream; | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Disposable d) { | ||
if (DisposableHelper.validate(upstream, d)) { | ||
this.upstream = d; | ||
downstream.onSubscribe(this); | ||
} | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
downstream.onSuccess(Notification.<T>createOnComplete()); | ||
} | ||
|
||
@Override | ||
public void onSuccess(T t) { | ||
downstream.onSuccess(Notification.<T>createOnNext(t)); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
downstream.onSuccess(Notification.<T>createOnError(e)); | ||
} | ||
|
||
@Override | ||
public boolean isDisposed() { | ||
return upstream.isDisposed(); | ||
} | ||
|
||
@Override | ||
public void dispose() { | ||
upstream.dispose(); | ||
} | ||
} |
105 changes: 105 additions & 0 deletions
105
src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/** | ||
* Copyright (c) 2016-present, RxJava Contributors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | ||
* compliance with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software distributed under the License is | ||
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | ||
* the License for the specific language governing permissions and limitations under the License. | ||
*/ | ||
|
||
package io.reactivex.internal.operators.single; | ||
|
||
import io.reactivex.*; | ||
import io.reactivex.annotations.Experimental; | ||
import io.reactivex.disposables.Disposable; | ||
import io.reactivex.exceptions.Exceptions; | ||
import io.reactivex.functions.Function; | ||
import io.reactivex.internal.disposables.DisposableHelper; | ||
import io.reactivex.internal.functions.ObjectHelper; | ||
|
||
/** | ||
* Maps the success value of the source to a Notification, then | ||
* maps it back to the corresponding signal type. | ||
* @param <T> the element type of the source | ||
* @param <R> the element type of the Notification and result | ||
* @since 2.2.4 - experimental | ||
*/ | ||
@Experimental | ||
public final class SingleDematerialize<T, R> extends Maybe<R> { | ||
|
||
final Single<T> source; | ||
|
||
final Function<? super T, Notification<R>> selector; | ||
|
||
public SingleDematerialize(Single<T> source, Function<? super T, Notification<R>> selector) { | ||
this.source = source; | ||
this.selector = selector; | ||
} | ||
|
||
@Override | ||
protected void subscribeActual(MaybeObserver<? super R> observer) { | ||
source.subscribe(new DematerializeObserver<T, R>(observer, selector)); | ||
} | ||
|
||
static final class DematerializeObserver<T, R> implements SingleObserver<T>, Disposable { | ||
|
||
final MaybeObserver<? super R> downstream; | ||
|
||
final Function<? super T, Notification<R>> selector; | ||
|
||
Disposable upstream; | ||
|
||
DematerializeObserver(MaybeObserver<? super R> downstream, | ||
Function<? super T, Notification<R>> selector) { | ||
this.downstream = downstream; | ||
this.selector = selector; | ||
} | ||
|
||
@Override | ||
public void dispose() { | ||
upstream.dispose(); | ||
} | ||
|
||
@Override | ||
public boolean isDisposed() { | ||
return upstream.isDisposed(); | ||
} | ||
|
||
@Override | ||
public void onSubscribe(Disposable d) { | ||
if (DisposableHelper.validate(upstream, d)) { | ||
upstream = d; | ||
downstream.onSubscribe(this); | ||
} | ||
} | ||
|
||
@Override | ||
public void onSuccess(T t) { | ||
Notification<R> notification; | ||
|
||
try { | ||
notification = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Notification"); | ||
} catch (Throwable ex) { | ||
Exceptions.throwIfFatal(ex); | ||
downstream.onError(ex); | ||
return; | ||
} | ||
if (notification.isOnNext()) { | ||
downstream.onSuccess(notification.getValue()); | ||
} else if (notification.isOnComplete()) { | ||
downstream.onComplete(); | ||
} else { | ||
downstream.onError(notification.getError()); | ||
} | ||
} | ||
|
||
@Override | ||
public void onError(Throwable e) { | ||
downstream.onError(e); | ||
} | ||
} | ||
} |
Oops, something went wrong.