-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a NoopEngine implementation #31163
Changes from 1 commit
c345e54
7d469cc
5c0ed00
2985692
19b50b0
7d8e9de
21de3a4
ce634e9
e49a854
2629329
191eae0
5ed1bae
6931981
8f59a7e
de63a5e
c2c820a
ba56f25
023effb
a4d2e3b
78ae62d
13c06b9
4e428d2
37398ac
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,4 +24,9 @@ public class InternalEngineFactory implements EngineFactory { | |
public Engine newReadWriteEngine(EngineConfig config) { | ||
return new InternalEngine(config); | ||
} | ||
|
||
@Override | ||
public Engine newNoopEngine(EngineConfig config) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you take a look at how we handle the following engine in CCR and see if that extension point makes sense instead of adding this here? It feels conceptually wrong that the internal engine factory returns something other than an internal engine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to make this a plugin? I was thinking built-in but I guess we could move it to a module if we wanted to, I wasn't sure if it's a good idea to do engine implementations inside of modules There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at this again, it appears that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be great if we could make it a module, I am always happy to see isolation when it makes sense and here I think it can. Note that CCR has an engine implementation inside a module. 😇 We have talked exactly about the possibility of reusing the engine plugin work in the context of closed indices many months ago so we could bring that work out of the CCR branch and into 6.x/master. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I've removed the changes to the |
||
return new NoopEngine(config); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,339 @@ | ||
/* | ||
* Licensed to Elasticsearch under one or more contributor | ||
* license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright | ||
* ownership. Elasticsearch licenses this file to you under | ||
* the Apache License, Version 2.0 (the "License"); you may | ||
* not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package org.elasticsearch.index.engine; | ||
|
||
import org.apache.lucene.index.DirectoryReader; | ||
import org.apache.lucene.index.IndexCommit; | ||
import org.apache.lucene.index.SegmentInfos; | ||
import org.elasticsearch.common.Nullable; | ||
import org.elasticsearch.core.internal.io.IOUtils; | ||
import org.elasticsearch.index.seqno.LocalCheckpointTracker; | ||
import org.elasticsearch.index.seqno.SequenceNumbers; | ||
import org.elasticsearch.index.translog.Translog; | ||
import org.elasticsearch.index.translog.TranslogConfig; | ||
import org.elasticsearch.index.translog.TranslogCorruptedException; | ||
import org.elasticsearch.index.translog.TranslogDeletionPolicy; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.function.BiFunction; | ||
import java.util.function.LongSupplier; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* NoopEngine is an engine implementation that does nothing but the bare minimum | ||
* required in order to have an engine. All attempts to do something (search, | ||
* index, get), throw {@link UnsupportedOperationException}. This does maintain | ||
* a translog with a deletion policy so that when flushing, no translog is | ||
* retained on disk (setting a retention size and age of 0). | ||
* | ||
* It's also important to notice that this does list the commits of the Store's | ||
* Directory so that the last commit's user data can be read for the historyUUID | ||
* and last committed segment info. | ||
*/ | ||
public class NoopEngine extends Engine { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this class be final and maybe pkg private? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, though I'll un-final it when I work on the second half of this, but it can be final for now :) |
||
|
||
private final Translog translog; | ||
private final IndexCommit lastCommit; | ||
private final LocalCheckpointTracker localCheckpointTracker; | ||
private final String historyUUID; | ||
private SegmentInfos lastCommittedSegmentInfos; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be final no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep |
||
|
||
public NoopEngine(EngineConfig engineConfig) { | ||
super(engineConfig); | ||
|
||
store.incRef(); | ||
boolean success = false; | ||
Translog translog = null; | ||
|
||
// The deletion policy for the translog should not keep any translogs around, so the min age/size is set to -1 | ||
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(-1, -1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we move this into the try block please? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Certainly |
||
|
||
try { | ||
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); | ||
translog = openTranslog(engineConfig, translogDeletionPolicy, engineConfig.getGlobalCheckpointSupplier()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need a translog? all we want is to validate that the translog has a the right uuid and that it's empty? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We use the translog all over the place in other methods in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I looked at these methods and I agree it's on the edge - there are quite a few. I would still prefer we replace them with unsupported operations or noop returns (ex empty sanpshots). This will lock things down and prevent things that aren't supposed to happen - I think that's good. An alternative is to implement a NoopTranslog but that's another rabbit hole.
Why is that? I think it's good to only close indices that have no ongoing indexing (like our plan for frozen index). Regardless - why can't we do the flush / trim when we close the open engine and convert it to a noop engine? I can see one thing down the road because we may close an index on recovery where it has broken settings (TBD). In that case I would still prefer to make utilities methods like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, but even with no ongoing indexing, a translog still remains (due to retention policy)
That would require setting a new retention policy on an existing engine (making a part of InternalEngine mutable which makes me In order to do this though, we'll have to remove the |
||
assert translog.getGeneration() != null; | ||
this.translog = translog; | ||
List<IndexCommit> indexCommits = DirectoryReader.listCommits(store.directory()); | ||
lastCommit = indexCommits.get(indexCommits.size()-1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Formatting nit: |
||
historyUUID = lastCommit.getUserData().get(HISTORY_UUID_KEY); | ||
// We don't want any translogs hanging around for recovery, so we need to set these accordingly | ||
final long lastGen = Long.parseLong(lastCommit.getUserData().get(Translog.TRANSLOG_GENERATION_KEY)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not used? Also - I think it's to validate integrity - i.e. open the translog, see that it's uuid matches, see that it's empty and shut it down? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I pushed a commit removing the unused |
||
translogDeletionPolicy.setTranslogGenerationOfLastCommit(lastGen); | ||
translogDeletionPolicy.setMinTranslogGenerationForRecovery(lastGen); | ||
|
||
localCheckpointTracker = createLocalCheckpointTracker(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't really need a local checkpoint tracker here, instead can we work on master to not expose the localCheckpointTracker out of engine (similar to how we don't expose the translog) and then we can avoid creating it? We should assert that maxSeq == localCheckpoint when opening lucene and otherwise fail. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean something like this: #31213 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. thanks. |
||
success = true; | ||
} catch (IOException | TranslogCorruptedException e) { | ||
throw new EngineCreationFailureException(shardId, "failed to create engine", e); | ||
} finally { | ||
if (success == false) { | ||
IOUtils.closeWhileHandlingException(translog); | ||
if (isClosed.get() == false) { | ||
// failure we need to dec the store reference | ||
store.decRef(); | ||
} | ||
} | ||
} | ||
logger.trace("created new NoopEngine"); | ||
} | ||
|
||
private Translog openTranslog(EngineConfig engineConfig, TranslogDeletionPolicy translogDeletionPolicy, | ||
LongSupplier globalCheckpointSupplier) throws IOException { | ||
final TranslogConfig translogConfig = engineConfig.getTranslogConfig(); | ||
final String translogUUID = loadTranslogUUIDFromLastCommit(); | ||
// We expect that this shard already exists, so it must already have an existing translog else something is badly wrong! | ||
return new Translog(translogConfig, translogUUID, translogDeletionPolicy, globalCheckpointSupplier, | ||
engineConfig.getPrimaryTermSupplier()); | ||
} | ||
|
||
/** | ||
* Reads the current stored translog ID from the last commit data. | ||
*/ | ||
@Nullable | ||
private String loadTranslogUUIDFromLastCommit() { | ||
final Map<String, String> commitUserData = lastCommittedSegmentInfos.getUserData(); | ||
if (commitUserData.containsKey(Translog.TRANSLOG_GENERATION_KEY) == false) { | ||
throw new IllegalStateException("commit doesn't contain translog generation id"); | ||
} | ||
return commitUserData.get(Translog.TRANSLOG_UUID_KEY); | ||
} | ||
|
||
private LocalCheckpointTracker createLocalCheckpointTracker() { | ||
final long maxSeqNo; | ||
final long localCheckpoint; | ||
final SequenceNumbers.CommitInfo seqNoStats = | ||
SequenceNumbers.loadSeqNoInfoFromLuceneCommit(lastCommittedSegmentInfos.userData.entrySet()); | ||
maxSeqNo = seqNoStats.maxSeqNo; | ||
localCheckpoint = seqNoStats.localCheckpoint; | ||
logger.trace("recovered maximum sequence number [{}] and local checkpoint [{}]", maxSeqNo, localCheckpoint); | ||
return new LocalCheckpointTracker(maxSeqNo, localCheckpoint); | ||
} | ||
|
||
@Override | ||
protected SegmentInfos getLastCommittedSegmentInfos() { | ||
return lastCommittedSegmentInfos; | ||
} | ||
|
||
@Override | ||
public String getHistoryUUID() { | ||
return historyUUID; | ||
} | ||
|
||
@Override | ||
public long getWritingBytes() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public boolean isNoopEngine() { | ||
return true; | ||
} | ||
|
||
@Override | ||
public long getIndexThrottleTimeInMillis() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public boolean isThrottled() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public IndexResult index(Index index) { | ||
throw new UnsupportedOperationException("indexing is not supported on a noop engine"); | ||
} | ||
|
||
@Override | ||
public DeleteResult delete(Delete delete) { | ||
throw new UnsupportedOperationException("deletion is not supported on a noop engine"); | ||
} | ||
|
||
@Override | ||
public NoOpResult noOp(NoOp noOp) { | ||
throw new UnsupportedOperationException("noop is not supported on a noop engine"); | ||
} | ||
|
||
@Override | ||
public SyncedFlushResult syncFlush(String syncId, CommitId expectedCommitId) throws EngineException { | ||
throw new UnsupportedOperationException("synced flush is not supported on a noop engine"); | ||
} | ||
|
||
@Override | ||
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException { | ||
throw new UnsupportedOperationException("gets are not supported on a noop engine"); | ||
} | ||
|
||
@Override | ||
public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { | ||
throw new UnsupportedOperationException("searching is not supported on a noop engine"); | ||
} | ||
|
||
@Override | ||
public Translog getTranslog() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this increases visibilty |
||
return translog; | ||
} | ||
|
||
@Override | ||
public boolean ensureTranslogSynced(Stream<Translog.Location> locations) { | ||
return false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this can throw an unsupported operation exception too? |
||
} | ||
|
||
@Override | ||
public void syncTranslog() { | ||
} | ||
|
||
@Override | ||
public LocalCheckpointTracker getLocalCheckpointTracker() { | ||
return localCheckpointTracker; | ||
} | ||
|
||
@Override | ||
public long getIndexBufferRAMBytesUsed() { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public List<Segment> segments(boolean verbose) { | ||
return Arrays.asList(getSegmentInfo(lastCommittedSegmentInfos, verbose)); | ||
} | ||
|
||
@Override | ||
public void refresh(String source) throws EngineException { | ||
} | ||
|
||
// Override the refreshNeeded method so that we don't attempt to acquire a searcher checking if we need to refresh | ||
@Override | ||
public boolean refreshNeeded() { | ||
// We never need to refresh a noop engine so always return false | ||
return false; | ||
} | ||
|
||
@Override | ||
public void writeIndexingBuffer() throws EngineException { | ||
} | ||
|
||
@Override | ||
public boolean shouldPeriodicallyFlush() { | ||
return false; | ||
} | ||
|
||
@Override | ||
public CommitId flush(boolean force, boolean waitIfOngoing) throws EngineException { | ||
return new CommitId(lastCommittedSegmentInfos.getId()); | ||
} | ||
|
||
@Override | ||
public CommitId flush() throws EngineException { | ||
try { | ||
translog.rollGeneration(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this looks wrong. We don't write anything here why do we need to modify the translog? I think this should be read-only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order for flushing to clear existing translogs (because we don't want any translog operations to be replayed for peer or store recovery) we want the flush method to remove the translog, this was added so that flushing the new engine would ensure that we don't have any translog operations around that could cause UOEs during recovery There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can remove this for now and re-introduce it later when adding the state transition part, if that makes it better, but we still need to be able to completely remove translog ops before doing recovery since we have no way to do operation-based recovery. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah I'd like to remove it for now I can 't see in this change why it's needed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay, I've removed that change from this PR |
||
translog.trimUnreferencedReaders(); | ||
} catch (IOException e) { | ||
maybeFailEngine("flush", e); | ||
throw new FlushFailedEngineException(shardId, e); | ||
} | ||
return new CommitId(lastCommittedSegmentInfos.getId()); | ||
} | ||
|
||
@Override | ||
public void trimTranslog() throws EngineException { | ||
} | ||
|
||
@Override | ||
public void rollTranslogGeneration() throws EngineException { | ||
} | ||
|
||
@Override | ||
public void forceMerge(boolean flush, int maxNumSegments, boolean onlyExpungeDeletes, boolean upgrade, | ||
boolean upgradeOnlyAncientSegments) throws EngineException { | ||
} | ||
|
||
@Override | ||
public IndexCommitRef acquireLastIndexCommit(boolean flushFirst) throws EngineException { | ||
return new Engine.IndexCommitRef(lastCommit, () -> {}); | ||
} | ||
|
||
@Override | ||
public IndexCommitRef acquireSafeIndexCommit() throws EngineException { | ||
return acquireLastIndexCommit(false); | ||
} | ||
|
||
/** | ||
* Closes the engine without acquiring the write lock. This should only be | ||
* called while the write lock is hold or in a disaster condition ie. if the engine | ||
* is failed. | ||
*/ | ||
@Override | ||
protected final void closeNoLock(String reason, CountDownLatch closedLatch) { | ||
if (isClosed.compareAndSet(false, true)) { | ||
assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : | ||
"Either the write lock must be held or the engine must be currently be failing itself"; | ||
try { | ||
IOUtils.close(translog); | ||
} catch (Exception e) { | ||
logger.warn("Failed to close translog", e); | ||
} finally { | ||
try { | ||
store.decRef(); | ||
logger.debug("engine closed [{}]", reason); | ||
} finally { | ||
closedLatch.countDown(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
public void activateThrottling() { | ||
throw new UnsupportedOperationException("closed engine can't throttle"); | ||
} | ||
|
||
@Override | ||
public void deactivateThrottling() { | ||
throw new UnsupportedOperationException("closed engine can't throttle"); | ||
} | ||
|
||
@Override | ||
public void restoreLocalCheckpointFromTranslog() { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra NL |
||
} | ||
|
||
@Override | ||
public int fillSeqNoGaps(long primaryTerm) { | ||
return 0; | ||
} | ||
|
||
@Override | ||
public Engine recoverFromTranslog() { | ||
return this; | ||
} | ||
|
||
@Override | ||
public void skipTranslogRecovery() { | ||
} | ||
|
||
@Override | ||
public void maybePruneDeletes() { | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels wrong to me. It looks like it's only used for testing? Is this really needed? We did not feel the need to add something like this for the following engine in CCR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry this snuck in from me extracting this from my other branch (where this is actually used), I'll remove it for now and re-add it for the next set of work