Skip to content

Commit

Permalink
minor code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
brettwooldridge committed Nov 29, 2024
1 parent 1164493 commit 9fa66d1
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions src/main/java/com/zaxxer/hikari/util/ConcurrentBag.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ public class ConcurrentBag<T extends IConcurrentBagEntry> implements AutoCloseab
private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentBag.class);

private final CopyOnWriteArrayList<T> sharedList;
private final boolean weakThreadLocals;
private final boolean useWeakThreadLocals;

private final ThreadLocal<List<Object>> threadList;
private final ThreadLocal<List<Object>> threadLocalList;
private final IBagStateListener listener;
private final AtomicInteger waiters;
private volatile boolean closed;
Expand Down Expand Up @@ -95,17 +95,14 @@ public interface IBagStateListener
public ConcurrentBag(final IBagStateListener listener)
{
this.listener = listener;
this.weakThreadLocals = useWeakThreadLocals();
this.useWeakThreadLocals = useWeakThreadLocals();

this.handoffQueue = new SynchronousQueue<>(true);
this.waiters = new AtomicInteger();
this.sharedList = new CopyOnWriteArrayList<>();
if (weakThreadLocals) {
this.threadList = ThreadLocal.withInitial(() -> new ArrayList<>(16));
}
else {
this.threadList = ThreadLocal.withInitial(() -> new FastList<>(IConcurrentBagEntry.class, 16));
}
this.threadLocalList = ThreadLocal.withInitial(() ->
useWeakThreadLocals ? new ArrayList<>(16) : new FastList<>(IConcurrentBagEntry.class, 16)
);
}

/**
Expand All @@ -120,18 +117,18 @@ public ConcurrentBag(final IBagStateListener listener)
public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
final var list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final var list = threadLocalList.get();
for (var i = list.size() - 1; i >= 0; i--) {
final var entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
final T bagEntry = useWeakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}

// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
final var waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
Expand Down Expand Up @@ -188,9 +185,9 @@ else if ((i & 0xff) == 0xff) {
}
}

final var threadLocalList = threadList.get();
if (threadLocalList.size() < 50) {
threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
final var threadLocalEntries = this.threadLocalList.get();
if (threadLocalEntries.size() < 16) {
threadLocalEntries.add(useWeakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
}
}

Expand Down Expand Up @@ -230,12 +227,12 @@ public boolean remove(final T bagEntry)
return false;
}

final boolean removed = sharedList.remove(bagEntry);
final var removed = sharedList.remove(bagEntry);
if (!removed && !closed) {
LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
}

threadList.get().remove(bagEntry);
threadLocalList.get().remove(bagEntry);

return removed;
}
Expand Down Expand Up @@ -307,7 +304,7 @@ public void unreserve(final T bagEntry)
{
if (bagEntry.compareAndSet(STATE_RESERVED, STATE_NOT_IN_USE)) {
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
while (waiters.get() > 0 && bagEntry.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(bagEntry)) {
Thread.yield();
}
}
Expand Down

0 comments on commit 9fa66d1

Please sign in to comment.