Skip to content

Commit

Permalink
fixed?
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jan 3, 2024
1 parent 96e7d34 commit 9fe4d60
Show file tree
Hide file tree
Showing 22 changed files with 62 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@
*/
package org.elasticsearch.plugin.noop.action.search;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.plugin.noop.NoopPlugin;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.profile.SearchProfileResults;
Expand All @@ -35,7 +32,7 @@ public TransportNoopSearchAction(TransportService transportService, ActionFilter
NoopPlugin.NOOP_SEARCH_ACTION.name(),
transportService,
actionFilters,
(Writeable.Reader<SearchRequest>) SearchRequest::new,
SearchRequest::new,
EsExecutors.DIRECT_EXECUTOR_SERVICE
);
}
Expand All @@ -44,7 +41,7 @@ public TransportNoopSearchAction(TransportService transportService, ActionFilter
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> listener) {
listener.onResponse(
new SearchResponse(
new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), 0.0f),
SearchHits.EMPTY_WITH_TOTAL_HITS,
InternalAggregations.EMPTY,
new Suggest(Collections.emptyList()),
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ private static void mergeSuggest(
final int index = fetchResult.counterGetAndIncrement();
assert index < fetchResult.hits().getHits().length
: "not enough hits fetched. index [" + index + "] length: " + fetchResult.hits().getHits().length;
SearchHit hit = fetchResult.hits().getHits()[index];
SearchHit hit = fetchResult.hits().getAt(index);
CompletionSuggestion.Entry.Option suggestOption = suggestionOptions.get(scoreDocIndex - currentOffset);
hit.score(shardDoc.score);
hit.shard(fetchResult.getSearchShardTarget());
Expand Down Expand Up @@ -451,7 +451,7 @@ private static SearchHits getHits(
final int index = fetchResult.counterGetAndIncrement();
assert index < fetchResult.hits().getHits().length
: "not enough hits fetched. index [" + index + "] length: " + fetchResult.hits().getHits().length;
final SearchHit searchHit = fetchResult.hits().getHits()[index];
final SearchHit searchHit = fetchResult.hits().getAt(index);
searchHit.shard(fetchResult.getSearchShardTarget());
if (reducedQueryPhase.rankCoordinatorContext != null) {
assert shardDoc instanceof RankDoc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ private static ShardId extractShardId(ShardSearchFailure failure) {
};

private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits totalHits, Map<ShardIdAndClusterAlias, Integer> shards) {
SearchHit[] hits = searchHits.getHits();
ScoreDoc[] scoreDocs = new ScoreDoc[hits.length];
ScoreDoc[] scoreDocs = new ScoreDoc[searchHits.getHits().length];
final TopDocs topDocs;
if (searchHits.getSortFields() != null) {
if (searchHits.getCollapseField() != null) {
Expand All @@ -303,8 +302,8 @@ private static TopDocs searchHitsToTopDocs(SearchHits searchHits, TotalHits tota
topDocs = new TopDocs(totalHits, scoreDocs);
}

for (int i = 0; i < hits.length; i++) {
SearchHit hit = hits[i];
for (int i = 0; i < searchHits.getHits().length; i++) {
SearchHit hit = searchHits.getAt(i);
SearchShardTarget shard = hit.getShard();
ShardIdAndClusterAlias shardId = new ShardIdAndClusterAlias(shard.getShardId(), shard.getClusterAlias());
shards.putIfAbsent(shardId, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ private static Response wrapSearchResponse(SearchResponse response) {
} else {
hits = new ArrayList<>(response.getHits().getHits().length);
for (SearchHit hit : response.getHits().getHits()) {
// TODO: used pooled hits here
hits.add(new ClientHit(hit.asUnpooled()));
hits.add(new ClientHit(hit));
}
hits = unmodifiableList(hits);
}
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/java/org/elasticsearch/search/SearchHit.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,15 @@ public SearchHit sourceRef(BytesReference source) {
* {@code _source} or if source is disabled in the mapping.
*/
public boolean hasSource() {
assert hasReferences();
return source != null;
}

/**
* The source of the document as string (can be {@code null}).
*/
public String getSourceAsString() {
assert hasReferences();
if (source == null) {
return null;
}
Expand Down Expand Up @@ -496,6 +498,7 @@ public Map<String, Object> getSourceAsMap() {
* The hit field matching the given field name.
*/
public DocumentField field(String fieldName) {
assert hasReferences();
DocumentField result = documentFields.get(fieldName);
if (result != null) {
return result;
Expand Down Expand Up @@ -686,6 +689,7 @@ public Map<String, Float> getMatchedQueriesAndScores() {
* @return Inner hits or <code>null</code> if there are none
*/
public Map<String, SearchHits> getInnerHits() {
assert hasReferences();
return innerHits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public SearchHits(
@Override
protected void closeInternal() {
for (int i = 0; i < hits.length; i++) {
assert hits[i] != null;
hits[i].decRef();
hits[i] = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public void testMergeProfileResults() throws InterruptedException {
SearchProfileResults profile = SearchProfileResultsTests.createTestItem();
expectedProfile.putAll(profile.getShardResults());
SearchResponse searchResponse = new SearchResponse(
SearchHits.unpooled(SearchHits.EMPTY, new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN, null, null, null),
SearchHits.unpooled(SearchHits.EMPTY, new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN),
null,
null,
false,
Expand Down Expand Up @@ -409,7 +409,7 @@ public void testMergeCompletionSuggestions() throws InterruptedException {
suggestions.add(completionSuggestion);
Suggest suggest = new Suggest(suggestions);
SearchResponse searchResponse = new SearchResponse(
SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN, null, null, null),
SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN),
null,
suggest,
false,
Expand Down Expand Up @@ -495,7 +495,7 @@ public void testMergeCompletionSuggestionsTieBreak() throws InterruptedException
suggestions.add(completionSuggestion);
Suggest suggest = new Suggest(suggestions);
SearchResponse searchResponse = new SearchResponse(
SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN, null, null, null),
SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN),
null,
suggest,
false,
Expand Down Expand Up @@ -564,7 +564,7 @@ public void testMergeEmptyFormat() throws InterruptedException {
Collections.emptyMap()
);

SearchHits searchHits = SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN, null, null, null);
SearchHits searchHits = SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN);
try (
SearchResponseMerger searchResponseMerger = new SearchResponseMerger(
0,
Expand Down Expand Up @@ -645,7 +645,7 @@ public void testMergeAggs() throws InterruptedException {
InternalDateRange range = factory.create(rangeAggName, singletonList(bucket), DocValueFormat.RAW, false, emptyMap());
InternalAggregations aggs = InternalAggregations.from(Arrays.asList(range, max));
SearchResponse searchResponse = new SearchResponse(
SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN, null, null, null),
SearchHits.unpooled(SearchHits.EMPTY, null, Float.NaN),
aggs,
null,
false,
Expand Down Expand Up @@ -978,7 +978,7 @@ public void testMergeEmptySearchHitsWithNonEmpty() {
}
{
SearchResponse searchResponse = new SearchResponse(
SearchHits.unpooled(SearchHits.EMPTY, new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN, null, null, null),
SearchHits.unpooled(SearchHits.EMPTY, new TotalHits(0, TotalHits.Relation.EQUAL_TO), Float.NaN),
null,
null,
false,
Expand Down Expand Up @@ -1034,7 +1034,7 @@ public void testMergeOnlyEmptyHits() {
expectedTotalHits = new TotalHits(Math.min(previousValue + totalHits.value, trackTotalHitsUpTo), totalHitsRelation);
}
SearchResponse searchResponse = new SearchResponse(
SearchHits.unpooled(new SearchHit[0], totalHits, Float.NaN, null, null, null),
SearchHits.unpooled(new SearchHit[0], totalHits, Float.NaN),
null,
null,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.inference.TaskType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -105,8 +106,7 @@ public void getModelWithSecrets(String modelId, ActionListener<UnparsedModel> li
return;
}

var hits = searchResponse.getHits().getHits();
delegate.onResponse(UnparsedModel.unparsedModelFromMap(createModelConfigMap(hits, modelId)));
delegate.onResponse(UnparsedModel.unparsedModelFromMap(createModelConfigMap(searchResponse.getHits(), modelId)));
});

QueryBuilder queryBuilder = documentIdQuery(modelId);
Expand All @@ -132,8 +132,7 @@ public void getModel(String modelId, ActionListener<UnparsedModel> listener) {
return;
}

var hits = searchResponse.getHits().getHits();
var modelConfigs = parseHitsAsModels(hits).stream().map(UnparsedModel::unparsedModelFromMap).toList();
var modelConfigs = parseHitsAsModels(searchResponse.getHits()).stream().map(UnparsedModel::unparsedModelFromMap).toList();
assert modelConfigs.size() == 1;
delegate.onResponse(modelConfigs.get(0));
});
Expand Down Expand Up @@ -162,8 +161,7 @@ public void getModelsByTaskType(TaskType taskType, ActionListener<List<UnparsedM
return;
}

var hits = searchResponse.getHits().getHits();
var modelConfigs = parseHitsAsModels(hits).stream().map(UnparsedModel::unparsedModelFromMap).toList();
var modelConfigs = parseHitsAsModels(searchResponse.getHits()).stream().map(UnparsedModel::unparsedModelFromMap).toList();
delegate.onResponse(modelConfigs);
});

Expand Down Expand Up @@ -192,8 +190,7 @@ public void getAllModels(ActionListener<List<UnparsedModel>> listener) {
return;
}

var hits = searchResponse.getHits().getHits();
var modelConfigs = parseHitsAsModels(hits).stream().map(UnparsedModel::unparsedModelFromMap).toList();
var modelConfigs = parseHitsAsModels(searchResponse.getHits()).stream().map(UnparsedModel::unparsedModelFromMap).toList();
delegate.onResponse(modelConfigs);
});

Expand All @@ -212,16 +209,16 @@ public void getAllModels(ActionListener<List<UnparsedModel>> listener) {
client.search(modelSearch, searchListener);
}

private List<ModelConfigMap> parseHitsAsModels(SearchHit[] hits) {
private List<ModelConfigMap> parseHitsAsModels(SearchHits hits) {
var modelConfigs = new ArrayList<ModelConfigMap>();
for (var hit : hits) {
modelConfigs.add(new ModelConfigMap(hit.getSourceAsMap(), Map.of()));
}
return modelConfigs;
}

private ModelConfigMap createModelConfigMap(SearchHit[] hits, String modelId) {
Map<String, SearchHit> mappedHits = Arrays.stream(hits).collect(Collectors.toMap(hit -> {
private ModelConfigMap createModelConfigMap(SearchHits hits, String modelId) {
Map<String, SearchHit> mappedHits = Arrays.stream(hits.getHits()).collect(Collectors.toMap(hit -> {
if (hit.getIndex().startsWith(InferenceIndex.INDEX_NAME)) {
return InferenceIndex.INDEX_NAME;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xcontent.DeprecationHandler;
Expand Down Expand Up @@ -304,7 +303,7 @@ protected ForecastRequestStats getForecastStats(String jobId, String forecastId)
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
searchResponse.getHits().getHits()[0].getSourceRef().streamInput()
searchResponse.getHits().getAt(0).getSourceRef().streamInput()
)
) {
forecastRequestStats.set(ForecastRequestStats.STRICT_PARSER.apply(parser, null));
Expand All @@ -323,8 +322,7 @@ protected List<ForecastRequestStats> getForecastStats() throws Exception {
.filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE))
),
searchResponse -> {
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : searchResponse.getHits()) {
try (
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(
Expand Down Expand Up @@ -368,8 +366,7 @@ protected List<Forecast> getForecasts(String jobId, ForecastRequestStats forecas
)
.addSort(SortBuilders.fieldSort(Result.TIMESTAMP.getPreferredName()).order(SortOrder.ASC)),
searchResponse -> {
SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits) {
for (SearchHit hit : searchResponse.getHits()) {
try (
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import static org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex.createStateIndexAndAliasIfNecessary;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;

public class ModelSnapshotRetentionIT extends MlNativeAutodetectIntegTestCase {

Expand Down Expand Up @@ -191,8 +190,7 @@ private List<String> getAvailableModelStateDocIds() throws Exception {
private List<String> getDocIdsFromSearch(SearchRequest searchRequest) throws Exception {
List<String> docIds = new ArrayList<>();
assertResponse(client().execute(TransportSearchAction.TYPE, searchRequest), searchResponse -> {
assertThat(searchResponse.getHits(), notNullValue());
for (SearchHit searchHit : searchResponse.getHits().getHits()) {
for (SearchHit searchHit : searchResponse.getHits()) {
docIds.add(searchHit.getId());
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.StoredFieldsContext;
import org.elasticsearch.search.sort.SortOrder;
Expand Down Expand Up @@ -126,8 +127,7 @@ protected InputStream initScroll(long startTimestamp) throws IOException {
logger.debug("[{}] Search response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
scrollId = searchResponse.getScrollId();
SearchHit[] hits = searchResponse.getHits().getHits();
return processAndConsumeSearchHits(hits);
return processAndConsumeSearchHits(searchResponse.getHits());
} finally {
searchResponse.decRef();
}
Expand Down Expand Up @@ -184,17 +184,17 @@ private SearchRequestBuilder buildSearchRequest(long start) {
/**
* IMPORTANT: This is not an idempotent method. This method changes the input array by setting each element to <code>null</code>.
*/
private InputStream processAndConsumeSearchHits(SearchHit[] hits) throws IOException {
private InputStream processAndConsumeSearchHits(SearchHits hits) throws IOException {

if (hits == null || hits.length == 0) {
if (hits.getHits().length == 0) {
hasNext = false;
clearScroll();
return null;
}

BytesStreamOutput outputStream = new BytesStreamOutput();

SearchHit lastHit = hits[hits.length - 1];
SearchHit lastHit = hits.getAt(hits.getHits().length - 1);
lastTimestamp = context.extractedFields.timeFieldValue(lastHit);
try (SearchHitToJsonProcessor hitProcessor = new SearchHitToJsonProcessor(context.extractedFields, outputStream)) {
for (SearchHit hit : hits) {
Expand Down Expand Up @@ -233,8 +233,7 @@ private InputStream continueScroll() throws IOException {
logger.debug("[{}] Search response was obtained", context.jobId);
timingStatsReporter.reportSearchDuration(searchResponse.getTook());
scrollId = searchResponse.getScrollId();
SearchHit[] hits = searchResponse.getHits().getHits();
return processAndConsumeSearchHits(hits);
return processAndConsumeSearchHits(searchResponse.getHits());
} finally {
if (searchResponse != null) {
searchResponse.decRef();
Expand Down
Loading

0 comments on commit 9fe4d60

Please sign in to comment.