From de3184475b48efabb99aea51b4b015b9ecab2f2f Mon Sep 17 00:00:00 2001 From: headinthebox Date: Wed, 26 Feb 2014 18:53:19 -0800 Subject: [PATCH 1/2] Implemented Skip using Lift in Observable and OperatorSkip Deleted non-time part from OperationSkip Moved tests to OperatorSkipTest --- rxjava-core/src/main/java/rx/Observable.java | 84 +------------------ .../main/java/rx/operators/OperationSkip.java | 75 ----------------- .../main/java/rx/operators/OperatorMap.java | 1 + .../main/java/rx/operators/OperatorSkip.java | 49 +++++++++++ .../java/rx/operators/OperationSkipTest.java | 31 ------- .../java/rx/operators/OperatorSkipTest.java | 40 +++++++++ 6 files changed, 92 insertions(+), 188 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorSkip.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 893ec455eb..c244fb5001 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -49,87 +49,7 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.observers.SafeSubscriber; -import rx.operators.OnSubscribeFromIterable; -import rx.operators.OnSubscribeRange; -import rx.operators.OperationAll; -import rx.operators.OperationAmb; -import rx.operators.OperationAny; -import rx.operators.OperationAsObservable; -import rx.operators.OperationAverage; -import rx.operators.OperationBuffer; -import rx.operators.OperationCache; -import rx.operators.OperationCombineLatest; -import rx.operators.OperationConcat; -import rx.operators.OperationDebounce; -import rx.operators.OperationDefaultIfEmpty; -import rx.operators.OperationDefer; -import rx.operators.OperationDelay; -import rx.operators.OperationDematerialize; -import rx.operators.OperationDistinct; -import rx.operators.OperationDistinctUntilChanged; -import rx.operators.OperationElementAt; -import rx.operators.OperationFinally; -import rx.operators.OperationFlatMap; -import rx.operators.OperationGroupByUntil; -import rx.operators.OperationGroupJoin; -import rx.operators.OperationInterval; -import rx.operators.OperationJoin; -import rx.operators.OperationJoinPatterns; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMergeMaxConcurrent; -import rx.operators.OperationMinMax; -import rx.operators.OperationMulticast; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationOnExceptionResumeNextViaObservable; -import rx.operators.OperationParallelMerge; -import rx.operators.OperationReplay; -import rx.operators.OperationRetry; -import rx.operators.OperationSample; -import rx.operators.OperationSequenceEqual; -import rx.operators.OperationSingle; -import rx.operators.OperationSkip; -import rx.operators.OperationSkipLast; -import rx.operators.OperationSkipUntil; -import rx.operators.OperationSkipWhile; -import rx.operators.OperationSum; -import rx.operators.OperationSwitch; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTakeLast; -import rx.operators.OperationTakeTimed; -import rx.operators.OperationTakeUntil; -import rx.operators.OperationTakeWhile; -import rx.operators.OperationThrottleFirst; -import rx.operators.OperationTimeInterval; -import rx.operators.OperationTimer; -import rx.operators.OperationToMap; -import rx.operators.OperationToMultimap; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationUsing; -import rx.operators.OperationWindow; -import rx.operators.OperatorCast; -import rx.operators.OperatorDoOnEach; -import rx.operators.OperatorFilter; -import rx.operators.OperatorGroupBy; -import rx.operators.OperatorMap; -import rx.operators.OperatorMerge; -import rx.operators.OperatorObserveOn; -import rx.operators.OperatorOnErrorResumeNextViaFunction; -import rx.operators.OperatorOnErrorFlatMap; -import rx.operators.OperatorParallel; -import rx.operators.OperatorRepeat; -import rx.operators.OperatorScan; -import rx.operators.OperatorSubscribeOn; -import rx.operators.OperatorTake; -import rx.operators.OperatorTimeout; -import rx.operators.OperatorTimeoutWithSelector; -import rx.operators.OperatorTimestamp; -import rx.operators.OperatorToObservableList; -import rx.operators.OperatorToObservableSortedList; -import rx.operators.OperatorUnsubscribeOn; -import rx.operators.OperatorZip; -import rx.operators.OperatorZipIterable; +import rx.operators.*; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; @@ -6274,7 +6194,7 @@ public final Observable singleOrDefault(T defaultValue, Func1RxJava Wiki: skip() */ public final Observable skip(int num) { - return create(OperationSkip.skip(this, num)); + return lift(new OperatorSkip(num)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkip.java b/rxjava-core/src/main/java/rx/operators/OperationSkip.java index 5267322e96..31ffa1c90f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkip.java @@ -39,81 +39,6 @@ */ public final class OperationSkip { - /** - * Skips a specified number of contiguous values from the start of a Observable sequence and then returns the remaining values. - * - * @param items - * @param num - * @return the observable sequence starting after a number of skipped values - * - * @see Observable.Skip(TSource) Method - */ - public static OnSubscribeFunc skip(final Observable items, final int num) { - // wrap in a Observable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take rather than 1 handing both, which is not thread-safe. - return new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(Observer observer) { - return new Skip(items, num).onSubscribe(observer); - } - - }; - } - - /** - * This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads. - *

- * It IS thread-safe from within it while receiving onNext events from multiple threads. - * - * @param - */ - private static class Skip implements OnSubscribeFunc { - private final int num; - private final Observable items; - - Skip(final Observable items, final int num) { - this.num = num; - this.items = items; - } - - public Subscription onSubscribe(Observer observer) { - return items.subscribe(new ItemObserver(observer)); - } - - /** - * Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count. - */ - private class ItemObserver implements Observer { - - private AtomicInteger counter = new AtomicInteger(); - private final Observer observer; - - public ItemObserver(Observer observer) { - this.observer = observer; - } - - @Override - public void onCompleted() { - observer.onCompleted(); - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onNext(T args) { - // skip them until we reach the 'num' value - if (counter.incrementAndGet() > num) { - observer.onNext(args); - } - } - - } - - } - /** * Skip the items after subscription for the given duration. * diff --git a/rxjava-core/src/main/java/rx/operators/OperatorMap.java b/rxjava-core/src/main/java/rx/operators/OperatorMap.java index 6418005521..233e2fcb70 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorMap.java @@ -61,3 +61,4 @@ public void onNext(T t) { } } + diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSkip.java b/rxjava-core/src/main/java/rx/operators/OperatorSkip.java new file mode 100644 index 0000000000..020c488fa6 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSkip.java @@ -0,0 +1,49 @@ +package rx.operators; + +import rx.Observable; +import rx.Subscriber; + +/** + * Returns an Observable that skips the first num items emitted by the source + * Observable. + *

+ * + *

+ * You can ignore the first num items emitted by an Observable and attend only to + * those items that come after, by modifying the Observable with the skip operation. + */ +public final class OperatorSkip implements Observable.Operator { + + int n; + + public OperatorSkip(int n) { + this.n = n; + } + + @Override + public Subscriber call(final Subscriber child) { + return new Subscriber(child) { + + @Override + public void onCompleted() { + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + if(n <= 0) { + child.onNext(t); + } else { + n -= 1; + } + } + + }; + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java index 5999e07fee..c7dad11ce1 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipTest.java @@ -17,7 +17,6 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.OperationSkip.*; import java.util.concurrent.TimeUnit; @@ -31,36 +30,6 @@ public class OperationSkipTest { - @Test - public void testSkip1() { - Observable w = Observable.from("one", "two", "three"); - Observable skip = Observable.create(skip(w, 2)); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - skip.subscribe(observer); - verify(observer, never()).onNext("one"); - verify(observer, never()).onNext("two"); - verify(observer, times(1)).onNext("three"); - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); - } - - @Test - public void testSkip2() { - Observable w = Observable.from("one", "two", "three"); - Observable skip = Observable.create(skip(w, 1)); - - @SuppressWarnings("unchecked") - Observer observer = mock(Observer.class); - skip.subscribe(observer); - verify(observer, never()).onNext("one"); - verify(observer, times(1)).onNext("two"); - verify(observer, times(1)).onNext("three"); - verify(observer, never()).onError(any(Throwable.class)); - verify(observer, times(1)).onCompleted(); - } - @Test public void testSkipTimed() { TestScheduler scheduler = new TestScheduler(); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java new file mode 100644 index 0000000000..155faf21eb --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java @@ -0,0 +1,40 @@ +package rx.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observer; + +import static org.mockito.Mockito.*; + +public class OperatorSkipTest { + + @Test + public void testSkip1() { + Observable w = Observable.from("one", "two", "three"); + Observable skip = w.lift(new OperatorSkip(2)); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + skip.subscribe(observer); + verify(observer, never()).onNext("one"); + verify(observer, never()).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void testSkip2() { + Observable w = Observable.from("one", "two", "three"); + Observable skip = w.lift(new OperatorSkip(1)); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + skip.subscribe(observer); + verify(observer, never()).onNext("one"); + verify(observer, times(1)).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + } +} From 6b25b2304b08dbaf9f4d52054f06d743a9fb4a22 Mon Sep 17 00:00:00 2001 From: headinthebox Date: Thu, 27 Feb 2014 11:31:40 -0800 Subject: [PATCH 2/2] Fixed state capture bug. Added some additional tests --- .../main/java/rx/operators/OperatorSkip.java | 8 +- .../java/rx/operators/OperatorSkipTest.java | 102 ++++++++++++++++-- 2 files changed, 99 insertions(+), 11 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSkip.java b/rxjava-core/src/main/java/rx/operators/OperatorSkip.java index 020c488fa6..cc0ec068c2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSkip.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSkip.java @@ -14,7 +14,7 @@ */ public final class OperatorSkip implements Observable.Operator { - int n; + final int n; public OperatorSkip(int n) { this.n = n; @@ -24,6 +24,8 @@ public OperatorSkip(int n) { public Subscriber call(final Subscriber child) { return new Subscriber(child) { + int skipped = 0; + @Override public void onCompleted() { child.onCompleted(); @@ -36,10 +38,10 @@ public void onError(Throwable e) { @Override public void onNext(T t) { - if(n <= 0) { + if(skipped >= n) { child.onNext(t); } else { - n -= 1; + skipped += 1; } } diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java index 155faf21eb..f0a9706786 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java @@ -9,24 +9,39 @@ public class OperatorSkipTest { @Test - public void testSkip1() { - Observable w = Observable.from("one", "two", "three"); - Observable skip = w.lift(new OperatorSkip(2)); + public void testSkipNegativeElements() { + + Observable skip = Observable.from("one", "two", "three").lift(new OperatorSkip(-99)); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); skip.subscribe(observer); - verify(observer, never()).onNext("one"); - verify(observer, never()).onNext("two"); + verify(observer, times(1)).onNext("one"); + verify(observer, times(1)).onNext("two"); verify(observer, times(1)).onNext("three"); verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onCompleted(); } @Test - public void testSkip2() { - Observable w = Observable.from("one", "two", "three"); - Observable skip = w.lift(new OperatorSkip(1)); + public void testSkipZeroElements() { + + Observable skip = Observable.from("one", "two", "three").lift(new OperatorSkip(0)); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + skip.subscribe(observer); + verify(observer, times(1)).onNext("one"); + verify(observer, times(1)).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void testSkipOneElement() { + + Observable skip = Observable.from("one", "two", "three").lift(new OperatorSkip(1)); @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -37,4 +52,75 @@ public void testSkip2() { verify(observer, never()).onError(any(Throwable.class)); verify(observer, times(1)).onCompleted(); } + + @Test + public void testSkipTwoElements() { + + Observable skip = Observable.from("one", "two", "three").lift(new OperatorSkip(2)); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + skip.subscribe(observer); + verify(observer, never()).onNext("one"); + verify(observer, never()).onNext("two"); + verify(observer, times(1)).onNext("three"); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void testSkipEmptyStream() { + + Observable w = Observable.empty(); + Observable skip = w.lift(new OperatorSkip(1)); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + skip.subscribe(observer); + verify(observer, never()).onNext(any(String.class)); + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onCompleted(); + } + + @Test + public void testSkipMultipleObservers() { + + Observable skip = Observable.from("one", "two", "three").lift(new OperatorSkip(2)); + + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + skip.subscribe(observer1); + + @SuppressWarnings("unchecked") + Observer observer2 = mock(Observer.class); + skip.subscribe(observer2); + + verify(observer1, times(1)).onNext(any(String.class)); + verify(observer1, never()).onError(any(Throwable.class)); + verify(observer1, times(1)).onCompleted(); + + verify(observer2, times(1)).onNext(any(String.class)); + verify(observer2, never()).onError(any(Throwable.class)); + verify(observer2, times(1)).onCompleted(); + } + + @Test + public void testSkipError() { + + Exception e = new Exception(); + + Observable ok = Observable.from("one"); + Observable error = Observable.error(e); + + Observable skip = Observable.concat(ok, error).lift(new OperatorSkip(100)); + + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + skip.subscribe(observer); + + verify(observer, never()).onNext(any(String.class)); + verify(observer, times(1)).onError(e); + verify(observer, never()).onCompleted(); + + } }