diff --git a/src/test/java/rx/BackpressureTests.java b/src/test/java/rx/BackpressureTests.java index 0760350f67..f54b8b67d5 100644 --- a/src/test/java/rx/BackpressureTests.java +++ b/src/test/java/rx/BackpressureTests.java @@ -15,28 +15,29 @@ */ package rx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; import rx.Observable.OnSubscribe; import rx.exceptions.MissingBackpressureException; -import rx.functions.Func1; -import rx.functions.Func2; +import rx.functions.*; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.test.TestObstructionDetection; public class BackpressureTests { + @After + public void doAfterTest() { + TestObstructionDetection.checkObstruction(); + } + @Test public void testObserveOn() { int NUM = (int) (RxRingBuffer.SIZE * 2.1); @@ -163,6 +164,7 @@ public Observable call(Integer i) { } @Test + @Ignore // the test is non-deterministic and can't be made deterministic public void testFlatMapAsync() { int NUM = (int) (RxRingBuffer.SIZE * 2.1); AtomicInteger c = new AtomicInteger(); diff --git a/src/test/java/rx/test/TestObstructionDetection.java b/src/test/java/rx/test/TestObstructionDetection.java new file mode 100644 index 0000000000..859d138dd0 --- /dev/null +++ b/src/test/java/rx/test/TestObstructionDetection.java @@ -0,0 +1,90 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.test; + +import java.util.*; +import java.util.concurrent.*; + +import rx.Scheduler; +import rx.functions.Action0; +import rx.schedulers.Schedulers; + +/** + * Check if there is an obstruction in the computation scheduler. + * Put the following code into test classes: + *
+ * @org.junit.After
+ * public void doAfterTest() {
+ *     rx.test.TestObstructionDetection.checkObstruction();
+ * }
+ * 
+ * or + *

+ * @org.junit.AfterClass
+ * public static void doAfterClass() {
+ *     rx.test.TestObstructionDetection.checkObstruction();
+ * }
+ * 
+ */ +public final class TestObstructionDetection { + private TestObstructionDetection() { + throw new IllegalStateException("No instances!"); + } + /** + * Checks if tasks can be immediately executed on the computation scheduler. + * @throws ObstructionExceptio if the schedulers don't respond within 1 second + */ + public static void checkObstruction() { + final int ncpu = Runtime.getRuntime().availableProcessors(); + + final CountDownLatch cdl = new CountDownLatch(ncpu); + final List workers = new ArrayList(); + final Action0 task = new Action0() { + @Override + public void call() { + cdl.countDown(); + } + }; + + for (int i = 0; i < ncpu; i++) { + workers.add(Schedulers.computation().createWorker()); + } + for (Scheduler.Worker w : workers) { + w.schedule(task); + } + try { + if (!cdl.await(1, TimeUnit.SECONDS)) { + throw new ObstructionException("Obstruction/Timeout detected!"); + } + } catch (InterruptedException ex) { + throw new ObstructionException("Interrupted: " + ex); + } finally { + for (Scheduler.Worker w : workers) { + w.unsubscribe(); + } + } + } + /** + * Exception thrown if obstruction was detected. + */ + public static final class ObstructionException extends RuntimeException { + /** */ + private static final long serialVersionUID = -6380717994471291795L; + public ObstructionException(String message) { + super(message); + } + } +} diff --git a/src/test/java/rx/test/TestObstructionDetectionTest.java b/src/test/java/rx/test/TestObstructionDetectionTest.java new file mode 100644 index 0000000000..f6db1db597 --- /dev/null +++ b/src/test/java/rx/test/TestObstructionDetectionTest.java @@ -0,0 +1,74 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.test; + +import org.junit.*; + +import rx.*; +import rx.Scheduler.Worker; +import rx.functions.Action0; +import rx.schedulers.Schedulers; +import rx.test.TestObstructionDetection.ObstructionException; + +public class TestObstructionDetectionTest { + private static Scheduler.Worker w; + @org.junit.After + public void doAfterTest() { + rx.test.TestObstructionDetection.checkObstruction(); + } + @AfterClass + public static void afterClass() { + Worker w2 = w; + if (w2 != null) { + w2.unsubscribe(); + } + } + @Test(timeout = 10000, expected = ObstructionException.class) + public void testObstruction() { + Scheduler.Worker w = Schedulers.computation().createWorker(); + + try { + w.schedule(new Action0() { + @Override + public void call() { + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + + } + } + }); + TestObstructionDetection.checkObstruction(); + } finally { + w.unsubscribe(); + } + } + @Test(timeout = 10000) + public void testNoObstruction() { + w = Schedulers.computation().createWorker(); + + w.schedule(new Action0() { + @Override + public void call() { + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + + } + } + }); + } +}