Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ScheduledExecutorService: call purge periodically on JDK 6 to avoid #2465

Merged
merged 1 commit into from
Jan 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 105 additions & 13 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
package rx.internal.schedulers;

import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import rx.*;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.internal.util.RxThreadFactory;
import rx.plugins.*;
import rx.subscriptions.Subscriptions;

Expand All @@ -30,24 +34,111 @@ public class NewThreadWorker extends Scheduler.Worker implements Subscription {
private final ScheduledExecutorService executor;
private final RxJavaSchedulersHook schedulersHook;
volatile boolean isUnsubscribed;

/** The purge frequency in milliseconds. */
private static final String FREQUENCY_KEY = "io.reactivex.rxjava.scheduler.jdk6.purge-frequency-millis";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These don't match the naming convention using in RxRingBuffer with just the rx prefix: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/util/RxRingBuffer.java#L267

We should probably stick with that convention since it is already set, so:

rx.scheduler.jdk6.purge-frequency-millis
rx.scheduler.jdk6.purge-force

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to fix it in master but then it conflicts with #2579, which by itself also adds parameters with unconventional naming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we fix this then go work on #2579 which still needs to be reviewed and can be rebased?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hystrix could use the enhancements in #2579 and since it affects the same file, it is much easier to change the values there in a plain commit than rebasing.

/** Force the use of purge (true/false). */
private static final String PURGE_FORCE_KEY = "io.reactivex.rxjava.scheduler.jdk6.purge-force";
private static final String PURGE_THREAD_PREFIX = "RxSchedulerPurge-";
/** Forces the use of purge even if setRemoveOnCancelPolicy is available. */
private static final boolean PURGE_FORCE;
/** The purge frequency in milliseconds. */
public static final int PURGE_FREQUENCY;
private static final ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor> EXECUTORS;
private static final AtomicReference<ScheduledExecutorService> PURGE;
static {
EXECUTORS = new ConcurrentHashMap<ScheduledThreadPoolExecutor, ScheduledThreadPoolExecutor>();
PURGE = new AtomicReference<ScheduledExecutorService>();
PURGE_FORCE = Boolean.getBoolean(PURGE_FORCE_KEY);
PURGE_FREQUENCY = Integer.getInteger(FREQUENCY_KEY, 1000);
}
/**
* Registers the given executor service and starts the purge thread if not already started.
* <p>{@code public} visibility reason: called from other package(s) within RxJava
* @param service a scheduled thread pool executor instance
*/
public static void registerExecutor(ScheduledThreadPoolExecutor service) {
do {
ScheduledExecutorService exec = PURGE.get();
if (exec != null) {
break;
}
exec = Executors.newScheduledThreadPool(1, new RxThreadFactory(PURGE_THREAD_PREFIX));
if (PURGE.compareAndSet(null, exec)) {
exec.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
purgeExecutors();
}
}, PURGE_FREQUENCY, PURGE_FREQUENCY, TimeUnit.MILLISECONDS);

break;
}
} while (true);

EXECUTORS.putIfAbsent(service, service);
}
/**
* Deregisters the executor service.
* <p>{@code public} visibility reason: called from other package(s) within RxJava
* @param service a scheduled thread pool executor instance
*/
public static void deregisterExecutor(ScheduledExecutorService service) {
EXECUTORS.remove(service);
}
/** Purges each registered executor and eagerly evicts shutdown executors. */
static void purgeExecutors() {
try {
Iterator<ScheduledThreadPoolExecutor> it = EXECUTORS.keySet().iterator();
while (it.hasNext()) {
ScheduledThreadPoolExecutor exec = it.next();
if (!exec.isShutdown()) {
exec.purge();
} else {
it.remove();
}
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
}
}

/**
* Tries to enable the Java 7+ setRemoveOnCancelPolicy.
* <p>{@code public} visibility reason: called from other package(s) within RxJava.
* If the method returns false, the {@link #registerExecutor(ScheduledThreadPoolExecutor)} may
* be called to enable the backup option of purging the executors.
* @param exec the executor to call setRemoveOnCaneclPolicy if available.
* @return true if the policy was successfully enabled
*/
public static boolean tryEnableCancelPolicy(ScheduledExecutorService exec) {
if (!PURGE_FORCE) {
for (Method m : exec.getClass().getMethods()) {
if (m.getName().equals("setRemoveOnCancelPolicy")
&& m.getParameterTypes().length == 1
&& m.getParameterTypes()[0] == Boolean.TYPE) {
try {
m.invoke(exec, true);
return true;
} catch (Exception ex) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
}
}
}
}
return false;
}

/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
executor = Executors.newScheduledThreadPool(1, threadFactory);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
for (Method m : executor.getClass().getMethods()) {
if (m.getName().equals("setRemoveOnCancelPolicy")
&& m.getParameterTypes().length == 1
&& m.getParameterTypes()[0] == Boolean.TYPE) {
try {
m.invoke(executor, true);
} catch (Exception ex) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(ex);
}
break;
}
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
}

@Override
Expand Down Expand Up @@ -88,6 +179,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
public void unsubscribe() {
isUnsubscribed = true;
executor.shutdownNow();
deregisterExecutor(executor);
}

@Override
Expand Down
1 change: 1 addition & 0 deletions src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public Worker createWorker() {
private static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final ThreadWorker threadWorker;
@SuppressWarnings("unused")
volatile int once;
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/rx/schedulers/GenericScheduledExecutorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
package rx.schedulers;

import rx.Scheduler;
import rx.internal.schedulers.NewThreadWorker;
import rx.internal.util.RxThreadFactory;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.*;

/**
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
Expand Down Expand Up @@ -49,7 +47,13 @@ private GenericScheduledExecutorService() {
if (count > 8) {
count = 8;
}
executor = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
if (exec instanceof ScheduledThreadPoolExecutor) {
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
}
}
executor = exec;
}

/**
Expand Down
91 changes: 43 additions & 48 deletions src/test/java/rx/schedulers/CachedThreadSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.Observable;
import rx.Scheduler;
import rx.functions.*;
import rx.internal.schedulers.NewThreadWorker;
import static org.junit.Assert.assertTrue;

public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
Expand Down Expand Up @@ -73,55 +74,49 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru

@Test(timeout = 30000)
public void testCancelledTaskRetention() throws InterruptedException {
try {
ScheduledThreadPoolExecutor.class.getMethod("setRemoveOnCancelPolicy", Boolean.TYPE);

System.out.println("Wait before GC");
Thread.sleep(1000);

System.out.println("GC");
System.gc();

Thread.sleep(1000);


MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

Scheduler.Worker w = Schedulers.io().createWorker();
for (int i = 0; i < 750000; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
System.out.println("Wait before GC");
Thread.sleep(1000);

System.out.println("GC");
System.gc();

Thread.sleep(1000);


MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
long initial = memHeap.getUsed();

System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);

Scheduler.Worker w = Schedulers.io().createWorker();
for (int i = 0; i < 750000; i++) {
if (i % 50000 == 0) {
System.out.println(" -> still scheduling: " + i);
}

memHeap = memoryMXBean.getHeapMemoryUsage();
long after = memHeap.getUsed();
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);

w.unsubscribe();

System.out.println("Wait before second GC");
Thread.sleep(1000);

System.out.println("Second GC");
System.gc();

Thread.sleep(1000);

memHeap = memoryMXBean.getHeapMemoryUsage();
long finish = memHeap.getUsed();
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);

if (finish > initial * 5) {
Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
}
} catch (NoSuchMethodException ex) {
// not supported, no reason to test for it
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
}

memHeap = memoryMXBean.getHeapMemoryUsage();
long after = memHeap.getUsed();
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);

w.unsubscribe();

System.out.println("Wait before second GC");
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);

System.out.println("Second GC");
System.gc();

Thread.sleep(1000);

memHeap = memoryMXBean.getHeapMemoryUsage();
long finish = memHeap.getUsed();
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);

if (finish > initial * 5) {
Assert.fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
}
}

Expand Down