Skip to content

Commit

Permalink
simplify flow/stop control in testConcurrentWriteViewsAndSnapshot
Browse files Browse the repository at this point in the history
Thread starvation of the control thread could cause the writer to keep on generating ops and make the test go out of control (OOM).
  • Loading branch information
bleskes committed May 23, 2017
1 parent 6ec485d commit b2ccb6b
Showing 1 changed file with 39 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,9 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

Expand Down Expand Up @@ -671,11 +669,11 @@ public void testVerifyTranslogIsNotDeleted() throws IOException {
* Tests that concurrent readers and writes maintain view and snapshot semantics
*/
public void testConcurrentWriteViewsAndSnapshot() throws Throwable {
final Thread[] writers = new Thread[randomIntBetween(1, 10)];
final Thread[] readers = new Thread[randomIntBetween(1, 10)];
final Thread[] writers = new Thread[randomIntBetween(1, 3)];
final Thread[] readers = new Thread[randomIntBetween(1, 3)];
final int flushEveryOps = randomIntBetween(5, 100);
// used to notify main thread that so many operations have been written so it can simulate a flush
final AtomicReference<CountDownLatch> writtenOpsLatch = new AtomicReference<>(new CountDownLatch(0));
final int maxOps = randomIntBetween(200, 1000);
final Object signalReaderSomeDataWasIndexed = new Object();
final AtomicLong idGenerator = new AtomicLong();
final CyclicBarrier barrier = new CyclicBarrier(writers.length + readers.length + 1);

Expand All @@ -696,7 +694,7 @@ public void testConcurrentWriteViewsAndSnapshot() throws Throwable {
public void doRun() throws BrokenBarrierException, InterruptedException, IOException {
barrier.await();
int counter = 0;
while (run.get()) {
while (run.get() && idGenerator.get() < maxOps) {
long id = idGenerator.incrementAndGet();
final Translog.Operation op;
final Translog.Operation.Type type =
Expand All @@ -723,7 +721,14 @@ public void doRun() throws BrokenBarrierException, InterruptedException, IOExcep
if (id % writers.length == threadId) {
translog.ensureSynced(location);
}
writtenOpsLatch.get().countDown();
if (id % flushEveryOps == 0) {
translog.commit(translog.currentFileGeneration());
}
if (id % 7 == 0) {
synchronized (signalReaderSomeDataWasIndexed) {
signalReaderSomeDataWasIndexed.notifyAll();
}
}
counter++;
}
logger.debug("--> [{}] done. wrote [{}] ops.", threadName, counter);
Expand Down Expand Up @@ -774,7 +779,7 @@ void newView() throws IOException {
protected void doRun() throws Exception {
barrier.await();
int iter = 0;
while (run.get()) {
while (idGenerator.get() < maxOps) {
if (iter++ % 10 == 0) {
newView();
}
Expand Down Expand Up @@ -807,7 +812,11 @@ protected void doRun() throws Exception {
}
}
// slow down things a bit and spread out testing..
writtenOpsLatch.get().await(200, TimeUnit.MILLISECONDS);
synchronized (signalReaderSomeDataWasIndexed) {
if (idGenerator.get() < maxOps){
signalReaderSomeDataWasIndexed.wait();
}
}
}
closeView();
logger.debug("--> [{}] done. tested [{}] snapshots", threadId, iter);
Expand All @@ -817,34 +826,27 @@ protected void doRun() throws Exception {
}

barrier.await();
try {
for (int iterations = scaledRandomIntBetween(10, 200); iterations > 0 && errors.isEmpty(); iterations--) {
writtenOpsLatch.set(new CountDownLatch(flushEveryOps));
while (writtenOpsLatch.get().await(200, TimeUnit.MILLISECONDS) == false) {
if (errors.size() > 0) {
break;
}
}
translog.commit(translog.currentFileGeneration());
}
} finally {
run.set(false);
logger.debug("--> waiting for threads to stop");
for (Thread thread : writers) {
thread.join();
}
for (Thread thread : readers) {
thread.join();
}
if (errors.size() > 0) {
Throwable e = errors.get(0);
for (Throwable suppress : errors.subList(1, errors.size())) {
e.addSuppressed(suppress);
}
throw e;
logger.debug("--> waiting for threads to stop");
for (Thread thread : writers) {
thread.join();
}
logger.debug("--> waiting for readers to stop");
// force stopping, if all writers crashed
synchronized (signalReaderSomeDataWasIndexed) {
idGenerator.set(Long.MAX_VALUE);
signalReaderSomeDataWasIndexed.notifyAll();
}
for (Thread thread : readers) {
thread.join();
}
if (errors.size() > 0) {
Throwable e = errors.get(0);
for (Throwable suppress : errors.subList(1, errors.size())) {
e.addSuppressed(suppress);
}
logger.info("--> test done. total ops written [{}]", writtenOps.size());
throw e;
}
logger.info("--> test done. total ops written [{}]", writtenOps.size());
}

public void testSyncUpTo() throws IOException {
Expand Down Expand Up @@ -1403,7 +1405,7 @@ public void run() {
break;
case DELETE:
op = new Translog.Delete(
"test", threadId + "_" + opCount,
"test", threadId + "_" + opCount,
new Term("_uid", threadId + "_" + opCount),
opCount,
0,
Expand Down

0 comments on commit b2ccb6b

Please sign in to comment.