Skip to content

Commit

Permalink
[pinpoint-apm#11050] Replace StopFlag with CompletableFuture.cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
emeroad committed Jun 4, 2024
1 parent e335631 commit 9e497c8
Showing 1 changed file with 34 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
Expand Down Expand Up @@ -64,16 +64,17 @@ public void appendServerInfo(final Range range, final NodeList source, final Lin
if (CollectionUtils.isEmpty(nodes)) {
return;
}
final AtomicBoolean stopSign = new AtomicBoolean();
final CompletableFuture[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap, stopSign);

final CompletableFuture<ServerGroupList>[] futures = getServerGroupListFutures(range, nodes, linkDataDuplexMap);
CompletableFuture.allOf(futures).cancel(true);
if (-1 == timeoutMillis) {
// Returns the result value when complete
CompletableFuture.allOf(futures).join();
} else {
try {
CompletableFuture.allOf(futures).get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) { // InterruptedException, ExecutionException, TimeoutException
stopSign.set(Boolean.TRUE);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
CompletableFuture.allOf(futures).cancel(true);
String cause = "an error occurred while adding server info";
if (e instanceof TimeoutException) {
cause += " build timed out. timeout=" + timeoutMillis + "ms";
Expand All @@ -83,44 +84,55 @@ public void appendServerInfo(final Range range, final NodeList source, final Lin
}
}

private CompletableFuture[] getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
List<CompletableFuture<Void>> serverGroupListFutures = new ArrayList<>();
@SuppressWarnings("unchecked")
private CompletableFuture<ServerGroupList>[] getServerGroupListFutures(Range range, Collection<Node> nodes, LinkDataDuplexMap linkDataDuplexMap) {
List<CompletableFuture<ServerGroupList>> serverGroupListFutures = new ArrayList<>();
for (Node node : nodes) {
if (node.getServiceType().isUnknown()) {
// we do not know the server info for unknown nodes
continue;
}
CompletableFuture<Void> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap, stopSign);
CompletableFuture<ServerGroupList> serverGroupListFuture = getServerGroupListFuture(range, node, linkDataDuplexMap);
serverGroupListFutures.add(serverGroupListFuture);
}

return serverGroupListFutures.toArray(new CompletableFuture[0]);
}

private CompletableFuture<Void> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap, AtomicBoolean stopSign) {
CompletableFuture<ServerGroupList> serverGroupListFuture;
private CompletableFuture<ServerGroupList> getServerGroupListFuture(Range range, Node node, LinkDataDuplexMap linkDataDuplexMap) {
CompletableFuture<ServerGroupList> serverGroupListFuture = getServerGroupListFuture0(node, range, linkDataDuplexMap);
serverGroupListFuture.whenComplete((serverGroupList, throwable) -> {
if (throwable != null) {
// error
logger.warn("Failed to get server info for node {}", node, throwable);
node.setServerGroupList(serverGroupListFactory.createEmptyNodeInstanceList());
} else {
logger.trace("ServerGroupList: {}", serverGroupList);
node.setServerGroupList(serverGroupList);
}
});
return serverGroupListFuture;
}

private CompletableFuture<ServerGroupList> getServerGroupListFuture0(Node node, Range range, LinkDataDuplexMap linkDataDuplexMap) {
final Application application = node.getApplication();
final ServiceType nodeServiceType = application.getServiceType();
if (nodeServiceType.isWas()) {
final Instant to = range.getToInstant();
serverGroupListFuture = CompletableFuture.supplyAsync(new Supplier<ServerGroupList>() {
return CompletableFuture.supplyAsync(new Supplier<>() {
@Override
public ServerGroupList get() {
if (Boolean.TRUE == stopSign.get()) { // Stop
return serverGroupListFactory.createEmptyNodeInstanceList();
}
final Instant to = range.getToInstant();
return serverGroupListFactory.createWasNodeInstanceList(node, to);
}
}, executor);
} else if (nodeServiceType.isTerminal() || nodeServiceType.isAlias()) {
// extract information about the terminal node
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(application, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createTerminalNodeInstanceList(application, linkDataDuplexMap));
} else if (nodeServiceType.isQueue()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(application, linkDataDuplexMap));
return CompletableFuture.completedFuture(serverGroupListFactory.createQueueNodeInstanceList(application, linkDataDuplexMap));
} else if (nodeServiceType.isUser()) {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
} else {
serverGroupListFuture = CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
return CompletableFuture.completedFuture(serverGroupListFactory.createUserNodeInstanceList());
}
return serverGroupListFuture.thenAccept(node::setServerGroupList);
}
return CompletableFuture.completedFuture(serverGroupListFactory.createEmptyNodeInstanceList());
}
}

0 comments on commit 9e497c8

Please sign in to comment.