Skip to content

Commit

Permalink
3.x: Add concatMapX operators (aliases) (#6879)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Jan 26, 2020
1 parent 4257ef5 commit 57bd1a9
Show file tree
Hide file tree
Showing 10 changed files with 1,063 additions and 241 deletions.
458 changes: 224 additions & 234 deletions docs/Operator-Matrix.md

Large diffs are not rendered by default.

70 changes: 63 additions & 7 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -2828,12 +2828,13 @@ public final <R> Maybe<R> compose(@NonNull MaybeTransformer<? super T, ? extends
* Returns a {@code Maybe} that is based on applying a specified function to the item emitted by the current {@code Maybe},
* where that function returns a {@link MaybeSource}.
* <p>
* <img width="640" height="356" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.png" alt="">
* <img width="640" height="216" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatMap.png" alt="">
* <p>
* Note that flatMap and concatMap for {@code Maybe} is the same operation.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* <p>Note that flatMap and concatMap for {@code Maybe} is the same operation.
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the current {@code Maybe}, returns a {@code MaybeSource}
Expand All @@ -2845,8 +2846,63 @@ public final <R> Maybe<R> compose(@NonNull MaybeTransformer<? super T, ? extends
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Maybe<R> concatMap(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatten<>(this, mapper));
return flatMap(mapper);
}

/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* current {@code Maybe}, where that function returns a {@code Completable}.
* <p>
* <img width="640" height="304" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatMapCompletable.png" alt="">
* <p>
* This operator is an alias for {@link #flatMapCompletable(Function)}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to the item emitted by the current {@code Maybe}, returns a
* {@code Completable}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatMapCompletable(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper);
}

/**
* Returns a {@code Maybe} based on applying a specified function to the item emitted by the
* current {@code Maybe}, where that function returns a {@link Single}.
* When this {@code Maybe} just completes the resulting {@code Maybe} completes as well.
* <p>
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatMapSingle.png" alt="">
* <p>
* This operator is an alias for {@link #flatMapSingleElement(Function)}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the current {@code Maybe}, returns a
* {@code Single}
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Maybe<R> concatMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
return flatMapSingleElement(mapper);
}

/**
Expand Down Expand Up @@ -3732,7 +3788,7 @@ public final <R> Flowable<R> flatMapPublisher(@NonNull Function<? super T, ? ext
* current {@code Maybe}, where that function returns a {@code Single}.
* When this {@code Maybe} completes a {@link NoSuchElementException} will be thrown.
* <p>
* <img width="640" height="356" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
* <img width="640" height="346" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
Expand All @@ -3759,7 +3815,7 @@ public final <R> Single<R> flatMapSingle(@NonNull Function<? super T, ? extends
* current {@code Maybe}, where that function returns a {@link Single}.
* When this {@code Maybe} just completes the resulting {@code Maybe} completes as well.
* <p>
* <img width="640" height="356" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapSingleElement} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -3787,7 +3843,7 @@ public final <R> Maybe<R> flatMapSingleElement(@NonNull Function<? super T, ? ex
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* current {@code Maybe}, where that function returns a {@code Completable}.
* <p>
* <img width="640" height="267" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapCompletable.png" alt="">
* <img width="640" height="303" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMapCompletable3.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down
80 changes: 80 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2278,6 +2278,86 @@ public final <U> Single<U> cast(@NonNull Class<? extends U> clazz) {
return map(Functions.castFunction(clazz));
}

/**
* Returns a {@code Single} that is based on applying a specified function to the item emitted by the current {@code Single},
* where that function returns a {@link SingleSource}.
* <p>
* <img width="640" height="313" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatMap.png" alt="">
* <p>
* The operator is an alias for {@link #flatMap(Function)}
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the current {@code Single}, returns a {@code SingleSource}
* @return the new {@code Single} returned from {@code mapper} when applied to the item emitted by the current {@code Single}
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Single<R> concatMap(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleFlatMap<>(this, mapper));
}

/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* current {@code Single}, where that function returns a {@link CompletableSource}.
* <p>
* <img width="640" height="298" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatMapCompletable.png" alt="">
* <p>
* The operator is an alias for {@link #flatMapCompletable(Function)}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to the item emitted by the current {@code Single}, returns a
* {@code CompletableSource}
* @return the new {@code Completable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatMapCompletable(@NonNull Function<? super T, ? extends CompletableSource> mapper) {
return flatMapCompletable(mapper);
}

/**
* Returns a {@link Maybe} that is based on applying a specified function to the item emitted by the current {@code Single},
* where that function returns a {@link MaybeSource}.
* <p>
* <img width="640" height="254" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.concatMapMaybe.png" alt="">
* <p>
* The operator is an alias for {@link #flatMapMaybe(Function)}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the current {@code Single}, returns a {@code MaybeSource}
* @return the new {@code Maybe} returned from {@code mapper} when applied to the item emitted by the current {@code Single}
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Maybe<R> concatMapMaybe(@NonNull Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
return flatMapMaybe(mapper);
}

/**
* Returns a {@link Flowable} that emits the item emitted by the current {@code Single}, then the item emitted by the
* specified {@link SingleSource}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.maybe;

import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class MaybeConcatMapCompletableTest extends RxJavaTest {

@Test
public void dispose() {
TestHelper.checkDisposed(Maybe.just(1).concatMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Exception {
return Completable.complete();
}
}));
}

@Test
public void mapperThrows() {
Maybe.just(1)
.concatMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Exception {
throw new TestException();
}
})
.test()
.assertFailure(TestException.class);
}

@Test
public void mapperReturnsNull() {
Maybe.just(1)
.concatMapCompletable(new Function<Integer, Completable>() {
@Override
public Completable apply(Integer v) throws Exception {
return null;
}
})
.test()
.assertFailure(NullPointerException.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.maybe;

import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class MaybeConcatMapSingleTest extends RxJavaTest {
@Test
public void flatMapSingleElementValue() {
Maybe.just(1).concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
if (integer == 1) {
return Single.just(2);
}

return Single.just(1);
}
})
.test()
.assertResult(2);
}

@Test
public void flatMapSingleElementValueDifferentType() {
Maybe.just(1).concatMapSingle(new Function<Integer, SingleSource<String>>() {
@Override public SingleSource<String> apply(final Integer integer) throws Exception {
if (integer == 1) {
return Single.just("2");
}

return Single.just("1");
}
})
.test()
.assertResult("2");
}

@Test
public void flatMapSingleElementValueNull() {
Maybe.just(1).concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
return null;
}
})
.to(TestHelper.<Integer>testConsumer())
.assertNoValues()
.assertError(NullPointerException.class)
.assertErrorMessage("The mapper returned a null SingleSource");
}

@Test
public void flatMapSingleElementValueErrorThrown() {
Maybe.just(1).concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
throw new RuntimeException("something went terribly wrong!");
}
})
.to(TestHelper.<Integer>testConsumer())
.assertNoValues()
.assertError(RuntimeException.class)
.assertErrorMessage("something went terribly wrong!");
}

@Test
public void flatMapSingleElementError() {
RuntimeException exception = new RuntimeException("test");

Maybe.error(exception).concatMapSingle(new Function<Object, SingleSource<Object>>() {
@Override public SingleSource<Object> apply(final Object integer) throws Exception {
return Single.just(new Object());
}
})
.test()
.assertError(exception);
}

@Test
public void flatMapSingleElementEmpty() {
Maybe.<Integer>empty().concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
})
.test()
.assertNoValues()
.assertResult();
}

@Test
public void dispose() {
TestHelper.checkDisposed(Maybe.just(1).concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
}));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Integer>, Maybe<Integer>>() {
@Override
public Maybe<Integer> apply(Maybe<Integer> m) throws Exception {
return m.concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.just(2);
}
});
}
});
}

@Test
public void singleErrors() {
Maybe.just(1)
.concatMapSingle(new Function<Integer, SingleSource<Integer>>() {
@Override
public SingleSource<Integer> apply(final Integer integer) throws Exception {
return Single.error(new TestException());
}
})
.test()
.assertFailure(TestException.class);
}
}
Loading

0 comments on commit 57bd1a9

Please sign in to comment.