Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lower memory pressure when submitting many tasks with limited queue s… #12934

Merged
merged 6 commits into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public DispatchThreadPool() {

public DispatchThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue<Runnable> queue, final Thread.UncaughtExceptionHandler handler) {
super(PreferencesFactory.get().getInteger("threading.pool.size.max") == size ? new DispatchExecutorService() :
DefaultThreadPool.createExecutor(prefix, size, priority, queue, handler));
DefaultThreadPool.createExecutor(prefix, size, priority, queue, new DefaultThreadPool.CustomCallerPolicy(), handler));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ public DefaultBackgroundExecutor() {
}

public DefaultBackgroundExecutor(final Thread.UncaughtExceptionHandler handler) {
this(ThreadPool.DEFAULT_THREAD_NAME_PREFIX, handler);
this(ThreadPool.DEFAULT_THREAD_NAME_PREFIX, Integer.MAX_VALUE, handler);
}

public DefaultBackgroundExecutor(final String prefix) {
this(prefix, new LoggingUncaughtExceptionHandler());
this(prefix, Integer.MAX_VALUE, new LoggingUncaughtExceptionHandler());
}

public DefaultBackgroundExecutor(final String prefix, final Thread.UncaughtExceptionHandler handler) {
this(ThreadPoolFactory.get(prefix, handler));
public DefaultBackgroundExecutor(final String prefix, final int size, final Thread.UncaughtExceptionHandler handler) {
this(ThreadPoolFactory.get(prefix, size, handler));
}

public DefaultBackgroundExecutor(final ThreadPool concurrentExecutor) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import ch.cyberduck.core.preferences.PreferencesFactory;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -74,18 +78,21 @@ public DefaultThreadPool(final String prefix, final int size, final Thread.Uncau
}

public DefaultThreadPool(final String prefix, final int size, final Priority priority, final Thread.UncaughtExceptionHandler handler) {
super(createExecutor(prefix, size, priority, new LinkedBlockingQueue<>(), handler));
super(createExecutor(prefix, size, priority, new LinkedBlockingQueue<>(size), new CustomCallerPolicy(), handler));
}

public DefaultThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue<Runnable> queue, final Thread.UncaughtExceptionHandler handler) {
super(createExecutor(prefix, size, priority, queue, handler));
public DefaultThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue<Runnable> queue,
final RejectedExecutionHandler policy, final Thread.UncaughtExceptionHandler handler) {
super(createExecutor(prefix, size, priority, queue, policy, handler));
}

public static ThreadPoolExecutor createExecutor(final String prefix, final int size, final Priority priority, final BlockingQueue<Runnable> queue, final Thread.UncaughtExceptionHandler handler) {
public static ThreadPoolExecutor createExecutor(final String prefix, final int size, final Priority priority,
final BlockingQueue<Runnable> queue,
final RejectedExecutionHandler policy,
final Thread.UncaughtExceptionHandler handler) {
return new ThreadPoolExecutor(size, size,
PreferencesFactory.get().getLong("threading.pool.keepalive.seconds"), TimeUnit.SECONDS,
queue,
new NamedThreadFactory(prefix, priority, handler)) {
PreferencesFactory.get().getLong("threading.pool.keepalive.seconds"), TimeUnit.SECONDS,
queue, new NamedThreadFactory(prefix, priority, handler), policy) {
@Override
protected void afterExecute(final Runnable r, final Throwable t) {
if(t != null) {
Expand All @@ -94,4 +101,22 @@ protected void afterExecute(final Runnable r, final Throwable t) {
}
};
}

public static final class CustomCallerPolicy extends ThreadPoolExecutor.AbortPolicy {
private static final Logger log = LogManager.getLogger(CustomCallerPolicy.class);

@Override
public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) {
if(!e.isShutdown()) {
log.warn(String.format("Run %s on caller thread", r));
r.run();
}
else {
log.error(String.format("Rejected execution of %s", r));
// Reject
super.rejectedExecution(r, e);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,18 @@ public ThreadPoolFactory() {
}

/**
* @param prefix Thread name
* @param size Maximum pool size
* @param priority Thread priority
* @param queue Queue with pending tasks
* @param handler Uncaught thread exception handler
* @return Thread pool
*/
protected ThreadPool create(final String prefix, final Integer size, final ThreadPool.Priority priority,
final BlockingQueue<Runnable> queue, final Thread.UncaughtExceptionHandler handler) {
try {
final Constructor<ThreadPool> constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz,
prefix.getClass(), size.getClass(), priority.getClass(), queue.getClass(), handler.getClass());
prefix.getClass(), size.getClass(), priority.getClass(), queue.getClass(), handler.getClass());
if(null == constructor) {
log.warn(String.format("No matching constructor for parameter %s", handler.getClass()));
// Call default constructor for disabled implementations
Expand Down Expand Up @@ -100,7 +103,7 @@ public static ThreadPool get(final String prefix, final int size, final Thread.U
}

public static ThreadPool get(final String prefix, final int size, final ThreadPool.Priority priority, final Thread.UncaughtExceptionHandler handler) {
return get(prefix, size, priority, new LinkedBlockingQueue<>(), handler);
return get(prefix, size, priority, new LinkedBlockingQueue<>(size), handler);
}

public static ThreadPool get(final String prefix, final int size, final ThreadPool.Priority priority, final BlockingQueue<Runnable> queue, final Thread.UncaughtExceptionHandler handler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -76,16 +78,12 @@ public Object call() {

@Test
public void testFifoOrderSingleThread() throws Exception {
final DefaultThreadPool p = new DefaultThreadPool(1);
final List<Future<Integer>> wait = new ArrayList<Future<Integer>>();
final DefaultThreadPool p = new DefaultThreadPool("t", 1, ThreadPool.Priority.norm,
new LinkedBlockingQueue<>(), new ThreadPoolExecutor.AbortPolicy(), new LoggingUncaughtExceptionHandler());
final List<Future<Integer>> wait = new ArrayList<>();
final AtomicInteger counter = new AtomicInteger(0);
for(int i = 0; i < 1000; i++) {
wait.add(p.execute(new Callable<Integer>() {
@Override
public Integer call() {
return counter.incrementAndGet();
}
}));
wait.add(p.execute(counter::incrementAndGet));
}
int i = 1;
for(Future f : wait) {
Expand All @@ -98,15 +96,10 @@ public Integer call() {
@Test
public void testShutdownGracefully() {
final DefaultThreadPool p = new DefaultThreadPool(Integer.MAX_VALUE);
final List<Future<Integer>> wait = new ArrayList<Future<Integer>>();
final List<Future<Integer>> wait = new ArrayList<>();
final AtomicInteger counter = new AtomicInteger(0);
for(int i = 0; i < 1000; i++) {
wait.add(p.execute(new Callable<Integer>() {
@Override
public Integer call() {
return counter.incrementAndGet();
}
}));
wait.add(p.execute(counter::incrementAndGet));
}
p.shutdown(true);
assertEquals(1000, counter.get());
Expand Down