diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableTimeout.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableTimeout.java index fcfe2731d1..9788e1cf1c 100644 --- a/src/main/java/io/reactivex/internal/operators/completable/CompletableTimeout.java +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableTimeout.java @@ -57,7 +57,7 @@ static final class TimeOutObserver implements CompletableObserver { private final AtomicBoolean once; private final CompletableObserver s; - public TimeOutObserver(CompositeDisposable set, AtomicBoolean once, CompletableObserver s) { + TimeOutObserver(CompositeDisposable set, AtomicBoolean once, CompletableObserver s) { this.set = set; this.once = once; this.s = s; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java index 69325cc015..0d0a37db3e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDelaySubscriptionOther.java @@ -87,7 +87,7 @@ public void onComplete() { final class DelaySubscription implements Subscription { private final Subscription s; - public DelaySubscription(Subscription s) { + DelaySubscription(Subscription s) { this.s = s; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java index 200b1e042f..4127d9b9a5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java @@ -241,7 +241,7 @@ public static Function>, Publisher implements Callable> { private final Flowable parent; - public ReplayCallable(Flowable parent) { + ReplayCallable(Flowable parent) { this.parent = parent; } @@ -310,7 +310,7 @@ static final class ReplayFunction implements Function, Publish private final Function, ? extends Publisher> selector; private final Scheduler scheduler; - public ReplayFunction(Function, ? extends Publisher> selector, Scheduler scheduler) { + ReplayFunction(Function, ? extends Publisher> selector, Scheduler scheduler) { this.selector = selector; this.scheduler = scheduler; } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 73e4819e6e..e5b7c09cc8 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -833,7 +833,7 @@ static final class SubjectWork { final class Completion implements Runnable { private final UnicastProcessor processor; - public Completion(UnicastProcessor processor) { + Completion(UnicastProcessor processor) { this.processor = processor; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java index 066d8cef87..adfbbd4b3f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDelay.java @@ -118,7 +118,7 @@ public void run() { final class OnError implements Runnable { private final Throwable throwable; - public OnError(Throwable throwable) { + OnError(Throwable throwable) { this.throwable = throwable; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java index 135d8bfb80..02b6c9d845 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java @@ -341,7 +341,7 @@ static final class BufferedReplayCallable implements Callable parent; private final int bufferSize; - public BufferedReplayCallable(Observable parent, int bufferSize) { + BufferedReplayCallable(Observable parent, int bufferSize) { this.parent = parent; this.bufferSize = bufferSize; } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java b/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java index e78a19ae4c..7387fbaacd 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDelay.java @@ -82,7 +82,7 @@ public void run() { final class OnError implements Runnable { private final Throwable e; - public OnError(Throwable e) { + OnError(Throwable e) { this.e = e; } diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java b/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java index 36777ec3f6..0ed22f4c1b 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleEquals.java @@ -39,45 +39,54 @@ protected void subscribeActual(final SingleObserver s) { final CompositeDisposable set = new CompositeDisposable(); s.onSubscribe(set); - class InnerObserver implements SingleObserver { - final int index; - InnerObserver(int index) { - this.index = index; - } - @Override - public void onSubscribe(Disposable d) { - set.add(d); - } + first.subscribe(new InnerObserver(0, set, values, s, count)); + second.subscribe(new InnerObserver(1, set, values, s, count)); + } + + static class InnerObserver implements SingleObserver { + final int index; + final CompositeDisposable set; + final Object[] values; + final SingleObserver s; + final AtomicInteger count; + + InnerObserver(int index, CompositeDisposable set, Object[] values, SingleObserver s, AtomicInteger count) { + this.index = index; + this.set = set; + this.values = values; + this.s = s; + this.count = count; + } + @Override + public void onSubscribe(Disposable d) { + set.add(d); + } - @Override - public void onSuccess(T value) { - values[index] = value; + @Override + public void onSuccess(T value) { + values[index] = value; - if (count.incrementAndGet() == 2) { - s.onSuccess(ObjectHelper.equals(values[0], values[1])); - } + if (count.incrementAndGet() == 2) { + s.onSuccess(ObjectHelper.equals(values[0], values[1])); } + } - @Override - public void onError(Throwable e) { - for (;;) { - int state = count.get(); - if (state >= 2) { - RxJavaPlugins.onError(e); - return; - } - if (count.compareAndSet(state, 2)) { - set.dispose(); - s.onError(e); - return; - } + @Override + public void onError(Throwable e) { + for (;;) { + int state = count.get(); + if (state >= 2) { + RxJavaPlugins.onError(e); + return; + } + if (count.compareAndSet(state, 2)) { + set.dispose(); + s.onError(e); + return; } } - } - first.subscribe(new InnerObserver(0)); - second.subscribe(new InnerObserver(1)); } } diff --git a/src/main/java/io/reactivex/internal/util/ExceptionHelper.java b/src/main/java/io/reactivex/internal/util/ExceptionHelper.java index 8cce972a65..c8b53b0347 100644 --- a/src/main/java/io/reactivex/internal/util/ExceptionHelper.java +++ b/src/main/java/io/reactivex/internal/util/ExceptionHelper.java @@ -106,7 +106,7 @@ public static List flatten(Throwable t) { return list; } - final static class Termination extends Throwable { + static final class Termination extends Throwable { private static final long serialVersionUID = -4649703670690200604L; diff --git a/src/perf/java/io/reactivex/InputWithIncrementingInteger.java b/src/perf/java/io/reactivex/InputWithIncrementingInteger.java index 2b5ebbc720..8a784de957 100644 --- a/src/perf/java/io/reactivex/InputWithIncrementingInteger.java +++ b/src/perf/java/io/reactivex/InputWithIncrementingInteger.java @@ -26,6 +26,73 @@ * Exposes an Observable and Observer that increments n Integers and consumes them in a Blackhole. */ public abstract class InputWithIncrementingInteger { + final class DefaultSubscriberImpl extends DefaultSubscriber { + @Override + public void onComplete() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + bh.consume(t); + } + } + + final class IncrementingIterable implements Iterable { + private final class IncrementingIterator implements Iterator { + int i; + + @Override + public boolean hasNext() { + return i < size; + } + + @Override + public Integer next() { + Blackhole.consumeCPU(10); + return i++; + } + + @Override + public void remove() { + + } + } + + private final int size; + + private IncrementingIterable(int size) { + this.size = size; + } + + @Override + public Iterator iterator() { + return new IncrementingIterator(); + } + } + + final class IncrementingPublisher implements Publisher { + private final int size; + + IncrementingPublisher(int size) { + this.size = size; + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(EmptySubscription.INSTANCE); + for (int i = 0; i < size; i++) { + s.onNext(i); + } + s.onComplete(); + } + } + public Iterable iterable; public Flowable observable; public Flowable firehose; @@ -39,42 +106,8 @@ public void setup(final Blackhole bh) { final int size = getSize(); observable = Flowable.range(0, size); - firehose = Flowable.unsafeCreate(new Publisher() { - - @Override - public void subscribe(Subscriber s) { - s.onSubscribe(EmptySubscription.INSTANCE); - for (int i = 0; i < size; i++) { - s.onNext(i); - } - s.onComplete(); - } - - }); - iterable = new Iterable() { - @Override - public Iterator iterator() { - return new Iterator() { - int i; - - @Override - public boolean hasNext() { - return i < size; - } - - @Override - public Integer next() { - Blackhole.consumeCPU(10); - return i++; - } - - @Override - public void remove() { - - } - }; - } - }; + firehose = Flowable.unsafeCreate(new IncrementingPublisher(size)); + iterable = new IncrementingIterable(size); } @@ -83,24 +116,7 @@ public PerfSubscriber newLatchedObserver() { } public FlowableSubscriber newSubscriber() { - return new DefaultSubscriber() { - - @Override - public void onComplete() { - - } - - @Override - public void onError(Throwable e) { - - } - - @Override - public void onNext(Integer t) { - bh.consume(t); - } - - }; + return new DefaultSubscriberImpl(); } } diff --git a/src/test/java/io/reactivex/NoAnonymousInnerClassesTest.java b/src/test/java/io/reactivex/NoAnonymousInnerClassesTest.java new file mode 100644 index 0000000000..c611000c57 --- /dev/null +++ b/src/test/java/io/reactivex/NoAnonymousInnerClassesTest.java @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.io.File; +import java.net.URL; +import java.util.*; + +import org.junit.Test; + +public class NoAnonymousInnerClassesTest { + + @Test + public void verify() throws Exception { + URL u = NoAnonymousInnerClassesTest.class.getResource("/"); + File f = new File(u.toURI()); + + StringBuilder b = new StringBuilder("Anonymous inner classes found:"); + + Queue queue = new ArrayDeque(); + + queue.offer(f); + + String prefix = f.getAbsolutePath(); + int count = 0; + while (!queue.isEmpty()) { + + f = queue.poll(); + + if (f.isDirectory()) { + File[] dir = f.listFiles(); + if (dir != null && dir.length != 0) { + for (File g : dir) { + queue.offer(g); + } + } + } else { + String name = f.getName(); + if (name.endsWith(".class") && name.contains("$") + && !name.contains("Perf") && !name.contains("Test") + && !name.startsWith("Test")) { + String[] parts = name.split("\\$"); + for (String s : parts) { + if (Character.isDigit(s.charAt(0))) { + String n = f.getAbsolutePath().substring(prefix.length()).replace('\\', '.').replace('/', '.'); + if (n.startsWith(".")) { + n = n.substring(1); + } + b.append("\r\n").append(n); + count++; + break; + } + } + } + } + } + + if (count != 0) { + throw new AssertionError(b.toString()); + } + } +} diff --git a/src/test/java/io/reactivex/Retry.java b/src/test/java/io/reactivex/Retry.java index cbbcf809c2..e1b2f4b064 100644 --- a/src/test/java/io/reactivex/Retry.java +++ b/src/test/java/io/reactivex/Retry.java @@ -23,6 +23,38 @@ */ public class Retry implements TestRule { + final class RetryStatement extends Statement { + private final Statement base; + private final Description description; + + RetryStatement(Statement base, Description description) { + this.base = base; + this.description = description; + } + + @Override + public void evaluate() throws Throwable { + Throwable caughtThrowable = null; + + for (int i = 0; i < retryCount; i++) { + try { + base.evaluate(); + return; + } catch (Throwable t) { + caughtThrowable = t; + System.err.println(description.getDisplayName() + ": run " + (i + 1) + " failed"); + int n = sleep; + if (backoff && i != 0) { + n = n * (2 << i); + } + Thread.sleep(n); + } + } + System.err.println(description.getDisplayName() + ": giving up after " + retryCount + " failures"); + throw caughtThrowable; + } + } + final int retryCount; final int sleep; @@ -41,28 +73,6 @@ public Statement apply(Statement base, Description description) { } private Statement statement(final Statement base, final Description description) { - return new Statement() { - @Override - public void evaluate() throws Throwable { - Throwable caughtThrowable = null; - - for (int i = 0; i < retryCount; i++) { - try { - base.evaluate(); - return; - } catch (Throwable t) { - caughtThrowable = t; - System.err.println(description.getDisplayName() + ": run " + (i + 1) + " failed"); - int n = sleep; - if (backoff && i != 0) { - n = n * (2 << i); - } - Thread.sleep(n); - } - } - System.err.println(description.getDisplayName() + ": giving up after " + retryCount + " failures"); - throw caughtThrowable; - } - }; + return new RetryStatement(base, description); } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/flowable/Burst.java b/src/test/java/io/reactivex/flowable/Burst.java index 3c7937c775..47b8424e0c 100644 --- a/src/test/java/io/reactivex/flowable/Burst.java +++ b/src/test/java/io/reactivex/flowable/Burst.java @@ -33,10 +33,10 @@ */ public final class Burst extends Flowable { - private final List items; - private final Throwable error; + final List items; + final Throwable error; - private Burst(Throwable error, List items) { + Burst(Throwable error, List items) { if (items.isEmpty()) { throw new IllegalArgumentException("items cannot be empty"); } @@ -51,46 +51,7 @@ private Burst(Throwable error, List items) { @Override protected void subscribeActual(final Subscriber subscriber) { - subscriber.onSubscribe(new Subscription() { - - final Queue q = new ConcurrentLinkedQueue(items); - final AtomicLong requested = new AtomicLong(); - volatile boolean cancelled; - - @Override - public void request(long n) { - if (cancelled) { - // required by reactive-streams-jvm 3.6 - return; - } - if (SubscriptionHelper.validate(n)) { - // just for testing, don't care about perf - // so no attempt made to reduce volatile reads - if (BackpressureHelper.add(requested, n) == 0) { - if (q.isEmpty()) { - return; - } - while (!q.isEmpty() && requested.get() > 0) { - T item = q.poll(); - requested.decrementAndGet(); - subscriber.onNext(item); - } - if (q.isEmpty()) { - if (error != null) { - subscriber.onError(error); - } else { - subscriber.onComplete(); - } - } - } - } - } - - @Override - public void cancel() { - cancelled = true; - } - }); + subscriber.onSubscribe(new BurstSubscription(subscriber)); } @@ -103,12 +64,57 @@ public static Builder items(T... items) { return new Builder(Arrays.asList(items)); } + final class BurstSubscription implements Subscription { + private final Subscriber subscriber; + final Queue q = new ConcurrentLinkedQueue(items); + final AtomicLong requested = new AtomicLong(); + volatile boolean cancelled; + + BurstSubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + if (cancelled) { + // required by reactive-streams-jvm 3.6 + return; + } + if (SubscriptionHelper.validate(n)) { + // just for testing, don't care about perf + // so no attempt made to reduce volatile reads + if (BackpressureHelper.add(requested, n) == 0) { + if (q.isEmpty()) { + return; + } + while (!q.isEmpty() && requested.get() > 0) { + T item = q.poll(); + requested.decrementAndGet(); + subscriber.onNext(item); + } + if (q.isEmpty()) { + if (error != null) { + subscriber.onError(error); + } else { + subscriber.onComplete(); + } + } + } + } + } + + @Override + public void cancel() { + cancelled = true; + } + } + public static final class Builder { private final List items; private Throwable error; - private Builder(List items) { + Builder(List items) { this.items = items; } diff --git a/src/test/java/io/reactivex/flowable/FlowableEventStream.java b/src/test/java/io/reactivex/flowable/FlowableEventStream.java index 5fe062ff32..b158d39e8f 100644 --- a/src/test/java/io/reactivex/flowable/FlowableEventStream.java +++ b/src/test/java/io/reactivex/flowable/FlowableEventStream.java @@ -28,19 +28,8 @@ private FlowableEventStream() { } public static Flowable getEventStream(final String type, final int numInstances) { - return Flowable.generate(new Consumer>() { - @Override - public void accept(Emitter s) { - s.onNext(randomEvent(type, numInstances)); - try { - // slow it down somewhat - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - s.onError(e); - } - } - }).subscribeOn(Schedulers.newThread()); + return Flowable.generate(new EventConsumer(type, numInstances)) + .subscribeOn(Schedulers.newThread()); } public static Event randomEvent(String type, int numInstances) { @@ -60,6 +49,28 @@ private static int randomIntFrom0to(int max) { return Math.abs((int) x % max); } + static final class EventConsumer implements Consumer> { + private final String type; + private final int numInstances; + + EventConsumer(String type, int numInstances) { + this.type = type; + this.numInstances = numInstances; + } + + @Override + public void accept(Emitter s) { + s.onNext(randomEvent(type, numInstances)); + try { + // slow it down somewhat + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + s.onError(e); + } + } + } + public static class Event { public final String type; public final String instanceId; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 513d1a9c15..a94152fc03 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -690,7 +690,7 @@ static final class ExceptionData extends Exception { public final Object data; - public ExceptionData(Object data) { + ExceptionData(Object data) { this.data = data; } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/Burst.java b/src/test/java/io/reactivex/internal/operators/observable/Burst.java index 1154282ee7..32a7eb1869 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/Burst.java +++ b/src/test/java/io/reactivex/internal/operators/observable/Burst.java @@ -27,10 +27,10 @@ */ public final class Burst extends Observable { - private final List items; - private final Throwable error; + final List items; + final Throwable error; - private Burst(Throwable error, List items) { + Burst(Throwable error, List items) { this.error = error; this.items = items; } @@ -62,7 +62,7 @@ public static final class Builder { private final List items; private Throwable error; - private Builder(List items) { + Builder(List items) { this.items = items; } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 08dea0e7d1..d30435651c 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -689,7 +689,7 @@ static final class ExceptionData extends Exception { public final Object data; - public ExceptionData(Object data) { + ExceptionData(Object data) { this.data = data; } } diff --git a/src/test/java/io/reactivex/observable/ObservableEventStream.java b/src/test/java/io/reactivex/observable/ObservableEventStream.java index b69bc0c259..b8bef7c54a 100644 --- a/src/test/java/io/reactivex/observable/ObservableEventStream.java +++ b/src/test/java/io/reactivex/observable/ObservableEventStream.java @@ -29,19 +29,7 @@ private ObservableEventStream() { } public static Observable getEventStream(final String type, final int numInstances) { - return Observable.generate(new Consumer>() { - @Override - public void accept(Emitter s) { - s.onNext(randomEvent(type, numInstances)); - try { - // slow it down somewhat - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - s.onError(e); - } - } - }).subscribeOn(Schedulers.newThread()); + return Observable.generate(new EventConsumer(numInstances, type)).subscribeOn(Schedulers.newThread()); } public static Event randomEvent(String type, int numInstances) { @@ -61,6 +49,28 @@ private static int randomIntFrom0to(int max) { return Math.abs((int) x % max); } + static final class EventConsumer implements Consumer> { + private final int numInstances; + private final String type; + + EventConsumer(int numInstances, String type) { + this.numInstances = numInstances; + this.type = type; + } + + @Override + public void accept(Emitter s) { + s.onNext(randomEvent(type, numInstances)); + try { + // slow it down somewhat + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + s.onError(e); + } + } + } + public static class Event { public final String type; public final String instanceId; diff --git a/src/test/java/io/reactivex/observers/ObserverFusion.java b/src/test/java/io/reactivex/observers/ObserverFusion.java index b0061ec944..5e81b02216 100644 --- a/src/test/java/io/reactivex/observers/ObserverFusion.java +++ b/src/test/java/io/reactivex/observers/ObserverFusion.java @@ -42,18 +42,7 @@ public enum ObserverFusion { */ public static Function, TestObserver> test( final int mode, final boolean cancelled) { - return new Function, TestObserver>() { - @Override - public TestObserver apply(Observable t) throws Exception { - TestObserver ts = new TestObserver(); - ts.setInitialFusionMode(mode); - if (cancelled) { - ts.cancel(); - } - t.subscribe(ts); - return ts; - } - }; + return new TestFunctionCallback(mode, cancelled); } /** @@ -74,6 +63,40 @@ public static Consumer> assertFuseable() { return (Consumer)AssertFuseable.INSTANCE; } + static final class AssertFusionConsumer implements Consumer> { + private final int mode; + + AssertFusionConsumer(int mode) { + this.mode = mode; + } + + @Override + public void accept(TestObserver ts) throws Exception { + ts.assertFusionMode(mode); + } + } + + static final class TestFunctionCallback implements Function, TestObserver> { + private final int mode; + private final boolean cancelled; + + TestFunctionCallback(int mode, boolean cancelled) { + this.mode = mode; + this.cancelled = cancelled; + } + + @Override + public TestObserver apply(Observable t) throws Exception { + TestObserver ts = new TestObserver(); + ts.setInitialFusionMode(mode); + if (cancelled) { + ts.cancel(); + } + t.subscribe(ts); + return ts; + } + } + enum AssertFuseable implements Consumer> { INSTANCE; @Override @@ -124,12 +147,7 @@ public void accept(TestObserver ts) throws Exception { * @return the new Consumer instance */ public static Consumer> assertFusionMode(final int mode) { - return new Consumer>() { - @Override - public void accept(TestObserver ts) throws Exception { - ts.assertFusionMode(mode); - } - }; + return new AssertFusionConsumer(mode); } diff --git a/src/test/java/io/reactivex/subscribers/SubscriberFusion.java b/src/test/java/io/reactivex/subscribers/SubscriberFusion.java index 134a1cf8f3..43e15e350b 100644 --- a/src/test/java/io/reactivex/subscribers/SubscriberFusion.java +++ b/src/test/java/io/reactivex/subscribers/SubscriberFusion.java @@ -43,18 +43,7 @@ public enum SubscriberFusion { */ public static Function, TestSubscriber> test( final long initialRequest, final int mode, final boolean cancelled) { - return new Function, TestSubscriber>() { - @Override - public TestSubscriber apply(Flowable t) throws Exception { - TestSubscriber ts = new TestSubscriber(initialRequest); - ts.setInitialFusionMode(mode); - if (cancelled) { - ts.cancel(); - } - t.subscribe(ts); - return ts; - } - }; + return new TestFusionCheckFunction(mode, cancelled, initialRequest); } /** * Returns a Consumer that asserts on its TestSubscriber parameter that @@ -74,6 +63,42 @@ public static Consumer> assertFuseable() { return (Consumer)AssertFuseable.INSTANCE; } + static final class AssertFusionConsumer implements Consumer> { + private final int mode; + + AssertFusionConsumer(int mode) { + this.mode = mode; + } + + @Override + public void accept(TestSubscriber ts) throws Exception { + ts.assertFusionMode(mode); + } + } + + static final class TestFusionCheckFunction implements Function, TestSubscriber> { + private final int mode; + private final boolean cancelled; + private final long initialRequest; + + TestFusionCheckFunction(int mode, boolean cancelled, long initialRequest) { + this.mode = mode; + this.cancelled = cancelled; + this.initialRequest = initialRequest; + } + + @Override + public TestSubscriber apply(Flowable t) throws Exception { + TestSubscriber ts = new TestSubscriber(initialRequest); + ts.setInitialFusionMode(mode); + if (cancelled) { + ts.cancel(); + } + t.subscribe(ts); + return ts; + } + } + enum AssertFuseable implements Consumer> { INSTANCE; @Override @@ -124,12 +149,7 @@ public void accept(TestSubscriber ts) throws Exception { * @return the new Consumer instance */ public static Consumer> assertFusionMode(final int mode) { - return new Consumer>() { - @Override - public void accept(TestSubscriber ts) throws Exception { - ts.assertFusionMode(mode); - } - }; + return new AssertFusionConsumer(mode); } /**