diff --git a/rxjava-core/src/main/java/org/rx/reactive/Observable.java b/rxjava-core/src/main/java/org/rx/reactive/Observable.java index ff049a87d6..274d8fb49f 100644 --- a/rxjava-core/src/main/java/org/rx/reactive/Observable.java +++ b/rxjava-core/src/main/java/org/rx/reactive/Observable.java @@ -304,7 +304,7 @@ public Subscription subscribe(Observer observer) { * @return a Observable that, when a Observer subscribes to it, will execute the given function */ public static Observable create(Func1> func) { - return wrap(OperationToObservableFunction.toObservableFunction(func)); + return OperationToObservableFunction.toObservableFunction(func); } /** @@ -367,7 +367,7 @@ public static Observable empty() { * @return a Observable object that calls onError when a Observer subscribes */ public static Observable error(Exception exception) { - return wrap(new ThrowObservable(exception)); + return new ThrowObservable(exception); } /** @@ -455,7 +455,7 @@ public static Observable just(T value) { * by the source Observable */ public static Observable last(final Observable that) { - return wrap(OperationLast.last(that)); + return OperationLast.last(that); } /** @@ -477,7 +477,7 @@ public static Observable last(final Observable that) { * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { - return wrap(OperationMap.map(sequence, func)); + return OperationMap.map(sequence, func); } /** @@ -532,7 +532,7 @@ public R call(T t1) { * the Observables obtained from this transformation */ public static Observable mapMany(Observable sequence, Func1, T> func) { - return wrap(OperationMap.mapMany(sequence, func)); + return OperationMap.mapMany(sequence, func); } /** @@ -598,7 +598,7 @@ public static Observable> materialize(final Observable se * @see MSDN: Observable.Merge Method */ public static Observable merge(List> source) { - return wrap(OperationMerge.merge(source)); + return OperationMerge.merge(source); } /** @@ -616,7 +616,7 @@ public static Observable merge(List> source) { * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable> source) { - return wrap(OperationMerge.merge(source)); + return OperationMerge.merge(source); } /** @@ -634,7 +634,7 @@ public static Observable merge(Observable> source) { * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable... source) { - return wrap(OperationMerge.merge(source)); + return OperationMerge.merge(source); } /** @@ -654,7 +654,7 @@ public static Observable merge(Observable... source) { * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(List> source) { - return wrap(OperationMergeDelayError.mergeDelayError(source)); + return OperationMergeDelayError.mergeDelayError(source); } /** @@ -674,7 +674,7 @@ public static Observable mergeDelayError(List> source) { * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable> source) { - return wrap(OperationMergeDelayError.mergeDelayError(source)); + return OperationMergeDelayError.mergeDelayError(source); } /** @@ -694,7 +694,7 @@ public static Observable mergeDelayError(Observable> source * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable... source) { - return wrap(OperationMergeDelayError.mergeDelayError(source)); + return OperationMergeDelayError.mergeDelayError(source); } /** @@ -710,7 +710,7 @@ public static Observable mergeDelayError(Observable... source) { * @return a Observable that never sends any information to a Observer */ public static Observable never() { - return wrap(new NeverObservable()); + return new NeverObservable(); } /** @@ -749,7 +749,7 @@ public static Subscription noOpSubscription() { * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Func1, Exception> resumeFunction) { - return wrap(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); + return OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction); } /** @@ -812,7 +812,7 @@ public Observable call(Exception e) { * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) { - return wrap(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); + return OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence); } /** @@ -839,7 +839,7 @@ public static Observable onErrorResumeNext(final Observable that, fina * @return the source Observable, with its behavior modified as described */ public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { - return wrap(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); + return OperationOnErrorReturn.onErrorReturn(that, resumeFunction); } /** @@ -870,7 +870,7 @@ public static Observable onErrorReturn(final Observable that, Func1Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, Func2 accumulator) { - return wrap(OperationScan.scan(sequence, accumulator).last()); + return OperationScan.scan(sequence, accumulator).last(); } /** @@ -941,7 +941,7 @@ public T call(T t1, T t2) { * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { - return wrap(OperationScan.scan(sequence, initialValue, accumulator).last()); + return OperationScan.scan(sequence, initialValue, accumulator).last(); } /** @@ -1004,7 +1004,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, Func2 accumulator) { - return wrap(OperationScan.scan(sequence, accumulator)); + return OperationScan.scan(sequence, accumulator); } /** @@ -1061,7 +1061,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, T initialValue, Func2 accumulator) { - return wrap(OperationScan.scan(sequence, initialValue, accumulator)); + return OperationScan.scan(sequence, initialValue, accumulator); } /** @@ -1114,7 +1114,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Skip Method */ public static Observable skip(final Observable items, int num) { - return wrap(OperationSkip.skip(items, num)); + return OperationSkip.skip(items, num); } /** @@ -1132,7 +1132,7 @@ public static Observable skip(final Observable items, int num) { * @return a Observable that is a chronologically well-behaved version of the source Observable */ public static Observable synchronize(Observable observable) { - return wrap(OperationSynchronize.synchronize(observable)); + return OperationSynchronize.synchronize(observable); } /** @@ -1155,7 +1155,7 @@ public static Observable synchronize(Observable observable) { * Observable */ public static Observable take(final Observable items, final int num) { - return wrap(OperationTake.take(items, num)); + return OperationTake.take(items, num); } /** @@ -1178,7 +1178,7 @@ public static Observable take(final Observable items, final int num) { * items emitted by the source Observable */ public static Observable> toList(final Observable that) { - return wrap(OperationToObservableList.toObservableList(that)); + return OperationToObservableList.toObservableList(that); } /** @@ -1198,7 +1198,7 @@ public static Observable> toList(final Observable that) { * @return a Observable that emits each item in the source Iterable sequence */ public static Observable toObservable(Iterable iterable) { - return wrap(OperationToObservableIterable.toObservableIterable(iterable)); + return OperationToObservableIterable.toObservableIterable(iterable); } /** @@ -1233,7 +1233,7 @@ public static Observable toObservable(T... items) { * @return */ public static Observable> toSortedList(Observable sequence) { - return wrap(OperationToObservableSortedList.toSortedList(sequence)); + return OperationToObservableSortedList.toSortedList(sequence); } /** @@ -1247,7 +1247,7 @@ public static Observable> toSortedList(Observable sequence) { * @return */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { - return wrap(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); + return OperationToObservableSortedList.toSortedList(sequence, sortFunction); } /** @@ -1261,40 +1261,14 @@ public static Observable> toSortedList(Observable sequence, Func2 * @return */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { - return wrap(OperationToObservableSortedList.toSortedList(sequence, new Func2() { + return OperationToObservableSortedList.toSortedList(sequence, new Func2() { @Override public Integer call(T t1, T t2) { return Functions.execute(sortFunction, t1, t2); } - })); - } - - /** - * Allow wrapping responses with the AbstractObservable so that we have all of - * the utility methods available for subscribing. - *

- * This is not expected to benefit Java usage, but is intended for dynamic script which are a primary target of the Observable operations. - *

- * Since they are dynamic they can execute the "hidden" methods on AbstractObservable while appearing to only receive an Observable without first casting. - * - * @param o - * @return - */ - private static Observable wrap(final Observable o) { - if (o instanceof Observable) { - // if the Observable is already an AbstractObservable, don't wrap it again. - return (Observable) o; - } - return new Observable() { - - @Override - public Subscription subscribe(Observer observer) { - return o.subscribe(observer); - } - - }; + }); } /** @@ -1322,7 +1296,7 @@ public Subscription subscribe(Observer observer) { * @return a Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Func2 reduceFunction) { - return wrap(OperationZip.zip(w0, w1, reduceFunction)); + return OperationZip.zip(w0, w1, reduceFunction); } /** @@ -1389,7 +1363,7 @@ public R call(T0 t0, T1 t1) { * @return a Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { - return wrap(OperationZip.zip(w0, w1, w2, function)); + return OperationZip.zip(w0, w1, w2, function); } /** @@ -1461,7 +1435,7 @@ public R call(T0 t0, T1 t1, T2 t2) { * @return a Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { - return wrap(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); + return OperationZip.zip(w0, w1, w2, w3, reduceFunction); } /**