diff --git a/src/main/java/rx/internal/schedulers/NewThreadWorker.java b/src/main/java/rx/internal/schedulers/NewThreadWorker.java index 094c94892f1..65e56bc8e8b 100644 --- a/src/main/java/rx/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/rx/internal/schedulers/NewThreadWorker.java @@ -27,6 +27,8 @@ import rx.plugins.*; import rx.subscriptions.*; +import static rx.internal.util.PlatformDependent.ANDROID_API_VERSION_IS_NOT_ANDROID; + /** * @warn class description missing */ @@ -39,8 +41,7 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription { /** Force the use of purge (true/false). */ private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force"; private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-"; - /** Forces the use of purge even if setRemoveOnCancelPolicy is available. */ - private static final boolean PURGE_FORCE; + private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY; /** The purge frequency in milliseconds. */ public static final int PURGE_FREQUENCY; private static final ConcurrentHashMap EXECUTORS; @@ -48,8 +49,17 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription { static { EXECUTORS = new ConcurrentHashMap(); PURGE = new AtomicReference(); - PURGE_FORCE = Boolean.getBoolean(PURGE_FORCE_KEY); PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000); + + // Forces the use of purge even if setRemoveOnCancelPolicy is available + final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY); + + final int androidApiVersion = PlatformDependent.getAndroidApiVersion(); + + // According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boolean) + // setRemoveOnCancelPolicy available since Android API 21 + SHOULD_TRY_ENABLE_CANCEL_POLICY = !purgeForce + && (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >= 21); } /** * Registers the given executor service and starts the purge thread if not already started. @@ -102,32 +112,77 @@ static void purgeExecutors() { RxJavaPlugins.getInstance().getErrorHandler().handleError(t); } } - - /** + + /** + * Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}. + * Also, it works even for inheritance: {@link Method} of base class can be invoked on instance of child class. + */ + private static volatile Object cachedSetRemoveOnCancelPolicyMethod; + + private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object(); + + /** * Tries to enable the Java 7+ setRemoveOnCancelPolicy. *

{@code public} visibility reason: called from other package(s) within RxJava. * If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may * be called to enable the backup option of purging the executors. - * @param exec the executor to call setRemoveOnCaneclPolicy if available. + * @param executor the executor to call setRemoveOnCaneclPolicy if available. * @return true if the policy was successfully enabled */ - public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) { - if (!PURGE_FORCE) { - for (Method m : exec.getClass().getMethods()) { - if (m.getName().equals("setRemoveOnCancelPolicy") - && m.getParameterTypes().length == 1 - && m.getParameterTypes()[0] == Boolean.TYPE) { - try { - m.invoke(exec, true); - return true; - } catch (Exception ex) { - RxJavaPlugins.getInstance().getErrorHandler().handleError(ex); - } + public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) { + if (SHOULD_TRY_ENABLE_CANCEL_POLICY) { + final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor; + + final Method methodToCall; + + if (isInstanceOfScheduledThreadPoolExecutor) { + final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod; + + if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) { + return false; + } + + if (localSetRemoveOnCancelPolicyMethod == null) { + Method method = findSetRemoveOnCancelPolicyMethod(executor); + + cachedSetRemoveOnCancelPolicyMethod = method != null + ? method + : SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED; + + methodToCall = method; + } else { + methodToCall = (Method) localSetRemoveOnCancelPolicyMethod; + } + } else { + methodToCall = findSetRemoveOnCancelPolicyMethod(executor); + } + + if (methodToCall != null) { + try { + methodToCall.invoke(executor, true); + return true; + } catch (Exception e) { + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); } } } + return false; } + + static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) { + for (final Method method : executor.getClass().getMethods()) { + if (method.getName().equals("setRemoveOnCancelPolicy")) { + final Class[] parameterTypes = method.getParameterTypes(); + + if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) { + return method; + } + } + } + + return null; + } /* package */ public NewThreadWorker(ThreadFactory threadFactory) { diff --git a/src/main/java/rx/internal/util/PlatformDependent.java b/src/main/java/rx/internal/util/PlatformDependent.java index c6b7f3c28f9..628d9704802 100644 --- a/src/main/java/rx/internal/util/PlatformDependent.java +++ b/src/main/java/rx/internal/util/PlatformDependent.java @@ -20,31 +20,57 @@ /** * Allow platform dependent logic such as checks for Android. - * + * * Modeled after Netty with some code copy/pasted from: https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/PlatformDependent.java */ public final class PlatformDependent { - private static final boolean IS_ANDROID = isAndroid0(); + /** + * Constant value of Android API Version which means that current platform is not Android. + * + * @see #getAndroidApiVersion() + */ + public static final int ANDROID_API_VERSION_IS_NOT_ANDROID = 0; + + private static final int ANDROID_API_VERSION = resolveAndroidApiVersion(); + + private static final boolean IS_ANDROID = ANDROID_API_VERSION != ANDROID_API_VERSION_IS_NOT_ANDROID; /** - * Returns {@code true} if and only if the current platform is Android + * Returns {@code true} if and only if the current platform is Android. */ public static boolean isAndroid() { return IS_ANDROID; } - private static boolean isAndroid0() { - boolean android; + /** + * Returns version of Android API. + * + * @return version of Android API or {@link #ANDROID_API_VERSION_IS_NOT_ANDROID } if version + * can not be resolved or if current platform is not Android. + */ + public static int getAndroidApiVersion() { + return ANDROID_API_VERSION; + } + + /** + * Resolves version of Android API. + * + * @return version of Android API or {@code null} if version can not be resolved. + * @see Documentation + * @see #ANDROID_API_VERSION_IS_NOT_ANDROID + */ + private static int resolveAndroidApiVersion() { try { - Class.forName("android.app.Application", false, getSystemClassLoader()); - android = true; + return (Integer) Class + .forName("android.os.Build$VERSION", true, getSystemClassLoader()) + .getField("SDK_INT") + .get(null); } catch (Exception e) { - // Failed to load the class uniquely available in Android. - android = false; + // Can not resolve version of Android API, maybe current platform is not Android + // or API of resolving current Version of Android API has changed in some release of Android + return ANDROID_API_VERSION_IS_NOT_ANDROID; } - - return android; } /** diff --git a/src/test/java/rx/internal/schedulers/NewThreadWorkerTest.java b/src/test/java/rx/internal/schedulers/NewThreadWorkerTest.java new file mode 100644 index 00000000000..b2047426ea1 --- /dev/null +++ b/src/test/java/rx/internal/schedulers/NewThreadWorkerTest.java @@ -0,0 +1,118 @@ +package rx.internal.schedulers; + +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.lang.reflect.Modifier.FINAL; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +public class NewThreadWorkerTest { + + @Test + public void findSetRemoveOnCancelPolicyMethodShouldFindMethod() { + ScheduledExecutorService executor = spy(new ScheduledThreadPoolExecutor(1)); + Method setRemoveOnCancelPolicyMethod = NewThreadWorker.findSetRemoveOnCancelPolicyMethod(executor); + + assertNotNull(setRemoveOnCancelPolicyMethod); + assertEquals("setRemoveOnCancelPolicy", setRemoveOnCancelPolicyMethod.getName()); + assertEquals(1, setRemoveOnCancelPolicyMethod.getParameterTypes().length); + assertEquals(Boolean.TYPE, setRemoveOnCancelPolicyMethod.getParameterTypes()[0]); + verifyZeroInteractions(executor); + } + + @Test + public void findSetRemoveOnCancelPolicyMethodShouldNotFindMethod() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + Method setRemoveOnCancelPolicyMethod = NewThreadWorker.findSetRemoveOnCancelPolicyMethod(executor); + assertNull(setRemoveOnCancelPolicyMethod); + verifyZeroInteractions(executor); + } + + @Test + public void tryEnableCancelPolicyShouldInvokeMethodOnExecutor() { + final AtomicBoolean setRemoveOnCancelPolicyWasInvoked = new AtomicBoolean(); + + ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1) { + // @Override is not used here, to prevent compilation error on JDK 6 + public void setRemoveOnCancelPolicy(boolean value) { + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + + // Check to ensure that this method was called from NewThreadWorker, + // on some versions of JDK ScheduledThreadPoolExecutor may call this method itself and break the test. + for (StackTraceElement stackTraceElement : stackTraceElements) { + if (stackTraceElement.getClassName().equals(NewThreadWorker.class.getCanonicalName())) { + setRemoveOnCancelPolicyWasInvoked.set(true); + break; + } + } + } + }; + + boolean result = NewThreadWorker.tryEnableCancelPolicy(executor); + + assertTrue(result); + assertTrue(setRemoveOnCancelPolicyWasInvoked.get()); + } + + @Test + public void tryEnableCancelPolicyShouldNotInvokeMethodOnExecutor() { + ScheduledExecutorService executor = mock(ScheduledExecutorService.class); + + boolean result = NewThreadWorker.tryEnableCancelPolicy(executor); + + assertFalse(result); + verifyZeroInteractions(executor); + } + + @Test + public void tryEnableCancelPolicyShouldBeSkipped() throws NoSuchFieldException, IllegalAccessException { + Field shouldTryEnableCancelPolicyField = NewThreadWorker + .class + .getDeclaredField("SHOULD_TRY_ENABLE_CANCEL_POLICY"); + + shouldTryEnableCancelPolicyField.setAccessible(true); + + Field modifiersFieldOfField = Field + .class + .getDeclaredField("modifiers"); + + modifiersFieldOfField.setAccessible(true); + + // Removing final flag from the field + modifiersFieldOfField.set( + shouldTryEnableCancelPolicyField, + shouldTryEnableCancelPolicyField.getModifiers() & ~FINAL + ); + + final boolean initialValue = shouldTryEnableCancelPolicyField.getBoolean(null); + + try { + // Setting FALSE to the SHOULD_TRY_ENABLE_CANCEL_POLICY + shouldTryEnableCancelPolicyField.setBoolean(null, false); + + // Notice, that this executor has "setRemoveOnCancelPolicy" method + ScheduledThreadPoolExecutor executor = spy(new ScheduledThreadPoolExecutor(1)); + + boolean result = NewThreadWorker.tryEnableCancelPolicy(executor); + assertFalse(result); + + verifyZeroInteractions(executor); + } finally { + // Reverting value back for other tests + shouldTryEnableCancelPolicyField.setBoolean(null, initialValue); + + // Restoring "final" modifier, probably it'll make JIT on VM that runs Tests happier + modifiersFieldOfField.set( + shouldTryEnableCancelPolicyField, + shouldTryEnableCancelPolicyField.getModifiers() & FINAL + ); + } + } +}