Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tiered Caching] Adding took time for QuerySearchResult #10510

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ public void preProcess(SearchContext context) {
}

public void execute(SearchContext searchContext) throws QueryPhaseExecutionException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @peteralfonsi, could you please evaluate the use of Timer class for this change? I'd like to hear your thoughts on whether utilizing the existing Timer class is a feasible choice. Thank you.

final long startTime = System.nanoTime();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry missed this earlier. Given we don't report this as stats anywhere, wondering if millis suffice for our use case. Nano is generally much more expensive:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need an accurate elapsed time for queryPhase so that we can take decisions based on that. So we need nanos for that instead of millis.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per my understanding, since we need this time for evaluating whether to store result on disk or not, and disk access can be few ms, I don't completely understand the reason for using nanos instead of millis. Although it should be fine to use nanos as well, just want to ensure my understanding is not lacking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not just about whether disk access would be in few ms, System.currentTimeMilis() is tied to System clock, so using this might be a bit error prone. While System.nanoTime measures time relative to an arbitrary point in time and not affected by System clock skewness. Elapsed time is calculated using nanoTime across OpenSearch while millis used to display human readable dates to users.

if (searchContext.hasOnlySuggest()) {
suggestProcessor.process(searchContext);
searchContext.queryResult()
.topDocs(
new TopDocsAndMaxScore(new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN),
new DocValueFormat[0]
);
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
return;
}

Expand Down Expand Up @@ -165,6 +167,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
);
searchContext.queryResult().profileResults(shardResults);
}
searchContext.queryResult().setTookTimeNanos(System.nanoTime() - startTime);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of questions:

  1. The query phase time is also computed in SearchOperationListenerExecutor but I see that you need it in QueryResult which is stored in cache before above records the time.
  2. If we store this time in QueryResult then while eviction it will need to deserialize each evicted value to get the tookTime and make decision of keeping it in disk tier or not. Wondering if the cache should have it as a separate wrapped object of QueryResult instead, to make it easily available ? Is there a need to output this value in the QueryResult ? If yes, then will probably use the value computed by SearchOperationListenerExecutor to keep it consistent with search stats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1), I wasn't aware of that - let me see if it's doable to get that stored time into the QueryResult.
For 2), in my tiered caching policies PR that should hopefully get raised soon, there is a separate wrapper with just the important info to decide whether to add a result to a disk tier. This gets written first, before the actual result, so when it decides it can just read that wrapper and doesn't have to spend time deserializing the whole result. I will see if i can get the other value into there instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 2, not sure if I understood correctly. If you already have a wrapper concept, then that wrapper can hold to the computed tookTime and the computation can be done here instead. That way QueryResult can be kept agnostic of tookTime

}

// making public for testing
Expand Down Expand Up @@ -292,7 +295,6 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q
queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize());
queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA());
}

return shouldRescore;
} finally {
// Search phase has finished, no longer need to check for timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TotalHits;
import org.opensearch.Version;
import org.opensearch.common.io.stream.DelayableWriteable;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.core.common.io.stream.StreamInput;
Expand Down Expand Up @@ -87,6 +88,7 @@ public final class QuerySearchResult extends SearchPhaseResult {
private int nodeQueueSize = -1;

private final boolean isNull;
private Long tookTimeNanos = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally the tookTimeNanos should also be final, since we never modify it once set. Although, not sure how we can communicate that intent while invoking setTookTimeNanos from QueryPhase

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can define this field with SetOnce


public QuerySearchResult() {
this(false);
Expand Down Expand Up @@ -364,6 +366,11 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc
nodeQueueSize = in.readInt();
setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new));
setRescoreDocIds(new RescoreDocIds(in));
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
tookTimeNanos = in.readOptionalLong();
} else {
tookTimeNanos = null;
}
}

@Override
Expand Down Expand Up @@ -406,6 +413,9 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeInt(nodeQueueSize);
out.writeOptionalWriteable(getShardSearchRequest());
getRescoreDocIds().writeTo(out);
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalLong(tookTimeNanos);
}
}

public TotalHits getTotalHits() {
Expand All @@ -415,4 +425,12 @@ public TotalHits getTotalHits() {
public float getMaxScore() {
return maxScore;
}

public Long getTookTimeNanos() {
return tookTimeNanos;
}

void setTookTimeNanos(long tookTime) {
tookTimeNanos = tookTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.Strings;
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
Expand All @@ -103,9 +108,11 @@
import org.opensearch.lucene.queries.MinDocQuery;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.collapse.CollapseBuilder;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.ScrollContext;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchRequest;
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.test.TestSearchContext;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -115,6 +122,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -1145,6 +1153,114 @@ public void testQueryTimeoutChecker() throws Exception {
createTimeoutCheckerThenWaitThenRun(timeCacheLifespan / 4, timeCacheLifespan / 2 + timeTolerance, false, true);
}

public void testQuerySearchResultTookTime() throws IOException {
int sleepMillis = randomIntBetween(10, 100); // between 0.01 and 0.1 sec
DelayedQueryPhaseSearcher delayedQueryPhaseSearcher = new DelayedQueryPhaseSearcher(sleepMillis);

// we need to test queryPhase.execute(), not executeInternal(), since that's what the timer wraps around
// for that we must set up a searchContext with more functionality than the TestSearchContext,
// which requires a bit of complexity with test classes

Directory dir = newDirectory();
final Sort sort = new Sort(new SortField("rank", SortField.Type.INT));
IndexWriterConfig iwc = newIndexWriterConfig().setIndexSort(sort);
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
Document doc = new Document();
for (int i = 0; i < 10; i++) {
doc.add(new StringField("foo", Integer.toString(i), Store.NO));
}
w.addDocument(doc);
w.close();
IndexReader reader = DirectoryReader.open(dir);

QueryShardContext queryShardContext = mock(QueryShardContext.class);
when(queryShardContext.fieldMapper("user")).thenReturn(
new NumberFieldType("user", NumberType.INTEGER, true, false, true, false, null, Collections.emptyMap())
);

Index index = new Index("IndexName", "UUID");
ShardId shardId = new ShardId(index, 0);
long nowInMillis = System.currentTimeMillis();
String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10);
SearchRequest searchRequest = new SearchRequest();
searchRequest.allowPartialSearchResults(randomBoolean());
ShardSearchRequest request = new ShardSearchRequest(
OriginalIndices.NONE,
searchRequest,
shardId,
1,
AliasFilter.EMPTY,
1f,
nowInMillis,
clusterAlias,
Strings.EMPTY_ARRAY
);
TestSearchContextWithRequest searchContext = new TestSearchContextWithRequest(
queryShardContext,
indexShard,
newEarlyTerminationContextSearcher(reader, 0, executor),
request
);

QueryPhase queryPhase = new QueryPhase(delayedQueryPhaseSearcher);
queryPhase.execute(searchContext);
Long tookTime = searchContext.queryResult().getTookTimeNanos();
assertTrue(tookTime >= (long) sleepMillis * 1000000);
reader.close();
dir.close();
}

private class TestSearchContextWithRequest extends TestSearchContext {
ShardSearchRequest request;
Query query;

public TestSearchContextWithRequest(
QueryShardContext queryShardContext,
IndexShard indexShard,
ContextIndexSearcher searcher,
ShardSearchRequest request
) {
super(queryShardContext, indexShard, searcher);
this.request = request;
this.query = new TermQuery(new Term("foo", "bar"));
}

@Override
public ShardSearchRequest request() {
return request;
}

@Override
public Query query() {
return this.query;
}
}

private class DelayedQueryPhaseSearcher extends QueryPhase.DefaultQueryPhaseSearcher implements QueryPhaseSearcher {
// add delay into searchWith
private final int sleepMillis;

public DelayedQueryPhaseSearcher(int sleepMillis) {
super();
this.sleepMillis = sleepMillis;
}

@Override
public boolean searchWith(
SearchContext searchContext,
ContextIndexSearcher searcher,
Query query,
LinkedList<QueryCollectorContext> collectors,
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException {
try {
Thread.sleep(sleepMillis);
} catch (Exception ignored) {}
return super.searchWith(searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout);
Comment on lines +1257 to +1260
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be written as try finally block?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I put the return statement in a finally block, it creates a warning which causes ./gradlew precommit to fail.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! In general, it is bad coding practice as it can eat exceptions, although here we intentionally doing that.

}
}

private void createTimeoutCheckerThenWaitThenRun(
long timeout,
long sleepAfterCreation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.opensearch.search.suggest.SuggestTests;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;

import static java.util.Collections.emptyList;

public class QuerySearchResultTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -99,25 +101,36 @@ private static QuerySearchResult createTestInstance() throws Exception {
if (randomBoolean()) {
result.aggregations(InternalAggregationsTests.createTestInstance());
}
assertNull(result.getTookTimeNanos());
return result;
}

public void testSerialization() throws Exception {
QuerySearchResult querySearchResult = createTestInstance();
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new);
assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
assertEquals(querySearchResult.from(), deserialized.from());
assertEquals(querySearchResult.size(), deserialized.size());
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
if (deserialized.hasAggs()) {
Aggregations aggs = querySearchResult.consumeAggs().expand();
Aggregations deserializedAggs = deserialized.consumeAggs().expand();
assertEquals(aggs.asList(), deserializedAggs.asList());
HashMap<Boolean, Long> expectedValues = new HashMap<>(); // map contains whether to set took time, and if so, to what value
expectedValues.put(false, null);
expectedValues.put(true, 1000L);
for (Boolean doSetTookTime : expectedValues.keySet()) {
QuerySearchResult querySearchResult = createTestInstance();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can instead update the createTestInstance to randomly add the tookTime and then assert on created instance tookTime with the took time in deserialized one.

if (doSetTookTime) {
querySearchResult.setTookTimeNanos(expectedValues.get(doSetTookTime));
}
QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new);
assertEquals(querySearchResult.getContextId().getId(), deserialized.getContextId().getId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
assertEquals(querySearchResult.from(), deserialized.from());
assertEquals(querySearchResult.size(), deserialized.size());
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
if (deserialized.hasAggs()) {
Aggregations aggs = querySearchResult.consumeAggs().expand();
Aggregations deserializedAggs = deserialized.consumeAggs().expand();
assertEquals(aggs.asList(), deserializedAggs.asList());
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
assertEquals(querySearchResult.getTookTimeNanos(), deserialized.getTookTimeNanos());
assertEquals(expectedValues.get(doSetTookTime), querySearchResult.getTookTimeNanos());
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
}

public void testNullResponse() throws Exception {
Expand Down
Loading