diff --git a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 9fb63d0de019d..e013939374fea 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -20,6 +20,8 @@ package org.elasticsearch.index.engine; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader; +import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode; import org.apache.lucene.document.Field; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DirectoryReader; @@ -42,7 +44,9 @@ import org.apache.lucene.search.TermQuery; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.LockObtainFailedException; +import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.InfoStream; import org.elasticsearch.Assertions; @@ -77,6 +81,7 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.ElasticsearchMergePolicy; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.FsDirectoryFactory; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogConfig; @@ -2140,10 +2145,21 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx } } + static Map getReaderAttributes(Directory directory) { + Directory unwrap = FilterDirectory.unwrap(directory); + boolean defaultOffHeap = FsDirectoryFactory.isHybridFs(unwrap) || unwrap instanceof MMapDirectory; + return Map.of( + BlockTreeTermsReader.FST_MODE_KEY, // if we are using MMAP for term dics we force all off heap unless it's the ID field + defaultOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name() + , BlockTreeTermsReader.FST_MODE_KEY + "." + IdFieldMapper.NAME, // always force ID field on-heap for fast updates + FSTLoadMode.ON_HEAP.name()); + } + private IndexWriterConfig getIndexWriterConfig() { final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer()); iwc.setCommitOnClose(false); // we by default don't commit on close iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND); + iwc.setReaderAttributes(getReaderAttributes(store.directory())); iwc.setIndexDeletionPolicy(combinedDeletionPolicy); // with tests.verbose, lucene sets this up: plumb to align with filesystem stream boolean verbose = false; diff --git a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java index a41f07e994b93..7f474d1be24c7 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java @@ -46,7 +46,7 @@ public NoOpEngine(EngineConfig config) { super(config, null, null, true, Function.identity()); this.stats = new SegmentsStats(); Directory directory = store.directory(); - try (DirectoryReader reader = DirectoryReader.open(directory)) { + try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) { for (LeafReaderContext ctx : reader.getContext().leaves()) { SegmentReader segmentReader = Lucene.segmentReader(ctx.reader()); fillSegmentStats(segmentReader, true, stats); diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 9d5f6054243e4..5cd3747286e74 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.engine; +import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; @@ -47,7 +48,9 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.function.BiFunction; import java.util.function.Function; @@ -62,6 +65,12 @@ */ public class ReadOnlyEngine extends Engine { + /** + * Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an + * ID field. + */ + public static final Map OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, + BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()); private final SegmentInfos lastCommittedSegmentInfos; private final SeqNoStats seqNoStats; private final TranslogStats translogStats; @@ -165,7 +174,7 @@ protected final DirectoryReader wrapReader(DirectoryReader reader, } protected DirectoryReader open(IndexCommit commit) throws IOException { - return DirectoryReader.open(commit); + return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES); } private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) { diff --git a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java index 84bb4c49b27d4..c86206763caab 100644 --- a/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java +++ b/server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FileSwitchDirectory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.LockFactory; @@ -121,6 +122,14 @@ public String[] listAll() throws IOException { return directory; } + /** + * Returns true iff the directory is a hybrid fs directory + */ + public static boolean isHybridFs(Directory directory) { + Directory unwrap = FilterDirectory.unwrap(directory); + return unwrap instanceof HybridDirectory; + } + static final class HybridDirectory extends NIOFSDirectory { private final FSDirectory randomAccessDirectory; diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index 5f1f7d23a8c6a..410774114bd78 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -38,7 +38,6 @@ import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedChecksum; import org.apache.lucene.store.ByteArrayDataInput; -import org.apache.lucene.store.ByteBufferIndexInput; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -46,7 +45,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.Lock; -import org.apache.lucene.store.RandomAccessInput; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; @@ -98,7 +96,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -137,7 +134,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref * this by exploiting lucene internals and wrapping the IndexInput in a simple delegate. */ public static final Setting FORCE_RAM_TERM_DICT = Setting.boolSetting("index.force_memory_term_dictionary", false, - Property.IndexScope); + Property.IndexScope, Property.Deprecated); static final String CODEC = "store"; static final int VERSION_WRITE_THROWABLE= 2; // we write throwable since 2.0 static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0 @@ -172,8 +169,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory, final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING); logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval); ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval); - this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId), - indexSettings.getValue(FORCE_RAM_TERM_DICT)); + this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId)); this.shardLock = shardLock; this.onClose = onClose; @@ -712,12 +708,10 @@ public int refCount() { static final class StoreDirectory extends FilterDirectory { private final Logger deletesLogger; - private final boolean forceRamTermDict; - StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger, boolean forceRamTermDict) { + StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) { super(delegateDirectory); this.deletesLogger = deletesLogger; - this.forceRamTermDict = forceRamTermDict; } /** Estimate the cumulative size of all files in this directory in bytes. */ @@ -744,18 +738,6 @@ private void innerClose() throws IOException { super.close(); } - @Override - public IndexInput openInput(String name, IOContext context) throws IOException { - IndexInput input = super.openInput(name, context); - if (name.endsWith(".tip") || name.endsWith(".cfs")) { - // only do this if we are reading cfs or tip file - all other files don't need this. - if (forceRamTermDict && input instanceof ByteBufferIndexInput) { - return new DeoptimizingIndexInput(input.toString(), input); - } - } - return input; - } - @Override public String toString() { return "store(" + in.toString() + ")"; @@ -1614,127 +1596,4 @@ private static IndexWriterConfig newIndexWriterConfig() { // we also don't specify a codec here and merges should use the engines for this index .setMergePolicy(NoMergePolicy.INSTANCE); } - - /** - * see {@link #FORCE_RAM_TERM_DICT} for details - */ - private static final class DeoptimizingIndexInput extends IndexInput { - - private final IndexInput in; - - private DeoptimizingIndexInput(String resourceDescription, IndexInput in) { - super(resourceDescription); - this.in = in; - } - - @Override - public IndexInput clone() { - return new DeoptimizingIndexInput(toString(), in.clone()); - } - - @Override - public void close() throws IOException { - in.close(); - } - - @Override - public long getFilePointer() { - return in.getFilePointer(); - } - - @Override - public void seek(long pos) throws IOException { - in.seek(pos); - } - - @Override - public long length() { - return in.length(); - } - - @Override - public String toString() { - return in.toString(); - } - - @Override - public IndexInput slice(String sliceDescription, long offset, long length) throws IOException { - return new DeoptimizingIndexInput(sliceDescription, in.slice(sliceDescription, offset, length)); - } - - @Override - public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException { - return in.randomAccessSlice(offset, length); - } - - @Override - public byte readByte() throws IOException { - return in.readByte(); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - in.readBytes(b, offset, len); - } - - @Override - public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException { - in.readBytes(b, offset, len, useBuffer); - } - - @Override - public short readShort() throws IOException { - return in.readShort(); - } - - @Override - public int readInt() throws IOException { - return in.readInt(); - } - - @Override - public int readVInt() throws IOException { - return in.readVInt(); - } - - @Override - public int readZInt() throws IOException { - return in.readZInt(); - } - - @Override - public long readLong() throws IOException { - return in.readLong(); - } - - @Override - public long readVLong() throws IOException { - return in.readVLong(); - } - - @Override - public long readZLong() throws IOException { - return in.readZLong(); - } - - @Override - public String readString() throws IOException { - return in.readString(); - } - - @Override - public Map readMapOfStrings() throws IOException { - return in.readMapOfStrings(); - } - - @Override - public Set readSetOfStrings() throws IOException { - return in.readSetOfStrings(); - } - - @Override - public void skipBytes(long numBytes) throws IOException { - in.skipBytes(numBytes); - } - } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index b213da097ce5e..ce64111ecec2b 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -61,8 +61,11 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHitCountCollector; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.store.BaseDirectoryWrapper; import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.Lock; +import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; @@ -98,6 +101,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; import org.elasticsearch.index.codec.CodecService; @@ -123,7 +127,9 @@ import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.index.shard.ShardUtils; +import org.elasticsearch.index.store.FsDirectoryFactory; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.translog.SnapshotMatchers; import org.elasticsearch.index.translog.Translog; @@ -5678,4 +5684,59 @@ public void testRefreshAndFailEngineConcurrently() throws Exception { } assertThat(engine.config().getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING).getUsed(), equalTo(0L)); } + + public void testGetReaderAttributes() throws IOException { + try(BaseDirectoryWrapper dir = newFSDirectory(createTempDir())) { + Directory unwrap = FilterDirectory.unwrap(dir); + boolean isMMap = unwrap instanceof MMapDirectory; + Map readerAttributes = InternalEngine.getReaderAttributes(dir); + assertEquals(2, readerAttributes.size()); + assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id")); + if (isMMap) { + assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst")); + } else { + assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst")); + } + } + + try(MMapDirectory dir = new MMapDirectory(createTempDir())) { + Map readerAttributes = + InternalEngine.getReaderAttributes(randomBoolean() ? dir : + new MockDirectoryWrapper(random(), dir)); + assertEquals(2, readerAttributes.size()); + assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id")); + assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst")); + } + + Settings.Builder settingsBuilder = Settings.builder() + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); + Settings settings = settingsBuilder.build(); + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("foo", settings); + FsDirectoryFactory service = new FsDirectoryFactory(); + Path tempDir = createTempDir().resolve(indexSettings.getUUID()).resolve("0"); + ShardPath path = new ShardPath(false, tempDir, tempDir, new ShardId(indexSettings.getIndex(), 0)); + try (Directory directory = service.newDirectory(indexSettings, path)) { + + Map readerAttributes = + InternalEngine.getReaderAttributes(randomBoolean() ? directory : + new MockDirectoryWrapper(random(), directory)); + assertEquals(2, readerAttributes.size()); + + switch (IndexModule.defaultStoreType(true)) { + case HYBRIDFS: + case MMAPFS: + assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id")); + assertEquals("OFF_HEAP", readerAttributes.get("blocktree.terms.fst")); + break; + case NIOFS: + case SIMPLEFS: + case FS: + assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst._id")); + assertEquals("ON_HEAP", readerAttributes.get("blocktree.terms.fst")); + break; + default: + fail("unknownw type"); + } + } + } } diff --git a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java index f03500e6e1250..de32e3e43077d 100644 --- a/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java +++ b/server/src/test/java/org/elasticsearch/index/engine/NoOpEngineTests.java @@ -144,7 +144,7 @@ public void testNoOpEngineStats() throws Exception { assertEquals(expectedDocStats.getTotalSizeInBytes(), noOpEngine.docStats().getTotalSizeInBytes()); assertEquals(expectedDocStats.getAverageSizeInBytes(), noOpEngine.docStats().getAverageSizeInBytes()); assertEquals(expectedSegmentStats.getCount(), noOpEngine.segmentsStats(includeFileSize, true).getCount()); - assertEquals(expectedSegmentStats.getMemoryInBytes(), noOpEngine.segmentsStats(includeFileSize, true).getMemoryInBytes()); + // don't compare memory in bytes since we load the index with term-dict off-heap assertEquals(expectedSegmentStats.getFileSizes().size(), noOpEngine.segmentsStats(includeFileSize, true).getFileSizes().size()); diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 9e8fae209dd81..dc2557a1f6e13 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -40,14 +40,12 @@ import org.apache.lucene.index.SnapshotDeletionPolicy; import org.apache.lucene.index.Term; import org.apache.lucene.store.BaseDirectoryWrapper; -import org.apache.lucene.store.ByteBufferIndexInput; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; -import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.RAMDirectory; import org.apache.lucene.util.BytesRef; @@ -1081,48 +1079,6 @@ public void testHistoryUUIDCanBeForced() throws IOException { } } - public void testDeoptimizeMMap() throws IOException { - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("index", - Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .put(Store.FORCE_RAM_TERM_DICT.getKey(), true).build()); - final ShardId shardId = new ShardId("index", "_na_", 1); - String file = "test." + (randomBoolean() ? "tip" : "cfs"); - try (Store store = new Store(shardId, indexSettings, new MMapDirectory(createTempDir()), new DummyShardLock(shardId))) { - try (IndexOutput output = store.directory().createOutput(file, IOContext.DEFAULT)) { - output.writeInt(0); - } - try (IndexOutput output = store.directory().createOutput("someOtherFile.txt", IOContext.DEFAULT)) { - output.writeInt(0); - } - try (IndexInput input = store.directory().openInput(file, IOContext.DEFAULT)) { - assertFalse(input instanceof ByteBufferIndexInput); - assertFalse(input.clone() instanceof ByteBufferIndexInput); - assertFalse(input.slice("foo", 1, 1) instanceof ByteBufferIndexInput); - } - - try (IndexInput input = store.directory().openInput("someOtherFile.txt", IOContext.DEFAULT)) { - assertTrue(input instanceof ByteBufferIndexInput); - assertTrue(input.clone() instanceof ByteBufferIndexInput); - assertTrue(input.slice("foo", 1, 1) instanceof ByteBufferIndexInput); - } - } - - indexSettings = IndexSettingsModule.newIndexSettings("index", - Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT) - .put(Store.FORCE_RAM_TERM_DICT.getKey(), false).build()); - - try (Store store = new Store(shardId, indexSettings, new MMapDirectory(createTempDir()), new DummyShardLock(shardId))) { - try (IndexOutput output = store.directory().createOutput(file, IOContext.DEFAULT)) { - output.writeInt(0); - } - try (IndexInput input = store.directory().openInput(file, IOContext.DEFAULT)) { - assertTrue(input instanceof ByteBufferIndexInput); - assertTrue(input.clone() instanceof ByteBufferIndexInput); - assertTrue(input.slice("foo", 1, 1) instanceof ByteBufferIndexInput); - } - } - } - public void testGetPendingFiles() throws IOException { final ShardId shardId = new ShardId("index", "_na_", 1); final String testfile = "testfile"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 50f1125b275f1..631bd0b9ef9d2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -78,7 +78,7 @@ public FrozenEngine(EngineConfig config) { boolean success = false; Directory directory = store.directory(); - try (DirectoryReader reader = DirectoryReader.open(directory)) { + try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) { canMatchReader = ElasticsearchDirectoryReader.wrap(new RewriteCachingDirectoryReader(directory, reader.leaves()), config.getShardId()); // we record the segment stats here - that's what the reader needs when it's open and it give the user @@ -168,7 +168,7 @@ private synchronized DirectoryReader getOrOpenReader() throws IOException { for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { listeners.beforeRefresh(); } - reader = DirectoryReader.open(engineConfig.getStore().directory()); + reader = DirectoryReader.open(engineConfig.getStore().directory(), OFF_HEAP_READER_ATTRIBUTES); processReader(reader); reader = lastOpenedReader = wrapReader(reader, Function.identity()); reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java index 102bcde9dc77b..2129b14993d36 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshot.java @@ -38,6 +38,7 @@ import org.apache.lucene.util.FixedBitSet; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.engine.ReadOnlyEngine; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -80,7 +81,8 @@ public synchronized List syncSnapshot(IndexCommit commit) throws IOExcep List createdFiles = new ArrayList<>(); String segmentFileName; try (Lock writeLock = targetDirectory.obtainLock(IndexWriter.WRITE_LOCK_NAME); - StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit)) { + StandardDirectoryReader reader = (StandardDirectoryReader) DirectoryReader.open(commit, + ReadOnlyEngine.OFF_HEAP_READER_ATTRIBUTES)) { SegmentInfos segmentInfos = reader.getSegmentInfos().clone(); DirectoryReader wrappedReader = wrapReader(reader); List newInfos = new ArrayList<>(); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index bb5819e1bda43..b2fb833f34bb9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -135,7 +135,7 @@ protected void closeInternal() { final long maxDoc = segmentInfos.totalMaxDoc(); tempStore.bootstrapNewHistory(maxDoc, maxDoc); store.incRef(); - try (DirectoryReader reader = DirectoryReader.open(tempStore.directory())) { + try (DirectoryReader reader = DirectoryReader.open(tempStore.directory(), ReadOnlyEngine.OFF_HEAP_READER_ATTRIBUTES)) { IndexCommit indexCommit = reader.getIndexCommit(); super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus); } finally { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 17019b0ac18cf..4cf21cfbecd19 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -19,6 +19,7 @@ import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.seqno.SequenceNumbers; +import org.elasticsearch.index.shard.DocsStats; import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -146,12 +147,17 @@ public void testCircuitBreakerAccounting() throws IOException { null, listener, null, globalCheckpoint::get, new HierarchyCircuitBreakerService(defaultSettings.getSettings(), new ClusterSettings(defaultSettings.getNodeSettings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))); CircuitBreaker breaker = config.getCircuitBreakerService().getBreaker(CircuitBreaker.ACCOUNTING); - long expectedUse; + final int docs; try (InternalEngine engine = createEngine(config)) { - addDocuments(globalCheckpoint, engine); + docs = addDocuments(globalCheckpoint, engine); engine.flush(false, true); // first flush to make sure we have a commit that we open in the frozen engine blow. engine.refresh("test"); // pull the reader to account for RAM in the breaker. + } + final long expectedUse; + try (ReadOnlyEngine readOnlyEngine = new ReadOnlyEngine(config, null, null, true, i -> i)) { expectedUse = breaker.getUsed(); + DocsStats docsStats = readOnlyEngine.docStats(); + assertEquals(docs, docsStats.getCount()); } assertTrue(expectedUse > 0); assertEquals(0, breaker.getUsed()); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 39407ef735974..90d7109cc3843 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -48,7 +48,6 @@ import java.util.Collection; import java.util.EnumSet; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -66,7 +65,7 @@ protected Collection> getPlugins() { return pluginList(XPackPlugin.class); } - public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedException { + public void testCloseFreezeAndOpen() { createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -107,7 +106,7 @@ public void testCloseFreezeAndOpen() throws ExecutionException, InterruptedExcep } while (searchResponse.getHits().getHits().length > 0); } - public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException, ExecutionException { + public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException { XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("_doc") .startObject("properties").startObject("field").field("type", "text").field("term_vector", "with_positions_offsets_payloads") .endObject().endObject() @@ -150,7 +149,7 @@ public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOEx assertEquals(numRefreshes, index.getTotal().refresh.getTotal()); } - public void testFreezeAndUnfreeze() throws InterruptedException, ExecutionException { + public void testFreezeAndUnfreeze() { createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); client().prepareIndex("index", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); client().prepareIndex("index", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -190,7 +189,7 @@ private void assertIndexFrozen(String idx) { assertTrue(FrozenEngine.INDEX_FROZEN.get(indexService.getIndexSettings().getSettings())); } - public void testDoubleFreeze() throws ExecutionException, InterruptedException { + public void testDoubleFreeze() { createIndex("test-idx", Settings.builder().put("index.number_of_shards", 2).build()); assertAcked(client().execute(FreezeIndexAction.INSTANCE, new TransportFreezeIndexAction.FreezeRequest("test-idx")).actionGet()); ResourceNotFoundException exception = expectThrows(ResourceNotFoundException.class, @@ -200,7 +199,7 @@ public void testDoubleFreeze() throws ExecutionException, InterruptedException { assertEquals("no index found to freeze", exception.getMessage()); } - public void testUnfreezeClosedIndices() throws ExecutionException, InterruptedException { + public void testUnfreezeClosedIndices() { createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build()); client().prepareIndex("idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); createIndex("idx-closed", Settings.builder().put("index.number_of_shards", 1).build()); @@ -215,7 +214,7 @@ public void testUnfreezeClosedIndices() throws ExecutionException, InterruptedEx assertHitCount(client().prepareSearch().get(), 1L); } - public void testFreezePattern() throws ExecutionException, InterruptedException { + public void testFreezePattern() { createIndex("test-idx", Settings.builder().put("index.number_of_shards", 1).build()); client().prepareIndex("test-idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); createIndex("test-idx-1", Settings.builder().put("index.number_of_shards", 1).build()); @@ -238,7 +237,7 @@ public void testFreezePattern() throws ExecutionException, InterruptedException assertEquals(0, index.getTotal().refresh.getTotal()); } - public void testCanMatch() throws ExecutionException, InterruptedException, IOException { + public void testCanMatch() throws IOException { createIndex("index"); client().prepareIndex("index", "_doc", "1").setSource("field", "2010-01-05T02:00").setRefreshPolicy(IMMEDIATE).execute() .actionGet(); @@ -294,7 +293,7 @@ public void testCanMatch() throws ExecutionException, InterruptedException, IOEx } } - public void testWriteToFrozenIndex() throws ExecutionException, InterruptedException { + public void testWriteToFrozenIndex() { createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build()); client().prepareIndex("idx", "_doc", "1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); assertAcked(client().execute(FreezeIndexAction.INSTANCE, new TransportFreezeIndexAction.FreezeRequest("idx")).actionGet()); @@ -303,7 +302,7 @@ public void testWriteToFrozenIndex() throws ExecutionException, InterruptedExcep client().prepareIndex("idx", "_doc", "2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get()); } - public void testIgnoreUnavailable() throws ExecutionException, InterruptedException { + public void testIgnoreUnavailable() { createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build()); createIndex("idx-close", Settings.builder().put("index.number_of_shards", 1).build()); assertAcked(client().admin().indices().prepareClose("idx-close")); @@ -314,7 +313,7 @@ public void testIgnoreUnavailable() throws ExecutionException, InterruptedExcept client().admin().cluster().prepareState().get().getState().metaData().index("idx-close").getState()); } - public void testUnfreezeClosedIndex() throws ExecutionException, InterruptedException { + public void testUnfreezeClosedIndex() { createIndex("idx", Settings.builder().put("index.number_of_shards", 1).build()); assertAcked(client().execute(FreezeIndexAction.INSTANCE, new TransportFreezeIndexAction.FreezeRequest("idx")).actionGet()); assertAcked(client().admin().indices().prepareClose("idx")); @@ -331,7 +330,7 @@ public void testUnfreezeClosedIndex() throws ExecutionException, InterruptedExce client().admin().cluster().prepareState().get().getState().metaData().index("idx").getState()); } - public void testFreezeIndexIncreasesIndexSettingsVersion() throws ExecutionException, InterruptedException { + public void testFreezeIndexIncreasesIndexSettingsVersion() { final String index = "test"; createIndex(index, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0).build()); client().prepareIndex(index, "_doc").setSource("field", "value").execute().actionGet(); @@ -371,7 +370,7 @@ public void testFreezeEmptyIndexWithTranslogOps() throws Exception { assertIndexFrozen(indexName); } - public void testRecoveryState() throws ExecutionException, InterruptedException { + public void testRecoveryState() { final String indexName = "index_recovery_state"; createIndex(indexName, Settings.builder() .put("index.number_of_replicas", 0)