Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into 6171-py-TableService
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind committed Oct 24, 2024
2 parents bea6155 + 306153a commit ab74e91
Show file tree
Hide file tree
Showing 128 changed files with 8,385 additions and 2,853 deletions.
2 changes: 2 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
open-pull-requests-limit: 99
schedule:
interval: "weekly"
commit-message:
Expand All @@ -15,6 +16,7 @@ updates:
- "stanbrub"
- package-ecosystem: "gradle"
directory: "/"
open-pull-requests-limit: 99
schedule:
interval: "weekly"
commit-message:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/cla.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
steps:
- name: "CLA Assistant"
if: (github.event.comment.body == 'recheck' || github.event.comment.body == 'I have read the CLA Document and I hereby sign the CLA') || github.event_name == 'pull_request_target'
uses: cla-assistant/github-action@v2.5.1
uses: cla-assistant/github-action@v2.6.1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PERSONAL_ACCESS_TOKEN : ${{ secrets.CLA_PERSONAL_ACCESS_TOKEN }}
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/nightly-check-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ on:
# 2AM EST == 6AM UTC
- cron: '0 6 * * *'
push:
branches: [ 'nightly/**', 'release/v*' ]
branches: [ 'nightly/**', 'release/v*', 'dependabot/**' ]

jobs:
nightly:
Expand Down Expand Up @@ -90,7 +90,7 @@ jobs:

- name: Publish Test Results
uses: scacap/action-surefire-report@v1
if: ${{ github.repository_owner == 'deephaven' }}
if: ${{ github.repository_owner == 'deephaven' && github.ref == 'refs/heads/main' }}
env:
NODE_OPTIONS: '--max_old_space_size=4096'
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static void main(String[] args) throws ClassNotFoundException, IOExceptio
excludes.add("description");
excludes.add("firstValidDate");
excludes.add("lastValidDate");
excludes.add("clearCache");

StaticCalendarMethodsGenerator gen =
new StaticCalendarMethodsGenerator(gradleTask, packageName, className, imports,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.deephaven.base.reference.WeakReferenceWrapper;
import io.deephaven.base.verify.Assert;
import io.deephaven.base.verify.Require;
import org.apache.commons.lang3.function.TriConsumer;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand Down Expand Up @@ -219,6 +220,36 @@ public final <NOTIFICATION_TYPE> boolean deliverNotification(
return initialSize > 0 && size == 0;
}

/**
* Dispatch a notification to all subscribers. Clean up any GC'd subscriptions.
*
* @param procedure The notification procedure to invoke
* @param firstNotification The first item to deliver
* @param secondNotification The second item to deliver (must be of the same type as {@code firstNotification})
* @param activeOnly Whether to restrict this notification to active subscriptions only
* @return Whether this operation caused the set to become <b>empty</b>
*/
public final <NOTIFICATION_TYPE> boolean deliverNotification(
@NotNull final TriConsumer<LISTENER_TYPE, NOTIFICATION_TYPE, NOTIFICATION_TYPE> procedure,
@Nullable final NOTIFICATION_TYPE firstNotification,
@Nullable final NOTIFICATION_TYPE secondNotification,
final boolean activeOnly) {
final int initialSize = size;
for (int si = 0; si < size;) {
final Entry currentEntry = subscriptions[si];
final LISTENER_TYPE currentListener = currentEntry.getListener();
if (currentListener == null) {
removeAt(si);
continue; // si is not incremented in this case - we'll reconsider the same slot if necessary.
}
if (!activeOnly || currentEntry.isActive()) {
procedure.accept(currentListener, firstNotification, secondNotification);
}
++si;
}
return initialSize > 0 && size == 0;
}

private void removeAt(final int subscriptionIndex) {
final int lastSubscriptionIndex = --size;
subscriptions[subscriptionIndex] = subscriptions[lastSubscriptionIndex];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,21 @@
import io.deephaven.base.reference.CleanupReference;
import io.deephaven.base.verify.Require;
import io.deephaven.configuration.Configuration;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.Utils;
import io.deephaven.util.annotations.TestUseOnly;
import io.deephaven.internal.log.LoggerFactory;
import org.jetbrains.annotations.NotNull;

import java.lang.ref.PhantomReference;
import java.lang.ref.Reference;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.SoftReference;
import java.lang.ref.WeakReference;
import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
* Utility for draining a reference queue of {@link CleanupReference}s and invoking their cleanup methods.
Expand Down Expand Up @@ -57,9 +64,9 @@ public interface ExceptionHandler {
private final ExceptionHandler exceptionHandler;

/**
* The reference queue from the most recent initialization.
* The drain queue from the most recent initialization.
*/
private volatile ReferenceQueue<?> referenceQueue;
private volatile DrainQueue drainQueue;

/**
* The cleaner thread from the most recent initialization, guarded by the lock on {@code this}.
Expand Down Expand Up @@ -98,29 +105,113 @@ public CleanupReferenceProcessor(
* {@link CleanupReferenceProcessor} instance
*/
public <RT> ReferenceQueue<RT> getReferenceQueue() {
ReferenceQueue localQueue;
if ((localQueue = referenceQueue) == null) {
return getDrainQueue().referenceQueue();
}

private DrainQueue getDrainQueue() {
DrainQueue localQueue;
if ((localQueue = drainQueue) == null) {
synchronized (this) {
if ((localQueue = referenceQueue) == null) {
referenceQueue = localQueue = new ReferenceQueue<>();
cleanerThread = new Thread(new DrainQueue(localQueue),
if ((localQueue = drainQueue) == null) {
drainQueue = localQueue = new DrainQueue(new ReferenceQueue<>());
cleanerThread = new Thread(localQueue,
"CleanupReferenceProcessor-" + name + "-drainingThread");
cleanerThread.setDaemon(true);
cleanerThread.start();
}
}
}
// noinspection unchecked
return localQueue;
}

/**
* Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes phantom
* reachable.
*
* <p>
* The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the
* {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when
* {@code referent} has become phantom reachable. The {@code action} will not be invoked more than once.
*
* <p>
* The cleaning {@code action} must <b>not</b> refer to the {@code referent} being registered. If so, the
* {@code referent} will never become phantom reachable and the cleaning {@code action} will never be invoked
* automatically.
*
* <p>
* Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit
* {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference
* processor will hold onto the reference.
*
* @param referent the object to monitor
* @param action a {@code Runnable} to invoke when the referent becomes phantom reachable
* @return a cleanup reference instance
*/
public <T> CleanupReference<T> registerPhantom(T referent, Runnable action) {
return getDrainQueue().registerPhantom(referent, action);
}

/**
* Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes weakly
* reachable.
*
* <p>
* The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the
* {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when
* {@code referent} has become weakly reachable. The {@code action} will not be invoked more than once.
*
* <p>
* The cleaning {@code action} must <b>not</b> refer to the {@code referent} being registered. If so, the
* {@code referent} will never become weakly reachable and the cleaning {@code action} will never be invoked
* automatically.
*
* <p>
* Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit
* {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference
* processor will hold onto the reference.
*
* @param referent the object to monitor
* @param action a {@code Runnable} to invoke when the referent becomes weakly reachable
* @return a cleanup reference instance
*/
public <T> CleanupReference<T> registerWeak(T referent, Runnable action) {
return getDrainQueue().registerWeak(referent, action);
}

/**
* Registers a {@code referent} and a cleaning {@code action} to run when the {@code referent} becomes softly
* reachable.
*
* <p>
* The most efficient use is to explicitly invoke the {@link CleanupReference#cleanup() cleanup} method when the
* {@code referent} is closed or no longer needed. Otherwise, the cleaning {@code action} will be invoked when
* {@code referent} has become softly reachable. The {@code action} will not be invoked more than once.
*
* <p>
* The cleaning {@code action} must <b>not</b> refer to the {@code referent} being registered. If so, the
* {@code referent} will never become softly reachable and the cleaning {@code action} will never be invoked
* automatically.
*
* <p>
* Note: while the caller is encouraged to hold onto the cleanup reference to allow for explicit
* {@link CleanupReference#cleanup() cleanup} invocation, they are not required to as this cleanup reference
* processor will hold onto the reference.
*
* @param referent the object to monitor
* @param action a {@code Runnable} to invoke when the referent becomes softly reachable
* @return a cleanup reference instance
*/
public <T> CleanupReference<T> registerSoft(T referent, Runnable action) {
return getDrainQueue().registerSoft(referent, action);
}

/**
* Reset this instance so that the next call to {@link #getReferenceQueue()} will re-initialize it and provide a new
* queue. Results in the prompt termination of the daemon thread that may have been draining the existing queue.
*/
@TestUseOnly
public final synchronized void resetForUnitTests() {
referenceQueue = null;
drainQueue = null;
if (cleanerThread != null) {
cleanerThread.interrupt();
cleanerThread = null;
Expand All @@ -132,33 +223,139 @@ public final synchronized void resetForUnitTests() {
*/
private class DrainQueue implements Runnable {

private final ReferenceQueue<?> localQueue;
private final ReferenceQueue<?> referenceQueue;
private final Set<RegisteredCleanupReference<?>> registrations;

private DrainQueue(ReferenceQueue<?> localQueue) {
this.localQueue = localQueue;
private DrainQueue(ReferenceQueue<?> referenceQueue) {
this.referenceQueue = Objects.requireNonNull(referenceQueue);
this.registrations = Collections.newSetFromMap(new ConcurrentHashMap<>());
}

public <T> ReferenceQueue<T> referenceQueue() {
// noinspection unchecked
return (ReferenceQueue<T>) referenceQueue;
}

public <T> CleanupReference<T> registerPhantom(T referent, Runnable action) {
final PhantomCleanupRef<T> ref = new PhantomCleanupRef<>(referent, referenceQueue(), action);
registrations.add(ref);
return ref;
}

public <T> CleanupReference<T> registerWeak(T referent, Runnable action) {
final WeakCleanupRef<T> ref = new WeakCleanupRef<>(referent, referenceQueue(), action);
registrations.add(ref);
return ref;
}

public <T> CleanupReference<T> registerSoft(T referent, Runnable action) {
final SoftCleanupRef<T> ref = new SoftCleanupRef<>(referent, referenceQueue(), action);
registrations.add(ref);
return ref;
}

@Override
public void run() {
while (localQueue == referenceQueue) {
while (this == drainQueue) {
final Reference<?> reference;
try {
reference = localQueue.remove(shutdownCheckDelayMillis);
reference = referenceQueue.remove(shutdownCheckDelayMillis);
} catch (InterruptedException ignored) {
continue;
}
if (reference instanceof CleanupReference) {
final CleanupReference<?> ref = (CleanupReference<?>) reference;
try {
if (LOG_CLEANED_REFERENCES) {
log.info().append("CleanupReferenceProcessor-").append(name).append(", cleaning ")
.append(Utils.REFERENT_FORMATTER, reference).endl();
}
((CleanupReference<?>) reference).cleanup();
ref.cleanup();
} catch (Exception e) {
exceptionHandler.accept(log, (CleanupReference<?>) reference, e);
exceptionHandler.accept(log, ref, e);
} finally {
if (ref instanceof RegisteredCleanupReference) {
registrations.remove(ref);
}
}
}
}
}
}

interface RegisteredCleanupReference<T> extends CleanupReference<T> {

}

private static class PhantomCleanupRef<T> extends PhantomReference<T> implements RegisteredCleanupReference<T> {
private Runnable action;

PhantomCleanupRef(T referent, ReferenceQueue<? super T> q, Runnable action) {
super(referent, q);
this.action = Objects.requireNonNull(action);
Reference.reachabilityFence(referent);
Reference.reachabilityFence(q);
}

@Override
public void cleanup() {
final Runnable cleanup;
synchronized (this) {
if (action == null) {
return;
}
cleanup = action;
action = null;
}
cleanup.run();
}
}

private static class WeakCleanupRef<T> extends WeakReference<T> implements RegisteredCleanupReference<T> {
private Runnable action;

WeakCleanupRef(T referent, ReferenceQueue<? super T> q, Runnable action) {
super(referent, q);
this.action = Objects.requireNonNull(action);
Reference.reachabilityFence(referent);
Reference.reachabilityFence(q);
}

@Override
public void cleanup() {
final Runnable cleanup;
synchronized (this) {
if (action == null) {
return;
}
cleanup = action;
action = null;
}
cleanup.run();
}
}

private static class SoftCleanupRef<T> extends SoftReference<T> implements RegisteredCleanupReference<T> {
private Runnable action;

SoftCleanupRef(T referent, ReferenceQueue<? super T> q, Runnable action) {
super(referent, q);
this.action = Objects.requireNonNull(action);
Reference.reachabilityFence(referent);
Reference.reachabilityFence(q);
}

@Override
public void cleanup() {
final Runnable cleanup;
synchronized (this) {
if (action == null) {
return;
}
cleanup = action;
action = null;
}
cleanup.run();
}
}
}
Loading

0 comments on commit ab74e91

Please sign in to comment.