Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: coverage, fixes, cleanup 10/15-1 #4708

Merged
merged 2 commits into from
Oct 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 2 additions & 52 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8305,7 +8305,7 @@ public final Observable<T> onTerminateDetach() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final ConnectableObservable<T> publish() {
return publish(bufferSize());
return ObservablePublish.create(this);
}

/**
Expand All @@ -8329,58 +8329,8 @@ public final ConnectableObservable<T> publish() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector) {
return publish(selector, bufferSize());
}

/**
* Returns an Observable that emits the results of invoking a specified selector on items emitted by a
* {@link ConnectableObservable} that shares a single subscription to the underlying sequence.
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R>
* the type of items emitted by the resulting ObservableSource
* @param selector
* a function that can use the multicasted source sequence as many times as needed, without
* causing multiple subscriptions to the source sequence. Observers to the given source will
* receive all notifications of the source from the time of the subscription forward.
* @param bufferSize
* the number of elements to prefetch from the current Observable
* @return an Observable that emits the results of invoking the selector on the items emitted by a {@link ConnectableObservable} that shares a single subscription to the underlying sequence
* @see <a href="http://reactivex.io/documentation/operators/publish.html">ReactiveX operators documentation: Publish</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, int bufferSize) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(selector, "selector is null");
return ObservablePublish.create(this, selector, bufferSize);
}

/**
* Returns a {@link ConnectableObservable}, which is a variety of ObservableSource that waits until its
* {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those
* {@link Observer}s that have subscribed to it.
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param bufferSize
* the number of elements to prefetch from the current Observable
* @return a {@link ConnectableObservable} that upon connection causes the source ObservableSource to emit items
* to its {@link Observer}s
* @see <a href="http://reactivex.io/documentation/operators/publish.html">ReactiveX operators documentation: Publish</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final ConnectableObservable<T> publish(int bufferSize) {
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return ObservablePublish.create(this, bufferSize);
return new ObservablePublishSelector<T, R>(this, selector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,15 @@ public boolean isDisposed() {
@Override
public void onNext(T t) {
U b = buffer;
if (b == null) {
return;
}

b.add(t);
if (b != null) {
b.add(t);

if (++size >= count) {
actual.onNext(b);
if (++size >= count) {
actual.onNext(b);

size = 0;
createBuffer();
size = 0;
createBuffer();
}
}
}

Expand Down Expand Up @@ -185,21 +183,14 @@ public void onNext(T t) {
U b;

try {
b = bufferSupplier.call();
b = ObjectHelper.requireNonNull(bufferSupplier.call(), "The bufferSupplier returned a null collection. Null values are generally not allowed in 2.x operators and sources.");
} catch (Throwable e) {
buffers.clear();
s.dispose();
actual.onError(e);
return;
}

if (b == null) {
buffers.clear();
s.dispose();
actual.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}

buffers.offer(b);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,8 @@ void disposeAll() {

for (;;) {

try {
inner = observers.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
inner = observers.poll();

if (inner == null) {
return;
}
Expand Down
Loading