Skip to content

Commit

Permalink
be more aggressive about cleaning up full queues
Browse files Browse the repository at this point in the history
  • Loading branch information
alandekok committed Jan 5, 2025
1 parent 128d2fb commit b26d88e
Showing 1 changed file with 88 additions and 23 deletions.
111 changes: 88 additions & 23 deletions src/main/threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ extern void request_free(REQUEST *request);
extern void request_done(REQUEST *request, int original);

#ifndef HAVE_STDATOMIC_H
static int request_discard_lower_priority(int priority);
static int request_fifo_discard(int priority, bool lower, time_t now);
#endif

/*
Expand Down Expand Up @@ -372,16 +372,29 @@ static void reap_children(void)
int request_enqueue(REQUEST *request)
{
bool managed = false;
time_t now;

rad_assert(pool_initialized == true);

/*
* Proxied packets get requeued when they time out, or
* when they get a reply. We want to use "now" as the
* time, and not the time 5-10 seconds in the past when
* we originally received the request.
*/
if (request->priority == RAD_LISTEN_PROXY) {
now = time(NULL);
} else {
now = request->timestamp;
}

/*
* If we haven't checked the number of child threads
* in a while, OR if the thread pool appears to be full,
* go manage it.
*/
if (last_cleaned < request->timestamp) {
thread_pool_manage(request->timestamp);
if (last_cleaned < now) {
thread_pool_manage(now);
managed = true;
}

Expand All @@ -398,14 +411,14 @@ int request_enqueue(REQUEST *request)

num = load(thread_pool.active_threads);
if (num == thread_pool.total_threads) {
thread_pool_manage(request->timestamp);
thread_pool_manage(now);
managed = true;
}

if (!managed) {
num = load(thread_pool.exited_threads);
if (num > 0) {
thread_pool_manage(request->timestamp);
thread_pool_manage(now);
}
}
}
Expand All @@ -424,15 +437,15 @@ int request_enqueue(REQUEST *request)
if (!managed &&
((thread_pool.active_threads == thread_pool.total_threads) ||
(thread_pool.exited_threads > 0))) {
thread_pool_manage(request->timestamp);
thread_pool_manage(now);
}

pthread_mutex_lock(&thread_pool.queue_mutex);

#ifdef WITH_STATS
#ifdef WITH_ACCOUNTING
if (thread_pool.auto_limit_acct) {
struct timeval now;
struct timeval when;

/*
* Throw away accounting requests if we're too
Expand Down Expand Up @@ -492,7 +505,7 @@ int request_enqueue(REQUEST *request)
}
}

gettimeofday(&now, NULL);
gettimeofday(&when, NULL);

/*
* Calculate the instantaneous arrival rate into
Expand All @@ -501,7 +514,7 @@ int request_enqueue(REQUEST *request)
thread_pool.pps_in.pps = rad_pps(&thread_pool.pps_in.pps_old,
&thread_pool.pps_in.pps_now,
&thread_pool.pps_in.time_old,
&now);
&when);

thread_pool.pps_in.pps_now++;
}
Expand All @@ -511,10 +524,14 @@ int request_enqueue(REQUEST *request)
thread_pool.request_count++;

/*
* If there are too many packets _overall_, then try to delete a lower priority one.
* If there are too many packets _overall_, OR the
* destinatio fifo is full, then try to delete a lower
* priority one.
*/
if ((thread_pool.num_queued >= thread_pool.max_queue_size) &&
(request_discard_lower_priority(request->priority) == 0)) {
if (((thread_pool.num_queued >= thread_pool.max_queue_size) &&
(request_fifo_discard(request->priority, true, now) == 0)) ||
(fr_fifo_full(thread_pool.fifo[request->priority]) &&
(request_fifo_discard(request->priority, false, now) == 0))) {
pthread_mutex_unlock(&thread_pool.queue_mutex);
RATE_LIMIT(ERROR("Something is blocking the server. There are %d packets in the queue, "
"waiting to be processed. Ignoring the new request.", thread_pool.num_queued));
Expand Down Expand Up @@ -554,24 +571,72 @@ int request_enqueue(REQUEST *request)
/*
* Try to free up requests by discarding requests of lower priority.
*/
static int request_discard_lower_priority(int priority)
static int request_fifo_discard(int priority, bool lower, time_t now)
{
int i;
int i, rcode;
REQUEST *request;

if (priority == 0) return 0;
if (lower) {
for (i = NUM_FIFOS - 1; i < priority; i--) {
request = fr_fifo_pop(thread_pool.fifo[i]);
if (!request) continue;

for (i = NUM_FIFOS - 1; i < priority; i--) {
request = fr_fifo_pop(thread_pool.fifo[i]);
if (!request) continue;
fr_assert(request->child_state == REQUEST_QUEUED);
request->child_state = REQUEST_DONE;
request_done(request, FR_ACTION_DONE);
return 1;
}

fr_assert(request->child_state == REQUEST_QUEUED);
request->child_state = REQUEST_DONE;
request_done(request, FR_ACTION_DONE);
return 1;
/*
* We didn't discard a lower priority entry.
* Maybe we need to discard one of the current
* priority, which has been in the queue for a
* while.
*/
}

return 0;
/*
* The time stamp for proxied requests may be 5-10
* seconds in the past, because the home server hasn't
* responded. We therefore can't discard "old" requests,
* as they have just received a reply, or they have just
* timed out.
*/
if (priority <= RAD_LISTEN_PROXY) return 0;

/*
* Peek at the first entry in the fifo. Note that there
* is not always a first entry. This is because we're
* called if there are too many _total_ requests.
*/
rcode = 0;

retry:
request = fr_fifo_peek(thread_pool.fifo[priority]);
if (!request) return rcode;

/*
* This request expires in the future. We can't do anything.
*/
if ((request->timestamp + main_config.max_request_time) > now) return rcode;

request = fr_fifo_pop(thread_pool.fifo[priority]);
rad_assert(request != NULL);
VERIFY_REQUEST(request);

request->child_state = REQUEST_DONE;
if (request->master_state == REQUEST_TO_FREE) {
request_free(request);
} else {
request_done(request, REQUEST_DONE);
}
thread_pool.num_queued--;

/*
* We might as well delete as many old requests as possible.
*/
rcode = 1;
goto retry;
}
#endif

Expand Down

0 comments on commit b26d88e

Please sign in to comment.