Skip to content

Commit

Permalink
Merge pull request ReactiveX#215 from benjchristensen/pull-212-manual…
Browse files Browse the repository at this point in the history
…-merge

Manual Merge of Pull Request ReactiveX#212
  • Loading branch information
benjchristensen committed Mar 31, 2013
2 parents 669f8a7 + 5f852fd commit ce51735
Show file tree
Hide file tree
Showing 4 changed files with 664 additions and 134 deletions.
13 changes: 8 additions & 5 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationTake;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationWhere;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
Expand All @@ -54,7 +56,6 @@
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1779,7 +1780,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
* @return
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
return create(OperationTake.takeWhile(items, predicate));
return create(OperationTakeWhile.takeWhile(items, predicate));
}

/**
Expand Down Expand Up @@ -1811,16 +1812,18 @@ public Boolean call(T t) {
* @return
*/
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
return create(OperationTake.takeWhileWithIndex(items, predicate));
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
}

public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>()
{
@Override
public Boolean call(T t, Integer integer) {
public Boolean call(T t, Integer integer)
{
return (Boolean) _f.call(t, integer);
}
}));
Expand Down
256 changes: 256 additions & 0 deletions rxjava-core/src/main/java/rx/operators/AbstractOperation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package rx.operators;

import static org.junit.Assert.*;

import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
* Common utility functions for operator implementations and tests.
*/
/* package */class AbstractOperation
{
private AbstractOperation() {
}

public static class UnitTest {

public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
{
return new Func1<Observer<T>, Subscription>()
{
@Override
public Subscription call(Observer<T> observer)
{
return source.call(new TestingObserver<T>(observer));
}
};
}

public static class TestingObserver<T> implements Observer<T> {

private final Observer<T> actual;
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final AtomicBoolean isInCallback = new AtomicBoolean(false);

public TestingObserver(Observer<T> actual) {
this.actual = actual;
}

@Override
public void onCompleted() {
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onCompleted();
isInCallback.set(false);
}

@Override
public void onError(Exception e) {
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onError(e);
isInCallback.set(false);
}

@Override
public void onNext(T args) {
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
actual.onNext(args);
isInCallback.set(false);
}

}

@Test(expected = AssertionError.class)
public void testDoubleCompleted() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onCompleted();
observer.onCompleted();
return Subscriptions.empty();
}
})).lastOrDefault("end");

}

@Test(expected = AssertionError.class)
public void testCompletedError() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onCompleted();
observer.onError(new Exception());
return Subscriptions.empty();
}
})).lastOrDefault("end");
}

@Test(expected = AssertionError.class)
public void testCompletedNext() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onCompleted();
observer.onNext("one");
return Subscriptions.empty();
}
})).lastOrDefault("end");
}

@Test(expected = AssertionError.class)
public void testErrorCompleted() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onError(new Exception());
observer.onCompleted();
return Subscriptions.empty();
}
})).lastOrDefault("end");
}

@Test(expected = AssertionError.class)
public void testDoubleError() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onError(new Exception());
observer.onError(new Exception());
return Subscriptions.empty();
}
})).lastOrDefault("end");
}

@Test(expected = AssertionError.class)
public void testErrorNext() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onError(new Exception());
observer.onNext("one");
return Subscriptions.empty();
}
})).lastOrDefault("end");
}

@Test
public void testNextCompleted() {
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(Observer<String> observer)
{
observer.onNext("one");
observer.onCompleted();
return Subscriptions.empty();
}
})).lastOrDefault("end");
}

@Test
public void testConcurrentNextNext() {
final List<Thread> threads = new ArrayList<Thread>();
final AtomicReference<Throwable> threadFailure = new AtomicReference<Throwable>();
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
{
@Override
public Subscription call(final Observer<String> observer)
{
threads.add(new Thread(new Runnable()
{
@Override
public void run()
{
observer.onNext("one");
}
}));
threads.add(new Thread(new Runnable()
{
@Override
public void run()
{
observer.onNext("two");
}
}));
return Subscriptions.empty();
}
})).subscribe(new SlowObserver());
for (Thread thread : threads) {
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler()
{
@Override
public void uncaughtException(Thread thread, Throwable throwable)
{
threadFailure.set(throwable);
}
});
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException ignored) {
}
}
// Junit seems pretty bad about exposing test failures inside of created threads.
assertNotNull("exception thrown by thread", threadFailure.get());
assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass());
}

private static class SlowObserver implements Observer<String>
{
@Override
public void onCompleted()
{
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}

@Override
public void onError(Exception e)
{
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}

@Override
public void onNext(String args)
{
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
}
}
}
}
}
Loading

0 comments on commit ce51735

Please sign in to comment.