Skip to content

Commit

Permalink
2.x: Implement as() (#5729)
Browse files Browse the repository at this point in the history
* Implement Observable.as()

* Implement Single.as()

* Implement Maybe.as()

* Implement Flowable.as()

* Implement Completable.as()

* Add Experimental annotations

* Add throws doc

* Fix docs and validation errors

* Add @SInCE 2.1.7 - experimental

* ParallelFlowable.as()

* Start ConverterTest

* Fix tests and update validator

* Remove exceptions from signatures

* Remove exception signature from implementations

* Assert the full execution of extend() tests

* Use test() helpers
  • Loading branch information
ZacSweers authored and akarnokd committed Nov 19, 2017
1 parent 9521512 commit 6a44e5d
Show file tree
Hide file tree
Showing 23 changed files with 826 additions and 11 deletions.
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,28 @@ public final Completable andThen(CompletableSource next) {
return concatWith(next);
}

/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the resulting object type
* @param converter the function that receives the current Completable instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.1.7 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull CompletableConverter<? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Subscribes to and awaits the termination of this Completable instance in a blocking manner and
* rethrows any exception emitted.
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/reactivex/CompletableConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.annotations.*;

/**
* Convenience interface and callback used by the {@link Completable#as} operator to turn a Completable into another
* value fluently.
*
* @param <R> the output type
* @since 2.1.7 - experimental
*/
@Experimental
public interface CompletableConverter<R> {
/**
* Applies a function to the upstream Completable and returns a converted value of type {@code R}.
*
* @param upstream the upstream Completable instance
* @return the converted value
*/
@NonNull
R apply(@NonNull Completable upstream);
}
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5237,6 +5237,31 @@ public final Single<Boolean> any(Predicate<? super T> predicate) {
return RxJavaPlugins.onAssembly(new FlowableAnySingle<T>(this, predicate));
}

/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The backpressure behavior depends on what happens in the {@code converter} function.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the resulting object type
* @param converter the function that receives the current Flowable instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.1.7 - experimental
*/
@Experimental
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull FlowableConverter<T, ? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Returns the first item emitted by this {@code Flowable}, or throws
* {@code NoSuchElementException} if it emits no items.
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/reactivex/FlowableConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.annotations.*;

/**
* Convenience interface and callback used by the {@link Flowable#as} operator to turn a Flowable into another
* value fluently.
*
* @param <T> the upstream type
* @param <R> the output type
* @since 2.1.7 - experimental
*/
@Experimental
public interface FlowableConverter<T, R> {
/**
* Applies a function to the upstream Flowable and returns a converted value of type {@code R}.
*
* @param upstream the upstream Flowable instance
* @return the converted value
*/
@NonNull
R apply(@NonNull Flowable<T> upstream);
}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,28 @@ public final Maybe<T> ambWith(MaybeSource<? extends T> other) {
return ambArray(this, other);
}

/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the resulting object type
* @param converter the function that receives the current Maybe instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.1.7 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull MaybeConverter<T, ? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
* null if completed or an exception (which is propagated).
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/reactivex/MaybeConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.annotations.*;

/**
* Convenience interface and callback used by the {@link Maybe#as} operator to turn a Maybe into another
* value fluently.
*
* @param <T> the upstream type
* @param <R> the output type
* @since 2.1.7 - experimental
*/
@Experimental
public interface MaybeConverter<T, R> {
/**
* Applies a function to the upstream Maybe and returns a converted value of type {@code R}.
*
* @param upstream the upstream Maybe instance
* @return the converted value
*/
@NonNull
R apply(@NonNull Maybe<T> upstream);
}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4800,6 +4800,28 @@ public final Single<Boolean> any(Predicate<? super T> predicate) {
return RxJavaPlugins.onAssembly(new ObservableAnySingle<T>(this, predicate));
}

/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the resulting object type
* @param converter the function that receives the current Observable instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.1.7 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull ObservableConverter<T, ? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Returns the first item emitted by this {@code Observable}, or throws
* {@code NoSuchElementException} if it emits no items.
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/reactivex/ObservableConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.annotations.*;

/**
* Convenience interface and callback used by the {@link Observable#as} operator to turn an Observable into another
* value fluently.
*
* @param <T> the upstream type
* @param <R> the output type
* @since 2.1.7 - experimental
*/
@Experimental
public interface ObservableConverter<T, R> {
/**
* Applies a function to the upstream Observable and returns a converted value of type {@code R}.
*
* @param upstream the upstream Observable instance
* @return the converted value
*/
@NonNull
R apply(@NonNull Observable<T> upstream);
}
22 changes: 22 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,28 @@ public final Single<T> ambWith(SingleSource<? extends T> other) {
return ambArray(this, other);
}

/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* This allows fluent conversion to any other type.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code as} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the resulting object type
* @param converter the function that receives the current Single instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.1.7 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> R as(@NonNull SingleConverter<T, ? extends R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Hides the identity of the current Single, including the Disposable that is sent
* to the downstream via {@code onSubscribe()}.
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/reactivex/SingleConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import io.reactivex.annotations.*;

/**
* Convenience interface and callback used by the {@link Single#as} operator to turn a Single into another
* value fluently.
*
* @param <T> the upstream type
* @param <R> the output type
* @since 2.1.7 - experimental
*/
@Experimental
public interface SingleConverter<T, R> {
/**
* Applies a function to the upstream Single and returns a converted value of type {@code R}.
*
* @param upstream the upstream Single instance
* @return the converted value
*/
@NonNull
R apply(@NonNull Single<T> upstream);
}
18 changes: 18 additions & 0 deletions src/main/java/io/reactivex/parallel/ParallelFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> sourc
return RxJavaPlugins.onAssembly(new ParallelFromPublisher<T>(source, parallelism, prefetch));
}

/**
* Calls the specified converter function during assembly time and returns its resulting value.
* <p>
* This allows fluent conversion to any other type.
*
* @param <R> the resulting object type
* @param converter the function that receives the current ParallelFlowable instance and returns a value
* @return the converted value
* @throws NullPointerException if converter is null
* @since 2.1.7 - experimental
*/
@Experimental
@CheckReturnValue
@NonNull
public final <R> R as(@NonNull ParallelFlowableConverter<T, R> converter) {
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
}

/**
* Maps the source values on each 'rail' to another value.
* <p>
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.parallel;

import io.reactivex.annotations.*;

/**
* Convenience interface and callback used by the {@link ParallelFlowable#as} operator to turn a ParallelFlowable into
* another value fluently.
*
* @param <T> the upstream type
* @param <R> the output type
* @since 2.1.7 - experimental
*/
@Experimental
public interface ParallelFlowableConverter<T, R> {
/**
* Applies a function to the upstream ParallelFlowable and returns a converted value of type {@code R}.
*
* @param upstream the upstream ParallelFlowable instance
* @return the converted value
*/
@NonNull
R apply(@NonNull ParallelFlowable<T> upstream);
}
Loading

0 comments on commit 6a44e5d

Please sign in to comment.