From 05d12682a39c9df36595cef86fdaf35d10103909 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 17 Dec 2013 09:31:12 -0800 Subject: [PATCH 1/2] Make NewThreadScheduler create Daemon threads This matches the behavior of Schedulers.COMPUTATION_EXECUTOR and Schedulers.IO_EXECUTOR. See https://groups.google.com/forum/#!topic/rxjava/Qe1qi0aHtnE and https://github.com/Netflix/RxJava/issues/431#issuecomment-30767610 --- .../src/main/java/rx/schedulers/NewThreadScheduler.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index b2fdff50d2..7e460923a6 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -52,7 +52,9 @@ private EventLoopScheduler() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + Thread t = new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + t.setDaemon(true); + return t; } }); } From be9841a2925b02e893e48f7ab3e9d04f6964326b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 17 Dec 2013 11:42:41 -0800 Subject: [PATCH 2/2] Fix non-deterministic unit test - there is no guarantee for how many threads Interval will use so useless to assert anything on it --- .../operators/OperationParallelMergeTest.java | 30 +++++-------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java b/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java index 907993ef10..7872ae59d5 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java @@ -58,24 +58,8 @@ public void testParallelMerge() { @Test public void testNumberOfThreads() { - final ConcurrentHashMap threads = new ConcurrentHashMap(); - Observable.merge(getStreams()) - .toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String o) { - System.out.println("o: " + o + " Thread: " + Thread.currentThread()); - threads.put(Thread.currentThread().getName(), Thread.currentThread().getName()); - } - }); - - // without injecting anything, the getStream() method uses Interval which runs on a default scheduler - assertEquals(Runtime.getRuntime().availableProcessors(), threads.keySet().size()); - - // clear - threads.clear(); - - // now we parallelMerge into 3 streams and observeOn for each + final ConcurrentHashMap threads = new ConcurrentHashMap(); + // parallelMerge into 3 streams and observeOn for each // we expect 3 threads in the output OperationParallelMerge.parallelMerge(getStreams(), 3) .flatMap(new Func1, Observable>() { @@ -90,8 +74,8 @@ public Observable call(Observable o) { @Override public void call(String o) { - System.out.println("o: " + o + " Thread: " + Thread.currentThread()); - threads.put(Thread.currentThread().getName(), Thread.currentThread().getName()); + System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId()); + threads.put(Thread.currentThread().getId(), Thread.currentThread().getId()); } }); @@ -100,7 +84,7 @@ public void call(String o) { @Test public void testNumberOfThreadsOnScheduledMerge() { - final ConcurrentHashMap threads = new ConcurrentHashMap(); + final ConcurrentHashMap threads = new ConcurrentHashMap(); // now we parallelMerge into 3 streams and observeOn for each // we expect 3 threads in the output @@ -109,8 +93,8 @@ public void testNumberOfThreadsOnScheduledMerge() { @Override public void call(String o) { - System.out.println("o: " + o + " Thread: " + Thread.currentThread()); - threads.put(Thread.currentThread().getName(), Thread.currentThread().getName()); + System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId()); + threads.put(Thread.currentThread().getId(), Thread.currentThread().getId()); } });