Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ForkJoinPool and provide builder API to limit threads count. #1092

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion objectbox-java/src/main/java/io/objectbox/BoxStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -227,7 +228,7 @@ public static boolean isSyncServerAvailable() {
private final int[] allEntityTypeIds;
private final Map<Class<?>, Box<?>> boxes = new ConcurrentHashMap<>();
private final Set<Transaction> transactions = Collections.newSetFromMap(new WeakHashMap<>());
private final ExecutorService threadPool = new ObjectBoxThreadPool(this);
private final ExecutorService threadPool;
private final ObjectClassPublisher objectClassPublisher;
final boolean debugTxRead;
final boolean debugTxWrite;
Expand Down Expand Up @@ -257,6 +258,8 @@ public static boolean isSyncServerAvailable() {
private SyncClient syncClient;

BoxStore(BoxStoreBuilder builder) {
threadPool = Executors.unconfigurableExecutorService(
new ObjectBoxThreadPool(this, builder.executorServiceParallelism));
context = builder.context;
relinker = builder.relinker;
NativeLibraryLoader.ensureLoaded();
Expand Down
12 changes: 12 additions & 0 deletions objectbox-java/src/main/java/io/objectbox/BoxStoreBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class BoxStoreBuilder {
int maxReaders;
boolean noReaderThreadLocals;

int executorServiceParallelism = ForkJoinPool.getCommonPoolParallelism();

int queryAttempts;

/** For DebugCursor. */
Expand Down Expand Up @@ -319,6 +322,15 @@ public BoxStoreBuilder noReaderThreadLocals() {
return this;
}

/**
* Sets the maximum allowed level of parallelism allowed by executor service
* used by BoxStore. The default value is equal to {@ref ForkJoinPool#getCommonPoolParallelism())}
*/
public BoxStoreBuilder executorServiceParallelism(int parallelism) {
this.executorServiceParallelism = parallelism;
return this;
}

@Internal
public void entity(EntityInfo<?> entityInfo) {
entityInfoList.add(entityInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,84 @@
package io.objectbox.internal;

import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.objectbox.BoxStore;
import io.objectbox.annotation.apihint.Internal;

/**
* Custom thread pool similar to {@link Executors#newCachedThreadPool()} with the following adjustments:
* Custom executor service similar to {@link Executors#newWorkStealingPool()} with the following adjustments:
* <ul>
* <li>Release thread local resources ({@link BoxStore#closeThreadResources()})</li>
* <li>Reduce keep-alive time for threads to 20 seconds</li>
* <li>Uses a ThreadFactory to name threads like "ObjectBox-1-Thread-1"</li>
* <li>Release thread local resources ({@link BoxStore#closeThreadResources()}) after task execution</li>
* <li>Uses a custom thread factory to name threads like "ObjectBox-ForkJoinPool-1-Thread-1"</li>
* </ul>
*
*/
@Internal
public class ObjectBoxThreadPool extends ThreadPoolExecutor {
public final class ObjectBoxThreadPool extends AbstractExecutorService {
private final BoxStore boxStore;
private final ExecutorService executorImpl;

public ObjectBoxThreadPool(BoxStore boxStore) {
super(0, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ObjectBoxThreadFactory());
public ObjectBoxThreadPool(BoxStore boxStore, int parallelism) {
this.boxStore = boxStore;
this.executorImpl = Executors.unconfigurableExecutorService(
new ForkJoinPool(
parallelism,
pool -> {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
// Priority and daemon status are inherited from calling thread; ensure to reset if required
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
if (thread.isDaemon()) {
thread.setDaemon(false);
}
thread.setName("ObjectBox-" + thread.getName());
return thread;
},
null,
false));
}


@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
boxStore.closeThreadResources();
public void shutdown() {
executorImpl.shutdown();
}

static class ObjectBoxThreadFactory implements ThreadFactory {
private static final AtomicInteger POOL_COUNT = new AtomicInteger();
@Override
public List<Runnable> shutdownNow() {
return executorImpl.shutdownNow();
}

private final ThreadGroup group;
private final String namePrefix = "ObjectBox-" + POOL_COUNT.incrementAndGet() + "-Thread-";
private final AtomicInteger threadCount = new AtomicInteger();
@Override
public boolean isShutdown() {
return executorImpl.isShutdown();
}

ObjectBoxThreadFactory() {
SecurityManager securityManager = System.getSecurityManager();
group = (securityManager != null) ? securityManager.getThreadGroup() :
Thread.currentThread().getThreadGroup();
}
@Override
public boolean isTerminated() {
return executorImpl.isTerminated();
}

public Thread newThread(Runnable runnable) {
String name = namePrefix + threadCount.incrementAndGet();
Thread thread = new Thread(group, runnable, name);
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorImpl.awaitTermination(timeout, unit);
}

// Priority and daemon status are inherited from calling thread; ensure to reset if required
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
if (thread.isDaemon()) {
thread.setDaemon(false);
@Override
public void execute(Runnable command) {
executorImpl.execute(() -> {
try {
command.run();
} finally {
boxStore.closeThreadResources();
}
return thread;
}
});
}
}