diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 3c3c4040ca..34008f1a0c 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -908,6 +908,28 @@ public final Completable andThen(CompletableSource next) { return concatWith(next); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Completable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(@NonNull CompletableConverter converter) { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } + /** * Subscribes to and awaits the termination of this Completable instance in a blocking manner and * rethrows any exception emitted. diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java new file mode 100644 index 0000000000..39ec9b452b --- /dev/null +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -0,0 +1,35 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link Completable#as} operator to turn a Completable into another + * value fluently. + * + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface CompletableConverter { + /** + * Applies a function to the upstream Completable and returns a converted value of type {@code R}. + * + * @param upstream the upstream Completable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Completable upstream); +} diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 7bc45def95..b2da55c6fa 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5237,6 +5237,31 @@ public final Single any(Predicate predicate) { return RxJavaPlugins.onAssembly(new FlowableAnySingle(this, predicate)); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Backpressure:
+ *
The backpressure behavior depends on what happens in the {@code converter} function.
+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Flowable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @BackpressureSupport(BackpressureKind.SPECIAL) + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(@NonNull FlowableConverter converter) { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } + /** * Returns the first item emitted by this {@code Flowable}, or throws * {@code NoSuchElementException} if it emits no items. diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java new file mode 100644 index 0000000000..541e335bcd --- /dev/null +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link Flowable#as} operator to turn a Flowable into another + * value fluently. + * + * @param the upstream type + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface FlowableConverter { + /** + * Applies a function to the upstream Flowable and returns a converted value of type {@code R}. + * + * @param upstream the upstream Flowable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Flowable upstream); +} diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 13922ed8d3..c2a88aacb3 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1989,6 +1989,28 @@ public final Maybe ambWith(MaybeSource other) { return ambArray(this, other); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Maybe instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(@NonNull MaybeConverter converter) { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } + /** * Waits in a blocking fashion until the current Maybe signals a success value (which is returned), * null if completed or an exception (which is propagated). diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java new file mode 100644 index 0000000000..e156ed5944 --- /dev/null +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link Maybe#as} operator to turn a Maybe into another + * value fluently. + * + * @param the upstream type + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface MaybeConverter { + /** + * Applies a function to the upstream Maybe and returns a converted value of type {@code R}. + * + * @param upstream the upstream Maybe instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Maybe upstream); +} diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index a124159e07..bfa3a19e4e 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4800,6 +4800,28 @@ public final Single any(Predicate predicate) { return RxJavaPlugins.onAssembly(new ObservableAnySingle(this, predicate)); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Observable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(@NonNull ObservableConverter converter) { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } + /** * Returns the first item emitted by this {@code Observable}, or throws * {@code NoSuchElementException} if it emits no items. diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java new file mode 100644 index 0000000000..b413de69de --- /dev/null +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link Observable#as} operator to turn an Observable into another + * value fluently. + * + * @param the upstream type + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface ObservableConverter { + /** + * Applies a function to the upstream Observable and returns a converted value of type {@code R}. + * + * @param upstream the upstream Observable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Observable upstream); +} diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 03b659a71a..72ee2872c6 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1522,6 +1522,28 @@ public final Single ambWith(SingleSource other) { return ambArray(this, other); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Single instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(@NonNull SingleConverter converter) { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } + /** * Hides the identity of the current Single, including the Disposable that is sent * to the downstream via {@code onSubscribe()}. diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java new file mode 100644 index 0000000000..9938b22cc7 --- /dev/null +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link Single#as} operator to turn a Single into another + * value fluently. + * + * @param the upstream type + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface SingleConverter { + /** + * Applies a function to the upstream Single and returns a converted value of type {@code R}. + * + * @param upstream the upstream Single instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Single upstream); +} diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index b1f6d60322..4dd6dfd93d 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -122,6 +122,24 @@ public static ParallelFlowable from(@NonNull Publisher sourc return RxJavaPlugins.onAssembly(new ParallelFromPublisher(source, parallelism, prefetch)); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + * + * @param the resulting object type + * @param converter the function that receives the current ParallelFlowable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @NonNull + public final R as(@NonNull ParallelFlowableConverter converter) { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } + /** * Maps the source values on each 'rail' to another value. *

diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java b/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java new file mode 100644 index 0000000000..f782eb7bb0 --- /dev/null +++ b/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link ParallelFlowable#as} operator to turn a ParallelFlowable into + * another value fluently. + * + * @param the upstream type + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface ParallelFlowableConverter { + /** + * Applies a function to the upstream ParallelFlowable and returns a converted value of type {@code R}. + * + * @param upstream the upstream ParallelFlowable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull ParallelFlowable upstream); +} diff --git a/src/test/java/io/reactivex/ConverterTest.java b/src/test/java/io/reactivex/ConverterTest.java new file mode 100644 index 0000000000..d2f174cd2f --- /dev/null +++ b/src/test/java/io/reactivex/ConverterTest.java @@ -0,0 +1,268 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.exceptions.TestException; +import io.reactivex.parallel.*; + +public final class ConverterTest { + + @Test + public void flowableConverterThrows() { + try { + Flowable.just(1).as(new FlowableConverter() { + @Override + public Integer apply(Flowable v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void observableConverterThrows() { + try { + Observable.just(1).as(new ObservableConverter() { + @Override + public Integer apply(Observable v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void singleConverterThrows() { + try { + Single.just(1).as(new SingleConverter() { + @Override + public Integer apply(Single v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void maybeConverterThrows() { + try { + Maybe.just(1).as(new MaybeConverter() { + @Override + public Integer apply(Maybe v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void completableConverterThrows() { + try { + Completable.complete().as(new CompletableConverter() { + @Override + public Completable apply(Completable v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + // Test demos for signature generics in compose() methods. Just needs to compile. + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void observableGenericsSignatureTest() { + A a = new A() { }; + + Observable.just(a).as((ObservableConverter)ConverterTest.testObservableConverterCreator()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void singleGenericsSignatureTest() { + A a = new A() { }; + + Single.just(a).as((SingleConverter)ConverterTest.testSingleConverterCreator()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void maybeGenericsSignatureTest() { + A a = new A() { }; + + Maybe.just(a).as((MaybeConverter)ConverterTest.testMaybeConverterCreator()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void flowableGenericsSignatureTest() { + A a = new A() { }; + + Flowable.just(a).as((FlowableConverter)ConverterTest.testFlowableConverterCreator()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void parallelFlowableGenericsSignatureTest() { + A a = new A() { }; + + Flowable.just(a).parallel().as((ParallelFlowableConverter)ConverterTest.testParallelFlowableConverterCreator()); + } + + @Test + public void compositeTest() { + CompositeConverter converter = new CompositeConverter(); + + Flowable.just(1) + .as(converter) + .test() + .assertValue(1); + + Observable.just(1) + .as(converter) + .test() + .assertValue(1); + + Maybe.just(1) + .as(converter) + .test() + .assertValue(1); + + Single.just(1) + .as(converter) + .test() + .assertValue(1); + + Completable.complete() + .as(converter) + .test() + .assertComplete(); + + Flowable.just(1) + .parallel() + .as(converter) + .test() + .assertValue(1); + } + + interface A { } + interface B { } + + private static ObservableConverter, B> testObservableConverterCreator() { + return new ObservableConverter, B>() { + @Override + public B apply(Observable> a) { + return new B() { + }; + } + }; + } + + private static SingleConverter, B> testSingleConverterCreator() { + return new SingleConverter, B>() { + @Override + public B apply(Single> a) { + return new B() { + }; + } + }; + } + + private static MaybeConverter, B> testMaybeConverterCreator() { + return new MaybeConverter, B>() { + @Override + public B apply(Maybe> a) { + return new B() { + }; + } + }; + } + + private static FlowableConverter, B> testFlowableConverterCreator() { + return new FlowableConverter, B>() { + @Override + public B apply(Flowable> a) { + return new B() { + }; + } + }; + } + + private static ParallelFlowableConverter, B> testParallelFlowableConverterCreator() { + return new ParallelFlowableConverter, B>() { + @Override + public B apply(ParallelFlowable> a) { + return new B() { + }; + } + }; + } + + static class CompositeConverter + implements ObservableConverter>, + ParallelFlowableConverter>, + FlowableConverter>, + MaybeConverter>, + SingleConverter>, + CompletableConverter> { + @Override + public Flowable apply(ParallelFlowable upstream) { + return upstream.sequential(); + } + + @Override + public Flowable apply(Completable upstream) { + return upstream.toFlowable(); + } + + @Override + public Observable apply(Flowable upstream) { + return upstream.toObservable(); + } + + @Override + public Flowable apply(Maybe upstream) { + return upstream.toFlowable(); + } + + @Override + public Flowable apply(Observable upstream) { + return upstream.toFlowable(BackpressureStrategy.MISSING); + } + + @Override + public Flowable apply(Single upstream) { + return upstream.toFlowable(); + } + } +} diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index 6fef3b0273..5ad5afbd96 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -560,6 +560,46 @@ public void checkParallelFlowable() { defaultValues.put(ParallelFailureHandling.class, ParallelFailureHandling.ERROR); + @SuppressWarnings("rawtypes") + class MixedConverters implements FlowableConverter, ObservableConverter, SingleConverter, + MaybeConverter, CompletableConverter, ParallelFlowableConverter { + + @Override + public Object apply(ParallelFlowable upstream) { + return upstream; + } + + @Override + public Object apply(Completable upstream) { + return upstream; + } + + @Override + public Object apply(Maybe upstream) { + return upstream; + } + + @Override + public Object apply(Single upstream) { + return upstream; + } + + @Override + public Object apply(Observable upstream) { + return upstream; + } + + @Override + public Object apply(Flowable upstream) { + return upstream; + } + } + + MixedConverters mc = new MixedConverters(); + for (Class c : MixedConverters.class.getInterfaces()) { + defaultValues.put(c, mc); + } + // ----------------------------------------------------------------------------------- defaultInstances = new HashMap, List>(); diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index a764d682f1..9b0b914a66 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -2783,17 +2783,42 @@ public void timeoutOtherNull() { @Test(timeout = 5000) public void toNormal() { - Flowable flow = normal.completable.to(new Function>() { - @Override - public Flowable apply(Completable c) { - return c.toFlowable(); - } - }); + normal.completable + .to(new Function>() { + @Override + public Flowable apply(Completable c) { + return c.toFlowable(); + } + }) + .test() + .assertComplete() + .assertNoValues(); + } + + @Test(timeout = 5000) + public void asNormal() { + normal.completable + .as(new CompletableConverter>() { + @Override + public Flowable apply(Completable c) { + return c.toFlowable(); + } + }) + .test() + .assertComplete() + .assertNoValues(); + } - flow.blockingForEach(new Consumer() { + @Test + public void as() { + Completable.complete().as(new CompletableConverter>() { @Override - public void accept(Object e) { } - }); + public Flowable apply(Completable v) { + return v.toFlowable(); + } + }) + .test() + .assertComplete(); } @Test(expected = NullPointerException.class) @@ -2801,6 +2826,11 @@ public void toNull() { normal.completable.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + normal.completable.as(null); + } + @Test(timeout = 5000) public void toFlowableNormal() { normal.completable.toFlowable().blockingForEach(Functions.emptyConsumer()); diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 72fc7289d6..fdba369d99 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -2351,6 +2351,11 @@ public void toNull() { just1.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + just1.as(null); + } + @Test(expected = NullPointerException.class) public void toListNull() { just1.toList(null); diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index 29e0d6974a..84c86a8504 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -20,6 +20,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.Observable; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; @@ -1113,7 +1114,7 @@ public void testForEachWithNull() { public void testExtend() { final TestSubscriber subscriber = new TestSubscriber(); final Object value = new Object(); - Flowable.just(value).to(new Function, Object>() { + Object returned = Flowable.just(value).to(new Function, Object>() { @Override public Object apply(Flowable onSubscribe) { onSubscribe.subscribe(subscriber); @@ -1123,6 +1124,36 @@ public Object apply(Flowable onSubscribe) { return subscriber.values().get(0); } }); + assertSame(returned, value); + } + + @Test + public void testAsExtend() { + final TestSubscriber subscriber = new TestSubscriber(); + final Object value = new Object(); + Object returned = Flowable.just(value).as(new FlowableConverter() { + @Override + public Object apply(Flowable onSubscribe) { + onSubscribe.subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertComplete(); + subscriber.assertValue(value); + return subscriber.values().get(0); + } + }); + assertSame(returned, value); + } + + @Test + public void as() { + Flowable.just(1).as(new FlowableConverter>() { + @Override + public Observable apply(Flowable v) { + return v.toObservable(); + } + }) + .test() + .assertResult(1); } @Test diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index e715b37c1e..c493f956b1 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -381,11 +381,28 @@ public Flowable apply(Maybe v) throws Exception { .assertResult(1); } + @Test + public void as() { + Maybe.just(1).as(new MaybeConverter>() { + @Override + public Flowable apply(Maybe v) { + return v.toFlowable(); + } + }) + .test() + .assertResult(1); + } + @Test(expected = NullPointerException.class) public void toNull() { Maybe.just(1).to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + Maybe.just(1).as(null); + } + @Test public void compose() { Maybe.just(1).compose(new MaybeTransformer() { diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 7f58c9a057..8816bcfe10 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -2408,6 +2408,11 @@ public void toNull() { just1.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + just1.as(null); + } + @Test(expected = NullPointerException.class) public void toListNull() { just1.toList(null); diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 24653d1ff3..fc79edaa9c 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1153,7 +1153,7 @@ public void testForEachWithNull() { public void testExtend() { final TestObserver subscriber = new TestObserver(); final Object value = new Object(); - Observable.just(value).to(new Function, Object>() { + Object returned = Observable.just(value).to(new Function, Object>() { @Override public Object apply(Observable onSubscribe) { onSubscribe.subscribe(subscriber); @@ -1163,6 +1163,36 @@ public Object apply(Observable onSubscribe) { return subscriber.values().get(0); } }); + assertSame(returned, value); + } + + @Test + public void testAsExtend() { + final TestObserver subscriber = new TestObserver(); + final Object value = new Object(); + Object returned = Observable.just(value).as(new ObservableConverter() { + @Override + public Object apply(Observable onSubscribe) { + onSubscribe.subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertComplete(); + subscriber.assertValue(value); + return subscriber.values().get(0); + } + }); + assertSame(returned, value); + } + + @Test + public void as() { + Observable.just(1).as(new ObservableConverter>() { + @Override + public Flowable apply(Observable v) { + return v.toFlowable(BackpressureStrategy.MISSING); + } + }) + .test() + .assertResult(1); } @Test diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java index 4cda73adbc..577d4be1cd 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -1100,6 +1100,20 @@ public Flowable apply(ParallelFlowable pf) throws Exception { .assertResult(1, 2, 3, 4, 5); } + @Test + public void as() { + Flowable.range(1, 5) + .parallel() + .as(new ParallelFlowableConverter>() { + @Override + public Flowable apply(ParallelFlowable pf) { + return pf.sequential(); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + @Test(expected = TestException.class) public void toThrows() { Flowable.range(1, 5) @@ -1112,6 +1126,18 @@ public Flowable apply(ParallelFlowable pf) throws Exception { }); } + @Test(expected = TestException.class) + public void asThrows() { + Flowable.range(1, 5) + .parallel() + .as(new ParallelFlowableConverter>() { + @Override + public Flowable apply(ParallelFlowable pf) { + throw new TestException(); + } + }); + } + @Test public void compose() { Flowable.range(1, 5) diff --git a/src/test/java/io/reactivex/single/SingleNullTests.java b/src/test/java/io/reactivex/single/SingleNullTests.java index 405054243c..374d7ee313 100644 --- a/src/test/java/io/reactivex/single/SingleNullTests.java +++ b/src/test/java/io/reactivex/single/SingleNullTests.java @@ -844,6 +844,11 @@ public void toNull() { just1.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + just1.as(null); + } + @Test(expected = NullPointerException.class) public void zipWithNull() { just1.zipWith(null, new BiFunction() { diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index 3fc650d276..ac9df2fafb 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -543,6 +543,18 @@ public Integer apply(Single v) throws Exception { }).intValue()); } + @Test + public void as() { + Single.just(1).as(new SingleConverter>() { + @Override + public Flowable apply(Single v) { + return v.toFlowable(); + } + }) + .test() + .assertResult(1); + } + @Test(expected = NullPointerException.class) public void fromObservableNull() { Single.fromObservable(null);