Skip to content

Commit

Permalink
bck
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jan 3, 2024
1 parent c009fda commit a449603
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ public boolean equals(Object obj) {
}
}

private QueryPage<JobStats> jobsStats;
private final QueryPage<JobStats> jobsStats;

public Response(QueryPage<JobStats> jobsStats) {
super(Collections.emptyList(), Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -416,8 +417,8 @@ private static ActionListener<SearchResponse> step3VerifyModelPartsArePresent(
String modelId
) {
return ActionListener.wrap(response -> {
SearchHit[] hits = response.getHits().getHits();
if (hits.length == 0) {
SearchHits hits = response.getHits();
if (hits.getHits().length == 0) {
failOrRespondWith0(
() -> new ResourceNotFoundException(Messages.getMessage(Messages.MODEL_DEFINITION_NOT_FOUND, modelId)),
errorIfDefinitionIsMissing,
Expand All @@ -428,14 +429,14 @@ private static ActionListener<SearchResponse> step3VerifyModelPartsArePresent(
}

long firstTotalLength;
DocumentField firstTotalLengthField = hits[0].field(TrainedModelDefinitionDoc.TOTAL_DEFINITION_LENGTH.getPreferredName());
DocumentField firstTotalLengthField = hits.getAt(0).field(TrainedModelDefinitionDoc.TOTAL_DEFINITION_LENGTH.getPreferredName());
if (firstTotalLengthField != null && firstTotalLengthField.getValue() instanceof Long firstTotalDefinitionLength) {
firstTotalLength = firstTotalDefinitionLength;
} else {
failOrRespondWith0(
() -> missingFieldsError(
modelId,
hits[0].getId(),
hits.getAt(0).getId(),
List.of(TrainedModelDefinitionDoc.TOTAL_DEFINITION_LENGTH.getPreferredName())
),
errorIfDefinitionIsMissing,
Expand Down Expand Up @@ -484,7 +485,7 @@ private static ActionListener<SearchResponse> step3VerifyModelPartsArePresent(
finalTotalLength,
TrainedModelDefinitionDoc.docNum(modelId, Objects.requireNonNull(hit.getId())),
firstTotalLength,
TrainedModelDefinitionDoc.docNum(modelId, Objects.requireNonNull(hits[0].getId()))
TrainedModelDefinitionDoc.docNum(modelId, Objects.requireNonNull(hits.getAt(0).getId()))
),
errorIfDefinitionIsMissing,
modelId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ public void findDatafeedsByJobIds(
Map<String, DatafeedConfig.Builder> datafeedsByJobId = new HashMap<>();
// There cannot be more than one datafeed per job
assert response.getHits().getTotalHits().value <= jobIds.size();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits().getHits()) {
DatafeedConfig.Builder builder = parseLenientlyFromSource(hit.getSourceRef());
datafeedsByJobId.put(builder.getJobId(), builder);
}
Expand Down Expand Up @@ -505,8 +504,7 @@ public void expandDatafeedConfigs(
ActionListener.<SearchResponse>wrap(response -> {
List<DatafeedConfig.Builder> datafeeds = new ArrayList<>();
Set<String> datafeedIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits().getHits()) {
try {
BytesReference source = hit.getSourceRef();
DatafeedConfig.Builder datafeed = parseLenientlyFromSource(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ void persistProgress(Client clientToUse, String jobId, Runnable runnable) {
String indexOrAlias = AnomalyDetectorsIndex.jobStateIndexWriteAlias();
StoredProgress previous = null;
if (searchResponse.getHits().getHits().length > 0) {
indexOrAlias = searchResponse.getHits().getHits()[0].getIndex();
indexOrAlias = searchResponse.getHits().getAt(0).getIndex();
try {
previous = MlParserUtils.parse(searchResponse.getHits().getHits()[0], StoredProgress.PARSER);
previous = MlParserUtils.parse(searchResponse.getHits().getAt(0), StoredProgress.PARSER);
} catch (Exception ex) {
LOGGER.warn(() -> "[" + jobId + "] failed to parse previously stored progress", ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,11 @@ public void preview(ActionListener<List<Row>> listener) {
return;
}

final SearchHit[] hits = searchResponse.getHits().getHits();
List<Row> rows = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
String[] extractedValues = extractValues(hit);
rows.add(extractedValues == null ? new Row(null, hit, true) : new Row(extractedValues, hit, false));
List<Row> rows = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
var unpooled = hit.asUnpooled();
String[] extractedValues = extractValues(unpooled);
rows.add(extractedValues == null ? new Row(null, unpooled, true) : new Row(extractedValues, unpooled, false));
}
listener.onResponse(rows);
}, listener::onFailure)
Expand Down Expand Up @@ -317,12 +317,13 @@ private String[] extractProcessedValue(ProcessedField processedField, SearchHit
}

private Row createRow(SearchHit hit) {
String[] extractedValues = extractValues(hit);
var unpooled = hit.asUnpooled();
String[] extractedValues = extractValues(unpooled);
if (extractedValues == null) {
return new Row(null, hit, true);
return new Row(null, unpooled, true);
}
boolean isTraining = trainTestSplitter.get().isTraining(extractedValues);
Row row = new Row(extractedValues, hit, isTraining);
Row row = new Row(extractedValues, unpooled, isTraining);
LOGGER.trace(
() -> format(
"[%s] Extracted row: sort key = [%s], is_training = [%s], values = %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.Max;
import org.elasticsearch.search.aggregations.metrics.Sum;
Expand Down Expand Up @@ -664,7 +665,7 @@ public void getTrainedModel(
ActionListener<SearchResponse> trainedModelSearchHandler = ActionListener.wrap(modelSearchResponse -> {
TrainedModelConfig.Builder builder;
try {
builder = handleHits(modelSearchResponse.getHits().getHits(), modelId, this::parseModelConfigLenientlyFromSource).get(0);
builder = handleHits(modelSearchResponse.getHits(), modelId, this::parseModelConfigLenientlyFromSource).get(0);
} catch (ResourceNotFoundException ex) {
getTrainedModelListener.onFailure(
new ResourceNotFoundException(Messages.getMessage(Messages.INFERENCE_NOT_FOUND, modelId))
Expand Down Expand Up @@ -702,7 +703,7 @@ public void getTrainedModel(
ActionListener.wrap(definitionSearchResponse -> {
try {
List<TrainedModelDefinitionDoc> docs = handleHits(
definitionSearchResponse.getHits().getHits(),
definitionSearchResponse.getHits(),
modelId,
(bytes, resourceId) -> ChunkedTrainedModelRestorer.parseModelDefinitionDocLenientlyFromSource(
bytes,
Expand Down Expand Up @@ -1269,15 +1270,15 @@ private static Set<String> matchedResourceIds(String[] tokens) {
}

private static <T> List<T> handleHits(
SearchHit[] hits,
SearchHits hits,
String resourceId,
CheckedBiFunction<BytesReference, String, T, Exception> parseLeniently
) throws Exception {
if (hits.length == 0) {
if (hits.getHits().length == 0) {
throw new ResourceNotFoundException(resourceId);
}
List<T> results = new ArrayList<>(hits.length);
String initialIndex = hits[0].getIndex();
List<T> results = new ArrayList<>(hits.getHits().length);
String initialIndex = hits.getAt(0).getIndex();
for (SearchHit hit : hits) {
// We don't want to spread across multiple backing indices
if (hit.getIndex().equals(initialIndex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,8 @@ public void jobIdMatches(List<String> ids, ActionListener<List<String>> listener
ML_ORIGIN,
searchRequest,
ActionListener.<SearchResponse>wrap(response -> {
SearchHit[] hits = response.getHits().getHits();
List<String> matchedIds = new ArrayList<>();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits().getHits()) {
matchedIds.add(hit.field(Job.ID.getPreferredName()).getValue());
}
listener.onResponse(matchedIds);
Expand Down Expand Up @@ -520,8 +519,7 @@ public void expandJobsIds(
ActionListener.<SearchResponse>wrap(response -> {
SortedSet<String> jobIds = new TreeSet<>();
SortedSet<String> groupsIds = new TreeSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits().getHits()) {
jobIds.add(hit.field(Job.ID.getPreferredName()).getValue());
List<Object> groups = hit.field(Job.GROUPS.getPreferredName()).getValues();
if (groups != null) {
Expand Down Expand Up @@ -591,8 +589,7 @@ public void expandJobs(
List<Job.Builder> jobs = new ArrayList<>();
Set<String> jobAndGroupIds = new HashSet<>();

SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits().getHits()) {
try {
BytesReference source = hit.getSourceRef();
Job.Builder job = parseJobLenientlyFromSource(source);
Expand Down Expand Up @@ -646,8 +643,7 @@ public void expandGroupIds(List<String> groupIds, ActionListener<SortedSet<Strin
searchRequest,
ActionListener.<SearchResponse>wrap(response -> {
SortedSet<String> jobIds = new TreeSet<>();
SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits()) {
jobIds.add(hit.field(Job.ID.getPreferredName()).getValue());
}

Expand Down Expand Up @@ -721,11 +717,9 @@ public void findJobsWithCustomRules(ActionListener<List<Job>> listener) {
ActionListener.<SearchResponse>wrap(response -> {
List<Job> jobs = new ArrayList<>();

SearchHit[] hits = response.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits()) {
try {
BytesReference source = hit.getSourceRef();
Job job = parseJobLenientlyFromSource(source).build();
Job job = parseJobLenientlyFromSource(hit.getSourceRef()).build();
jobs.add(job);
} catch (IOException e) {
// TODO A better way to handle this rather than just ignoring the error?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ public void persistQuantiles(Quantiles quantiles, WriteRequest.RefreshPolicy ref
// - if the document did exist, update it in the index where it resides (not necessarily the current write index)
ActionListener<SearchResponse> searchFormerQuantilesDocListener = ActionListener.wrap(searchResponse -> {
String indexOrAlias = searchResponse.getHits().getHits().length > 0
? searchResponse.getHits().getHits()[0].getIndex()
? searchResponse.getHits().getAt(0).getIndex()
: AnomalyDetectorsIndex.jobStateIndexWriteAlias();

Persistable persistable = new Persistable(indexOrAlias, quantiles.getJobId(), quantiles, quantilesDocId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void checkForLeftOverDocuments(Job job, ActionListener<Boolean> listener)
ActionListener<MultiSearchResponse> searchResponseActionListener = new DelegatingActionListener<>(listener) {
@Override
public void onResponse(MultiSearchResponse response) {
List<SearchHit> searchHits = new ArrayList<>();
List<String> searchHits = new ArrayList<>();
// Consider the possibility that some of the responses are exceptions
for (int i = 0; i < response.getResponses().length; i++) {
MultiSearchResponse.Item itemResponse = response.getResponses()[i];
Expand All @@ -243,7 +243,9 @@ public void onResponse(MultiSearchResponse response) {
delegate.onFailure(e);
return;
}
searchHits.addAll(Arrays.asList(itemResponse.getResponse().getHits().getHits()));
for (SearchHit hit : itemResponse.getResponse().getHits().getHits()) {
searchHits.add(hit.getId());
}
}

if (searchHits.isEmpty()) {
Expand All @@ -252,12 +254,11 @@ public void onResponse(MultiSearchResponse response) {
int quantileDocCount = 0;
int categorizerStateDocCount = 0;
int resultDocCount = 0;
for (SearchHit hit : searchHits) {
if (hit.getId().equals(Quantiles.documentId(job.getId()))
|| hit.getId().equals(Quantiles.v54DocumentId(job.getId()))) {
for (String hitId : searchHits) {
if (hitId.equals(Quantiles.documentId(job.getId())) || hitId.equals(Quantiles.v54DocumentId(job.getId()))) {
quantileDocCount++;
} else if (hit.getId().startsWith(CategorizerState.documentPrefix(job.getId()))
|| hit.getId().startsWith(CategorizerState.v54DocumentPrefix(job.getId()))) {
} else if (hitId.startsWith(CategorizerState.documentPrefix(job.getId()))
|| hitId.startsWith(CategorizerState.v54DocumentPrefix(job.getId()))) {
categorizerStateDocCount++;
} else {
resultDocCount++;
Expand Down Expand Up @@ -1071,9 +1072,8 @@ public void categoryDefinitions(
ML_ORIGIN,
searchRequest,
ActionListener.<SearchResponse>wrap(searchResponse -> {
SearchHit[] hits = searchResponse.getHits().getHits();
List<CategoryDefinition> results = new ArrayList<>(hits.length);
for (SearchHit hit : hits) {
List<CategoryDefinition> results = new ArrayList<>(searchResponse.getHits().getHits().length);
for (SearchHit hit : searchResponse.getHits().getHits()) {
BytesReference source = hit.getSourceRef();
try (
InputStream stream = source.streamInput();
Expand Down Expand Up @@ -1729,9 +1729,8 @@ public void scheduledEvents(ScheduledEventsQueryBuilder query, ActionListener<Qu
request.request(),
ActionListener.<SearchResponse>wrap(response -> {
List<ScheduledEvent> events = new ArrayList<>();
SearchHit[] hits = response.getHits().getHits();
try {
for (SearchHit hit : hits) {
for (SearchHit hit : response.getHits().getHits()) {
ScheduledEvent.Builder event = MlParserUtils.parse(hit, ScheduledEvent.LENIENT_PARSER);

event.eventId(hit.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,13 @@ private SearchResponse initScroll() {
private Deque<T> mapHits(SearchResponse searchResponse) {
Deque<T> results = new ArrayDeque<>();

SearchHit[] hits = searchResponse.getHits().getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : searchResponse.getHits().getHits()) {
T mapped = map(hit);
if (mapped != null) {
results.add(mapped);
}
}
count += hits.length;
count += searchResponse.getHits().getHits().length;

if (hasNext() == false && scrollId != null) {
client.prepareClearScroll().setScrollIds(Collections.singletonList(scrollId)).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.xpack.ml.utils.MlIndicesUtils;
Expand Down Expand Up @@ -153,7 +154,7 @@ protected SearchResponse executeSearchRequest(SearchRequest searchRequest) {
private Deque<T> mapHits(SearchResponse searchResponse) {
Deque<T> results = new ArrayDeque<>();

SearchHit[] hits = searchResponse.getHits().getHits();
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
T mapped = map(hit);
if (mapped != null) {
Expand All @@ -162,12 +163,12 @@ private Deque<T> mapHits(SearchResponse searchResponse) {
}

// fewer hits than we requested, this is the end of the search
if (hits.length < batchSize) {
if (hits.getHits().length < batchSize) {
lastSearchReturnedResults.set(false);
}

if (hits.length > 0) {
extractSearchAfterFields(hits[hits.length - 1]);
if (hits.getHits().length > 0) {
extractSearchAfterFields(hits.getAt(hits.getHits().length - 1));
}

return results;
Expand Down
Loading

0 comments on commit a449603

Please sign in to comment.