Skip to content

Commit

Permalink
Add Single#zipDelayError operator (#1515)
Browse files Browse the repository at this point in the history
Motivation:
We have Single#zip but not the zipDelayError variant.

Modifications:
- Add Single#zipDelayError operator to match Single#zip use cases

Result:
Single#zipDelayError operator exists.
  • Loading branch information
Scottmitch authored Apr 24, 2021
1 parent ba16bee commit d97df5f
Show file tree
Hide file tree
Showing 10 changed files with 468 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ public final Publisher<T> concat(Publisher<? extends T> next) {

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
* {@code singles}.
* {@code this} and {@code other}.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
Expand All @@ -890,14 +890,40 @@ public final Publisher<T> concat(Publisher<? extends T> next) {
* @param <T2> The type of {@code other}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
* {@code singles}.
* {@code this} and {@code other}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public final <T2, R> Single<R> zipWith(Single<? extends T2> other,
BiFunction<? super T, ? super T2, ? extends R> zipper) {
return zip(this, other, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
* {@code this} and {@code other}. If any of the {@link Single}s terminate with an error, the returned
* {@link Single} will wait for termination till all the other {@link Single}s have been subscribed and terminated,
* and then terminate with the first error.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
* CompletableFuture<T> f1 = ...; // this
* CompletableFuture<T2> other = ...;
* CompletableFuture.allOf(f1, other).get(); // wait for all futures to complete
* return zipper.apply(f1.get(), other.get());
* }</pre>
* @param other The other {@link Single} to zip with.
* @param zipper Used to combine the completed results for each item from {@code singles}.
* @param <T2> The type of {@code other}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
* {@code this} and {@code other}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public final <T2, R> Single<R> zipWithDelayError(Single<? extends T2> other,
BiFunction<? super T, ? super T2, ? extends R> zipper) {
return zipDelayError(this, other, zipper);
}

/**
* Re-subscribes to this {@link Single} if an error is emitted and the passed {@link BiIntPredicate} returns
* {@code true}.
Expand Down Expand Up @@ -2228,7 +2254,7 @@ public static <T> Single<T> anyOf(final Iterable<Single<? extends T>> singles) {

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted
* by {@code singles}.
* by {@code s1} and {@code s2}.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
Expand All @@ -2244,17 +2270,45 @@ public static <T> Single<T> anyOf(final Iterable<Single<? extends T>> singles) {
* @param <T2> The type for the second {@link Single}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
* {@code singles}.
* {@code s1} and {@code s2}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <T1, T2, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2,
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return SingleZipper.zip(s1, s2, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted
* by {@code s1} and {@code s2}. If any of the {@link Single}s terminate with an error, the returned {@link Single}
* will wait for termination till all the other {@link Single}s have been subscribed and terminated, and then
* terminate with the first error.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
* CompletableFuture<T1> f1 = ...; // s1
* CompletableFuture<T2> f2 = ...; // s2
* CompletableFuture.allOf(f1, f2).get(); // wait for all futures to complete
* return zipper.apply(f1.get(), f2.get());
* }</pre>
* @param s1 The first {@link Single} to zip.
* @param s2 The second {@link Single} to zip.
* @param zipper Used to combine the completed results for each item from {@code singles}.
* @param <T1> The type for the first {@link Single}.
* @param <T2> The type for the second {@link Single}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link BiFunction} to items emitted by
* {@code s1} and {@code s2}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <T1, T2, R> Single<R> zipDelayError(Single<? extends T1> s1, Single<? extends T2> s2,
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return SingleZipper.zipDelayError(s1, s2, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link Function3} to items emitted by
* {@code singles}.
* {@code s1}, {@code s2}, and {@code s3}.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
Expand All @@ -2273,7 +2327,7 @@ public static <T1, T2, R> Single<R> zip(Single<? extends T1> s1, Single<? extend
* @param <T3> The type for the third {@link Single}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link Function3} to items emitted by
* {@code singles}.
* {@code s1}, {@code s2}, and {@code s3}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <T1, T2, T3, R> Single<R> zip(
Expand All @@ -2282,9 +2336,41 @@ public static <T1, T2, T3, R> Single<R> zip(
return SingleZipper.zip(s1, s2, s3, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link Function3} to items emitted by
* {@code s1}, {@code s2}, and {@code s3}. If any of the {@link Single}s terminate with an error, the returned
* {@link Single} will wait for termination till all the other {@link Single}s have been subscribed and terminated,
* and then terminate with the first error.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
* CompletableFuture<T1> f1 = ...; // s1
* CompletableFuture<T2> f2 = ...; // s2
* CompletableFuture<T3> f3 = ...; // s3
* CompletableFuture.allOf(f1, f2, f3).get(); // wait for all futures to complete
* return zipper.apply(f1.get(), f2.get(), f3.get());
* }</pre>
* @param s1 The first {@link Single} to zip.
* @param s2 The second {@link Single} to zip.
* @param s3 The third {@link Single} to zip.
* @param zipper Used to combine the completed results for each item from {@code singles}.
* @param <T1> The type for the first {@link Single}.
* @param <T2> The type for the second {@link Single}.
* @param <T3> The type for the third {@link Single}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link Function3} to items emitted by
* {@code s1}, {@code s2}, and {@code s3}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <T1, T2, T3, R> Single<R> zipDelayError(
Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
return SingleZipper.zipDelayError(s1, s2, s3, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link Function4} to items emitted by
* {@code singles}.
* {@code s1}, {@code s2}, {@code s3}, and {@code s4}.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
Expand All @@ -2306,7 +2392,7 @@ public static <T1, T2, T3, R> Single<R> zip(
* @param <T4> The type for the fourth {@link Single}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link Function4} to items emitted by
* {@code singles}.
* {@code s1}, {@code s2}, {@code s3}, and {@code s4}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <T1, T2, T3, T4, R> Single<R> zip(
Expand All @@ -2315,6 +2401,41 @@ public static <T1, T2, T3, T4, R> Single<R> zip(
return SingleZipper.zip(s1, s2, s3, s4, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link Function4} to items emitted by
* {@code s1}, {@code s2}, {@code s3}, and {@code s4}. If any of the {@link Single}s terminate with an error, the
* returned {@link Single} will wait for termination till all the other {@link Single}s have been subscribed and
* terminated, and then terminate with the first error.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
* CompletableFuture<T1> f1 = ...; // s1
* CompletableFuture<T2> f2 = ...; // s2
* CompletableFuture<T3> f3 = ...; // s3
* CompletableFuture<T4> f4 = ...; // s3
* CompletableFuture.allOf(f1, f2, f3, f4).get(); // wait for all futures to complete
* return zipper.apply(f1.get(), f2.get(), f3.get(), f4.get());
* }</pre>
* @param s1 The first {@link Single} to zip.
* @param s2 The second {@link Single} to zip.
* @param s3 The third {@link Single} to zip.
* @param s4 The fourth {@link Single} to zip.
* @param zipper Used to combine the completed results for each item from {@code singles}.
* @param <T1> The type for the first {@link Single}.
* @param <T2> The type for the second {@link Single}.
* @param <T3> The type for the third {@link Single}.
* @param <T4> The type for the fourth {@link Single}.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link Function4} to items emitted by
* {@code s1}, {@code s2}, {@code s3}, and {@code s4}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <T1, T2, T3, T4, R> Single<R> zipDelayError(
Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4,
Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipper) {
return SingleZipper.zipDelayError(s1, s2, s3, s4, zipper);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link Function} to items emitted by
* {@code singles}.
Expand All @@ -2338,6 +2459,31 @@ public static <R> Single<R> zip(Function<? super Object[], ? extends R> zipper,
return SingleZipper.zip(zipper, singles);
}

/**
* Create a new {@link Single} that emits the results of a specified zipper {@link Function} to items emitted by
* {@code singles}. If any of the {@link Single}s terminate with an error, the returned {@link Single} will wait for
* termination till all the other {@link Single}s have been subscribed and terminated, and then terminate with the
* first error.
* <p>
* From a sequential programming point of view this method is roughly equivalent to the following:
* <pre>{@code
* Function<? super CompletableFuture<?>[], ? extends R> zipper = ...;
* CompletableFuture<?>[] futures = ...; // Provided Futures (analogous to the Singles here)
* CompletableFuture.allOf(futures).get(); // wait for all futures to complete
* return zipper.apply(futures);
* }</pre>
* @param zipper Used to combine the completed results for each item from {@code singles}.
* @param singles The collection of {@link Single}s that when complete provides the results to "zip" (aka combine)
* together.
* @param <R> The result type of the zipper.
* @return a new {@link Single} that emits the results of a specified zipper {@link Function} to items emitted by
* {@code singles}.
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX zip operator.</a>
*/
public static <R> Single<R> zipDelayError(Function<? super Object[], ? extends R> zipper, Single<?>... singles) {
return SingleZipper.zipDelayError(zipper, singles);
}

//
// Static Utility Methods End
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ static <T1, T2, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s
}).map(array -> zipper.apply((T1) array[0], (T2) array[1]));
}

@SuppressWarnings("unchecked")
static <T1, T2, R> Single<R> zipDelayError(Single<? extends T1> s1, Single<? extends T2> s2,
BiFunction<? super T1, ? super T2, ? extends R> zipper) {
return from(s1.map(v -> new ZipArg(0, v)), s2.map(v -> new ZipArg(1, v)))
.flatMapMergeSingleDelayError(identity(), 2, 2)
.collect(() -> new Object[2], (array, zipArg) -> {
array[zipArg.index] = zipArg.value;
return array;
}).map(array -> zipper.apply((T1) array[0], (T2) array[1]));
}

@SuppressWarnings("unchecked")
static <T1, T2, T3, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
Expand All @@ -48,6 +59,18 @@ static <T1, T2, T3, R> Single<R> zip(Single<? extends T1> s1, Single<? extends T
}).map(array -> zipper.apply((T1) array[0], (T2) array[1], (T3) array[2]));
}

@SuppressWarnings("unchecked")
static <T1, T2, T3, R> Single<R> zipDelayError(
Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3,
Function3<? super T1, ? super T2, ? super T3, ? extends R> zipper) {
return from(s1.map(v -> new ZipArg(0, v)), s2.map(v -> new ZipArg(1, v)), s3.map(v -> new ZipArg(2, v)))
.flatMapMergeSingleDelayError(identity(), 3, 3)
.collect(() -> new Object[3], (array, zipArg) -> {
array[zipArg.index] = zipArg.value;
return array;
}).map(array -> zipper.apply((T1) array[0], (T2) array[1], (T3) array[2]));
}

@SuppressWarnings("unchecked")
static <T1, T2, T3, T4, R> Single<R> zip(
Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4,
Expand All @@ -61,6 +84,19 @@ static <T1, T2, T3, T4, R> Single<R> zip(
}).map(array -> zipper.apply((T1) array[0], (T2) array[1], (T3) array[2], (T4) array[3]));
}

@SuppressWarnings("unchecked")
static <T1, T2, T3, T4, R> Single<R> zipDelayError(
Single<? extends T1> s1, Single<? extends T2> s2, Single<? extends T3> s3, Single<? extends T4> s4,
Function4<? super T1, ? super T2, ? super T3, ? super T4, ? extends R> zipper) {
return from(s1.map(v -> new ZipArg(0, v)), s2.map(v -> new ZipArg(1, v)),
s3.map(v -> new ZipArg(2, v)), s4.map(v -> new ZipArg(3, v)))
.flatMapMergeSingleDelayError(identity(), 4, 4)
.collect(() -> new Object[4], (array, zipArg) -> {
array[zipArg.index] = zipArg.value;
return array;
}).map(array -> zipper.apply((T1) array[0], (T2) array[1], (T3) array[2], (T4) array[3]));
}

static <R> Single<R> zip(Function<? super Object[], ? extends R> zipper, Single<?>... singles) {
@SuppressWarnings("unchecked")
Single<ZipArg>[] mappedSingles = new Single[singles.length];
Expand All @@ -76,6 +112,21 @@ static <R> Single<R> zip(Function<? super Object[], ? extends R> zipper, Single<
}).map(zipper);
}

static <R> Single<R> zipDelayError(Function<? super Object[], ? extends R> zipper, Single<?>... singles) {
@SuppressWarnings("unchecked")
Single<ZipArg>[] mappedSingles = new Single[singles.length];
for (int i = 0; i < singles.length; ++i) {
final int finalI = i;
mappedSingles[i] = singles[i].map(v -> new ZipArg(finalI, v));
}
return from(mappedSingles)
.flatMapMergeSingleDelayError(identity(), mappedSingles.length, mappedSingles.length)
.collect(() -> new Object[mappedSingles.length], (array, zipArg) -> {
array[zipArg.index] = zipArg.value;
return array;
}).map(zipper);
}

private static final class ZipArg {
private final int index;
@Nullable
Expand Down
Loading

0 comments on commit d97df5f

Please sign in to comment.