Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prefetching support to stored fields. #13424

Merged
merged 1 commit into from
Jun 3, 2024

Conversation

jpountz
Copy link
Contributor

@jpountz jpountz commented May 27, 2024

This adds StoredFields#prefetch(int), which mostly delegates to IndexInput#prefetch. Callers can take advantage of this API to parallelize I/O across multiple stored documents by first calling StoredFields#prefetch on all doc IDs before calling StoredFields#document on all doc IDs.

I added a cache of recently prefetched blocks to the default codec, in order to avoid prefetching the same block multiple times in a short period of time. This felt sensible given that doc ID reordering via recursive graph bisection or index sorting are likely to result in search results being clustered.

This adds `StoredFields#prefetch(int)`, which mostly delegates to
`IndexInput#prefetch`. Callers can take advantage of this API to parallelize
I/O across multiple stored documents by first calling `StoredFields#prefetch`
on all doc IDs before calling `StoredFields#document` on all doc IDs.

I added a cache of recently prefetched blocks to the default codec, in order to
avoid prefetching the same block multiple times in a short period of time. This
felt sensible given that doc ID reordering via recursive graph bisection or
index sorting are likely to result in search results being clustered.
@jpountz jpountz added this to the 10.0.0 milestone May 27, 2024
@jpountz jpountz mentioned this pull request May 27, 2024
9 tasks
@jpountz
Copy link
Contributor Author

jpountz commented May 27, 2024

Like for previous changes, I wrote a synthetic benchmark to make sure that this new API actually helps.

This benchmark simulates fetching 20 random stored documents in parallel. The index it creates is 39GB while my page cache only has a capacity of 25GB.
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FilterMergePolicy;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.StoredFields;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;

public class StoredFieldsPrefetchBench {

  public static int DUMMY;

  public static void main(String[] args) throws Exception {
    Path dirPath = Paths.get(args[0]);
    Directory dir = FSDirectory.open(dirPath);
    if (DirectoryReader.indexExists(dir) == false) {
      MergePolicy mergePolicy = new FilterMergePolicy(new TieredMergePolicy()) {
        @Override
        public boolean useCompoundFile(SegmentInfos infos, SegmentCommitInfo mergedInfo, MergeContext mergeContext)
            throws IOException {
          return false;
        }
      };
      try (IndexWriter w = new IndexWriter(dir, new IndexWriterConfig().setUseCompoundFile(false).setMergePolicy(mergePolicy))) {
        ExecutorService executor = Executors.newFixedThreadPool(4);
        AtomicLong indexed = new AtomicLong(0);
        for (int task = 0; task < 1000; ++task) {
          executor.execute(() -> {
            Random r = ThreadLocalRandom.current();
            for (int i = 0; i < 40_000; ++i) {
              Document doc = new Document();
              byte[] bytes = new byte[1024];
              r.nextBytes(bytes);
              doc.add(new StoredField("content", bytes));
              try {
                w.addDocument(doc);
              } catch (IOException e) {
                throw new UncheckedIOException(e);
              }
              final long actualIndexed = indexed.incrementAndGet();
              if (actualIndexed % 1_000_000 == 0) {
                System.out.println("Indexed: " + actualIndexed);
                try {
                  DirectoryReader.open(w).close();
                } catch (IOException e) {
                  throw new UncheckedIOException(e);
                }
              }
            }
          });
        }

        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.DAYS);
        w.commit();
        System.out.println("Done indexing");
      }
    }
    List<Long> latencies = new ArrayList<>();
    try (IndexReader reader = DirectoryReader.open(dir)) {

      Random r = ThreadLocalRandom.current();
      for (int i = 0; i < 10_000; ++i) {
        StoredFields storedFields = reader.storedFields();

        long start = System.nanoTime();

        int[] ids = new int[20];
        for (int j = 0; j < ids.length; ++j) {
          ids[j] = r.nextInt(reader.maxDoc());
        }
        for (int doc : ids) {
          storedFields.prefetch(doc);
        }
        for (int doc : ids) {
          DUMMY += storedFields.document(doc).getBinaryValue("content").hashCode();
        }

        long end = System.nanoTime();
        latencies.add((end - start) / 1000);
      }
    }
    latencies.sort(null);
    System.out.println("P50: " + latencies.get(latencies.size() / 2));
    System.out.println("P90: " + latencies.get(latencies.size() * 9 / 10));
    System.out.println("P99: " + latencies.get(latencies.size() * 99 / 100));
  }

}

Before the change:
P50: 2942
P90: 3900
P99: 4726

After the change:
P50: 650
P90: 801
P99: 970

@jpountz jpountz merged commit edd7747 into apache:main Jun 3, 2024
3 checks passed
@jpountz jpountz deleted the prefetch_stored_fields branch June 3, 2024 07:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants