Skip to content

Commit

Permalink
Replace EngineClosedException with AlreadyClosedExcpetion (#22631)
Browse files Browse the repository at this point in the history
`EngineClosedException` is a ES level exception that is used to indicate that the engine is closed when operation starts. It doesn't really add much value and we can use `AlreadyClosedException` from Lucene (which may already bubble if things go wrong during operations). Having two exception can just add confusion and lead to bugs, like wrong handling of `EngineClosedException` when dealing with document level failures. The latter was exposed by `IndexWithShadowReplicasIT`.

This PR also removes the AwaitFix from the `IndexWithShadowReplicasIT` tests (which was what cause this to be discovered). While debugging the source of the issue I found some mismatches in document uid management in the tests. The term that was passed to the engine didn't correspond to the uid in the parsed doc - those are fixed as well.
  • Loading branch information
bleskes committed Jan 19, 2017
1 parent 90355c9 commit 98db0a7
Show file tree
Hide file tree
Showing 18 changed files with 412 additions and 390 deletions.
3 changes: 1 addition & 2 deletions core/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
Expand Down Expand Up @@ -668,7 +667,7 @@ private void maybeFSyncTranslogs() {
if (translog.syncNeeded()) {
translog.sync();
}
} catch (EngineClosedException | AlreadyClosedException ex) {
} catch (AlreadyClosedException ex) {
// fine - continue;
} catch (IOException e) {
logger.warn("failed to sync translog", e);
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {
manager.release(searcher);
}
}
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
ensureOpen(); // throw EngineCloseException here if we are already closed
Expand All @@ -373,7 +373,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {

protected void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine.get());
throw new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());
}
}

Expand Down Expand Up @@ -860,6 +860,7 @@ public static class Index extends Operation {
public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime,
long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, version, versionType, origin, startTime);
assert uid.bytes().equals(doc.uid()) : "term uid " + uid + " doesn't match doc uid " + doc.uid();
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand Down Expand Up @@ -1102,7 +1103,7 @@ public void flushAndClose() throws IOException {
logger.debug("flushing shard on close - this might take some time to sync files to disk");
try {
flush(); // TODO we might force a flush in the future since we have the write lock already even though recoveries are running.
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
logger.debug("engine already closed - skipping flushAndClose");
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
*
*
*/
@Deprecated
public class EngineClosedException extends IndexShardClosedException {

public EngineClosedException(ShardId shardId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.merge.OnGoingMerge;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
Expand Down Expand Up @@ -415,7 +414,11 @@ public void index(Index index) {
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new IndexFailedEngineException(shardId, index.type(), index.id(), e);
if (e instanceof AlreadyClosedException) {
throw (AlreadyClosedException)e;
} else {
throw new IndexFailedEngineException(shardId, index.type(), index.id(), e);
}
}
}

Expand Down Expand Up @@ -579,7 +582,11 @@ public void delete(Delete delete) throws EngineException {
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw new DeleteFailedEngineException(shardId, delete, e);
if (e instanceof AlreadyClosedException) {
throw (AlreadyClosedException)e;
} else {
throw new DeleteFailedEngineException(shardId, delete, e);
}
}

maybePruneDeletedTombstones();
Expand Down Expand Up @@ -647,8 +654,6 @@ public void refresh(String source) throws EngineException {
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (EngineClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("refresh failed", e);
Expand Down Expand Up @@ -695,8 +700,6 @@ public void writeIndexingBuffer() throws EngineException {
} catch (AlreadyClosedException e) {
failOnTragicEvent(e);
throw e;
} catch (EngineClosedException e) {
throw e;
} catch (Exception e) {
try {
failEngine("writeIndexingBuffer failed", e);
Expand Down Expand Up @@ -875,7 +878,7 @@ private void pruneDeletedTombstones() {

@Override
public void forceMerge(final boolean flush, int maxNumSegments, boolean onlyExpungeDeletes,
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, EngineClosedException, IOException {
final boolean upgrade, final boolean upgradeOnlyAncientSegments) throws EngineException, IOException {
/*
* We do NOT acquire the readlock here since we are waiting on the merges to finish
* that's fine since the IW.rollback should stop all the threads and trigger an IOException
Expand Down Expand Up @@ -961,7 +964,8 @@ public IndexCommit acquireIndexCommit(final boolean flushFirst) throws EngineExc
}

@SuppressWarnings("finally")
private void failOnTragicEvent(AlreadyClosedException ex) {
private boolean failOnTragicEvent(AlreadyClosedException ex) {
final boolean engineFailed;
// if we are already closed due to some tragic exception
// we need to fail the engine. it might have already been failed before
// but we are double-checking it's failed and closed
Expand All @@ -974,14 +978,19 @@ private void failOnTragicEvent(AlreadyClosedException ex) {
}
} else {
failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
engineFailed = true;
}
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException());
} else if (failedEngine.get() == null) { // we are closed but the engine is not failed yet?
engineFailed = true;
} else if (failedEngine.get() == null && isClosed.get() == false) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
} else {
engineFailed = false;
}
return engineFailed;
}

@Override
Expand All @@ -994,8 +1003,7 @@ protected boolean maybeFailEngine(String source, Exception e) {
// exception that should only be thrown in a tragic event. we pass on the checks to failOnTragicEvent which will
// throw and AssertionError if the tragic event condition is not met.
if (e instanceof AlreadyClosedException) {
failOnTragicEvent((AlreadyClosedException)e);
return true;
return failOnTragicEvent((AlreadyClosedException)e);
} else if (e != null &&
((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,6 @@ public void refresh(String source) throws EngineException {
ensureOpen();
searcherManager.maybeRefreshBlocking();
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} catch (EngineClosedException e) {
throw e;
} catch (Exception e) {
try {
Expand Down
20 changes: 9 additions & 11 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.engine.CommitStats;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.EngineFactory;
Expand Down Expand Up @@ -612,7 +611,7 @@ public Engine.GetResult get(Engine.Get get) {
}

/**
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link EngineClosedException}.
* Writes all indexing changes to disk and opens a new searcher reflecting all changes. This can throw {@link AlreadyClosedException}.
*/
public void refresh(String source) {
verifyNotClosed();
Expand Down Expand Up @@ -1250,7 +1249,7 @@ boolean shouldFlush() {
try {
Translog translog = engine.getTranslog();
return translog.sizeInBytes() > indexSettings.getFlushThresholdSize().getBytes();
} catch (AlreadyClosedException | EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// that's fine we are already close - no need to flush
}
}
Expand Down Expand Up @@ -1289,21 +1288,21 @@ public IndexEventListener getIndexEventListener() {
public void activateThrottling() {
try {
getEngine().activateThrottling();
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// ignore
}
}

public void deactivateThrottling() {
try {
getEngine().deactivateThrottling();
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// ignore
}
}

private void handleRefreshException(Exception e) {
if (e instanceof EngineClosedException) {
if (e instanceof AlreadyClosedException) {
// ignore
} else if (e instanceof RefreshFailedEngineException) {
RefreshFailedEngineException rfee = (RefreshFailedEngineException) e;
Expand Down Expand Up @@ -1438,7 +1437,7 @@ private void doCheckIndex() throws IOException {
Engine getEngine() {
Engine engine = getEngineOrNull();
if (engine == null) {
throw new EngineClosedException(shardId);
throw new AlreadyClosedException("engine is closed");
}
return engine;
}
Expand Down Expand Up @@ -1575,7 +1574,7 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
private Engine createNewEngine(EngineConfig config) {
synchronized (mutex) {
if (state == IndexShardState.CLOSED) {
throw new EngineClosedException(shardId);
throw new AlreadyClosedException(shardId + " can't create engine - shard is closed");
}
assert this.currentEngineReference.get() == null;
Engine engine = newEngine(config);
Expand Down Expand Up @@ -1677,7 +1676,7 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida
try {
final Engine engine = getEngine();
engine.getTranslog().ensureSynced(candidates.stream().map(Tuple::v1));
} catch (EngineClosedException ex) {
} catch (AlreadyClosedException ex) {
// that's fine since we already synced everything on engine close - this also is conform with the methods
// documentation
} catch (IOException ex) { // if this fails we are in deep shit - fail the request
Expand Down Expand Up @@ -1792,8 +1791,7 @@ EngineFactory getEngineFactory() {
* refresh listeners.
* Otherwise <code>false</code>.
*
* @throws EngineClosedException if the engine is already closed
* @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
* @throws AlreadyClosedException if the engine or internal indexwriter in the engine is already closed
*/
public boolean isRefreshNeeded() {
return getEngine().refreshNeeded() || (refreshListeners != null && refreshListeners.refreshNeeded());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand All @@ -30,7 +31,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
Expand Down Expand Up @@ -386,7 +386,7 @@ private void runUnlocked() {
protected void checkIdle(IndexShard shard, long inactiveTimeNS) {
try {
shard.checkIdle(inactiveTimeNS);
} catch (EngineClosedException e) {
} catch (AlreadyClosedException e) {
logger.trace((Supplier<?>) () -> new ParameterizedMessage("ignore exception while checking if shard {} is inactive", shard.shardId()), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.action.support.replication;

import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.UnavailableShardsException;
Expand Down Expand Up @@ -52,7 +53,6 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
Expand Down Expand Up @@ -428,12 +428,12 @@ public void testStalePrimaryShardOnReroute() throws InterruptedException {
}
}

private ElasticsearchException randomRetryPrimaryException(ShardId shardId) {
private Exception randomRetryPrimaryException(ShardId shardId) {
return randomFrom(
new ShardNotFoundException(shardId),
new IndexNotFoundException(shardId.getIndex()),
new IndexShardClosedException(shardId),
new EngineClosedException(shardId),
new AlreadyClosedException(shardId + " primary is closed"),
new ReplicationOperation.RetryOnPrimaryException(shardId, "hello")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.InternalEngineTests;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
Expand Down Expand Up @@ -247,7 +249,8 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
assertSame(listener, indexService.getIndexOperationListeners().get(1));

Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
ParsedDocument doc = InternalEngineTests.createParsedDoc("1", "test", null);
Engine.Index index = new Engine.Index(new Term("_uid", doc.uid()), doc);
ShardId shardId = new ShardId(new Index("foo", "bar"), 0);
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
l.preIndex(shardId, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.Priority;
Expand All @@ -57,9 +56,6 @@
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
Expand Down
Loading

0 comments on commit 98db0a7

Please sign in to comment.