Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish and PublishLast overloads #738

Merged
merged 2 commits into from
Jan 14, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,11 @@
import rx.operators.OperationZip;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -5226,6 +5226,59 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Create a connectable observable sequence that shares a single
* subscription to the underlying sequence and starts with initialValue.
* @param initialValue the initial value of the underlying BehaviorSubject
* @return a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
*/
public ConnectableObservable<T> publish(T initialValue) {
return OperationMulticast.multicast(this, BehaviorSubject.<T> create(initialValue));
}

/**
* Create an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will receive all notifications of the source from the time
* of the subscription on.
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence.
*/
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return PublishSubject.create();
}
}, selector);
}

/**
* Create an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will receive all notifications of the source from the time
* of the subscription on
* @param initialValue the initial value of the underlying BehaviorSubject
* @return an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue
*/
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return BehaviorSubject.create(initialValue);
}
}, selector);
}

/**
* Returns a {@link ConnectableObservable} that emits only the last item
* emitted by the source Observable.
Expand All @@ -5238,6 +5291,30 @@ public ConnectableObservable<T> publish() {
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Create an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence containing only the last
* notification.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will only receive the last notification of the source
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence containing only the last
* notification.
*/
public <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return AsyncSubject.create();
}
}, selector);
}

/**
* Synonymous with <code>reduce()</code>.
Expand Down