diff --git a/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java b/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java index 803d999fe8..2eab100310 100644 --- a/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java +++ b/src/test/java/rx/schedulers/AbstractSchedulerConcurrencyTests.java @@ -101,44 +101,47 @@ public void testUnsubscribeRecursiveScheduleFromOutside() throws InterruptedExce final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); final Worker inner = getScheduler().createWorker(); - - inner.schedule(new Action0() { - - @Override - public void call() { - inner.schedule(new Action0() { - - int i = 0; - - @Override - public void call() { - System.out.println("Run: " + i++); - if (i == 10) { - latch.countDown(); - try { - // wait for unsubscribe to finish so we are not racing it - unsubscribeLatch.await(); - } catch (InterruptedException e) { - // we expect the countDown if unsubscribe is not working - // or to be interrupted if unsubscribe is successful since - // the unsubscribe will interrupt it as it is calling Future.cancel(true) - // so we will ignore the stacktrace + try { + inner.schedule(new Action0() { + + @Override + public void call() { + inner.schedule(new Action0() { + + int i = 0; + + @Override + public void call() { + System.out.println("Run: " + i++); + if (i == 10) { + latch.countDown(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } } + + counter.incrementAndGet(); + inner.schedule(this); } - - counter.incrementAndGet(); - inner.schedule(this); - } - }); - } - - }); - - latch.await(); - inner.unsubscribe(); - unsubscribeLatch.countDown(); - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - assertEquals(10, counter.get()); + }); + } + + }); + + latch.await(); + inner.unsubscribe(); + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } finally { + inner.unsubscribe(); + } } @Test @@ -146,32 +149,36 @@ public void testUnsubscribeRecursiveScheduleFromInside() throws InterruptedExcep final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); final Worker inner = getScheduler().createWorker(); - inner.schedule(new Action0() { - - @Override - public void call() { - inner.schedule(new Action0() { - - int i = 0; - - @Override - public void call() { - System.out.println("Run: " + i++); - if (i == 10) { - inner.unsubscribe(); + try { + inner.schedule(new Action0() { + + @Override + public void call() { + inner.schedule(new Action0() { + + int i = 0; + + @Override + public void call() { + System.out.println("Run: " + i++); + if (i == 10) { + inner.unsubscribe(); + } + + counter.incrementAndGet(); + inner.schedule(this); } - - counter.incrementAndGet(); - inner.schedule(this); - } - }); - } - - }); - - unsubscribeLatch.countDown(); - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - assertEquals(10, counter.get()); + }); + } + + }); + + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } finally { + inner.unsubscribe(); + } } @Test @@ -180,91 +187,104 @@ public void testUnsubscribeRecursiveScheduleWithDelay() throws InterruptedExcept final CountDownLatch unsubscribeLatch = new CountDownLatch(1); final AtomicInteger counter = new AtomicInteger(); final Worker inner = getScheduler().createWorker(); - inner.schedule(new Action0() { - - @Override - public void call() { - inner.schedule(new Action0() { - - long i = 1L; - - @Override - public void call() { - if (i++ == 10) { - latch.countDown(); - try { - // wait for unsubscribe to finish so we are not racing it - unsubscribeLatch.await(); - } catch (InterruptedException e) { - // we expect the countDown if unsubscribe is not working - // or to be interrupted if unsubscribe is successful since - // the unsubscribe will interrupt it as it is calling Future.cancel(true) - // so we will ignore the stacktrace + + try { + inner.schedule(new Action0() { + + @Override + public void call() { + inner.schedule(new Action0() { + + long i = 1L; + + @Override + public void call() { + if (i++ == 10) { + latch.countDown(); + try { + // wait for unsubscribe to finish so we are not racing it + unsubscribeLatch.await(); + } catch (InterruptedException e) { + // we expect the countDown if unsubscribe is not working + // or to be interrupted if unsubscribe is successful since + // the unsubscribe will interrupt it as it is calling Future.cancel(true) + // so we will ignore the stacktrace + } } + + counter.incrementAndGet(); + inner.schedule(this, 10, TimeUnit.MILLISECONDS); } - - counter.incrementAndGet(); - inner.schedule(this, 10, TimeUnit.MILLISECONDS); - } - }, 10, TimeUnit.MILLISECONDS); - } - }); - - latch.await(); - inner.unsubscribe(); - unsubscribeLatch.countDown(); - Thread.sleep(200); // let time pass to see if the scheduler is still doing work - assertEquals(10, counter.get()); + }, 10, TimeUnit.MILLISECONDS); + } + }); + + latch.await(); + inner.unsubscribe(); + unsubscribeLatch.countDown(); + Thread.sleep(200); // let time pass to see if the scheduler is still doing work + assertEquals(10, counter.get()); + } finally { + inner.unsubscribe(); + } } @Test public void recursionFromOuterActionAndUnsubscribeInside() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final Worker inner = getScheduler().createWorker(); - inner.schedule(new Action0() { - - int i = 0; - - @Override - public void call() { - i++; - if (i % 100000 == 0) { - System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); - } - if (i < 1000000L) { - inner.schedule(this); - } else { - latch.countDown(); + try { + inner.schedule(new Action0() { + + int i = 0; + + @Override + public void call() { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 1000000L) { + inner.schedule(this); + } else { + latch.countDown(); + } } - } - }); - - latch.await(); + }); + + latch.await(); + } finally { + inner.unsubscribe(); + } } @Test public void testRecursion() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final Worker inner = getScheduler().createWorker(); - inner.schedule(new Action0() { - - private long i = 0; - - @Override - public void call() { - i++; - if (i % 100000 == 0) { - System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); - } - if (i < 1000000L) { - inner.schedule(this); - } else { - latch.countDown(); + try { + inner.schedule(new Action0() { + + private long i = 0; + + @Override + public void call() { + i++; + if (i % 100000 == 0) { + System.out.println(i + " Total Memory: " + Runtime.getRuntime().totalMemory() + " Free: " + Runtime.getRuntime().freeMemory()); + } + if (i < 1000000L) { + inner.schedule(this); + } else { + latch.countDown(); + } } - } - }); - - latch.await(); + }); + + latch.await(); + } finally { + inner.unsubscribe(); + } } @Test @@ -272,72 +292,75 @@ public void testRecursionAndOuterUnsubscribe() throws InterruptedException { // use latches instead of Thread.sleep final CountDownLatch latch = new CountDownLatch(10); final CountDownLatch completionLatch = new CountDownLatch(1); - - Observable obs = Observable.create(new OnSubscribe() { - @Override - public void call(final Subscriber observer) { - final Worker inner = getScheduler().createWorker(); - inner.schedule(new Action0() { - @Override - public void call() { - observer.onNext(42); - latch.countDown(); - - // this will recursively schedule this task for execution again - inner.schedule(this); - } - }); - - observer.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - inner.unsubscribe(); - observer.onCompleted(); - completionLatch.countDown(); - } - - })); - - } - }); - - final AtomicInteger count = new AtomicInteger(); - final AtomicBoolean completed = new AtomicBoolean(false); - Subscription subscribe = obs.subscribe(new Subscriber() { - @Override - public void onCompleted() { - System.out.println("Completed"); - completed.set(true); - } - - @Override - public void onError(Throwable e) { - System.out.println("Error"); + final Worker inner = getScheduler().createWorker(); + try { + Observable obs = Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber observer) { + inner.schedule(new Action0() { + @Override + public void call() { + observer.onNext(42); + latch.countDown(); + + // this will recursively schedule this task for execution again + inner.schedule(this); + } + }); + + observer.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + inner.unsubscribe(); + observer.onCompleted(); + completionLatch.countDown(); + } + + })); + + } + }); + + final AtomicInteger count = new AtomicInteger(); + final AtomicBoolean completed = new AtomicBoolean(false); + Subscription subscribe = obs.subscribe(new Subscriber() { + @Override + public void onCompleted() { + System.out.println("Completed"); + completed.set(true); + } + + @Override + public void onError(Throwable e) { + System.out.println("Error"); + } + + @Override + public void onNext(Integer args) { + count.incrementAndGet(); + System.out.println(args); + } + }); + + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + fail("Timed out waiting on onNext latch"); } - - @Override - public void onNext(Integer args) { - count.incrementAndGet(); - System.out.println(args); + + // now unsubscribe and ensure it stops the recursive loop + subscribe.unsubscribe(); + System.out.println("unsubscribe"); + + if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { + fail("Timed out waiting on completion latch"); } - }); - - if (!latch.await(5000, TimeUnit.MILLISECONDS)) { - fail("Timed out waiting on onNext latch"); + + // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count + assertTrue(count.get() >= 10); + assertTrue(completed.get()); + } finally { + inner.unsubscribe(); } - - // now unsubscribe and ensure it stops the recursive loop - subscribe.unsubscribe(); - System.out.println("unsubscribe"); - - if (!completionLatch.await(5000, TimeUnit.MILLISECONDS)) { - fail("Timed out waiting on completion latch"); - } - - // the count can be 10 or higher due to thread scheduling of the unsubscribe vs the scheduler looping to emit the count - assertTrue(count.get() >= 10); - assertTrue(completed.get()); } @Test diff --git a/src/test/java/rx/schedulers/AbstractSchedulerTests.java b/src/test/java/rx/schedulers/AbstractSchedulerTests.java index afb7d1617c..53f77abcc6 100644 --- a/src/test/java/rx/schedulers/AbstractSchedulerTests.java +++ b/src/test/java/rx/schedulers/AbstractSchedulerTests.java @@ -58,55 +58,59 @@ public abstract class AbstractSchedulerTests { public void testNestedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final CountDownLatch latch = new CountDownLatch(1); - - final Action0 firstStepStart = mock(Action0.class); - final Action0 firstStepEnd = mock(Action0.class); - - final Action0 secondStepStart = mock(Action0.class); - final Action0 secondStepEnd = mock(Action0.class); - - final Action0 thirdStepStart = mock(Action0.class); - final Action0 thirdStepEnd = mock(Action0.class); - - final Action0 firstAction = new Action0() { - @Override - public void call() { - firstStepStart.call(); - firstStepEnd.call(); - latch.countDown(); - } - }; - final Action0 secondAction = new Action0() { - @Override - public void call() { - secondStepStart.call(); - inner.schedule(firstAction); - secondStepEnd.call(); - - } - }; - final Action0 thirdAction = new Action0() { - @Override - public void call() { - thirdStepStart.call(); - inner.schedule(secondAction); - thirdStepEnd.call(); - } - }; - - InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); - - inner.schedule(thirdAction); - - latch.await(); - - inOrder.verify(thirdStepStart, times(1)).call(); - inOrder.verify(thirdStepEnd, times(1)).call(); - inOrder.verify(secondStepStart, times(1)).call(); - inOrder.verify(secondStepEnd, times(1)).call(); - inOrder.verify(firstStepStart, times(1)).call(); - inOrder.verify(firstStepEnd, times(1)).call(); + try { + final CountDownLatch latch = new CountDownLatch(1); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + latch.countDown(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + inner.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + inner.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + inner.schedule(thirdAction); + + latch.await(); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } finally { + inner.unsubscribe(); + } } @Test @@ -194,31 +198,34 @@ public void testSequenceOfDelayedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final CountDownLatch latch = new CountDownLatch(1); - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - - inner.schedule(new Action0() { - @Override - public void call() { - inner.schedule(first, 30, TimeUnit.MILLISECONDS); - inner.schedule(second, 10, TimeUnit.MILLISECONDS); - inner.schedule(new Action0() { - - @Override - public void call() { - latch.countDown(); - } - }, 40, TimeUnit.MILLISECONDS); - } - }); - - latch.await(); - InOrder inOrder = inOrder(first, second); - - inOrder.verify(second, times(1)).call(); - inOrder.verify(first, times(1)).call(); - + try { + final CountDownLatch latch = new CountDownLatch(1); + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + inner.schedule(new Action0() { + @Override + public void call() { + inner.schedule(first, 30, TimeUnit.MILLISECONDS); + inner.schedule(second, 10, TimeUnit.MILLISECONDS); + inner.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }, 40, TimeUnit.MILLISECONDS); + } + }); + + latch.await(); + InOrder inOrder = inOrder(first, second); + + inOrder.verify(second, times(1)).call(); + inOrder.verify(first, times(1)).call(); + } finally { + inner.unsubscribe(); + } } @Test @@ -226,85 +233,100 @@ public void testMixOfDelayedAndNonDelayedActions() throws InterruptedException { Scheduler scheduler = getScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final CountDownLatch latch = new CountDownLatch(1); - final Action0 first = mock(Action0.class); - final Action0 second = mock(Action0.class); - final Action0 third = mock(Action0.class); - final Action0 fourth = mock(Action0.class); - - inner.schedule(new Action0() { - @Override - public void call() { - inner.schedule(first); - inner.schedule(second, 300, TimeUnit.MILLISECONDS); - inner.schedule(third, 100, TimeUnit.MILLISECONDS); - inner.schedule(fourth); - inner.schedule(new Action0() { - - @Override - public void call() { - latch.countDown(); - } - }, 400, TimeUnit.MILLISECONDS); - } - }); - - latch.await(); - InOrder inOrder = inOrder(first, second, third, fourth); - - inOrder.verify(first, times(1)).call(); - inOrder.verify(fourth, times(1)).call(); - inOrder.verify(third, times(1)).call(); - inOrder.verify(second, times(1)).call(); + try { + final CountDownLatch latch = new CountDownLatch(1); + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + final Action0 third = mock(Action0.class); + final Action0 fourth = mock(Action0.class); + + inner.schedule(new Action0() { + @Override + public void call() { + inner.schedule(first); + inner.schedule(second, 300, TimeUnit.MILLISECONDS); + inner.schedule(third, 100, TimeUnit.MILLISECONDS); + inner.schedule(fourth); + inner.schedule(new Action0() { + + @Override + public void call() { + latch.countDown(); + } + }, 400, TimeUnit.MILLISECONDS); + } + }); + + latch.await(); + InOrder inOrder = inOrder(first, second, third, fourth); + + inOrder.verify(first, times(1)).call(); + inOrder.verify(fourth, times(1)).call(); + inOrder.verify(third, times(1)).call(); + inOrder.verify(second, times(1)).call(); + } finally { + inner.unsubscribe(); + } } @Test public final void testRecursiveExecution() throws InterruptedException { final Scheduler scheduler = getScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final AtomicInteger i = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - inner.schedule(new Action0() { - - @Override - public void call() { - if (i.incrementAndGet() < 100) { - inner.schedule(this); - } else { - latch.countDown(); + + try { + + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + inner.schedule(new Action0() { + + @Override + public void call() { + if (i.incrementAndGet() < 100) { + inner.schedule(this); + } else { + latch.countDown(); + } } - } - }); - - latch.await(); - assertEquals(100, i.get()); + }); + + latch.await(); + assertEquals(100, i.get()); + } finally { + inner.unsubscribe(); + } } @Test public final void testRecursiveExecutionWithDelayTime() throws InterruptedException { Scheduler scheduler = getScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final AtomicInteger i = new AtomicInteger(); - final CountDownLatch latch = new CountDownLatch(1); - - inner.schedule(new Action0() { - - int state = 0; - - @Override - public void call() { - i.set(state); - if (state++ < 100) { - inner.schedule(this, 1, TimeUnit.MILLISECONDS); - } else { - latch.countDown(); + + try { + final AtomicInteger i = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(1); + + inner.schedule(new Action0() { + + int state = 0; + + @Override + public void call() { + i.set(state); + if (state++ < 100) { + inner.schedule(this, 1, TimeUnit.MILLISECONDS); + } else { + latch.countDown(); + } } - } - - }); - - latch.await(); - assertEquals(100, i.get()); + + }); + + latch.await(); + assertEquals(100, i.get()); + } finally { + inner.unsubscribe(); + } } @Test diff --git a/src/test/java/rx/schedulers/ComputationSchedulerTests.java b/src/test/java/rx/schedulers/ComputationSchedulerTests.java index 32f15b3564..881224cfac 100644 --- a/src/test/java/rx/schedulers/ComputationSchedulerTests.java +++ b/src/test/java/rx/schedulers/ComputationSchedulerTests.java @@ -46,47 +46,51 @@ public void testThreadSafetyWhenSchedulerIsHoppingBetweenThreads() { final HashMap map = new HashMap(); final Scheduler.Worker inner = Schedulers.computation().createWorker(); - - inner.schedule(new Action0() { - - private HashMap statefulMap = map; - int nonThreadSafeCounter = 0; - - @Override - public void call() { - Integer i = statefulMap.get("a"); - if (i == null) { - i = 1; - statefulMap.put("a", i); - statefulMap.put("b", i); - } else { - i++; - statefulMap.put("a", i); - statefulMap.put("b", i); - } - nonThreadSafeCounter++; - statefulMap.put("nonThreadSafeCounter", nonThreadSafeCounter); - if (i < NUM) { - inner.schedule(this); - } else { - latch.countDown(); + + try { + inner.schedule(new Action0() { + + private HashMap statefulMap = map; + int nonThreadSafeCounter = 0; + + @Override + public void call() { + Integer i = statefulMap.get("a"); + if (i == null) { + i = 1; + statefulMap.put("a", i); + statefulMap.put("b", i); + } else { + i++; + statefulMap.put("a", i); + statefulMap.put("b", i); + } + nonThreadSafeCounter++; + statefulMap.put("nonThreadSafeCounter", nonThreadSafeCounter); + if (i < NUM) { + inner.schedule(this); + } else { + latch.countDown(); + } } + }); + + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); } - }); - - try { - latch.await(); - } catch (InterruptedException e) { - e.printStackTrace(); + + System.out.println("Count A: " + map.get("a")); + System.out.println("Count B: " + map.get("b")); + System.out.println("nonThreadSafeCounter: " + map.get("nonThreadSafeCounter")); + + assertEquals(NUM, map.get("a").intValue()); + assertEquals(NUM, map.get("b").intValue()); + assertEquals(NUM, map.get("nonThreadSafeCounter").intValue()); + } finally { + inner.unsubscribe(); } - - System.out.println("Count A: " + map.get("a")); - System.out.println("Count B: " + map.get("b")); - System.out.println("nonThreadSafeCounter: " + map.get("nonThreadSafeCounter")); - - assertEquals(NUM, map.get("a").intValue()); - assertEquals(NUM, map.get("b").intValue()); - assertEquals(NUM, map.get("nonThreadSafeCounter").intValue()); } @Test diff --git a/src/test/java/rx/schedulers/TestSchedulerTest.java b/src/test/java/rx/schedulers/TestSchedulerTest.java index 8ab361f54d..f8a49eb0be 100644 --- a/src/test/java/rx/schedulers/TestSchedulerTest.java +++ b/src/test/java/rx/schedulers/TestSchedulerTest.java @@ -48,37 +48,41 @@ public final void testPeriodicScheduling() { final TestScheduler scheduler = new TestScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - inner.schedulePeriodically(new Action0() { - @Override - public void call() { - System.out.println(scheduler.now()); - calledOp.call(scheduler.now()); - } - }, 1, 2, TimeUnit.SECONDS); - - verify(calledOp, never()).call(anyLong()); - - InOrder inOrder = Mockito.inOrder(calledOp); - - scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(1000L); - - scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(3000L); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(3000L); - - scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); - inOrder.verify(calledOp, times(1)).call(5000L); - inOrder.verify(calledOp, times(1)).call(7000L); - - inner.unsubscribe(); - scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); + try { + inner.schedulePeriodically(new Action0() { + @Override + public void call() { + System.out.println(scheduler.now()); + calledOp.call(scheduler.now()); + } + }, 1, 2, TimeUnit.SECONDS); + + verify(calledOp, never()).call(anyLong()); + + InOrder inOrder = Mockito.inOrder(calledOp); + + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(1000L); + + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(3000L); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(3000L); + + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(5000L); + inOrder.verify(calledOp, times(1)).call(7000L); + + inner.unsubscribe(); + scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + } finally { + inner.unsubscribe(); + } } @SuppressWarnings("unchecked") @@ -90,37 +94,41 @@ public final void testPeriodicSchedulingUnsubscription() { final TestScheduler scheduler = new TestScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final Subscription subscription = inner.schedulePeriodically(new Action0() { - @Override - public void call() { - System.out.println(scheduler.now()); - calledOp.call(scheduler.now()); - } - }, 1, 2, TimeUnit.SECONDS); - - verify(calledOp, never()).call(anyLong()); - - InOrder inOrder = Mockito.inOrder(calledOp); - - scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(1000L); - - scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, never()).call(3000L); - - scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); - inOrder.verify(calledOp, times(1)).call(3000L); - - scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); - inOrder.verify(calledOp, times(1)).call(5000L); - inOrder.verify(calledOp, times(1)).call(7000L); - - subscription.unsubscribe(); - scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); - inOrder.verify(calledOp, never()).call(anyLong()); + try { + final Subscription subscription = inner.schedulePeriodically(new Action0() { + @Override + public void call() { + System.out.println(scheduler.now()); + calledOp.call(scheduler.now()); + } + }, 1, 2, TimeUnit.SECONDS); + + verify(calledOp, never()).call(anyLong()); + + InOrder inOrder = Mockito.inOrder(calledOp); + + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(1000L); + + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(3000L); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(3000L); + + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(5000L); + inOrder.verify(calledOp, times(1)).call(7000L); + + subscription.unsubscribe(); + scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + } finally { + inner.unsubscribe(); + } } @Test @@ -129,76 +137,89 @@ public final void testImmediateUnsubscribes() { final Scheduler.Worker inner = s.createWorker(); final AtomicInteger counter = new AtomicInteger(0); - inner.schedule(new Action0() { - - @Override - public void call() { - counter.incrementAndGet(); - System.out.println("counter: " + counter.get()); - inner.schedule(this); - } - - }); - inner.unsubscribe(); - assertEquals(0, counter.get()); + try { + inner.schedule(new Action0() { + + @Override + public void call() { + counter.incrementAndGet(); + System.out.println("counter: " + counter.get()); + inner.schedule(this); + } + + }); + inner.unsubscribe(); + assertEquals(0, counter.get()); + } finally { + inner.unsubscribe(); + } } @Test public final void testImmediateUnsubscribes2() { TestScheduler s = new TestScheduler(); final Scheduler.Worker inner = s.createWorker(); - final AtomicInteger counter = new AtomicInteger(0); - - final Subscription subscription = inner.schedule(new Action0() { - - @Override - public void call() { - counter.incrementAndGet(); - System.out.println("counter: " + counter.get()); - inner.schedule(this); - } - - }); - subscription.unsubscribe(); - assertEquals(0, counter.get()); + try { + final AtomicInteger counter = new AtomicInteger(0); + + final Subscription subscription = inner.schedule(new Action0() { + + @Override + public void call() { + counter.incrementAndGet(); + System.out.println("counter: " + counter.get()); + inner.schedule(this); + } + + }); + subscription.unsubscribe(); + assertEquals(0, counter.get()); + } finally { + inner.unsubscribe(); + } } @Test public final void testNestedSchedule() { final TestScheduler scheduler = new TestScheduler(); final Scheduler.Worker inner = scheduler.createWorker(); - final Action0 calledOp = mock(Action0.class); - - Observable poller; - poller = Observable.create(new OnSubscribe() { - @Override - public void call(final Subscriber aSubscriber) { - inner.schedule(new Action0() { - @Override - public void call() { - if (!aSubscriber.isUnsubscribed()) { - calledOp.call(); - inner.schedule(this, 5, TimeUnit.SECONDS); + + try { + final Action0 calledOp = mock(Action0.class); + + Observable poller; + poller = Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber aSubscriber) { + inner.schedule(new Action0() { + @Override + public void call() { + if (!aSubscriber.isUnsubscribed()) { + calledOp.call(); + inner.schedule(this, 5, TimeUnit.SECONDS); + } } - } - }); - } - }); - - InOrder inOrder = Mockito.inOrder(calledOp); - - Subscription sub; - sub = poller.subscribe(); - - scheduler.advanceTimeTo(6, TimeUnit.SECONDS); - inOrder.verify(calledOp, times(2)).call(); - - sub.unsubscribe(); - scheduler.advanceTimeTo(11, TimeUnit.SECONDS); - inOrder.verify(calledOp, never()).call(); - - sub = poller.subscribe(); - scheduler.advanceTimeTo(12, TimeUnit.SECONDS); - inOrder.verify(calledOp, times(1)).call(); + }); + } + }); + + InOrder inOrder = Mockito.inOrder(calledOp); + + Subscription sub; + sub = poller.subscribe(); + + scheduler.advanceTimeTo(6, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(2)).call(); + + sub.unsubscribe(); + scheduler.advanceTimeTo(11, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(); + + sub = poller.subscribe(); + scheduler.advanceTimeTo(12, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(); + } finally { + inner.unsubscribe(); + } } } diff --git a/src/test/java/rx/schedulers/TrampolineSchedulerTest.java b/src/test/java/rx/schedulers/TrampolineSchedulerTest.java index 253d3a5382..c628da245f 100644 --- a/src/test/java/rx/schedulers/TrampolineSchedulerTest.java +++ b/src/test/java/rx/schedulers/TrampolineSchedulerTest.java @@ -15,20 +15,17 @@ */ package rx.schedulers; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; -import java.util.ArrayList; -import java.util.Arrays; +import java.util.*; import org.junit.Test; -import rx.Observable; -import rx.Scheduler; +import rx.*; import rx.Scheduler.Worker; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.Observable; +import rx.functions.*; +import rx.subscriptions.CompositeSubscription; public class TrampolineSchedulerTest extends AbstractSchedulerTests { @@ -65,33 +62,40 @@ public void call(String t) { @Test public void testNestedTrampolineWithUnsubscribe() { final ArrayList workDone = new ArrayList(); + final CompositeSubscription workers = new CompositeSubscription(); Worker worker = Schedulers.trampoline().createWorker(); - worker.schedule(new Action0() { - - @Override - public void call() { - doWorkOnNewTrampoline("A", workDone); - } - - }); - - final Worker worker2 = Schedulers.trampoline().createWorker(); - worker2.schedule(new Action0() { - - @Override - public void call() { - doWorkOnNewTrampoline("B", workDone); - // we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker - worker2.unsubscribe(); - } - - }); - - assertEquals(6, workDone.size()); - assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone); + try { + workers.add(worker); + worker.schedule(new Action0() { + + @Override + public void call() { + workers.add(doWorkOnNewTrampoline("A", workDone)); + } + + }); + + final Worker worker2 = Schedulers.trampoline().createWorker(); + workers.add(worker2); + worker2.schedule(new Action0() { + + @Override + public void call() { + workers.add(doWorkOnNewTrampoline("B", workDone)); + // we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker + worker2.unsubscribe(); + } + + }); + + assertEquals(6, workDone.size()); + assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone); + } finally { + workers.unsubscribe(); + } } - private static void doWorkOnNewTrampoline(final String key, final ArrayList workDone) { + private static Worker doWorkOnNewTrampoline(final String key, final ArrayList workDone) { Worker worker = Schedulers.trampoline().createWorker(); worker.schedule(new Action0() { @@ -106,6 +110,7 @@ public void call() { } }); + return worker; } private static Action0 createPrintAction(final String message, final ArrayList workDone) {