Skip to content

Commit

Permalink
add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dnhatn committed Nov 1, 2019
1 parent 816b9ec commit a56d9ff
Showing 1 changed file with 73 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Phaser;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -3925,6 +3926,78 @@ public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecover
closeShard(shard, false);
}

/**
* Verifies that after closing shard is returned, we should have released the engine, and won't open a new engine.
*/
public void testCloseShardWhileOpeningEngineDuringRecovery() throws Exception {
CountDownLatch readyToCloseLatch = new CountDownLatch(1);
CountDownLatch closeDoneLatch = new CountDownLatch(1);
IndexShard shard = newShard(false, Settings.EMPTY, config -> {
InternalEngine engine = new InternalEngine(config);
readyToCloseLatch.countDown();
try {
closeDoneLatch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return engine;
});

Thread closeShardThread = new Thread(() -> {
try {
readyToCloseLatch.await();
shard.close("testing", false);
// in integration tests, this is done as a listener on IndexService.
MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId);
} catch (InterruptedException | IOException e) {
throw new AssertionError(e);
} finally {
closeDoneLatch.countDown();
}
});
closeShardThread.start();
recoveryEmptyReplica(shard, true);
closeShardThread.join();
closeShard(shard, false);
}

/**
* Similar to {@link #testCloseShardWhileOpeningEngineDuringRecovery()} but verifies a scenario where a shard is being reset.
*/
public void testCloseShardWhileOpeningEngineDuringReset() throws Exception {
CountDownLatch readyToClose = new CountDownLatch(2); // close shard after we have created two engines in recovery and reset.
Phaser readyToReturnEngine = new Phaser(1);
IndexShard shard = newStartedShard(false, Settings.EMPTY, config -> {
InternalEngine engine = new InternalEngine(config);
readyToClose.countDown();
readyToReturnEngine.arriveAndAwaitAdvance();
return engine;
});

readyToReturnEngine.register(); // for close thread
Thread closeShardThread = new Thread(() -> {
try {
readyToClose.await();
shard.close("testing", false);
// in integration tests, this is done as a listener on IndexService.
MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId);
} catch (InterruptedException | IOException e) {
throw new AssertionError(e);
} finally {
readyToReturnEngine.arrive();
}
});
closeShardThread.start();
shard.acquireAllReplicaOperationsPermits(shard.getOperationPrimaryTerm(), shard.getLastKnownGlobalCheckpoint(), 0L,
ActionListener.wrap(r -> {
try (r) {
shard.resetEngineToGlobalCheckpoint();
}
}, Assert::assertNotNull), TimeValue.timeValueMinutes(1L));
closeShardThread.join();
closeShard(shard, false);
}

public void testResetEngineWithBrokenTranslog() throws Exception {
IndexShard shard = newStartedShard(false);
updateMappings(shard, IndexMetaData.builder(shard.indexSettings.getIndexMetaData())
Expand Down

0 comments on commit a56d9ff

Please sign in to comment.