diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac2fb7b5d..7b863a03ac 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -107,11 +107,11 @@ import rx.operators.OperationZip; import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; -import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; +import rx.subjects.BehaviorSubject; import rx.subjects.PublishSubject; import rx.subjects.ReplaySubject; import rx.subjects.Subject; @@ -5226,6 +5226,59 @@ public ConnectableObservable publish() { return OperationMulticast.multicast(this, PublishSubject. create()); } + /** + * Create a connectable observable sequence that shares a single + * subscription to the underlying sequence and starts with initialValue. + * @param initialValue the initial value of the underlying BehaviorSubject + * @return a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. + */ + public ConnectableObservable publish(T initialValue) { + return OperationMulticast.multicast(this, BehaviorSubject. create(initialValue)); + } + + /** + * Create an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence. + * @param the result type + * @param selector function which can use the multicasted source + * sequence as many times as needed, without causing multiple + * subscriptions to the source sequence. Subscribers to the given + * source will receive all notifications of the source from the time + * of the subscription on. + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence. + */ + public Observable publish(Func1, ? extends Observable> selector) { + return multicast(new Func0>() { + @Override + public Subject call() { + return PublishSubject.create(); + } + }, selector); + } + + /** + * Create an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue. + * @param the result type + * @param selector function which can use the multicasted source + * sequence as many times as needed, without causing multiple + * subscriptions to the source sequence. Subscribers to the given + * source will receive all notifications of the source from the time + * of the subscription on + * @param initialValue the initial value of the underlying BehaviorSubject + * @return an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue + */ + public Observable publish(Func1, ? extends Observable> selector, final T initialValue) { + return multicast(new Func0>() { + @Override + public Subject call() { + return BehaviorSubject.create(initialValue); + } + }, selector); + } + /** * Returns a {@link ConnectableObservable} that emits only the last item * emitted by the source Observable. @@ -5238,6 +5291,30 @@ public ConnectableObservable publish() { public ConnectableObservable publishLast() { return OperationMulticast.multicast(this, AsyncSubject. create()); } + + /** + * Create an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence containing only the last + * notification. + * @param the result type + * @param selector function which can use the multicasted source + * sequence as many times as needed, without causing multiple + * subscriptions to the source sequence. Subscribers to the given + * source will only receive the last notification of the source + * @return an observable sequence that is the result of invoking the + * selector on a connectable observable sequence that shares a single + * subscription to the underlying sequence containing only the last + * notification. + */ + public Observable publishLast(Func1, ? extends Observable> selector) { + return multicast(new Func0>() { + @Override + public Subject call() { + return AsyncSubject.create(); + } + }, selector); + } /** * Synonymous with reduce().