From 7374d21d9915442c0df9acf08b0545874cb7f1d7 Mon Sep 17 00:00:00 2001 From: David Kocher Date: Thu, 3 Mar 2022 13:49:31 +0100 Subject: [PATCH 1/6] Lower memory pressure when submitting many tasks with limited queue size equal to the number of threads in the pool and run the task on the caller thread in case the queue fills up to allow catching up. --- .../cyberduck/core/threading/DefaultThreadPool.java | 11 ++++++----- .../cyberduck/core/threading/ThreadPoolFactory.java | 2 +- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java b/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java index b7f9328caf8..ef568b42574 100644 --- a/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java +++ b/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java @@ -74,18 +74,19 @@ public DefaultThreadPool(final String prefix, final int size, final Thread.Uncau } public DefaultThreadPool(final String prefix, final int size, final Priority priority, final Thread.UncaughtExceptionHandler handler) { - super(createExecutor(prefix, size, priority, new LinkedBlockingQueue<>(), handler)); + super(createExecutor(prefix, size, priority, new LinkedBlockingQueue<>(size), handler)); } public DefaultThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue queue, final Thread.UncaughtExceptionHandler handler) { super(createExecutor(prefix, size, priority, queue, handler)); } - public static ThreadPoolExecutor createExecutor(final String prefix, final int size, final Priority priority, final BlockingQueue queue, final Thread.UncaughtExceptionHandler handler) { + public static ThreadPoolExecutor createExecutor(final String prefix, final int size, final Priority priority, + final BlockingQueue queue, + final Thread.UncaughtExceptionHandler handler) { return new ThreadPoolExecutor(size, size, - PreferencesFactory.get().getLong("threading.pool.keepalive.seconds"), TimeUnit.SECONDS, - queue, - new NamedThreadFactory(prefix, priority, handler)) { + PreferencesFactory.get().getLong("threading.pool.keepalive.seconds"), TimeUnit.SECONDS, + queue, new NamedThreadFactory(prefix, priority, handler), new ThreadPoolExecutor.CallerRunsPolicy()) { @Override protected void afterExecute(final Runnable r, final Throwable t) { if(t != null) { diff --git a/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java b/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java index 7059acdc152..6b9415c0e3b 100644 --- a/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java +++ b/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java @@ -100,7 +100,7 @@ public static ThreadPool get(final String prefix, final int size, final Thread.U } public static ThreadPool get(final String prefix, final int size, final ThreadPool.Priority priority, final Thread.UncaughtExceptionHandler handler) { - return get(prefix, size, priority, new LinkedBlockingQueue<>(), handler); + return get(prefix, size, priority, new LinkedBlockingQueue<>(size), handler); } public static ThreadPool get(final String prefix, final int size, final ThreadPool.Priority priority, final BlockingQueue queue, final Thread.UncaughtExceptionHandler handler) { From 7e7284281b6f0fad5f8f0142f97ee3215b9f287a Mon Sep 17 00:00:00 2001 From: David Kocher Date: Thu, 3 Mar 2022 15:10:18 +0100 Subject: [PATCH 2/6] Do not limit queue size for tasks scheduled from main thread. --- .../core/threading/DefaultBackgroundExecutor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/ch/cyberduck/core/threading/DefaultBackgroundExecutor.java b/core/src/main/java/ch/cyberduck/core/threading/DefaultBackgroundExecutor.java index 25c02dc7318..244800db2b7 100644 --- a/core/src/main/java/ch/cyberduck/core/threading/DefaultBackgroundExecutor.java +++ b/core/src/main/java/ch/cyberduck/core/threading/DefaultBackgroundExecutor.java @@ -41,15 +41,15 @@ public DefaultBackgroundExecutor() { } public DefaultBackgroundExecutor(final Thread.UncaughtExceptionHandler handler) { - this(ThreadPool.DEFAULT_THREAD_NAME_PREFIX, handler); + this(ThreadPool.DEFAULT_THREAD_NAME_PREFIX, Integer.MAX_VALUE, handler); } public DefaultBackgroundExecutor(final String prefix) { - this(prefix, new LoggingUncaughtExceptionHandler()); + this(prefix, Integer.MAX_VALUE, new LoggingUncaughtExceptionHandler()); } - public DefaultBackgroundExecutor(final String prefix, final Thread.UncaughtExceptionHandler handler) { - this(ThreadPoolFactory.get(prefix, handler)); + public DefaultBackgroundExecutor(final String prefix, final int size, final Thread.UncaughtExceptionHandler handler) { + this(ThreadPoolFactory.get(prefix, size, handler)); } public DefaultBackgroundExecutor(final ThreadPool concurrentExecutor) { From cc88deca27ea7e24ae12f7ef3af8001b43d4d060 Mon Sep 17 00:00:00 2001 From: David Kocher Date: Thu, 3 Mar 2022 15:48:52 +0100 Subject: [PATCH 3/6] Fix test. --- .../core/threading/DefaultThreadPoolTest.java | 23 +++++++------------ 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/core/src/test/java/ch/cyberduck/core/threading/DefaultThreadPoolTest.java b/core/src/test/java/ch/cyberduck/core/threading/DefaultThreadPoolTest.java index bc5d394ef3b..e86f5d4ad99 100644 --- a/core/src/test/java/ch/cyberduck/core/threading/DefaultThreadPoolTest.java +++ b/core/src/test/java/ch/cyberduck/core/threading/DefaultThreadPoolTest.java @@ -24,7 +24,9 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.assertEquals; @@ -76,16 +78,12 @@ public Object call() { @Test public void testFifoOrderSingleThread() throws Exception { - final DefaultThreadPool p = new DefaultThreadPool(1); - final List> wait = new ArrayList>(); + final DefaultThreadPool p = new DefaultThreadPool("t", 1, ThreadPool.Priority.norm, + new LinkedBlockingQueue<>(), new ThreadPoolExecutor.AbortPolicy(), new LoggingUncaughtExceptionHandler()); + final List> wait = new ArrayList<>(); final AtomicInteger counter = new AtomicInteger(0); for(int i = 0; i < 1000; i++) { - wait.add(p.execute(new Callable() { - @Override - public Integer call() { - return counter.incrementAndGet(); - } - })); + wait.add(p.execute(counter::incrementAndGet)); } int i = 1; for(Future f : wait) { @@ -98,15 +96,10 @@ public Integer call() { @Test public void testShutdownGracefully() { final DefaultThreadPool p = new DefaultThreadPool(Integer.MAX_VALUE); - final List> wait = new ArrayList>(); + final List> wait = new ArrayList<>(); final AtomicInteger counter = new AtomicInteger(0); for(int i = 0; i < 1000; i++) { - wait.add(p.execute(new Callable() { - @Override - public Integer call() { - return counter.incrementAndGet(); - } - })); + wait.add(p.execute(counter::incrementAndGet)); } p.shutdown(true); assertEquals(1000, counter.get()); From ced265dcea76d9540903156a6c64567f66f60c91 Mon Sep 17 00:00:00 2001 From: David Kocher Date: Thu, 3 Mar 2022 15:52:02 +0100 Subject: [PATCH 4/6] Custom caller policy to reject when queue is full and executor is shut down. --- .../core/threading/DefaultThreadPool.java | 32 ++++++++++++++++--- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java b/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java index ef568b42574..1a5d972b134 100644 --- a/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java +++ b/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java @@ -20,8 +20,12 @@ import ch.cyberduck.core.preferences.PreferencesFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -74,19 +78,21 @@ public DefaultThreadPool(final String prefix, final int size, final Thread.Uncau } public DefaultThreadPool(final String prefix, final int size, final Priority priority, final Thread.UncaughtExceptionHandler handler) { - super(createExecutor(prefix, size, priority, new LinkedBlockingQueue<>(size), handler)); + super(createExecutor(prefix, size, priority, new LinkedBlockingQueue<>(size), new CustomCallerPolicy(), handler)); } - public DefaultThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue queue, final Thread.UncaughtExceptionHandler handler) { - super(createExecutor(prefix, size, priority, queue, handler)); + public DefaultThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue queue, + final RejectedExecutionHandler policy, final Thread.UncaughtExceptionHandler handler) { + super(createExecutor(prefix, size, priority, queue, policy, handler)); } public static ThreadPoolExecutor createExecutor(final String prefix, final int size, final Priority priority, final BlockingQueue queue, + final RejectedExecutionHandler policy, final Thread.UncaughtExceptionHandler handler) { return new ThreadPoolExecutor(size, size, PreferencesFactory.get().getLong("threading.pool.keepalive.seconds"), TimeUnit.SECONDS, - queue, new NamedThreadFactory(prefix, priority, handler), new ThreadPoolExecutor.CallerRunsPolicy()) { + queue, new NamedThreadFactory(prefix, priority, handler), policy) { @Override protected void afterExecute(final Runnable r, final Throwable t) { if(t != null) { @@ -95,4 +101,22 @@ protected void afterExecute(final Runnable r, final Throwable t) { } }; } + + private static final class CustomCallerPolicy extends ThreadPoolExecutor.AbortPolicy { + private static final Logger log = LogManager.getLogger(CustomCallerPolicy.class); + + @Override + public void rejectedExecution(final Runnable r, final ThreadPoolExecutor e) { + if(!e.isShutdown()) { + log.warn(String.format("Run %s on caller thread", r)); + r.run(); + } + else { + log.error(String.format("Rejected execution of %s", r)); + // Reject + super.rejectedExecution(r, e); + } + } + } + } From e09043e59f71978dce156cdae4545746dc3c10ee Mon Sep 17 00:00:00 2001 From: David Kocher Date: Thu, 3 Mar 2022 19:49:49 +0100 Subject: [PATCH 5/6] Add custom caller policy. --- .../java/ch/cyberduck/core/threading/DispatchThreadPool.java | 2 +- .../java/ch/cyberduck/core/threading/DefaultThreadPool.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/dylib/src/main/java/ch/cyberduck/core/threading/DispatchThreadPool.java b/core/dylib/src/main/java/ch/cyberduck/core/threading/DispatchThreadPool.java index 192c6e11033..e69a260d4cc 100644 --- a/core/dylib/src/main/java/ch/cyberduck/core/threading/DispatchThreadPool.java +++ b/core/dylib/src/main/java/ch/cyberduck/core/threading/DispatchThreadPool.java @@ -27,6 +27,6 @@ public DispatchThreadPool() { public DispatchThreadPool(final String prefix, final int size, final Priority priority, final BlockingQueue queue, final Thread.UncaughtExceptionHandler handler) { super(PreferencesFactory.get().getInteger("threading.pool.size.max") == size ? new DispatchExecutorService() : - DefaultThreadPool.createExecutor(prefix, size, priority, queue, handler)); + DefaultThreadPool.createExecutor(prefix, size, priority, queue, new DefaultThreadPool.CustomCallerPolicy(), handler)); } } diff --git a/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java b/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java index 1a5d972b134..4d4532a63d0 100644 --- a/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java +++ b/core/src/main/java/ch/cyberduck/core/threading/DefaultThreadPool.java @@ -102,7 +102,7 @@ protected void afterExecute(final Runnable r, final Throwable t) { }; } - private static final class CustomCallerPolicy extends ThreadPoolExecutor.AbortPolicy { + public static final class CustomCallerPolicy extends ThreadPoolExecutor.AbortPolicy { private static final Logger log = LogManager.getLogger(CustomCallerPolicy.class); @Override From e6112a3ecd8e2a4c46142c0759ca2274781319c2 Mon Sep 17 00:00:00 2001 From: David Kocher Date: Thu, 3 Mar 2022 15:10:22 +0100 Subject: [PATCH 6/6] Javadoc. --- .../java/ch/cyberduck/core/threading/ThreadPoolFactory.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java b/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java index 6b9415c0e3b..6fd43da8ecd 100644 --- a/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java +++ b/core/src/main/java/ch/cyberduck/core/threading/ThreadPoolFactory.java @@ -38,15 +38,18 @@ public ThreadPoolFactory() { } /** + * @param prefix Thread name * @param size Maximum pool size * @param priority Thread priority + * @param queue Queue with pending tasks * @param handler Uncaught thread exception handler + * @return Thread pool */ protected ThreadPool create(final String prefix, final Integer size, final ThreadPool.Priority priority, final BlockingQueue queue, final Thread.UncaughtExceptionHandler handler) { try { final Constructor constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, - prefix.getClass(), size.getClass(), priority.getClass(), queue.getClass(), handler.getClass()); + prefix.getClass(), size.getClass(), priority.getClass(), queue.getClass(), handler.getClass()); if(null == constructor) { log.warn(String.format("No matching constructor for parameter %s", handler.getClass())); // Call default constructor for disabled implementations