diff --git a/src/main/java/io/reactivex/Scheduler.java b/src/main/java/io/reactivex/Scheduler.java index 310e56b849..fcce530803 100644 --- a/src/main/java/io/reactivex/Scheduler.java +++ b/src/main/java/io/reactivex/Scheduler.java @@ -131,16 +131,7 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull Tim final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); - w.schedule(new Runnable() { - @Override - public void run() { - try { - decoratedRun.run(); - } finally { - w.dispose(); - } - } - }, delay, unit); + w.schedule(new DisposeTask(decoratedRun, w), delay, unit); return w; } @@ -440,4 +431,23 @@ public boolean isDisposed() { return disposed; } } + + static final class DisposeTask implements Runnable { + final Runnable decoratedRun; + final Worker w; + + DisposeTask(Runnable decoratedRun, Worker w) { + this.decoratedRun = decoratedRun; + this.w = w; + } + + @Override + public void run() { + try { + decoratedRun.run(); + } finally { + w.dispose(); + } + } + } } diff --git a/src/main/java/io/reactivex/internal/functions/Functions.java b/src/main/java/io/reactivex/internal/functions/Functions.java index 5b4b97f759..755af2d946 100644 --- a/src/main/java/io/reactivex/internal/functions/Functions.java +++ b/src/main/java/io/reactivex/internal/functions/Functions.java @@ -36,131 +36,50 @@ private Functions() { @SuppressWarnings("unchecked") public static Function toFunction(final BiFunction f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 2) { - throw new IllegalArgumentException("Array of size 2 expected but got " + a.length); - } - return ((BiFunction)f).apply(a[0], a[1]); - } - }; + return new Array2Func(f); } public static Function toFunction(final Function3 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 3) { - throw new IllegalArgumentException("Array of size 3 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2]); - } - }; + return new Array3Func(f); } public static Function toFunction(final Function4 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 4) { - throw new IllegalArgumentException("Array of size 4 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3]); - } - }; + return new Array4Func(f); } public static Function toFunction(final Function5 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 5) { - throw new IllegalArgumentException("Array of size 5 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4]); - } - }; + return new Array5Func(f); } public static Function toFunction( final Function6 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 6) { - throw new IllegalArgumentException("Array of size 6 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5]); - } - }; + return new Array6Func(f); } public static Function toFunction( final Function7 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 7) { - throw new IllegalArgumentException("Array of size 7 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5], (T7)a[6]); - } - }; + return new Array7Func(f); } public static Function toFunction( final Function8 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 8) { - throw new IllegalArgumentException("Array of size 8 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5], (T7)a[6], (T8)a[7]); - } - }; + return new Array8Func(f); } public static Function toFunction( final Function9 f) { ObjectHelper.requireNonNull(f, "f is null"); - return new Function() { - @SuppressWarnings("unchecked") - @Override - public R apply(Object[] a) throws Exception { - if (a.length != 9) { - throw new IllegalArgumentException("Array of size 9 expected but got " + a.length); - } - return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5], (T7)a[6], (T8)a[7], (T9)a[8]); - } - }; + return new Array9Func(f); } /** A singleton identity function. */ - static final Function IDENTITY = new Function() { - @Override - public Object apply(Object v) { - return v; - } - - @Override - public String toString() { - return "IdentityFunction"; - } - }; + static final Function IDENTITY = new Identity(); /** * Returns an identity function that simply returns its argument. @@ -172,35 +91,11 @@ public static Function identity() { return (Function)IDENTITY; } - public static final Runnable EMPTY_RUNNABLE = new Runnable() { - @Override - public void run() { } + public static final Runnable EMPTY_RUNNABLE = new EmptyRunnable(); - @Override - public String toString() { - return "EmptyRunnable"; - } - }; + public static final Action EMPTY_ACTION = new EmptyAction(); - public static final Action EMPTY_ACTION = new Action() { - @Override - public void run() { } - - @Override - public String toString() { - return "EmptyAction"; - } - }; - - static final Consumer EMPTY_CONSUMER = new Consumer() { - @Override - public void accept(Object v) { } - - @Override - public String toString() { - return "EmptyConsumer"; - } - }; + static final Consumer EMPTY_CONSUMER = new EmptyConsumer(); /** * Returns an empty consumer that does nothing. @@ -212,57 +107,23 @@ public static Consumer emptyConsumer() { return (Consumer)EMPTY_CONSUMER; } - public static final Consumer ERROR_CONSUMER = new Consumer() { - @Override - public void accept(Throwable error) { - RxJavaPlugins.onError(error); - } - }; + public static final Consumer ERROR_CONSUMER = new ErrorConsumer(); /** * Wraps the consumed Throwable into an OnErrorNotImplementedException and * signals it to the plugin error handler. */ - public static final Consumer ON_ERROR_MISSING = new Consumer() { - @Override - public void accept(Throwable error) { - RxJavaPlugins.onError(new OnErrorNotImplementedException(error)); - } - }; + public static final Consumer ON_ERROR_MISSING = new OnErrorMissingConsumer(); - public static final LongConsumer EMPTY_LONG_CONSUMER = new LongConsumer() { - @Override - public void accept(long v) { } - }; + public static final LongConsumer EMPTY_LONG_CONSUMER = new EmptyLongConsumer(); - static final Predicate ALWAYS_TRUE = new Predicate() { - @Override - public boolean test(Object o) { - return true; - } - }; + static final Predicate ALWAYS_TRUE = new TruePredicate(); - static final Predicate ALWAYS_FALSE = new Predicate() { - @Override - public boolean test(Object o) { - return false; - } - }; + static final Predicate ALWAYS_FALSE = new FalsePredicate(); - static final Callable NULL_SUPPLIER = new Callable() { - @Override - public Object call() { - return null; - } - }; + static final Callable NULL_SUPPLIER = new NullCallable(); - static final Comparator NATURAL_COMPARATOR = new Comparator() { - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public int compare(Object a, Object b) { - return ((Comparable)a).compareTo(b); - } - }; + static final Comparator NATURAL_COMPARATOR = new NaturalObjectComparator(); @SuppressWarnings("unchecked") public static Predicate alwaysTrue() { @@ -634,7 +495,7 @@ public static Comparator naturalComparator() { } static final class ListSorter implements Function, List> { - private final Comparator comparator; + final Comparator comparator; ListSorter(Comparator comparator) { this.comparator = comparator; @@ -651,10 +512,237 @@ public static Function, List> listSorter(final Comparator(comparator); } - public static final Consumer REQUEST_MAX = new Consumer() { + public static final Consumer REQUEST_MAX = new MaxRequestSubscription(); + + static final class Array2Func implements Function { + final BiFunction f; + + Array2Func(BiFunction f) { + this.f = f; + } + + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 2) { + throw new IllegalArgumentException("Array of size 2 expected but got " + a.length); + } + return ((BiFunction) f).apply(a[0], a[1]); + } + } + + static final class Array3Func implements Function { + final Function3 f; + + Array3Func(Function3 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 3) { + throw new IllegalArgumentException("Array of size 3 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2]); + } + } + + static final class Array4Func implements Function { + final Function4 f; + + Array4Func(Function4 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 4) { + throw new IllegalArgumentException("Array of size 4 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3]); + } + } + + static final class Array5Func implements Function { + private final Function5 f; + + Array5Func(Function5 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 5) { + throw new IllegalArgumentException("Array of size 5 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4]); + } + } + + static final class Array6Func implements Function { + final Function6 f; + + Array6Func(Function6 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 6) { + throw new IllegalArgumentException("Array of size 6 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5]); + } + } + + static final class Array7Func implements Function { + final Function7 f; + + Array7Func(Function7 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 7) { + throw new IllegalArgumentException("Array of size 7 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5], (T7)a[6]); + } + } + + static final class Array8Func implements Function { + final Function8 f; + + Array8Func(Function8 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 8) { + throw new IllegalArgumentException("Array of size 8 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5], (T7)a[6], (T8)a[7]); + } + } + + static final class Array9Func implements Function { + final Function9 f; + + Array9Func(Function9 f) { + this.f = f; + } + + @SuppressWarnings("unchecked") + @Override + public R apply(Object[] a) throws Exception { + if (a.length != 9) { + throw new IllegalArgumentException("Array of size 9 expected but got " + a.length); + } + return f.apply((T1)a[0], (T2)a[1], (T3)a[2], (T4)a[3], (T5)a[4], (T6)a[5], (T7)a[6], (T8)a[7], (T9)a[8]); + } + } + + static final class Identity implements Function { + @Override + public Object apply(Object v) { + return v; + } + + @Override + public String toString() { + return "IdentityFunction"; + } + } + + static final class EmptyRunnable implements Runnable { + @Override + public void run() { } + + @Override + public String toString() { + return "EmptyRunnable"; + } + } + + static final class EmptyAction implements Action { + @Override + public void run() { } + + @Override + public String toString() { + return "EmptyAction"; + } + } + + static final class EmptyConsumer implements Consumer { + @Override + public void accept(Object v) { } + + @Override + public String toString() { + return "EmptyConsumer"; + } + } + + static final class ErrorConsumer implements Consumer { + @Override + public void accept(Throwable error) { + RxJavaPlugins.onError(error); + } + } + + static final class OnErrorMissingConsumer implements Consumer { + @Override + public void accept(Throwable error) { + RxJavaPlugins.onError(new OnErrorNotImplementedException(error)); + } + } + + static final class EmptyLongConsumer implements LongConsumer { + @Override + public void accept(long v) { } + } + + static final class TruePredicate implements Predicate { + @Override + public boolean test(Object o) { + return true; + } + } + + static final class FalsePredicate implements Predicate { + @Override + public boolean test(Object o) { + return false; + } + } + + static final class NullCallable implements Callable { + @Override + public Object call() { + return null; + } + } + + static final class NaturalObjectComparator implements Comparator { + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public int compare(Object a, Object b) { + return ((Comparable)a).compareTo(b); + } + } + + static final class MaxRequestSubscription implements Consumer { @Override public void accept(Subscription t) throws Exception { t.request(Long.MAX_VALUE); } - }; + } } diff --git a/src/main/java/io/reactivex/internal/functions/ObjectHelper.java b/src/main/java/io/reactivex/internal/functions/ObjectHelper.java index 2abdb1bc51..b9ce56560e 100644 --- a/src/main/java/io/reactivex/internal/functions/ObjectHelper.java +++ b/src/main/java/io/reactivex/internal/functions/ObjectHelper.java @@ -80,12 +80,7 @@ public static int compare(long v1, long v2) { return v1 < v2 ? -1 : (v1 > v2 ? 1 : 0); } - static final BiPredicate EQUALS = new BiPredicate() { - @Override - public boolean test(Object o1, Object o2) { - return ObjectHelper.equals(o1, o2); - } - }; + static final BiPredicate EQUALS = new BiObjectPredicate(); /** * Returns a BiPredicate that compares its parameters via Objects.equals(). @@ -127,4 +122,10 @@ public static long verifyPositive(long value, String paramName) { return value; } + static final class BiObjectPredicate implements BiPredicate { + @Override + public boolean test(Object o1, Object o2) { + return ObjectHelper.equals(o1, o2); + } + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java index 6ce46c44f7..dd27bbd778 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ExecutorScheduler.java @@ -80,12 +80,7 @@ public Disposable scheduleDirect(@NonNull Runnable run, final long delay, final final DelayedRunnable dr = new DelayedRunnable(decoratedRun); - Disposable delayed = HELPER.scheduleDirect(new Runnable() { - @Override - public void run() { - dr.direct.replace(scheduleDirect(dr)); - } - }, delay, unit); + Disposable delayed = HELPER.scheduleDirect(new DelayedDispose(dr), delay, unit); dr.timed.replace(delayed); @@ -167,12 +162,7 @@ public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); - ScheduledRunnable sr = new ScheduledRunnable(new Runnable() { - @Override - public void run() { - mar.replace(schedule(decoratedRun)); - } - }, tasks); + ScheduledRunnable sr = new ScheduledRunnable(new SequentialDispose(mar, decoratedRun), tasks); tasks.add(sr); if (executor instanceof ScheduledExecutorService) { @@ -278,6 +268,20 @@ public boolean isDisposed() { } } + final class SequentialDispose implements Runnable { + private final SequentialDisposable mar; + private final Runnable decoratedRun; + + SequentialDispose(SequentialDisposable mar, Runnable decoratedRun) { + this.mar = mar; + this.decoratedRun = decoratedRun; + } + + @Override + public void run() { + mar.replace(schedule(decoratedRun)); + } + } } static final class DelayedRunnable extends AtomicReference implements Runnable, Disposable { @@ -322,4 +326,16 @@ public void dispose() { } } + final class DelayedDispose implements Runnable { + private final DelayedRunnable dr; + + DelayedDispose(DelayedRunnable dr) { + this.dr = dr; + } + + @Override + public void run() { + dr.direct.replace(scheduleDirect(dr)); + } + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java index 129687657d..54e8a537cf 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java @@ -65,23 +65,7 @@ public static void start() { ScheduledExecutorService next = Executors.newScheduledThreadPool(1, new RxThreadFactory("RxSchedulerPurge")); if (PURGE_THREAD.compareAndSet(curr, next)) { - next.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - try { - for (ScheduledThreadPoolExecutor e : new ArrayList(POOLS.keySet())) { - if (e.isShutdown()) { - POOLS.remove(e); - } else { - e.purge(); - } - } - } catch (Throwable e) { - // Exceptions.throwIfFatal(e); nowhere to go - RxJavaPlugins.onError(e); - } - } - }, PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS); + next.scheduleAtFixedRate(new ScheduledTask(), PURGE_PERIOD_SECONDS, PURGE_PERIOD_SECONDS, TimeUnit.SECONDS); return; } else { @@ -131,4 +115,22 @@ public static ScheduledExecutorService create(ThreadFactory factory) { } return exec; } + + static final class ScheduledTask implements Runnable { + @Override + public void run() { + try { + for (ScheduledThreadPoolExecutor e : new ArrayList(POOLS.keySet())) { + if (e.isShutdown()) { + POOLS.remove(e); + } else { + e.purge(); + } + } + } catch (Throwable e) { + // Exceptions.throwIfFatal(e); nowhere to go + RxJavaPlugins.onError(e); + } + } + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java index 826e3c7d9f..bdcc2cb160 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerWhen.java @@ -138,56 +138,10 @@ public Worker createWorker() { // the subscribe to off the workerQueue. final FlowableProcessor actionProcessor = UnicastProcessor.create().toSerialized(); // convert the work of scheduling all the actions into a completable - Flowable actions = actionProcessor.map(new Function() { - @Override - public Completable apply(final ScheduledAction action) { - return new Completable() { - @Override - protected void subscribeActual(CompletableObserver actionCompletable) { - actionCompletable.onSubscribe(action); - action.call(actualWorker, actionCompletable); - } - }; - } - }); + Flowable actions = actionProcessor.map(new CreateWorkerFunction(actualWorker)); // a worker that queues the action to the actionQueue subject. - Worker worker = new Worker() { - private final AtomicBoolean unsubscribed = new AtomicBoolean(); - - @Override - public void dispose() { - // complete the actionQueue when worker is unsubscribed to make - // room for the next worker in the workerQueue. - if (unsubscribed.compareAndSet(false, true)) { - actionProcessor.onComplete(); - actualWorker.dispose(); - } - } - - @Override - public boolean isDisposed() { - return unsubscribed.get(); - } - - @NonNull - @Override - public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit unit) { - // send a scheduled action to the actionQueue - DelayedAction delayedAction = new DelayedAction(action, delayTime, unit); - actionProcessor.onNext(delayedAction); - return delayedAction; - } - - @NonNull - @Override - public Disposable schedule(@NonNull final Runnable action) { - // send a scheduled action to the actionQueue - ImmediateAction immediateAction = new ImmediateAction(action); - actionProcessor.onNext(immediateAction); - return immediateAction; - } - }; + Worker worker = new QueueWorker(actionProcessor, actualWorker); // enqueue the completable that process actions put in reply subject workerProcessor.onNext(actions); @@ -196,16 +150,7 @@ public Disposable schedule(@NonNull final Runnable action) { return worker; } - static final Disposable SUBSCRIBED = new Disposable() { - @Override - public void dispose() { - } - - @Override - public boolean isDisposed() { - return false; - } - }; + static final Disposable SUBSCRIBED = new SubscribedDisposable(); static final Disposable DISPOSED = Disposables.disposed(); @@ -317,4 +262,87 @@ public void run() { } } } + + static final class CreateWorkerFunction implements Function { + final Worker actualWorker; + + CreateWorkerFunction(Worker actualWorker) { + this.actualWorker = actualWorker; + } + + @Override + public Completable apply(final ScheduledAction action) { + return new WorkerCompletable(action); + } + + final class WorkerCompletable extends Completable { + final ScheduledAction action; + + WorkerCompletable(ScheduledAction action) { + this.action = action; + } + + @Override + protected void subscribeActual(CompletableObserver actionCompletable) { + actionCompletable.onSubscribe(action); + action.call(actualWorker, actionCompletable); + } + } + } + + static final class QueueWorker extends Worker { + private final AtomicBoolean unsubscribed; + private final FlowableProcessor actionProcessor; + private final Worker actualWorker; + + QueueWorker(FlowableProcessor actionProcessor, Worker actualWorker) { + this.actionProcessor = actionProcessor; + this.actualWorker = actualWorker; + unsubscribed = new AtomicBoolean(); + } + + @Override + public void dispose() { + // complete the actionQueue when worker is unsubscribed to make + // room for the next worker in the workerQueue. + if (unsubscribed.compareAndSet(false, true)) { + actionProcessor.onComplete(); + actualWorker.dispose(); + } + } + + @Override + public boolean isDisposed() { + return unsubscribed.get(); + } + + @NonNull + @Override + public Disposable schedule(@NonNull final Runnable action, final long delayTime, @NonNull final TimeUnit unit) { + // send a scheduled action to the actionQueue + DelayedAction delayedAction = new DelayedAction(action, delayTime, unit); + actionProcessor.onNext(delayedAction); + return delayedAction; + } + + @NonNull + @Override + public Disposable schedule(@NonNull final Runnable action) { + // send a scheduled action to the actionQueue + ImmediateAction immediateAction = new ImmediateAction(action); + actionProcessor.onNext(immediateAction); + return immediateAction; + } + } + + static final class SubscribedDisposable implements Disposable { + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + } } diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index c63fdf2dac..25f164081e 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -117,13 +117,7 @@ Disposable enqueue(Runnable action, long execTime) { return EmptyDisposable.INSTANCE; } else { // queue wasn't empty, a parent is already processing so we just add to the end of the queue - return Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - timedRunnable.disposed = true; - queue.remove(timedRunnable); - } - }); + return Disposables.fromRunnable(new AppendToQueueTask(timedRunnable)); } } @@ -136,6 +130,20 @@ public void dispose() { public boolean isDisposed() { return disposed; } + + final class AppendToQueueTask implements Runnable { + final TimedRunnable timedRunnable; + + AppendToQueueTask(TimedRunnable timedRunnable) { + this.timedRunnable = timedRunnable; + } + + @Override + public void run() { + timedRunnable.disposed = true; + queue.remove(timedRunnable); + } + } } static final class TimedRunnable implements Comparable { diff --git a/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java b/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java index 22edc8ac61..283ad2d2a9 100644 --- a/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java +++ b/src/main/java/io/reactivex/internal/subscriptions/FullArbiter.java @@ -36,16 +36,7 @@ public final class FullArbiter extends FullArbiterPad2 implements Subscriptio long requested; volatile Subscription s; - static final Subscription INITIAL = new Subscription() { - @Override - public void request(long n) { - // deliberately no op - } - @Override - public void cancel() { - // deliberately no op - } - }; + static final Subscription INITIAL = new InitialSubscription(); Disposable resource; @@ -198,6 +189,18 @@ void drain() { } } } + + static final class InitialSubscription implements Subscription { + @Override + public void request(long n) { + // deliberately no op + } + + @Override + public void cancel() { + // deliberately no op + } + } } /** Pads the object header away. */ diff --git a/src/main/java/io/reactivex/internal/util/ExceptionHelper.java b/src/main/java/io/reactivex/internal/util/ExceptionHelper.java index 27b697123b..8cce972a65 100644 --- a/src/main/java/io/reactivex/internal/util/ExceptionHelper.java +++ b/src/main/java/io/reactivex/internal/util/ExceptionHelper.java @@ -49,15 +49,7 @@ public static RuntimeException wrapOrThrow(Throwable error) { * A singleton instance of a Throwable indicating a terminal state for exceptions, * don't leak this. */ - public static final Throwable TERMINATED = new Throwable("No further exceptions") { - - private static final long serialVersionUID = -4649703670690200604L; - - @Override - public Throwable fillInStackTrace() { - return this; - } - }; + public static final Throwable TERMINATED = new Termination(); public static boolean addThrowable(AtomicReference field, Throwable exception) { for (;;) { @@ -113,4 +105,18 @@ public static List flatten(Throwable t) { return list; } + + final static class Termination extends Throwable { + + private static final long serialVersionUID = -4649703670690200604L; + + Termination() { + super("No further exceptions"); + } + + @Override + public Throwable fillInStackTrace() { + return this; + } + } } diff --git a/src/main/java/io/reactivex/schedulers/Schedulers.java b/src/main/java/io/reactivex/schedulers/Schedulers.java index 967acce36d..7f621c6071 100644 --- a/src/main/java/io/reactivex/schedulers/Schedulers.java +++ b/src/main/java/io/reactivex/schedulers/Schedulers.java @@ -67,35 +67,15 @@ static final class NewThreadHolder { } static { - SINGLE = RxJavaPlugins.initSingleScheduler(new Callable() { - @Override - public Scheduler call() throws Exception { - return SingleHolder.DEFAULT; - } - }); - - COMPUTATION = RxJavaPlugins.initComputationScheduler(new Callable() { - @Override - public Scheduler call() throws Exception { - return ComputationHolder.DEFAULT; - } - }); - - IO = RxJavaPlugins.initIoScheduler(new Callable() { - @Override - public Scheduler call() throws Exception { - return IoHolder.DEFAULT; - } - }); + SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask()); + + COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask()); + + IO = RxJavaPlugins.initIoScheduler(new IOTask()); TRAMPOLINE = TrampolineScheduler.instance(); - NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new Callable() { - @Override - public Scheduler call() throws Exception { - return NewThreadHolder.DEFAULT; - } - }); + NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask()); } /** Utility class. */ @@ -215,4 +195,32 @@ public static void start() { trampoline().start(); SchedulerPoolFactory.start(); } + + static final class IOTask implements Callable { + @Override + public Scheduler call() throws Exception { + return IoHolder.DEFAULT; + } + } + + static final class NewThreadTask implements Callable { + @Override + public Scheduler call() throws Exception { + return NewThreadHolder.DEFAULT; + } + } + + static final class SingleTask implements Callable { + @Override + public Scheduler call() throws Exception { + return SingleHolder.DEFAULT; + } + } + + static final class ComputationTask implements Callable { + @Override + public Scheduler call() throws Exception { + return ComputationHolder.DEFAULT; + } + } } diff --git a/src/main/java/io/reactivex/schedulers/TestScheduler.java b/src/main/java/io/reactivex/schedulers/TestScheduler.java index d8700e4770..929fa22432 100644 --- a/src/main/java/io/reactivex/schedulers/TestScheduler.java +++ b/src/main/java/io/reactivex/schedulers/TestScheduler.java @@ -148,12 +148,7 @@ public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeU final TimedRunnable timedAction = new TimedRunnable(this, time + unit.toNanos(delayTime), run, counter++); queue.add(timedAction); - return Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - queue.remove(timedAction); - } - }); + return Disposables.fromRunnable(new QueueRemove(timedAction)); } @NonNull @@ -164,12 +159,7 @@ public Disposable schedule(@NonNull Runnable run) { } final TimedRunnable timedAction = new TimedRunnable(this, 0, run, counter++); queue.add(timedAction); - return Disposables.fromRunnable(new Runnable() { - @Override - public void run() { - queue.remove(timedAction); - } - }); + return Disposables.fromRunnable(new QueueRemove(timedAction)); } @Override @@ -177,5 +167,17 @@ public long now(@NonNull TimeUnit unit) { return TestScheduler.this.now(unit); } + final class QueueRemove implements Runnable { + final TimedRunnable timedAction; + + QueueRemove(TimedRunnable timedAction) { + this.timedAction = timedAction; + } + + @Override + public void run() { + queue.remove(timedAction); + } + } } }