Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: cleanup, fixes, coverage 10/20-1 #4736

Merged
merged 1 commit into from
Oct 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6139,7 +6139,7 @@ public final <B, U extends Collection<? super T>> Flowable<U> buffer(Callable<?
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> cache() {
return FlowableCache.from(this);
return cacheWithInitialCapacity(16);
}

/**
Expand Down Expand Up @@ -6201,7 +6201,7 @@ public final Flowable<T> cache() {
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> cacheWithInitialCapacity(int initialCapacity) {
ObjectHelper.verifyPositive(initialCapacity, "initialCapacity");
return FlowableCache.from(this, initialCapacity);
return RxJavaPlugins.onAssembly(new FlowableCache<T>(this, initialCapacity));
}

/**
Expand Down Expand Up @@ -6466,7 +6466,7 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
return FlowableScalarXMap.scalarXMap(v, mapper);
}
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.IMMEDIATE));
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
}


Expand Down Expand Up @@ -9633,13 +9633,13 @@ public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow) {
* @param capacity number of slots available in the buffer.
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
* @param overflowStrategy how should the {@code Publisher} react to buffer overflows. Null is not allowed.
* @return the source {@code Publisher} modified to buffer items up to the given capacity
* @return the source {@code Flowable} modified to buffer items up to the given capacity
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
* @since 2.0
*/
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
public final Publisher<T> onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy) {
public final Flowable<T> onBackpressureBuffer(long capacity, Action onOverflow, BackpressureOverflowStrategy overflowStrategy) {
ObjectHelper.requireNonNull(overflowStrategy, "strategy is null");
ObjectHelper.verifyPositive(capacity, "capacity");
return RxJavaPlugins.onAssembly(new FlowableOnBackpressureBufferStrategy<T>(this, capacity, onOverflow, overflowStrategy));
Expand Down
Loading