Skip to content

Commit

Permalink
Remove extra logging from PrioritizedThrottledTaskRunnerTests (elasti…
Browse files Browse the repository at this point in the history
  • Loading branch information
fcofdez authored and mark-vieira committed Feb 2, 2023
1 parent 4a556f1 commit 030d6d9
Showing 1 changed file with 16 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
package org.elasticsearch.common.util.concurrent;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;

Expand Down Expand Up @@ -49,16 +47,12 @@ public void tearDown() throws Exception {
}

static class TestTask extends AbstractRunnable implements Comparable<TestTask> {
private final Logger logger = LogManager.getLogger(TestTask.class);

private final Runnable runnable;
private final int priority;
private final String taskDescription;

TestTask(Runnable runnable, int priority, String taskDescription) {
TestTask(Runnable runnable, int priority) {
this.runnable = runnable;
this.priority = priority;
this.taskDescription = taskDescription;
}

@Override
Expand All @@ -68,9 +62,7 @@ public int compareTo(TestTask o) {

@Override
public void doRun() {
logger.info("--> starting to execute task [{}]", taskDescription);
runnable.run();
logger.info("--> finished task [{}]", taskDescription);
}

@Override
Expand All @@ -96,7 +88,7 @@ public void testMultiThreadedEnqueue() throws Exception {
throw new AssertionError(e);
}
executedCountDown.countDown();
}, getRandomPriority(), "testMultiThreadedEnqueue-" + taskId));
}, getRandomPriority()));
assertThat(taskRunner.runningTasks(), lessThanOrEqualTo(maxTasks));
}).start();
}
Expand All @@ -117,7 +109,7 @@ public void testTasksRunInOrder() throws Exception {
taskRunner.enqueueTask(new TestTask(() -> {
awaitBarrier(blockBarrier); // notify main thread that the runner is blocked
awaitBarrier(blockBarrier); // wait for main thread to finish enqueuing tasks
}, getRandomPriority(), "blocking task"));
}, getRandomPriority()));

blockBarrier.await(10, TimeUnit.SECONDS); // wait for blocking task to start executing

Expand All @@ -135,7 +127,7 @@ public void testTasksRunInOrder() throws Exception {
taskRunner.enqueueTask(new TestTask(() -> {
executedPriorities.add(priority);
executedCountDown.countDown();
}, priority, "concurrent enqueued tasks - " + taskId));
}, priority));
awaitBarrier(enqueuedBarrier); // notify main thread that the task is enqueued
}).start();
}
Expand Down Expand Up @@ -171,7 +163,7 @@ public void testEnqueueSpawnsNewTasksUpToMax() throws Exception {
throw new RuntimeException(e);
}
executedCountDown.countDown();
}, getRandomPriority(), "testEnqueueSpawnsNewTasksUpToMax-" + taskId));
}, getRandomPriority()));
assertThat(taskRunner.runningTasks(), equalTo(i + 1));
}
// Enqueueing one or more new tasks would create only one new running task
Expand All @@ -184,7 +176,7 @@ public void testEnqueueSpawnsNewTasksUpToMax() throws Exception {
throw new RuntimeException(e);
}
executedCountDown.countDown();
}, getRandomPriority(), "testEnqueueSpawnsNewTasksUpToMax-" + taskId));
}, getRandomPriority()));
assertThat(taskRunner.runningTasks(), equalTo(maxTasks));
}
assertThat(taskRunner.queueSize(), equalTo(newTasks - 1));
Expand All @@ -209,19 +201,17 @@ public void testFailsTasksOnRejectionOrShutdown() throws Exception {
try {
while (true) {
assertTrue(permits.tryAcquire(10, TimeUnit.SECONDS));
taskRunner.enqueueTask(
new TestTask(taskCompleted::countDown, getRandomPriority(), "testFailsTasksOnRejectionOrShutdown") {
@Override
public void onRejection(Exception e) {
rejectionCountDown.countDown();
}
taskRunner.enqueueTask(new TestTask(taskCompleted::countDown, getRandomPriority()) {
@Override
public void onRejection(Exception e) {
rejectionCountDown.countDown();
}

@Override
public void onAfter() {
permits.release();
}
@Override
public void onAfter() {
permits.release();
}
);
});
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -244,14 +234,9 @@ private int getRandomPriority() {
}

private void assertNoRunningTasks(PrioritizedThrottledTaskRunner<TestTask> taskRunner) {
logger.info("--> ensure that there are no running tasks in the executor. Max number of threads [{}]", maxThreads);
final var barrier = new CyclicBarrier(maxThreads + 1);
for (int i = 0; i < maxThreads; i++) {
executor.execute(() -> {
logger.info("--> await until barrier is released");
awaitBarrier(barrier);
logger.info("--> the barrier is released");
});
executor.execute(() -> { awaitBarrier(barrier); });
}
awaitBarrier(barrier);
assertThat(taskRunner.runningTasks(), equalTo(0));
Expand Down

0 comments on commit 030d6d9

Please sign in to comment.