Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 9.4.x 4105 4121 4122 queued thread pool #4146

Merged
merged 6 commits into from
Oct 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ public class QueuedThreadPool extends ContainerLifeCycle implements SizedThreadP
/**
* Encodes thread counts:
* <dl>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size</dd>
* <dt>Hi</dt><dd>Total thread count or Integer.MIN_VALUE if the pool is stopping</dd>
* <dt>Lo</dt><dd>Net idle threads == idle threads - job queue size. Essentially if positive,
* this represents the effective number of idle threads, and if negative it represents the
* demand for more threads</dd>
* </dl>
*/
private final AtomicBiInteger _counts = new AtomicBiInteger(Integer.MIN_VALUE, 0);
Expand Down Expand Up @@ -159,6 +161,8 @@ protected void doStart() throws Exception
}
addBean(_tryExecutor);

_lastShrink.set(System.nanoTime());

super.doStart();
// The threads count set to MIN_VALUE is used to signal to Runners that the pool is stopped.
_counts.set(0, 0); // threads, idle
Expand Down Expand Up @@ -290,6 +294,9 @@ public void setDaemon(boolean daemon)
public void setIdleTimeout(int idleTimeout)
{
_idleTimeout = idleTimeout;
ReservedThreadExecutor reserved = getBean(ReservedThreadExecutor.class);
if (reserved != null)
reserved.setIdleTimeout(idleTimeout, TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -443,7 +450,9 @@ public int getThreadsPriority()
@ManagedAttribute("size of the job queue")
public int getQueueSize()
{
return _jobs.size();
// The idle counter encodes demand, which is the effective queue size
int idle = _counts.getLo();
return Math.max(0, -idle);
}

/**
Expand Down Expand Up @@ -631,9 +640,6 @@ protected void startThread()
try
{
Thread thread = newThread(_runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
if (LOG.isDebugEnabled())
LOG.debug("Starting {}", thread);
_threads.add(thread);
Expand Down Expand Up @@ -665,7 +671,11 @@ private boolean addCounts(int deltaThreads, int deltaIdle)

protected Thread newThread(Runnable runnable)
{
return new Thread(_threadGroup, runnable);
Thread thread = new Thread(_threadGroup, runnable);
thread.setDaemon(isDaemon());
thread.setPriority(getThreadsPriority());
thread.setName(_name + "-" + thread.getId());
return thread;
}

protected void removeThread(Thread thread)
Expand Down Expand Up @@ -857,17 +867,19 @@ public void run()
if (LOG.isDebugEnabled())
LOG.debug("Runner started for {}", QueuedThreadPool.this);

Runnable job = null;
boolean idle = true;
try
{
Runnable job = null;
while (true)
{
// If we had a job, signal that we are idle again
// If we had a job,
if (job != null)
{
// signal that we are idle again
if (!addCounts(0, 1))
break;
job = null;
idle = true;
}
// else check we are still running
else if (_counts.getHi() == Integer.MIN_VALUE)
Expand All @@ -881,42 +893,37 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
job = _jobs.poll();
if (job == null)
{
// Wait for a job
// No job immediately available maybe we should shrink?
long idleTimeout = getIdleTimeout();
if (idleTimeout > 0 && getThreads() > _minThreads)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if ((now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout) && _lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}

// Wait for a job, only after we have checked if we should shrink
job = idleJobPoll(idleTimeout);

// If still no job?
if (job == null)
{
// maybe we should shrink
if (getThreads() > _minThreads && idleTimeout > 0)
{
long last = _lastShrink.get();
long now = System.nanoTime();
if (last == 0 || (now - last) > TimeUnit.MILLISECONDS.toNanos(idleTimeout))
{
if (_lastShrink.compareAndSet(last, now))
{
if (LOG.isDebugEnabled())
LOG.debug("shrinking {}", QueuedThreadPool.this);
break;
}
}
}
// continue to try again
continue;
}
}

idle = false;

// run job
if (LOG.isDebugEnabled())
LOG.debug("run {} in {}", job, QueuedThreadPool.this);
runJob(job);
if (LOG.isDebugEnabled())
LOG.debug("ran {} in {}", job, QueuedThreadPool.this);

// Clear any interrupted status
Thread.interrupted();
}
catch (InterruptedException e)
{
Expand All @@ -928,6 +935,11 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
{
LOG.warn(e);
}
finally
{
// Clear any interrupted status
Thread.interrupted();
}
}
}
finally
Expand All @@ -936,7 +948,7 @@ else if (_counts.getHi() == Integer.MIN_VALUE)
removeThread(thread);

// Decrement the total thread count and the idle count if we had no job
addCounts(-1, job == null ? -1 : 0);
addCounts(-1, idle ? -1 : 0);
if (LOG.isDebugEnabled())
LOG.debug("{} exited for {}", thread, QueuedThreadPool.this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.log.StacklessLogging;
import org.eclipse.jetty.util.thread.ThreadPool.SizedThreadPool;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -188,7 +187,7 @@ public void testThreadPool() throws Exception
QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(4);
tp.setIdleTimeout(900);
tp.setIdleTimeout(1000);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);

tp.start();
Expand All @@ -199,44 +198,49 @@ public void testThreadPool() throws Exception

// Doesn't shrink to less than min threads
Thread.sleep(3 * tp.getIdleTimeout() / 2);
waitForThreads(tp, 2);
waitForIdle(tp, 2);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(2));

// Run job0
RunningJob job0 = new RunningJob("JOB0");
tp.execute(job0);
assertTrue(job0._run.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(1));

// Run job1
RunningJob job1 = new RunningJob("JOB1");
tp.execute(job1);
assertTrue(job1._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 2);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(2));
assertThat(tp.getIdleThreads(), is(0));

// Run job2
RunningJob job2 = new RunningJob("JOB2");
tp.execute(job2);
assertTrue(job2._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 3);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(3));
assertThat(tp.getIdleThreads(), is(0));

// Run job3
RunningJob job3 = new RunningJob("JOB3");
tp.execute(job3);
assertTrue(job3._run.await(10, TimeUnit.SECONDS));
waitForThreads(tp, 4);
waitForIdle(tp, 0);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));

// Check no short term change
Thread.sleep(100);
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));

// Run job4. will be queued
RunningJob job4 = new RunningJob("JOB4");
tp.execute(job4);
assertFalse(job4._run.await(1, TimeUnit.SECONDS));
assertFalse(job4._run.await(250, TimeUnit.MILLISECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(1));

// finish job 0
job0._stopping.countDown();
Expand All @@ -246,12 +250,13 @@ public void testThreadPool() throws Exception
assertTrue(job4._run.await(10, TimeUnit.SECONDS));
assertThat(tp.getThreads(), is(4));
assertThat(tp.getIdleThreads(), is(0));
assertThat(tp.getQueueSize(), is(0));

// finish job 1
// finish job 1, and it's thread will become idle
job1._stopping.countDown();
assertTrue(job1._stopped.await(10, TimeUnit.SECONDS));
waitForIdle(tp, 1);
assertThat(tp.getThreads(), is(4));
waitForThreads(tp, 4);

// finish job 2,3,4
job2._stopping.countDown();
Expand All @@ -261,15 +266,9 @@ public void testThreadPool() throws Exception
assertTrue(job3._stopped.await(10, TimeUnit.SECONDS));
assertTrue(job4._stopped.await(10, TimeUnit.SECONDS));

waitForIdle(tp, 4);
assertThat(tp.getThreads(), is(4));

long duration = System.nanoTime();
waitForThreads(tp, 3);
assertThat(tp.getIdleThreads(), is(3));
duration = System.nanoTime() - duration;
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.greaterThan(tp.getIdleTimeout() / 2L));
assertThat(TimeUnit.NANOSECONDS.toMillis(duration), Matchers.lessThan(tp.getIdleTimeout() * 2L));
// Eventually all will have 3 idle threads
waitForIdle(tp, 3);
assertThat(tp.getThreads(), is(3));

tp.stop();
}
Expand Down Expand Up @@ -505,6 +504,58 @@ public void testShrink() throws Exception
tp.stop();
}

@Test
public void testSteadyShrink() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
Runnable job = () ->
{
try
{
latch.await();
}
catch(InterruptedException e)
{
e.printStackTrace();
}
};

QueuedThreadPool tp = new QueuedThreadPool();
tp.setMinThreads(2);
tp.setMaxThreads(10);
int timeout = 500;
tp.setIdleTimeout(timeout);
tp.setThreadsPriority(Thread.NORM_PRIORITY - 1);

tp.start();
waitForIdle(tp, 2);
waitForThreads(tp, 2);

for (int i = 0; i < 10; i++)
tp.execute(job);

waitForThreads(tp, 10);
int threads = tp.getThreads();
// let the jobs run
latch.countDown();

for (int i = 5; i-- > 0; )
{
Thread.sleep(timeout / 2);
tp.execute(job);
}

// Assert that steady rate of jobs doesn't prevent some idling out
assertThat(tp.getThreads(), lessThan(threads));
threads = tp.getThreads();
for (int i = 5; i-- > 0; )
{
Thread.sleep(timeout / 2);
tp.execute(job);
}
assertThat(tp.getThreads(), lessThan(threads));
}

@Test
public void testEnsureThreads() throws Exception
{
Expand Down Expand Up @@ -605,7 +656,7 @@ private void waitForIdle(QueuedThreadPool tp, int idle)
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(idle, tp.getIdleThreads());
assertThat(tp.getIdleThreads(), is(idle));
}

private void waitForReserved(QueuedThreadPool tp, int reserved)
Expand All @@ -624,7 +675,7 @@ private void waitForReserved(QueuedThreadPool tp, int reserved)
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(reserved, reservedThreadExecutor.getAvailable());
assertThat(reservedThreadExecutor.getAvailable(), is(reserved));
}

private void waitForThreads(QueuedThreadPool tp, int threads)
Expand All @@ -642,7 +693,7 @@ private void waitForThreads(QueuedThreadPool tp, int threads)
}
now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
assertEquals(threads, tp.getThreads());
assertThat(tp.getThreads(), is(threads));
}

@Test
Expand Down