Skip to content

Commit

Permalink
Merge pull request #738 from akarnokd/PublishAndPublishLast2
Browse files Browse the repository at this point in the history
Publish and PublishLast overloads
  • Loading branch information
benjchristensen committed Jan 14, 2014
2 parents 17307e4 + 0ad8980 commit 0b91c5b
Showing 1 changed file with 78 additions and 1 deletion.
79 changes: 78 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@
import rx.operators.OperationZip;
import rx.operators.SafeObservableSubscription;
import rx.operators.SafeObserver;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.schedulers.Schedulers;
import rx.subjects.AsyncSubject;
import rx.subjects.BehaviorSubject;
import rx.subjects.PublishSubject;
import rx.subjects.ReplaySubject;
import rx.subjects.Subject;
Expand Down Expand Up @@ -5491,6 +5491,59 @@ public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Create a connectable observable sequence that shares a single
* subscription to the underlying sequence and starts with initialValue.
* @param initialValue the initial value of the underlying BehaviorSubject
* @return a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
*/
public ConnectableObservable<T> publish(T initialValue) {
return OperationMulticast.multicast(this, BehaviorSubject.<T> create(initialValue));
}

/**
* Create an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will receive all notifications of the source from the time
* of the subscription on.
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence.
*/
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return PublishSubject.create();
}
}, selector);
}

/**
* Create an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will receive all notifications of the source from the time
* of the subscription on
* @param initialValue the initial value of the underlying BehaviorSubject
* @return an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue
*/
public <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return BehaviorSubject.create(initialValue);
}
}, selector);
}

/**
* Returns a {@link ConnectableObservable} that emits only the last item
* emitted by the source Observable.
Expand All @@ -5503,6 +5556,30 @@ public ConnectableObservable<T> publish() {
public ConnectableObservable<T> publishLast() {
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
}

/**
* Create an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence containing only the last
* notification.
* @param <R> the result type
* @param selector function which can use the multicasted source
* sequence as many times as needed, without causing multiple
* subscriptions to the source sequence. Subscribers to the given
* source will only receive the last notification of the source
* @return an observable sequence that is the result of invoking the
* selector on a connectable observable sequence that shares a single
* subscription to the underlying sequence containing only the last
* notification.
*/
public <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public Subject<T, T> call() {
return AsyncSubject.create();
}
}, selector);
}

/**
* Synonymous with <code>reduce()</code>.
Expand Down

0 comments on commit 0b91c5b

Please sign in to comment.