Skip to content

Commit

Permalink
Fixed state capture bug.
Browse files Browse the repository at this point in the history
Added some additional tests
  • Loading branch information
AppliedDuality committed Feb 27, 2014
1 parent de31844 commit 6b25b23
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 11 deletions.
8 changes: 5 additions & 3 deletions rxjava-core/src/main/java/rx/operators/OperatorSkip.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
public final class OperatorSkip<T> implements Observable.Operator<T, T> {

int n;
final int n;

public OperatorSkip(int n) {
this.n = n;
Expand All @@ -24,6 +24,8 @@ public OperatorSkip(int n) {
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {

int skipped = 0;

@Override
public void onCompleted() {
child.onCompleted();
Expand All @@ -36,10 +38,10 @@ public void onError(Throwable e) {

@Override
public void onNext(T t) {
if(n <= 0) {
if(skipped >= n) {
child.onNext(t);
} else {
n -= 1;
skipped += 1;
}
}

Expand Down
102 changes: 94 additions & 8 deletions rxjava-core/src/test/java/rx/operators/OperatorSkipTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,39 @@
public class OperatorSkipTest {

@Test
public void testSkip1() {
Observable<String> w = Observable.from("one", "two", "three");
Observable<String> skip = w.lift(new OperatorSkip<String>(2));
public void testSkipNegativeElements() {

Observable<String> skip = Observable.from("one", "two", "three").lift(new OperatorSkip<String>(-99));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkip2() {
Observable<String> w = Observable.from("one", "two", "three");
Observable<String> skip = w.lift(new OperatorSkip<String>(1));
public void testSkipZeroElements() {

Observable<String> skip = Observable.from("one", "two", "three").lift(new OperatorSkip<String>(0));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, times(1)).onNext("one");
verify(observer, times(1)).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipOneElement() {

Observable<String> skip = Observable.from("one", "two", "three").lift(new OperatorSkip<String>(1));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand All @@ -37,4 +52,75 @@ public void testSkip2() {
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipTwoElements() {

Observable<String> skip = Observable.from("one", "two", "three").lift(new OperatorSkip<String>(2));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext("one");
verify(observer, never()).onNext("two");
verify(observer, times(1)).onNext("three");
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipEmptyStream() {

Observable<String> w = Observable.empty();
Observable<String> skip = w.lift(new OperatorSkip<String>(1));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);
verify(observer, never()).onNext(any(String.class));
verify(observer, never()).onError(any(Throwable.class));
verify(observer, times(1)).onCompleted();
}

@Test
public void testSkipMultipleObservers() {

Observable<String> skip = Observable.from("one", "two", "three").lift(new OperatorSkip<String>(2));

@SuppressWarnings("unchecked")
Observer<String> observer1 = mock(Observer.class);
skip.subscribe(observer1);

@SuppressWarnings("unchecked")
Observer<String> observer2 = mock(Observer.class);
skip.subscribe(observer2);

verify(observer1, times(1)).onNext(any(String.class));
verify(observer1, never()).onError(any(Throwable.class));
verify(observer1, times(1)).onCompleted();

verify(observer2, times(1)).onNext(any(String.class));
verify(observer2, never()).onError(any(Throwable.class));
verify(observer2, times(1)).onCompleted();
}

@Test
public void testSkipError() {

Exception e = new Exception();

Observable<String> ok = Observable.from("one");
Observable<String> error = Observable.error(e);

Observable<String> skip = Observable.concat(ok, error).lift(new OperatorSkip<String>(100));

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
skip.subscribe(observer);

verify(observer, never()).onNext(any(String.class));
verify(observer, times(1)).onError(e);
verify(observer, never()).onCompleted();

}
}

0 comments on commit 6b25b23

Please sign in to comment.