Skip to content

Commit

Permalink
enabling term version check on local state for all admin read actions
Browse files Browse the repository at this point in the history
Signed-off-by: Rajiv Kumar Vaidyanathan <rajivkv@amazon.com>
  • Loading branch information
rajiv-kv committed Jun 17, 2024
1 parent 18c5bb6 commit 30851ff
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchTransportService;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.action.support.replication.TransportReplicationActionTests;
import org.opensearch.action.termvectors.MultiTermVectorsAction;
import org.opensearch.action.termvectors.MultiTermVectorsRequest;
Expand All @@ -93,6 +95,8 @@
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -124,6 +128,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -195,6 +200,7 @@ public void cleanUp() {
}

public void testGetFieldMappings() {

String getFieldMappingsShardAction = GetFieldMappingsAction.NAME + "[index][s]";
interceptTransportActions(getFieldMappingsShardAction);

Expand Down Expand Up @@ -546,6 +552,7 @@ public void testDeleteIndex() {

public void testGetMappings() {
interceptTransportActions(GetMappingsAction.NAME);
stubClusterTermResponse(internalCluster().getClusterManagerName());

GetMappingsRequest getMappingsRequest = new GetMappingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getMappings(getMappingsRequest).actionGet();
Expand All @@ -565,8 +572,9 @@ public void testPutMapping() {
}

public void testGetSettings() {
interceptTransportActions(GetSettingsAction.NAME);

interceptTransportActions(GetSettingsAction.NAME);
stubClusterTermResponse(internalCluster().getClusterManagerName());
GetSettingsRequest getSettingsRequest = new GetSettingsRequest().indices(randomIndicesOrAliases());
internalCluster().coordOnlyNodeClient().admin().indices().getSettings(getSettingsRequest).actionGet();

Expand Down Expand Up @@ -781,6 +789,7 @@ public List<TransportInterceptor> getTransportInterceptors(
}

private final Set<String> actions = new HashSet<>();
private final Map<String, TransportRequestHandler> stubHandlers = new ConcurrentHashMap<>();

private final Map<String, List<TransportRequest>> requests = new HashMap<>();

Expand All @@ -804,6 +813,11 @@ synchronized void interceptTransportActions(String... actions) {

synchronized void clearInterceptedActions() {
actions.clear();
stubHandlers.clear();
}

synchronized void stub(String action, TransportRequestHandler handler) {
stubHandlers.put(action, handler);
}

private class InterceptingRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
Expand All @@ -830,8 +844,25 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
}
}
}
requestHandler.messageReceived(request, channel, task);
if (!stubHandlers.containsKey(action)) {
requestHandler.messageReceived(request, channel, task);
} else {
stubHandlers.get(action).messageReceived(request, channel, task);
}

}
}
}

private void stubClusterTermResponse(String master) {
PluginsService pluginsService = internalCluster().getInstance(PluginsService.class, master);
pluginsService.filterPlugins(InterceptingTransportService.TestPlugin.class).stream().findFirst().get().instance.stub(
GetTermVersionAction.NAME,
(request, channel, task) -> channel.sendResponse(
new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1))
)
);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse;
import org.opensearch.action.support.clustermanager.term.GetTermVersionAction;
import org.opensearch.action.support.clustermanager.term.GetTermVersionResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.coordination.ClusterStateTermVersion;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.node.IoUsageStats;
import org.opensearch.node.ResourceUsageCollectorService;
import org.opensearch.node.resource.tracker.ResourceTrackerSettings;
import org.opensearch.plugins.Plugin;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
Expand All @@ -29,9 +34,13 @@
import org.opensearch.rest.action.admin.indices.RestGetAliasesAction;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.test.transport.MockTransportService;
import org.opensearch.transport.TransportService;
import org.junit.Before;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -62,6 +71,10 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase {
.put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50)
.build();

protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(MockTransportService.TestPlugin.class);
}

@Before
public void init() {
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(
Expand All @@ -82,12 +95,14 @@ public void init() {
}

public void testAdmissionControlEnforced() throws Exception {
stubClusterTermResponse(internalCluster().getClusterManagerName());
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}"));

stubClusterTermResponse(internalCluster().getClusterManagerName());
// Read API on ClusterManager

GetAliasesRequest aliasesRequest = new GetAliasesRequest();
aliasesRequest.aliases("alias1");
try {
Expand Down Expand Up @@ -156,6 +171,7 @@ public void admissionControlDisabledOnBreach(Settings admission) throws Interrup
}

public void testAdmissionControlResponseStatus() throws Exception {
stubClusterTermResponse(internalCluster().getClusterManagerName());
cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98));

// Write API on ClusterManager
Expand Down Expand Up @@ -195,4 +211,12 @@ Map<String, AdmissionControllerStats> getAdmissionControlStats(AdmissionControlS
}
return acStats;
}

private void stubClusterTermResponse(String master) {
MockTransportService primaryService = (MockTransportService) internalCluster().getInstance(TransportService.class, master);
primaryService.addRequestHandlingBehavior(GetTermVersionAction.NAME, (handler, request, channel, task) -> {
channel.sendResponse(new GetTermVersionResponse(new ClusterStateTermVersion(new ClusterName("test"), "1", -1, -1)));
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -534,4 +534,9 @@ private ClusterHealthResponse clusterHealth(
pendingTaskTimeInQueue
);
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,4 @@ private ClusterStateResponse buildResponse(final ClusterStateRequest request, fi

return new ClusterStateResponse(currentState.getClusterName(), builder.build(), false);
}

@Override
protected boolean localExecuteSupportedByAction() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,9 @@ protected void clusterManagerOperation(
logger.trace("done fetching pending tasks from cluster service");
listener.onResponse(new PendingClusterTasksResponse(pendingTasks));
}

@Override
protected boolean localExecuteSupportedByAction() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -267,24 +267,7 @@ protected void doStart(ClusterState clusterState) {
final DiscoveryNodes nodes = clusterState.nodes();
if (nodes.isLocalNodeElectedClusterManager() || localExecute(request)) {
// check for block, if blocked, retry, else, execute locally
final ClusterBlockException blockException = checkBlock(request, clusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
retry(clusterState, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.trace("exception occurred during cluster block checking, accepting state", e);
return true;
}
});
}
} else {
if (!checkForBlock(request, clusterState)) {
threadPool.executor(executor)
.execute(
ActionRunnable.wrap(
Expand Down Expand Up @@ -422,12 +405,39 @@ public GetTermVersionResponse read(StreamInput in) throws IOException {
};
}

private boolean checkForBlock(Request request, ClusterState localClusterState) {
final ClusterBlockException blockException = checkBlock(request, localClusterState);
if (blockException != null) {
if (!blockException.retryable()) {
listener.onFailure(blockException);
} else {
logger.debug("can't execute due to a cluster block, retrying", blockException);
retry(localClusterState, blockException, newState -> {
try {
ClusterBlockException newException = checkBlock(request, newState);
return (newException == null || !newException.retryable());
} catch (Exception e) {
// accept state as block will be rechecked by doStart() and listener.onFailure() then called
logger.trace("exception occurred during cluster block checking, accepting state", e);
return true;
}
});
}
return true;
} else {
return false;
}
}

private void executeOnLocalNode(ClusterState localClusterState) {
Runnable runTask = ActionRunnable.wrap(
getDelegateForLocalExecute(localClusterState),
l -> clusterManagerOperation(task, request, localClusterState, l)
);
threadPool.executor(executor).execute(runTask);
// check for block, if blocked, retry, else, execute locally
if (!checkForBlock(request, localClusterState)) {
Runnable runTask = ActionRunnable.wrap(
getDelegateForLocalExecute(localClusterState),
l -> clusterManagerOperation(task, request, localClusterState, l)
);
threadPool.executor(executor).execute(runTask);
}
}

private void executeOnClusterManager(DiscoveryNode clusterManagerNode, ClusterState clusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,9 @@ protected TransportClusterManagerNodeReadAction(
protected final boolean localExecute(Request request) {
return request.local();
}

protected boolean localExecuteSupportedByAction() {
return true;
}

}

0 comments on commit 30851ff

Please sign in to comment.