-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Tiered Caching] Adding took time for QuerySearchResult #10510
Changes from 5 commits
08a4f6d
dfd9128
a9ab327
4e57f4c
1d47f38
81dc9ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -131,13 +131,15 @@ public void preProcess(SearchContext context) { | |
} | ||
|
||
public void execute(SearchContext searchContext) throws QueryPhaseExecutionException { | ||
final long startTime = System.nanoTime(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry missed this earlier. Given we don't report this as stats anywhere, wondering if millis suffice for our use case. Nano is generally much more expensive: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need an accurate elapsed time for queryPhase so that we can take decisions based on that. So we need nanos for that instead of millis. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As per my understanding, since we need this time for evaluating whether to store result on disk or not, and disk access can be few ms, I don't completely understand the reason for using nanos instead of millis. Although it should be fine to use nanos as well, just want to ensure my understanding is not lacking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not just about whether disk access would be in few ms, System.currentTimeMilis() is tied to System clock, so using this might be a bit error prone. While System.nanoTime measures time relative to an arbitrary point in time and not affected by System clock skewness. Elapsed time is calculated using nanoTime across OpenSearch while millis used to display human readable dates to users. |
||
if (searchContext.hasOnlySuggest()) { | ||
suggestProcessor.process(searchContext); | ||
searchContext.queryResult() | ||
.topDocs( | ||
new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN), | ||
new DocValueFormat[0] | ||
); | ||
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime); | ||
return; | ||
} | ||
|
||
|
@@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep | |
); | ||
searchContext.queryResult().profileResults(shardResults); | ||
} | ||
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. couple of questions:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For 1), I wasn't aware of that - let me see if it's doable to get that stored time into the QueryResult. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For 2, not sure if I understood correctly. If you already have a wrapper concept, then that wrapper can hold to the computed |
||
} | ||
|
||
// making public for testing | ||
|
@@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q | |
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); | ||
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); | ||
} | ||
|
||
return shouldRescore; | ||
} finally { | ||
// Search phase has finished, no longer need to check for timeout | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,6 +34,7 @@ | |
|
||
import org.apache.lucene.search.FieldDoc; | ||
import org.apache.lucene.search.TotalHits; | ||
import org.opensearch.Version; | ||
import org.opensearch.common.io.stream.DelayableWriteable; | ||
import org.opensearch.common.lucene.search.TopDocsAndMaxScore; | ||
import org.opensearch.core.common.io.stream.StreamInput; | ||
|
@@ -87,6 +88,7 @@ public final class QuerySearchResult extends SearchPhaseResult { | |
private int nodeQueueSize = -1; | ||
|
||
private final boolean isNull; | ||
private Long tookTimeNanos = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally the tookTimeNanos should also be final, since we never modify it once set. Although, not sure how we can communicate that intent while invoking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can define this field with SetOnce |
||
|
||
public QuerySearchResult() { | ||
this(false); | ||
|
@@ -364,6 +366,11 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc | |
nodeQueueSize = in.readInt(); | ||
setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); | ||
setRescoreDocIds(new RescoreDocIds(in)); | ||
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { | ||
tookTimeNanos = in.readOptionalLong(); | ||
} else { | ||
tookTimeNanos = null; | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -406,6 +413,9 @@ public void writeToNoId(StreamOutput out) throws IOException { | |
out.writeInt(nodeQueueSize); | ||
out.writeOptionalWriteable(getShardSearchRequest()); | ||
getRescoreDocIds().writeTo(out); | ||
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { | ||
out.writeOptionalLong(tookTimeNanos); | ||
} | ||
} | ||
|
||
public TotalHits getTotalHits() { | ||
|
@@ -415,4 +425,12 @@ public TotalHits getTotalHits() { | |
public float getMaxScore() { | ||
return maxScore; | ||
} | ||
|
||
public Long getTookTimeNanos() { | ||
return tookTimeNanos; | ||
} | ||
|
||
void setTookTimeNanos(long tookTime) { | ||
tookTimeNanos = tookTime; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,9 +85,14 @@ | |
import org.apache.lucene.tests.index.RandomIndexWriter; | ||
import org.apache.lucene.util.BytesRef; | ||
import org.apache.lucene.util.FixedBitSet; | ||
import org.opensearch.action.OriginalIndices; | ||
import org.opensearch.action.search.SearchRequest; | ||
import org.opensearch.action.search.SearchShardTask; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.unit.TimeValue; | ||
import org.opensearch.core.common.Strings; | ||
import org.opensearch.core.index.Index; | ||
import org.opensearch.core.index.shard.ShardId; | ||
import org.opensearch.core.tasks.TaskCancelledException; | ||
import org.opensearch.index.mapper.DateFieldMapper; | ||
import org.opensearch.index.mapper.MappedFieldType; | ||
|
@@ -103,9 +108,11 @@ | |
import org.opensearch.lucene.queries.MinDocQuery; | ||
import org.opensearch.search.DocValueFormat; | ||
import org.opensearch.search.collapse.CollapseBuilder; | ||
import org.opensearch.search.internal.AliasFilter; | ||
import org.opensearch.search.internal.ContextIndexSearcher; | ||
import org.opensearch.search.internal.ScrollContext; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.internal.ShardSearchRequest; | ||
import org.opensearch.search.sort.SortAndFormats; | ||
import org.opensearch.test.TestSearchContext; | ||
import org.opensearch.threadpool.ThreadPool; | ||
|
@@ -115,6 +122,7 @@ | |
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.Collections; | ||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
|
@@ -1145,6 +1153,114 @@ public void testQueryTimeoutChecker() throws Exception { | |
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true); | ||
} | ||
|
||
public void testQuerySearchResultTookTime() throws IOException { | ||
int sleepMillis = randomIntBetween(10, 100); // between 0.01 and 0.1 sec | ||
DelayedQueryPhaseSearcher delayedQueryPhaseSearcher = new DelayedQueryPhaseSearcher(sleepMillis); | ||
|
||
// we need to test queryPhase.execute(), not executeInternal(), since that's what the timer wraps around | ||
// for that we must set up a searchContext with more functionality than the TestSearchContext, | ||
// which requires a bit of complexity with test classes | ||
|
||
Directory dir = newDirectory(); | ||
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT)); | ||
IndexWriterConfig iwc = newIndexWriterConfig().setIndexSort(sort); | ||
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc); | ||
Document doc = new Document(); | ||
for (int i = 0; i < 10; i++) { | ||
doc.add(new StringField("foo", Integer.toString(i), Store.NO)); | ||
} | ||
w.addDocument(doc); | ||
w.close(); | ||
IndexReader reader = DirectoryReader.open(dir); | ||
|
||
QueryShardContext queryShardContext = mock(QueryShardContext.class); | ||
when(queryShardContext.fieldMapper("user")).thenReturn( | ||
new NumberFieldType("user", NumberType.INTEGER, true, false, true, false, null, Collections.emptyMap()) | ||
); | ||
|
||
Index index = new Index("IndexName", "UUID"); | ||
ShardId shardId = new ShardId(index, 0); | ||
long nowInMillis = System.currentTimeMillis(); | ||
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10); | ||
SearchRequest searchRequest = new SearchRequest(); | ||
searchRequest.allowPartialSearchResults(randomBoolean()); | ||
ShardSearchRequest request = new ShardSearchRequest( | ||
OriginalIndices.NONE, | ||
searchRequest, | ||
shardId, | ||
1, | ||
AliasFilter.EMPTY, | ||
1f, | ||
nowInMillis, | ||
clusterAlias, | ||
Strings.EMPTY_ARRAY | ||
); | ||
TestSearchContextWithRequest searchContext = new TestSearchContextWithRequest( | ||
queryShardContext, | ||
indexShard, | ||
newEarlyTerminationContextSearcher(reader, 0, executor), | ||
request | ||
); | ||
|
||
QueryPhase queryPhase = new QueryPhase(delayedQueryPhaseSearcher); | ||
queryPhase.execute(searchContext); | ||
Long tookTime = searchContext.queryResult().getTookTimeNanos(); | ||
assertTrue(tookTime >= (long) sleepMillis * 1000000); | ||
reader.close(); | ||
dir.close(); | ||
} | ||
|
||
private class TestSearchContextWithRequest extends TestSearchContext { | ||
ShardSearchRequest request; | ||
Query query; | ||
|
||
public TestSearchContextWithRequest( | ||
QueryShardContext queryShardContext, | ||
IndexShard indexShard, | ||
ContextIndexSearcher searcher, | ||
ShardSearchRequest request | ||
) { | ||
super(queryShardContext, indexShard, searcher); | ||
this.request = request; | ||
this.query = new TermQuery(new Term("foo", "bar")); | ||
} | ||
|
||
@Override | ||
public ShardSearchRequest request() { | ||
return request; | ||
} | ||
|
||
@Override | ||
public Query query() { | ||
return this.query; | ||
} | ||
} | ||
|
||
private class DelayedQueryPhaseSearcher extends QueryPhase.DefaultQueryPhaseSearcher implements QueryPhaseSearcher { | ||
// add delay into searchWith | ||
private final int sleepMillis; | ||
|
||
public DelayedQueryPhaseSearcher(int sleepMillis) { | ||
super(); | ||
this.sleepMillis = sleepMillis; | ||
} | ||
|
||
@Override | ||
public boolean searchWith( | ||
SearchContext searchContext, | ||
ContextIndexSearcher searcher, | ||
Query query, | ||
LinkedList<QueryCollectorContext> collectors, | ||
boolean hasFilterCollector, | ||
boolean hasTimeout | ||
) throws IOException { | ||
try { | ||
Thread.sleep(sleepMillis); | ||
} catch (Exception ignored) {} | ||
return super.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout); | ||
Comment on lines
+1257
to
+1260
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can be written as try finally block? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I put the return statement in a finally block, it creates a warning which causes ./gradlew precommit to fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah! In general, it is bad coding practice as it can eat exceptions, although here we intentionally doing that. |
||
} | ||
} | ||
|
||
private void createTimeoutCheckerThenWaitThenRun( | ||
long timeout, | ||
long sleepAfterCreation, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,6 +56,8 @@ | |
import org.opensearch.search.suggest.SuggestTests; | ||
import org.opensearch.test.OpenSearchTestCase; | ||
|
||
import java.util.HashMap; | ||
|
||
import static java.util.Collections.emptyList; | ||
|
||
public class QuerySearchResultTests extends OpenSearchTestCase { | ||
|
@@ -99,25 +101,36 @@ private static QuerySearchResult createTestInstance() throws Exception { | |
if (randomBoolean()) { | ||
result.aggregations(InternalAggregationsTests.createTestInstance()); | ||
} | ||
assertNull(result.getTookTimeNanos()); | ||
return result; | ||
} | ||
|
||
public void testSerialization() throws Exception { | ||
QuerySearchResult querySearchResult = createTestInstance(); | ||
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new); | ||
assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId()); | ||
assertNull(deserialized.getSearchShardTarget()); | ||
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); | ||
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits); | ||
assertEquals(querySearchResult.from(), deserialized.from()); | ||
assertEquals(querySearchResult.size(), deserialized.size()); | ||
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); | ||
if (deserialized.hasAggs()) { | ||
Aggregations aggs = querySearchResult.consumeAggs().expand(); | ||
Aggregations deserializedAggs = deserialized.consumeAggs().expand(); | ||
assertEquals(aggs.asList(), deserializedAggs.asList()); | ||
HashMap<Boolean, Long> expectedValues = new HashMap<>(); // map contains whether to set took time, and if so, to what value | ||
expectedValues.put(false, null); | ||
expectedValues.put(true, 1000L); | ||
for (Boolean doSetTookTime : expectedValues.keySet()) { | ||
QuerySearchResult querySearchResult = createTestInstance(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can instead update the |
||
if (doSetTookTime) { | ||
querySearchResult.setTookTimeNanos(expectedValues.get(doSetTookTime)); | ||
} | ||
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new); | ||
assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId()); | ||
assertNull(deserialized.getSearchShardTarget()); | ||
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); | ||
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits); | ||
assertEquals(querySearchResult.from(), deserialized.from()); | ||
assertEquals(querySearchResult.size(), deserialized.size()); | ||
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); | ||
if (deserialized.hasAggs()) { | ||
Aggregations aggs = querySearchResult.consumeAggs().expand(); | ||
Aggregations deserializedAggs = deserialized.consumeAggs().expand(); | ||
assertEquals(aggs.asList(), deserializedAggs.asList()); | ||
} | ||
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); | ||
assertEquals(querySearchResult.getTookTimeNanos(), deserialized.getTookTimeNanos()); | ||
assertEquals(expectedValues.get(doSetTookTime), querySearchResult.getTookTimeNanos()); | ||
} | ||
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); | ||
} | ||
|
||
public void testNullResponse() throws Exception { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @peteralfonsi, could you please evaluate the use of Timer class for this change? I'd like to hear your thoughts on whether utilizing the existing Timer class is a feasible choice. Thank you.