Skip to content

Commit

Permalink
Improve performance of NewThreadWorker.tryEnableCancelPolicy().
Browse files Browse the repository at this point in the history
Disable search for ScheduledExecutorService.setRemoveOnCancelPolicy() on Android API < 21
  • Loading branch information
artem-zinnatullin committed Jul 30, 2015
1 parent 3494c00 commit 81b1951
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 29 deletions.
91 changes: 73 additions & 18 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import rx.plugins.*;
import rx.subscriptions.*;

import static rx.internal.util.PlatformDependent.ANDROID_API_VERSION_IS_NOT_ANDROID;

/**
* @warn class description missing
*/
Expand All @@ -39,17 +41,25 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription {
/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "rx.scheduler.jdk6.purge-force";
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
/** Forces the use of purge even if setRemoveOnCancelPolicy is available. */
private static final boolean PURGE_FORCE;
private static final boolean SHOULD_TRY_ENABLE_CANCEL_POLICY;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
private static final AtomicReference<ScheduledExecutorService> PURGE;
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
PURGE = new AtomicReference<ScheduledExecutorService>();
PURGE_FORCE = Boolean.getBoolean(PURGE_FORCE_KEY);
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);

// Forces the use of purge even if setRemoveOnCancelPolicy is available
final boolean purgeForce = Boolean.getBoolean(PURGE_FORCE_KEY);

final int androidApiVersion = PlatformDependent.getAndroidApiVersion();

// According to http://developer.android.com/reference/java/util/concurrent/ScheduledThreadPoolExecutor.html#setRemoveOnCancelPolicy(boolean)
// setRemoveOnCancelPolicy available since Android API 21
SHOULD_TRY_ENABLE_CANCEL_POLICY = !purgeForce
&& (androidApiVersion == ANDROID_API_VERSION_IS_NOT_ANDROID || androidApiVersion >= 21);
}
/**
* Registers the given executor service and starts the purge thread if not already started.
Expand Down Expand Up @@ -102,32 +112,77 @@ static void purgeExecutors() {
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
}
}

/**

/**
* Improves performance of {@link #tryEnableCancelPolicy(ScheduledExecutorService)}.
* Also, it works even for inheritance: {@link Method} of base class can be invoked on instance of child class.
*/
private static volatile Object cachedSetRemoveOnCancelPolicyMethod;

private static final Object SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED = new Object();

/**
* Tries to enable the Java 7+ setRemoveOnCancelPolicy.
* <p>{@code public} visibility reason: called from other package(s) within RxJava.
* If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
* be called to enable the backup option of purging the executors.
* @param exec the executor to call setRemoveOnCaneclPolicy if available.
* @param executor the executor to call setRemoveOnCaneclPolicy if available.
* @return true if the policy was successfully enabled
*/
public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) {
if (!PURGE_FORCE) {
for (Method m : exec.getClass().getMethods()) {
if (m.getName().equals("setRemoveOnCancelPolicy")
&& m.getParameterTypes().length == 1
&& m.getParameterTypes()[0] == Boolean.TYPE) {
try {
m.invoke(exec, true);
return true;
} catch (Exception ex) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
}
public static boolean tryEnableCancelPolicy(ScheduledExecutorService executor) {
if (SHOULD_TRY_ENABLE_CANCEL_POLICY) {
final boolean isInstanceOfScheduledThreadPoolExecutor = executor instanceof ScheduledThreadPoolExecutor;

final Method methodToCall;

if (isInstanceOfScheduledThreadPoolExecutor) {
final Object localSetRemoveOnCancelPolicyMethod = cachedSetRemoveOnCancelPolicyMethod;

if (localSetRemoveOnCancelPolicyMethod == SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED) {
return false;
}

if (localSetRemoveOnCancelPolicyMethod == null) {
Method method = findSetRemoveOnCancelPolicyMethod(executor);

cachedSetRemoveOnCancelPolicyMethod = method != null
? method
: SET_REMOVE_ON_CANCEL_POLICY_METHOD_NOT_SUPPORTED;

methodToCall = method;
} else {
methodToCall = (Method) localSetRemoveOnCancelPolicyMethod;
}
} else {
methodToCall = findSetRemoveOnCancelPolicyMethod(executor);
}

if (methodToCall != null) {
try {
methodToCall.invoke(executor, true);
return true;
} catch (Exception e) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
}
}

return false;
}

static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executor) {
for (final Method method : executor.getClass().getMethods()) {
if (method.getName().equals("setRemoveOnCancelPolicy")) {
final Class<?>[] parameterTypes = method.getParameterTypes();

if (parameterTypes.length == 1 && parameterTypes[0] == Boolean.TYPE) {
return method;
}
}
}

return null;
}

/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
Expand Down
48 changes: 37 additions & 11 deletions src/main/java/rx/internal/util/PlatformDependent.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,57 @@

/**
* Allow platform dependent logic such as checks for Android.
*
*
* Modeled after Netty with some code copy/pasted from: https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/PlatformDependent.java
*/
public final class PlatformDependent {

private static final boolean IS_ANDROID = isAndroid0();
/**
* Constant value of Android API Version which means that current platform is not Android.
*
* @see #getAndroidApiVersion()
*/
public static final int ANDROID_API_VERSION_IS_NOT_ANDROID = 0;

private static final int ANDROID_API_VERSION = resolveAndroidApiVersion();

private static final boolean IS_ANDROID = ANDROID_API_VERSION != ANDROID_API_VERSION_IS_NOT_ANDROID;

/**
* Returns {@code true} if and only if the current platform is Android
* Returns {@code true} if and only if the current platform is Android.
*/
public static boolean isAndroid() {
return IS_ANDROID;
}

private static boolean isAndroid0() {
boolean android;
/**
* Returns version of Android API.
*
* @return version of Android API or {@link #ANDROID_API_VERSION_IS_NOT_ANDROID } if version
* can not be resolved or if current platform is not Android.
*/
public static int getAndroidApiVersion() {
return ANDROID_API_VERSION;
}

/**
* Resolves version of Android API.
*
* @return version of Android API or {@code null} if version can not be resolved.
* @see <a href="http://developer.android.com/reference/android/os/Build.VERSION.html#SDK_INT">Documentation</a>
* @see #ANDROID_API_VERSION_IS_NOT_ANDROID
*/
private static int resolveAndroidApiVersion() {
try {
Class.forName("android.app.Application", false, getSystemClassLoader());
android = true;
return (Integer) Class
.forName("android.os.Build$VERSION", true, getSystemClassLoader())
.getField("SDK_INT")
.get(null);
} catch (Exception e) {
// Failed to load the class uniquely available in Android.
android = false;
// Can not resolve version of Android API, maybe current platform is not Android
// or API of resolving current Version of Android API has changed in some release of Android
return ANDROID_API_VERSION_IS_NOT_ANDROID;
}

return android;
}

/**
Expand Down
118 changes: 118 additions & 0 deletions src/test/java/rx/internal/schedulers/NewThreadWorkerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package rx.internal.schedulers;

import org.junit.Test;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.lang.reflect.Modifier.FINAL;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

public class NewThreadWorkerTest {

@Test
public void findSetRemoveOnCancelPolicyMethodShouldFindMethod() {
ScheduledExecutorService executor = spy(new ScheduledThreadPoolExecutor(1));
Method setRemoveOnCancelPolicyMethod = NewThreadWorker.findSetRemoveOnCancelPolicyMethod(executor);

assertNotNull(setRemoveOnCancelPolicyMethod);
assertEquals("setRemoveOnCancelPolicy", setRemoveOnCancelPolicyMethod.getName());
assertEquals(1, setRemoveOnCancelPolicyMethod.getParameterTypes().length);
assertEquals(Boolean.TYPE, setRemoveOnCancelPolicyMethod.getParameterTypes()[0]);
verifyZeroInteractions(executor);
}

@Test
public void findSetRemoveOnCancelPolicyMethodShouldNotFindMethod() {
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

Method setRemoveOnCancelPolicyMethod = NewThreadWorker.findSetRemoveOnCancelPolicyMethod(executor);
assertNull(setRemoveOnCancelPolicyMethod);
verifyZeroInteractions(executor);
}

@Test
public void tryEnableCancelPolicyShouldInvokeMethodOnExecutor() {
final AtomicBoolean setRemoveOnCancelPolicyWasInvoked = new AtomicBoolean();

ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1) {
// @Override is not used here, to prevent compilation error on JDK 6
public void setRemoveOnCancelPolicy(boolean value) {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();

// Check to ensure that this method was called from NewThreadWorker,
// on some versions of JDK ScheduledThreadPoolExecutor may call this method itself and break the test.
for (StackTraceElement stackTraceElement : stackTraceElements) {
if (stackTraceElement.getClassName().equals(NewThreadWorker.class.getCanonicalName())) {
setRemoveOnCancelPolicyWasInvoked.set(true);
break;
}
}
}
};

boolean result = NewThreadWorker.tryEnableCancelPolicy(executor);

assertTrue(result);
assertTrue(setRemoveOnCancelPolicyWasInvoked.get());
}

@Test
public void tryEnableCancelPolicyShouldNotInvokeMethodOnExecutor() {
ScheduledExecutorService executor = mock(ScheduledExecutorService.class);

boolean result = NewThreadWorker.tryEnableCancelPolicy(executor);

assertFalse(result);
verifyZeroInteractions(executor);
}

@Test
public void tryEnableCancelPolicyShouldBeSkipped() throws NoSuchFieldException, IllegalAccessException {
Field shouldTryEnableCancelPolicyField = NewThreadWorker
.class
.getDeclaredField("SHOULD_TRY_ENABLE_CANCEL_POLICY");

shouldTryEnableCancelPolicyField.setAccessible(true);

Field modifiersFieldOfField = Field
.class
.getDeclaredField("modifiers");

modifiersFieldOfField.setAccessible(true);

// Removing final flag from the field
modifiersFieldOfField.set(
shouldTryEnableCancelPolicyField,
shouldTryEnableCancelPolicyField.getModifiers() & ~FINAL
);

final boolean initialValue = shouldTryEnableCancelPolicyField.getBoolean(null);

try {
// Setting FALSE to the SHOULD_TRY_ENABLE_CANCEL_POLICY
shouldTryEnableCancelPolicyField.setBoolean(null, false);

// Notice, that this executor has "setRemoveOnCancelPolicy" method
ScheduledThreadPoolExecutor executor = spy(new ScheduledThreadPoolExecutor(1));

boolean result = NewThreadWorker.tryEnableCancelPolicy(executor);
assertFalse(result);

verifyZeroInteractions(executor);
} finally {
// Reverting value back for other tests
shouldTryEnableCancelPolicyField.setBoolean(null, initialValue);

// Restoring "final" modifier, probably it'll make JIT on VM that runs Tests happier
modifiersFieldOfField.set(
shouldTryEnableCancelPolicyField,
shouldTryEnableCancelPolicyField.getModifiers() & FINAL
);
}
}
}

0 comments on commit 81b1951

Please sign in to comment.