Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Custom Result Index Name an Alias #1225

Merged
merged 1 commit into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ jobs:
-Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \
-Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \
-Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \
-Dtests.timeoutSuite=3600000! -Dtests.logs=true"
-Dtests.timeoutSuite=3600000! -Dtest.logs=true"
;;
hc)
su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ADJobProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ protected void validateResultIndexAndRunJob(
ExecuteADResultResponseRecorder recorder,
Config detector
) {
String resultIndex = jobParameter.getCustomResultIndex();
String resultIndex = jobParameter.getCustomResultIndexOrAlias();
if (resultIndex == null) {
runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector);
return;
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/org/opensearch/ad/indices/ADIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum ADIndex implements TimeSeriesIndex {
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getCheckpointMappings)
),
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings));
STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)),
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),;

private final String indexName;
// whether we use an alias for the index
Expand All @@ -64,10 +65,4 @@ public boolean isAlias() {
public String getMapping() {
return mapping;
}

@Override
public boolean isJobIndex() {
return CommonName.JOB_INDEX.equals(indexName);
}

}
15 changes: 11 additions & 4 deletions src/main/java/org/opensearch/ad/indices/ADIndexManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -64,6 +66,7 @@ public class ADIndexManagement extends IndexManagement<ADIndex> {
* @param settings OS cluster setting
* @param nodeFilter Used to filter eligible nodes to host AD indices
* @param maxUpdateRunningTimes max number of retries to update index mapping and setting
* @param xContentRegistry registry for json parser
* @throws IOException
*/
public ADIndexManagement(
Expand All @@ -72,7 +75,8 @@ public ADIndexManagement(
ThreadPool threadPool,
Settings settings,
DiscoveryNodeFilterer nodeFilter,
int maxUpdateRunningTimes
int maxUpdateRunningTimes,
NamedXContentRegistry xContentRegistry
)
throws IOException {
super(
Expand All @@ -87,7 +91,10 @@ public ADIndexManagement(
AD_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings),
AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD.get(settings),
AD_RESULT_HISTORY_RETENTION_PERIOD.get(settings),
ADIndex.RESULT.getMapping()
ADIndex.RESULT.getMapping(),
xContentRegistry,
AnomalyDetector::parse,
ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*"
);
this.clusterService.addLocalNodeClusterManagerListener(this);

Expand Down Expand Up @@ -181,7 +188,7 @@ public void initDefaultResultIndexDirectly(ActionListener<CreateIndexResponse> a
AD_RESULT_HISTORY_INDEX_PATTERN,
ADCommonName.ANOMALY_RESULT_INDEX_ALIAS,
true,
AD_RESULT_HISTORY_INDEX_PATTERN,
true,
ADIndex.RESULT,
actionListener
);
Expand Down Expand Up @@ -270,6 +277,6 @@ protected DeleteRequest createDummyDeleteRequest(String resultIndex) throws IOEx

@Override
public void initCustomResultIndexDirectly(String resultIndex, ActionListener<CreateIndexResponse> actionListener) {
initResultIndexDirectly(resultIndex, null, false, AD_RESULT_HISTORY_INDEX_PATTERN, ADIndex.RESULT, actionListener);
initResultIndexDirectly(getCustomResultIndexPattern(resultIndex), resultIndex, false, false, ADIndex.RESULT, actionListener);
}
}
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept
detector.getLastUpdateTime(),
detector.getCategoryFields(),
detector.getUser(),
detector.getCustomResultIndex(),
detector.getCustomResultIndexOrAlias(),
detector.getImputationOption(),
detector.getRecencyEmphasis(),
detector.getSeasonIntervals(),
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/model/AnomalyDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ public AnomalyDetector(StreamInput input) throws IOException {
} else {
this.uiMetadata = null;
}
customResultIndex = input.readOptionalString();
customResultIndexOrAlias = input.readOptionalString();
if (input.readBoolean()) {
this.imputationOption = new ImputationOption(input);
} else {
Expand Down Expand Up @@ -326,7 +326,7 @@ public void writeTo(StreamOutput output) throws IOException {
} else {
output.writeBoolean(false);
}
output.writeOptionalString(customResultIndex);
output.writeOptionalString(customResultIndexOrAlias);
if (imputationOption != null) {
output.writeBoolean(true);
imputationOption.writeTo(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void saveResult(AnomalyResult result, Config config) {
config.getId(),
result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM,
result,
config.getCustomResultIndex()
config.getCustomResultIndexOrAlias()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.settings.ADEnabledSetting;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.timeseries.AbstractSearchAction;
import org.opensearch.timeseries.rest.AbstractSearchAction;

public abstract class AbstractADSearchAction<T extends ToXContentObject> extends AbstractSearchAction<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ protected AnomalyDetector copyConfig(User user, Config config) {
Instant.now(),
config.getCategoryFields(),
user,
config.getCustomResultIndex(),
config.getCustomResultIndexOrAlias(),
config.getImputationOption(),
config.getRecencyEmphasis(),
config.getSeasonIntervals(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ private void detectAnomaly(
user = adTask.getUser().getName();
roles = adTask.getUser().getRoles();
}
String resultIndex = adTask.getDetector().getCustomResultIndex();
String resultIndex = adTask.getDetector().getCustomResultIndexOrAlias();

if (resultIndex == null) {
// if result index is null, store anomaly result directly
Expand Down
61 changes: 48 additions & 13 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,31 +571,66 @@ protected void scaleTaskLaneOnCoordinatingNode(
TransportService transportService,
ActionListener<JobResponse> listener
) {
DiscoveryNode coordinatingNode = getCoordinatingNode(adTask);
transportService
.sendRequest(
coordinatingNode,
getCoordinatingNode(adTask),
ForwardADTaskAction.NAME,
new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS),
transportRequestOptions,
new ActionListenerResponseHandler<>(listener, JobResponse::new)
);
}

/**
* Retrieves the coordinating node for the given ADTask.
*
* This method looks for a node in the list of eligible data nodes that matches the coordinating node ID
* and version specified in the ADTask. The matching criteria are:
* 1. The node ID must be equal to the coordinating node ID.
* 2. Both node versions must be either null or equal.
*
* If the coordinating node ID and the local node have different software versions, this method will
* throw a ResourceNotFoundException.
*
* @param adTask the ADTask containing the coordinating node information.
* @return a DiscoveryNode containing the matching DiscoveryNode if found, or throws ResourceNotFoundException if no match is found.
* The caller is supposed to handle the thrown exception.
* @throws ResourceNotFoundException if the coordinating node has a different version than the local node or if the coordinating node is not found.
*/
private DiscoveryNode getCoordinatingNode(ADTask adTask) {
String coordinatingNode = adTask.getCoordinatingNode();
DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes();
DiscoveryNode targetNode = null;
for (DiscoveryNode node : eligibleDataNodes) {
if (node.getId().equals(coordinatingNode)) {
targetNode = node;
break;
try {
String coordinatingNodeId = adTask.getCoordinatingNode();
Version coordinatingNodeVersion = hashRing.getVersion(coordinatingNodeId);
Version localNodeVersion = hashRing.getVersion(clusterService.localNode().getId());
if (!isSameVersion(coordinatingNodeVersion, localNodeVersion)) {
throw new ResourceNotFoundException(
adTask.getConfigId(),
"AD task coordinating node has different version than local node"
);
}
}
if (targetNode == null) {

DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes();

for (DiscoveryNode node : eligibleDataNodes) {
String nodeId = node.getId();
if (nodeId == null) {
continue;
}

if (nodeId.equals(coordinatingNodeId)) {
return node;
}
}

throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
} catch (Exception e) {
logger.error("Error locating coordinating node", e);
throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found");
}
return targetNode;
}

private boolean isSameVersion(Version version1, Version version2) {
return (version1 == null && version2 == null) || (version1 != null && version2 != null && version1.compareTo(version2) == 0);
}

@Override
Expand Down Expand Up @@ -791,7 +826,7 @@ public <T> void cleanConfigCache(
} catch (ResourceNotFoundException e) {
logger
.warn(
"Task coordinating node left cluster, taskId: {}, detectorId: {}, coordinatingNode: {}",
"Task coordinating node left cluster or has different software version, taskId: {}, detectorId: {}, coordinatingNode: {}",
taskId,
detectorId,
coordinatingNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public ADResultBulkTransportAction(
@Override
protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) {
BulkRequest bulkRequest = new BulkRequest();
List<ADResultWriteRequest> results = request.getAnomalyResults();
List<ADResultWriteRequest> results = request.getResults();

if (indexingPressurePercent <= softLimit) {
for (ADResultWriteRequest resultWriteRequest : results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul
config.getId(),
RequestPriority.MEDIUM,
result,
config.getCustomResultIndex()
config.getCustomResultIndexOrAlias()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,10 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio
SearchRequest searchRequest = generateSearchRequest(request);

// Adding search over any custom result indices
String rawCustomResultIndex = getAdResponse.getDetector().getCustomResultIndex();
String customResultIndex = rawCustomResultIndex == null ? null : rawCustomResultIndex.trim();
if (!Strings.isNullOrEmpty(customResultIndex)) {
searchRequest.indices(defaultIndex, customResultIndex);
String rawCustomResultIndexPattern = getAdResponse.getDetector().getCustomResultIndexPattern();
String customResultIndexPattern = rawCustomResultIndexPattern == null ? null : rawCustomResultIndexPattern.trim();
if (!Strings.isNullOrEmpty(customResultIndexPattern)) {
searchRequest.indices(defaultIndex, customResultIndexPattern);
}

// Utilizing the existing search() from SearchHandler to handle security permissions. Both user role
Expand All @@ -321,7 +321,7 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio
clock.millis() + TOP_ANOMALY_RESULT_TIMEOUT_IN_MILLIS,
request.getSize(),
orderType,
customResultIndex
customResultIndexPattern
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.ratelimit.ADResultWriteRequest;
import org.opensearch.ad.transport.ADResultBulkAction;
import org.opensearch.ad.transport.ADResultBulkRequest;
import org.opensearch.client.Client;
Expand All @@ -27,7 +29,7 @@
import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler;

public class ADIndexMemoryPressureAwareResultHandler extends
IndexMemoryPressureAwareResultHandler<ADResultBulkRequest, ResultBulkResponse, ADIndex, ADIndexManagement> {
IndexMemoryPressureAwareResultHandler<AnomalyResult, ADResultWriteRequest, ADResultBulkRequest, ResultBulkResponse, ADIndex, ADIndexManagement> {
private static final Logger LOG = LogManager.getLogger(ADIndexMemoryPressureAwareResultHandler.class);

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected void validateResultIndexAndRunJob(
Exception exception = new EndRunException(configId, e.getMessage(), false);
handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector);
});
String resultIndex = jobParameter.getCustomResultIndex();
String resultIndex = jobParameter.getCustomResultIndexOrAlias();
if (resultIndex == null) {
indexManagement.validateDefaultResultIndexForBackendJob(configId, user, roles, () -> {
listener.onResponse(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class ForecastCommonMessages {
// ======================================
// Used for custom forecast result index
// ======================================
public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index ";
public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX;

// ======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public enum ForecastIndex implements TimeSeriesIndex {
ForecastCommonName.FORECAST_STATE_INDEX,
false,
ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getStateMappings)
);
),
CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getResultMappings));

private final String indexName;
// whether we use an alias for the index
Expand All @@ -64,9 +65,4 @@ public boolean isAlias() {
public String getMapping() {
return mapping;
}

@Override
public boolean isJobIndex() {
return CommonName.JOB_INDEX.equals(indexName);
}
}
Loading
Loading