Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use reader attributes to control term dict memory useage #42838

Merged
merged 9 commits into from
Jun 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.index.engine;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
Expand All @@ -42,7 +44,9 @@
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.InfoStream;
import org.elasticsearch.Assertions;
Expand Down Expand Up @@ -77,6 +81,7 @@
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.FsDirectoryFactory;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogConfig;
Expand Down Expand Up @@ -2140,10 +2145,21 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
}
}

static Map<String, String> getReaderAttributes(Directory directory) {
Directory unwrap = FilterDirectory.unwrap(directory);
boolean defaultOffHeap = FsDirectoryFactory.isHybridFs(unwrap) || unwrap instanceof MMapDirectory;
return Map.of(
BlockTreeTermsReader.FST_MODE_KEY, // if we are using MMAP for term dics we force all off heap unless it's the ID field
defaultOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name()
, BlockTreeTermsReader.FST_MODE_KEY + "." + IdFieldMapper.NAME, // always force ID field on-heap for fast updates
FSTLoadMode.ON_HEAP.name());
}

private IndexWriterConfig getIndexWriterConfig() {
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
iwc.setCommitOnClose(false); // we by default don't commit on close
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
iwc.setReaderAttributes(getReaderAttributes(store.directory()));
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
boolean verbose = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public NoOpEngine(EngineConfig config) {
super(config, null, null, true, Function.identity());
this.stats = new SegmentsStats();
Directory directory = store.directory();
try (DirectoryReader reader = DirectoryReader.open(directory)) {
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
for (LeafReaderContext ctx : reader.getContext().leaves()) {
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
fillSegmentStats(segmentReader, true, stats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.engine;

import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexReader;
Expand Down Expand Up @@ -47,7 +48,9 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand All @@ -62,6 +65,12 @@
*/
public class ReadOnlyEngine extends Engine {

/**
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
* ID field.
*/
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name());
private final SegmentInfos lastCommittedSegmentInfos;
private final SeqNoStats seqNoStats;
private final TranslogStats translogStats;
Expand Down Expand Up @@ -165,7 +174,7 @@ protected final DirectoryReader wrapReader(DirectoryReader reader,
}

protected DirectoryReader open(IndexCommit commit) throws IOException {
return DirectoryReader.open(commit);
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
}

private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FileSwitchDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.LockFactory;
Expand Down Expand Up @@ -121,6 +122,14 @@ public String[] listAll() throws IOException {
return directory;
}

/**
* Returns true iff the directory is a hybrid fs directory
*/
public static boolean isHybridFs(Directory directory) {
Directory unwrap = FilterDirectory.unwrap(directory);
return unwrap instanceof HybridDirectory;
}

static final class HybridDirectory extends NIOFSDirectory {
private final FSDirectory randomAccessDirectory;

Expand Down
147 changes: 3 additions & 144 deletions server/src/main/java/org/elasticsearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,13 @@
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteBufferIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.Lock;
import org.apache.lucene.store.RandomAccessInput;
import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
Expand Down Expand Up @@ -98,7 +96,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -137,7 +134,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
* this by exploiting lucene internals and wrapping the IndexInput in a simple delegate.
*/
public static final Setting<Boolean> FORCE_RAM_TERM_DICT = Setting.boolSetting("index.force_memory_term_dictionary", false,
Property.IndexScope);
Property.IndexScope, Property.Deprecated);
static final String CODEC = "store";
static final int VERSION_WRITE_THROWABLE= 2; // we write throwable since 2.0
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
Expand Down Expand Up @@ -172,8 +169,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId),
indexSettings.getValue(FORCE_RAM_TERM_DICT));
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;

Expand Down Expand Up @@ -712,12 +708,10 @@ public int refCount() {
static final class StoreDirectory extends FilterDirectory {

private final Logger deletesLogger;
private final boolean forceRamTermDict;

StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger, boolean forceRamTermDict) {
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
super(delegateDirectory);
this.deletesLogger = deletesLogger;
this.forceRamTermDict = forceRamTermDict;
}

/** Estimate the cumulative size of all files in this directory in bytes. */
Expand All @@ -744,18 +738,6 @@ private void innerClose() throws IOException {
super.close();
}

@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
IndexInput input = super.openInput(name, context);
if (name.endsWith(".tip") || name.endsWith(".cfs")) {
// only do this if we are reading cfs or tip file - all other files don't need this.
if (forceRamTermDict && input instanceof ByteBufferIndexInput) {
return new DeoptimizingIndexInput(input.toString(), input);
}
}
return input;
}

@Override
public String toString() {
return "store(" + in.toString() + ")";
Expand Down Expand Up @@ -1614,127 +1596,4 @@ private static IndexWriterConfig newIndexWriterConfig() {
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE);
}

/**
* see {@link #FORCE_RAM_TERM_DICT} for details
*/
private static final class DeoptimizingIndexInput extends IndexInput {

private final IndexInput in;

private DeoptimizingIndexInput(String resourceDescription, IndexInput in) {
super(resourceDescription);
this.in = in;
}

@Override
public IndexInput clone() {
return new DeoptimizingIndexInput(toString(), in.clone());
}

@Override
public void close() throws IOException {
in.close();
}

@Override
public long getFilePointer() {
return in.getFilePointer();
}

@Override
public void seek(long pos) throws IOException {
in.seek(pos);
}

@Override
public long length() {
return in.length();
}

@Override
public String toString() {
return in.toString();
}

@Override
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
return new DeoptimizingIndexInput(sliceDescription, in.slice(sliceDescription, offset, length));
}

@Override
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
return in.randomAccessSlice(offset, length);
}

@Override
public byte readByte() throws IOException {
return in.readByte();
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
in.readBytes(b, offset, len);
}

@Override
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
in.readBytes(b, offset, len, useBuffer);
}

@Override
public short readShort() throws IOException {
return in.readShort();
}

@Override
public int readInt() throws IOException {
return in.readInt();
}

@Override
public int readVInt() throws IOException {
return in.readVInt();
}

@Override
public int readZInt() throws IOException {
return in.readZInt();
}

@Override
public long readLong() throws IOException {
return in.readLong();
}

@Override
public long readVLong() throws IOException {
return in.readVLong();
}

@Override
public long readZLong() throws IOException {
return in.readZLong();
}

@Override
public String readString() throws IOException {
return in.readString();
}

@Override
public Map<String, String> readMapOfStrings() throws IOException {
return in.readMapOfStrings();
}

@Override
public Set<String> readSetOfStrings() throws IOException {
return in.readSetOfStrings();
}

@Override
public void skipBytes(long numBytes) throws IOException {
in.skipBytes(numBytes);
}
}
}
Loading