Skip to content

Commit

Permalink
**Make Custom Result Index Name an Alias** (opensearch-project#1225)
Browse files Browse the repository at this point in the history
Previously, we used configured custom result index names as the index name, creating the index only when a detector was created. This approach caused several problems for index management:

1. **Index State Management (ISM) Incompatibility**: ISM requires an alias for rolling over an index, but we only had the index name.
2. **Index Recognition Issue**: Even if we managed to roll over an index, AD could not recognize it because the index would not be recreated unless the detector was recreated.
3. **Result History and Top Anomaly Results**: The result history on the dashboard and top anomaly results were reading from a single index instead of an index pattern. Thus, after rolling over an index, the result history and top anomaly results would be lost.

This PR addresses these issues:

1. **Custom Result Index Creation**: We now create a custom result index with the name `<custom-name-history-{now/d}-1>` and use the alias `custom-name` to point to it.
2. **Index Recreation**: We recreate an index when failing to find a result index, updating the configured alias to point to the new index. This ensures continuity when an index is rolled over and new indices do not exist.
3. **Query Index Pattern**: The top anomaly result API now queries an index pattern instead of a single index. The result history on the dashboard follows the same approach. The dashboard code will be posted in a separate PR.

Additionally, this PR updates the custom result index mapping when the mapping is outdated, similar to existing logic on the default result index.

**Testing Done**:
* Successfully updated the mapping of the custom result index when the mapping is outdated.
* Verified that the frontend can still see old and new results after a rollover.
* Confirmed that the backend can still write to new indices after a rollover.
* Ensured all existing tests pass.

More tests will be added in the following PRs.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Jun 10, 2024
1 parent 16d4a36 commit 396ed41
Show file tree
Hide file tree
Showing 69 changed files with 1,004 additions and 425 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtests.logs=true"
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
;;
hc)
su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ADJobProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void validateResultIndexAndRunJob(
ExecuteADResultResponseRecorder recorder,
Config detector
) {
String resultIndex = jobParameter.getCustomResultIndex();
String resultIndex = jobParameter.getCustomResultIndexOrAlias();
if (resultIndex == null) {
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
return;
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/org/opensearch/ad/indices/ADIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum ADIndex implements TimeSeriesIndex {
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getCheckpointMappings)
),
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings));
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)),
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),;

private final String indexName;
// whether we use an alias for the index
Expand All @@ -64,10 +65,4 @@ public boolean isAlias() {
public String getMapping() {
return mapping;
}

@Override
public boolean isJobIndex() {
return CommonName.JOB_INDEX.equals(indexName);
}

}
15 changes: 11 additions & 4 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
* @param settings OS cluster setting
* @param nodeFilter Used to filter eligible nodes to host AD indices
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
* @param xContentRegistry registry for json parser
* @throws IOException
*/
public ADIndexManagement(
Expand All @@ -72,7 +75,8 @@ public ADIndexManagement(
ThreadPool threadPool,
Settings settings,
DiscoveryNodeFilterer nodeFilter,
int maxUpdateRunningTimes
int maxUpdateRunningTimes,
NamedXContentRegistry xContentRegistry
)
throws IOException {
super(
Expand All @@ -87,7 +91,10 @@ public ADIndexManagement(
AD_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings),
AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD.get(settings),
AD_RESULT_HISTORY_RETENTION_PERIOD.get(settings),
ADIndex.RESULT.getMapping()
ADIndex.RESULT.getMapping(),
xContentRegistry,
AnomalyDetector::parse,
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*"
);
this.clusterService.addLocalNodeClusterManagerListener(this);

Expand Down Expand Up @@ -181,7 +188,7 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
AD_RESULT_HISTORY_INDEX_PATTERN,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
true,
AD_RESULT_HISTORY_INDEX_PATTERN,
true,
ADIndex.RESULT,
actionListener
);
Expand Down Expand Up @@ -270,6 +277,6 @@ protected DeleteRequest createDummyDeleteRequest(String resultIndex) throws IOEx

@Override
public void initCustomResultIndexDirectly(String resultIndex, ActionListener<CreateIndexResponse> actionListener) {
initResultIndexDirectly(resultIndex, null, false, AD_RESULT_HISTORY_INDEX_PATTERN, ADIndex.RESULT, actionListener);
initResultIndexDirectly(getCustomResultIndexPattern(resultIndex), resultIndex, false, false, ADIndex.RESULT, actionListener);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
detector.getLastUpdateTime(),
detector.getCategoryFields(),
detector.getUser(),
detector.getCustomResultIndex(),
detector.getCustomResultIndexOrAlias(),
detector.getImputationOption(),
detector.getRecencyEmphasis(),
detector.getSeasonIntervals(),
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
} else {
this.uiMetadata = null;
}
customResultIndex = input.readOptionalString();
customResultIndexOrAlias = input.readOptionalString();
if (input.readBoolean()) {
this.imputationOption = new ImputationOption(input);
} else {
Expand Down Expand Up @@ -326,7 +326,7 @@ public void writeTo(StreamOutput output) throws IOException {
} else {
output.writeBoolean(false);
}
output.writeOptionalString(customResultIndex);
output.writeOptionalString(customResultIndexOrAlias);
if (imputationOption != null) {
output.writeBoolean(true);
imputationOption.writeTo(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void saveResult(AnomalyResult result, Config config) {
config.getId(),
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
result,
config.getCustomResultIndex()
config.getCustomResultIndexOrAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.settings.ADEnabledSetting;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.timeseries.AbstractSearchAction;
import org.opensearch.timeseries.rest.AbstractSearchAction;

public abstract class AbstractADSearchAction<T extends ToXContentObject> extends AbstractSearchAction<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ protected AnomalyDetector copyConfig(User user, Config config) {
Instant.now(),
config.getCategoryFields(),
user,
config.getCustomResultIndex(),
config.getCustomResultIndexOrAlias(),
config.getImputationOption(),
config.getRecencyEmphasis(),
config.getSeasonIntervals(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ private void detectAnomaly(
user = adTask.getUser().getName();
roles = adTask.getUser().getRoles();
}
String resultIndex = adTask.getDetector().getCustomResultIndex();
String resultIndex = adTask.getDetector().getCustomResultIndexOrAlias();

if (resultIndex == null) {
// if result index is null, store anomaly result directly
Expand Down
61 changes: 48 additions & 13 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,31 +571,66 @@ protected void scaleTaskLaneOnCoordinatingNode(
TransportService transportService,
ActionListener<JobResponse> listener
) {
DiscoveryNode coordinatingNode = getCoordinatingNode(adTask);
transportService
.sendRequest(
coordinatingNode,
getCoordinatingNode(adTask),
ForwardADTaskAction.NAME,
new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS),
transportRequestOptions,
new ActionListenerResponseHandler<>(listener, JobResponse::new)
);
}

/**
* Retrieves the coordinating node for the given ADTask.
*
* This method looks for a node in the list of eligible data nodes that matches the coordinating node ID
* and version specified in the ADTask. The matching criteria are:
* 1. The node ID must be equal to the coordinating node ID.
* 2. Both node versions must be either null or equal.
*
* If the coordinating node ID and the local node have different software versions, this method will
* throw a ResourceNotFoundException.
*
* @param adTask the ADTask containing the coordinating node information.
* @return a DiscoveryNode containing the matching DiscoveryNode if found, or throws ResourceNotFoundException if no match is found.
* The caller is supposed to handle the thrown exception.
* @throws ResourceNotFoundException if the coordinating node has a different version than the local node or if the coordinating node is not found.
*/
private DiscoveryNode getCoordinatingNode(ADTask adTask) {
String coordinatingNode = adTask.getCoordinatingNode();
DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes();
DiscoveryNode targetNode = null;
for (DiscoveryNode node : eligibleDataNodes) {
if (node.getId().equals(coordinatingNode)) {
targetNode = node;
break;
try {
String coordinatingNodeId = adTask.getCoordinatingNode();
Version coordinatingNodeVersion = hashRing.getVersion(coordinatingNodeId);
Version localNodeVersion = hashRing.getVersion(clusterService.localNode().getId());
if (!isSameVersion(coordinatingNodeVersion, localNodeVersion)) {
throw new ResourceNotFoundException(
adTask.getConfigId(),
"AD task coordinating node has different version than local node"
);
}
}
if (targetNode == null) {

DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes();

for (DiscoveryNode node : eligibleDataNodes) {
String nodeId = node.getId();
if (nodeId == null) {
continue;
}

if (nodeId.equals(coordinatingNodeId)) {
return node;
}
}

throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
} catch (Exception e) {
logger.error("Error locating coordinating node", e);
throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
}
return targetNode;
}

private boolean isSameVersion(Version version1, Version version2) {
return (version1 == null && version2 == null) || (version1 != null && version2 != null && version1.compareTo(version2) == 0);
}

@Override
Expand Down Expand Up @@ -791,7 +826,7 @@ public <T> void cleanConfigCache(
} catch (ResourceNotFoundException e) {
logger
.warn(
"Task coordinating node left cluster, taskId: {}, detectorId: {}, coordinatingNode: {}",
"Task coordinating node left cluster or has different software version, taskId: {}, detectorId: {}, coordinatingNode: {}",
taskId,
detectorId,
coordinatingNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ADResultBulkTransportAction(
@Override
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
BulkRequest bulkRequest = new BulkRequest();
List<ADResultWriteRequest> results = request.getAnomalyResults();
List<ADResultWriteRequest> results = request.getResults();

if (indexingPressurePercent <= softLimit) {
for (ADResultWriteRequest resultWriteRequest : results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndex()
config.getCustomResultIndexOrAlias()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio
SearchRequest searchRequest = generateSearchRequest(request);

// Adding search over any custom result indices
String rawCustomResultIndex = getAdResponse.getDetector().getCustomResultIndex();
String customResultIndex = rawCustomResultIndex == null ? null : rawCustomResultIndex.trim();
if (!Strings.isNullOrEmpty(customResultIndex)) {
searchRequest.indices(defaultIndex, customResultIndex);
String rawCustomResultIndexPattern = getAdResponse.getDetector().getCustomResultIndexPattern();
String customResultIndexPattern = rawCustomResultIndexPattern == null ? null : rawCustomResultIndexPattern.trim();
if (!Strings.isNullOrEmpty(customResultIndexPattern)) {
searchRequest.indices(defaultIndex, customResultIndexPattern);
}

// Utilizing the existing search() from SearchHandler to handle security permissions. Both user role
Expand All @@ -321,7 +321,7 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio
clock.millis() + TOP_ANOMALY_RESULT_TIMEOUT_IN_MILLIS,
request.getSize(),
orderType,
customResultIndex
customResultIndexPattern
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.ADResultWriteRequest;
import org.opensearch.ad.transport.ADResultBulkAction;
import org.opensearch.ad.transport.ADResultBulkRequest;
import org.opensearch.client.Client;
Expand All @@ -27,7 +29,7 @@
import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler;

public class ADIndexMemoryPressureAwareResultHandler extends
IndexMemoryPressureAwareResultHandler<ADResultBulkRequest, ResultBulkResponse, ADIndex, ADIndexManagement> {
IndexMemoryPressureAwareResultHandler<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest, ResultBulkResponse, ADIndex, ADIndexManagement> {
private static final Logger LOG = LogManager.getLogger(ADIndexMemoryPressureAwareResultHandler.class);

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void validateResultIndexAndRunJob(
Exception exception = new EndRunException(configId, e.getMessage(), false);
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector);
});
String resultIndex = jobParameter.getCustomResultIndex();
String resultIndex = jobParameter.getCustomResultIndexOrAlias();
if (resultIndex == null) {
indexManagement.validateDefaultResultIndexForBackendJob(configId, user, roles, () -> {
listener.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class ForecastCommonMessages {
// ======================================
// Used for custom forecast result index
// ======================================
public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index ";
public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX;

// ======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum ForecastIndex implements TimeSeriesIndex {
ForecastCommonName.FORECAST_STATE_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getStateMappings)
);
),
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getResultMappings));

private final String indexName;
// whether we use an alias for the index
Expand All @@ -64,9 +65,4 @@ public boolean isAlias() {
public String getMapping() {
return mapping;
}

@Override
public boolean isJobIndex() {
return CommonName.JOB_INDEX.equals(indexName);
}
}
Loading

0 comments on commit 396ed41

Please sign in to comment.