Skip to content

Commit

Permalink
Merge pull request #736 from akarnokd/FlatMapOverloads
Browse files Browse the repository at this point in the history
MergeMap with Iterable and resultSelector overloads
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents 9650eb1 + 892e27a commit 17307e4
Show file tree
Hide file tree
Showing 3 changed files with 727 additions and 0 deletions.
62 changes: 62 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
Expand Down Expand Up @@ -4059,6 +4060,67 @@ public <R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extend
return merge(map(func));
}

/**
* Create an Observable that applies a function to the pair of values from the source
* Observable and the collection Observable.
* @param <U> the element type of the collection Observable
* @param <R> the result type
* @param collectionSelector function that returns an Observable sequence for each value in the source Observable
* @param resultSelector function that combines the values of the source and collection Observable
* @return an Observable that applies a function to the pair of values from the source
* Observable and the collection Observable.
*/
public <U, R> Observable<R> mergeMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return create(OperationFlatMap.flatMap(this, collectionSelector, resultSelector));
}

/**
* Create an Observable that merges the values of the iterables returned by the
* collectionSelector for each source value.
* @param <R> the result value type
* @param collectionSelector function that returns an Iterable sequence of values for
* each source value.
* @return an Observable that merges the values of the iterables returned by the
* collectionSelector for each source value.
*/
public <R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
return merge(map(OperationFlatMap.flatMapIterableFunc(collectionSelector)));
}

/**
* Create an Observable that applies a function to the pair of values from the source
* Observable and the collection Iterable sequence.
* @param <U> the collection element type
* @param <R> the result type
* @param collectionSelector function that returns an Iterable sequence of values for
* each source value.
* @param resultSelector function that combines the values of the source and collection Iterable
* @return n Observable that applies a function to the pair of values from the source
* Observable and the collection Iterable sequence.
*/
public <U, R> Observable<R> mergeMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
Func2<? super T, ? super U, ? extends R> resultSelector) {
return mergeMap(OperationFlatMap.flatMapIterableFunc(collectionSelector), resultSelector);
}

/**
* Create an Observable that projects the notification of an observable sequence to an observable
* sequence and merges the results into one.
* @param <R> the result type
* @param onNext function returning a collection to merge for each onNext event of the source
* @param onError function returning a collection to merge for an onError event
* @param onCompleted function returning a collection to merge for an onCompleted event
* @return an Observable that projects the notification of an observable sequence to an observable
* sequence and merges the results into one.
*/
public <R> Observable<R> mergeMap(
Func1<? super T, ? extends Observable<? extends R>> onNext,
Func1<? super Throwable, ? extends Observable<? extends R>> onError,
Func0<? extends Observable<? extends R>> onCompleted) {
return create(OperationFlatMap.flatMap(this, onNext, onError, onCompleted));
}

/**
* Creates a new Observable by applying a function that you supply to each
* item emitted by the source Observable, where that function returns an
Expand Down
Loading

0 comments on commit 17307e4

Please sign in to comment.