Skip to content

Commit

Permalink
Fixing race condition between end-operation and future completion.
Browse files Browse the repository at this point in the history
  • Loading branch information
sdedic committed Sep 2, 2024
1 parent f59beaf commit 9dcefae
Showing 1 changed file with 99 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -135,6 +134,7 @@ public class ProjectReloadInternal {
* events, state releases and other stuff that should not be processed during project reload.
*/
private final Map<Project, ProjectOperations> pendingOperations = new WeakHashMap<>();
private final Set<ProjectOperations> terminatingOperations = new HashSet<>();

/**
* Identity map for ProjectStateData. Each is assigned a special Object held in the handle
Expand Down Expand Up @@ -532,7 +532,7 @@ public Pair<StateRef, ProjectState> getProjectState0(Project p, Lookup context,
oldS.notify();
}
}
endOperation(p, null);
endOperation(p, null, null);
}
}

Expand Down Expand Up @@ -649,7 +649,7 @@ public CompletableFuture<ProjectState> withProjectState2(StateRef refCurrent, Pr
return reload.clientFuture;
} finally {
LOG.log(Level.FINE, "Failed to load project {0} with request {1}", new Object[] { p, stateRequest });
endOperation(p, null);
endOperation(p, null, null);
}
}

Expand Down Expand Up @@ -743,32 +743,62 @@ public void runProjectAction(Project p, Runnable r) {

public void assertNoOperations() {
synchronized (this) {
if (!pendingOperations.isEmpty()) {
Map<Project, ProjectOperations> ops = new HashMap<>(this.pendingOperations);
ops.values().removeAll(this.terminatingOperations);
if (!ops.isEmpty()) {
System.err.println("Pending operations detected");
for (Map.Entry<Project, ProjectOperations> en : pendingOperations.entrySet()) {
for (Map.Entry<Project, ProjectOperations> en : ops.entrySet()) {
ProjectOperations op = en.getValue();
System.err.println(en.getKey() + ": usage " + op.usage + ", pendingReloads: " + op.pendingReloads.size() + ", actions: " + op.postponedActions.size());
for (Reloader r : op.pendingReloads) {
r.getOriginTrace().printStackTrace();
}
}
}
if (!pendingOperations.isEmpty() || this.loaderProcessors.size() != PROJECT_RELOAD_CONCURRENCY) {
if (!ops.isEmpty() || this.loaderProcessors.size() != PROJECT_RELOAD_CONCURRENCY) {
throw new IllegalStateException();
}
}
}

private Collection<IdentityHolder> collectReleases(ProjectOperations op) {
Collection<IdentityHolder> releases = op.releases;
// this is copied from postCleanup, but can be done in batch without
// checking the project operation is in progress for each reference. These are already removed
// from stateIdentity, so just check they did not obtain another one:
releases.removeIf(expired -> {
ProjectStateData d = expired.state.get();
if (d == null) {
return true;
}
IdentityHolder h = stateIdentity.get(d);
return h != null && h != expired;
});
op.releases = new ArrayList<>();
return releases;
}

private void notityReleased(IdentityHolder h) {
ProjectStateData d = h.state.get();
if (d != null) {
h.impl.projectDataReleased(d);
ReloadSpiAccessor.get().release(d);
}
}

/**
* Ends the project operation. Optionally unregisters a reload. If more reloads are pending,
* it starts another one. It ONLY dispatches one reload per project at a time. Multiple
* reloads (for different projects) may be waiting in the dispatcher's queue, if the
* number of reloads exceeds {@link #PROJECT_RELOAD_CONCURRENCY}.
* <p>
* If `reload` is NOT null, the caller MUST execute the returned Runnable.
*
* @param p the project.
* @param reload if not null, specifies the ending reload.
* @return runnable that sends out events and potentially continues next reload.
*/
private void endOperation(Project p, Reloader reload) {
private void endOperation(Project p, Reloader reload, Runnable futureCompleter) {
Reloader nextReloader;
Collection<IdentityHolder> releases = Collections.emptyList();
Collection<Runnable> postponedActions;
Expand All @@ -779,59 +809,73 @@ private void endOperation(Project p, Reloader reload) {
if (op == null) {
throw new IllegalArgumentException();
}
if (!op.removeReloader(reload)) {
if (reload != null && !op.removeReloader(reload)) {
return;
}
--op.usage;
if (op.usage > 0) {
return;
}

// temporary increment
++op.usage;
postponedActions = op.postponedActions;
op.postponedActions = new ArrayList<>();
nextReloader = op.nextReloader();

if (nextReloader != null) {
// schedule the next reload from the same project.
++op.usage;
op.postponedActions = new ArrayList<>();
// will not be releasing ProjectStateData in op.releases now, since they will only queue up again.
} else {
releases = op.releases;
pendingOperations.remove(p);

// this is copied from postCleanup, but can be done in batch without
// checking the project operation is in progress for each reference. These are already removed
// from stateIdentity, so just check they did not obtain another one:
for (Iterator<IdentityHolder> it = releases.iterator(); it.hasNext(); ) {
IdentityHolder expired = it.next();
ProjectStateData d = expired.state.get();
if (d == null) {
it.remove();
}
IdentityHolder h = stateIdentity.get(d);
if (h != null && h != expired) {
it.remove();
}
}
// do not remove from the pendingOperations YET, we want to capture potential reload requests
// until after the events are fired off, so they do not interleave.
releases = collectReleases(op);
}
terminatingOperations.add(op);
}
LOG.log(Level.FINE, "Project {0}: releasing postponed actions", p);


// note: this will eventually queue the cleanup again, if the project enter locked operation in the meantime.
releases.forEach(h -> {
ProjectStateData d = h.state.get();
if (d != null) {
h.impl.projectDataReleased(d);
ReloadSpiAccessor.get().release(d);
}

});
releases.forEach(this::notityReleased);

if (futureCompleter != null) {
futureCompleter.run();
}

postponedActions.forEach(Runnable::run);

releases = null;
postponedActions = null;

synchronized (this) {
terminatingOperations.remove(op);
if (nextReloader == null) {
nextReloader = op.nextReloader();
// if a reload magically appeared, do NOT decrement the usage, as we didn't go through the usage++ in nextReloader != null above.
if (nextReloader == null) {
if (--op.usage == 0) {
// finally remove, but still must process leftovers again
pendingOperations.remove(p);
releases = op.releases;
postponedActions = op.postponedActions;
}
}
} else {
// decrement the temporary inc
op.usage--;
}
}
if (releases != null) {
releases.forEach(this::notityReleased);
postponedActions.forEach(Runnable::run);
}

if (nextReloader == null) {
return;
}

Reloader fNextReloader = nextReloader;

// start (first or next) project reload
LOG.log(Level.FINE, "Project {0}: starting reload", nextReloader);
dispatcher.post(() -> {
Expand All @@ -843,32 +887,33 @@ private void endOperation(Project p, Reloader reload) {
} catch (InterruptedException ex) {
}
}

RequestProcessor floader = loader;

CompletableFuture<ProjectState> f = CompletableFuture.runAsync(() -> nextReloader.initRound(), loader).
thenCompose((v) -> nextReloader.start(floader));
f.whenComplete((a, b) -> {
CompletableFuture<ProjectState> f = CompletableFuture.runAsync(() -> fNextReloader.initRound(), loader).
thenCompose((v) -> fNextReloader.start(floader));
// run this cleanup in the dispatcher thread
f.whenCompleteAsync((result, err) -> {
LOG.log(Level.FINER, "Return RP to the pool", new Object[] { floader });
loaderProcessors.offer(floader);
try {
LOG.log(Level.FINE, "Load end project {0} with request {1}", new Object[] { nextReloader.project, nextReloader.request });
if (b == null) {
nextReloader.completePending.completeAsync(() -> a, RELOAD_RP);
} else {
RELOAD_RP.post(() -> {
nextReloader.completePending.completeExceptionally(b);
});
}
// this will eventually fire postponed events.
endOperation(nextReloader.project, nextReloader);
LOG.log(Level.FINER, "Return RP to the pool", new Object[] { floader });
LOG.log(Level.FINE, "Load end project {0} with request {1}", new Object[] { fNextReloader.project, fNextReloader.request });
// postpone event delivery so that the events observers see the Future as completed.
endOperation(fNextReloader.project, fNextReloader, () -> {
if (err == null) {
fNextReloader.completePending.completeAsync(() -> result, RELOAD_RP);
} else {
RELOAD_RP.post(() -> {
fNextReloader.completePending.completeExceptionally(err);
});
}
});
} catch (ThreadDeath td) {
throw td;
} catch (Throwable t) {
Exceptions.printStackTrace(t);
} finally {
loaderProcessors.offer(floader);
}
});
}, floader);
});
}

Expand Down

0 comments on commit 9dcefae

Please sign in to comment.