Skip to content

Commit

Permalink
Use reader attributes to control term dict memory useage (#42838)
Browse files Browse the repository at this point in the history
This change makes use of the reader attributes added in LUCENE-8671
to ensure that `_id` fields are always on-heap for best update performance
and term dicts are generally off-heap on Read-Only engines.

Closes #38390
  • Loading branch information
s1monw committed Jun 5, 2019
1 parent 795fa81 commit 4dd5be8
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 210 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -42,7 +44,9 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
Expand Down Expand Up @@ -77,6 +81,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -2140,10 +2145,21 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
}
}

static Map<String, String> getReaderAttributes(Directory directory) {
Directory unwrap = FilterDirectory.unwrap(directory);
boolean defaultOffHeap = FsDirectoryFactory.isHybridFs(unwrap) || unwrap instanceof MMapDirectory;
return Map.of(
BlockTreeTermsReader.FST_MODE_KEY, // if we are using MMAP for term dics we force all off heap unless it's the ID field
defaultOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name()
, BlockTreeTermsReader.FST_MODE_KEY + "." + IdFieldMapper.NAME, // always force ID field on-heap for fast updates
FSTLoadMode.ON_HEAP.name());
}

private IndexWriterConfig getIndexWriterConfig() {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setReaderAttributes(getReaderAttributes(store.directory()));
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
this.stats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory)) {
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
Expand Down Expand Up @@ -47,7 +48,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -62,6 +65,12 @@
*/
public class ReadOnlyEngine extends Engine {

/**
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
* ID field.
*/
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name());
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
private final TranslogStats translogStats;
Expand Down Expand Up @@ -165,7 +174,7 @@ protected final DirectoryReader wrapReader(DirectoryReader reader,
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
return DirectoryReader.open(commit);
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FileSwitchDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.LockFactory;
Expand Down Expand Up @@ -121,6 +122,14 @@ public String[] listAll() throws IOException {
return directory;
}

/**
* Returns true iff the directory is a hybrid fs directory
*/
public static boolean isHybridFs(Directory directory) {
Directory unwrap = FilterDirectory.unwrap(directory);
return unwrap instanceof HybridDirectory;
}

static final class HybridDirectory extends NIOFSDirectory {
private final FSDirectory randomAccessDirectory;

Expand Down
147 changes: 3 additions & 144 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteBufferIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -98,7 +96,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -137,7 +134,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* this by exploiting lucene internals and wrapping the IndexInput in a simple delegate.
*/
public static final Setting<Boolean> FORCE_RAM_TERM_DICT = Setting.boolSetting("index.force_memory_term_dictionary", false,
Property.IndexScope);
Property.IndexScope, Property.Deprecated);
static final String CODEC = "store";
static final int VERSION_WRITE_THROWABLE= 2; // we write throwable since 2.0
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
Expand Down Expand Up @@ -172,8 +169,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId),
indexSettings.getValue(FORCE_RAM_TERM_DICT));
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;

Expand Down Expand Up @@ -712,12 +708,10 @@ public int refCount() {
static final class StoreDirectory extends FilterDirectory {

private final Logger deletesLogger;
private final boolean forceRamTermDict;

StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger, boolean forceRamTermDict) {
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
super(delegateDirectory);
this.deletesLogger = deletesLogger;
this.forceRamTermDict = forceRamTermDict;
}

/** Estimate the cumulative size of all files in this directory in bytes. */
Expand All @@ -744,18 +738,6 @@ private void innerClose() throws IOException {
super.close();
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput input = super.openInput(name, context);
if (name.endsWith(".tip") || name.endsWith(".cfs")) {
// only do this if we are reading cfs or tip file - all other files don't need this.
if (forceRamTermDict && input instanceof ByteBufferIndexInput) {
return new DeoptimizingIndexInput(input.toString(), input);
}
}
return input;
}

@Override
public String toString() {
return "store(" + in.toString() + ")";
Expand Down Expand Up @@ -1614,127 +1596,4 @@ private static IndexWriterConfig newIndexWriterConfig() {
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE);
}

/**
* see {@link #FORCE_RAM_TERM_DICT} for details
*/
private static final class DeoptimizingIndexInput extends IndexInput {

private final IndexInput in;

private DeoptimizingIndexInput(String resourceDescription, IndexInput in) {
super(resourceDescription);
this.in = in;
}

@Override
public IndexInput clone() {
return new DeoptimizingIndexInput(toString(), in.clone());
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public long getFilePointer() {
return in.getFilePointer();
}

@Override
public void seek(long pos) throws IOException {
in.seek(pos);
}

@Override
public long length() {
return in.length();
}

@Override
public String toString() {
return in.toString();
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return new DeoptimizingIndexInput(sliceDescription, in.slice(sliceDescription, offset, length));
}

@Override
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
return in.randomAccessSlice(offset, length);
}

@Override
public byte readByte() throws IOException {
return in.readByte();
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}

@Override
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
in.readBytes(b, offset, len, useBuffer);
}

@Override
public short readShort() throws IOException {
return in.readShort();
}

@Override
public int readInt() throws IOException {
return in.readInt();
}

@Override
public int readVInt() throws IOException {
return in.readVInt();
}

@Override
public int readZInt() throws IOException {
return in.readZInt();
}

@Override
public long readLong() throws IOException {
return in.readLong();
}

@Override
public long readVLong() throws IOException {
return in.readVLong();
}

@Override
public long readZLong() throws IOException {
return in.readZLong();
}

@Override
public String readString() throws IOException {
return in.readString();
}

@Override
public Map<String, String> readMapOfStrings() throws IOException {
return in.readMapOfStrings();
}

@Override
public Set<String> readSetOfStrings() throws IOException {
return in.readSetOfStrings();
}

@Override
public void skipBytes(long numBytes) throws IOException {
in.skipBytes(numBytes);
}
}
}
Loading

0 comments on commit 4dd5be8

Please sign in to comment.