Skip to content

Commit

Permalink
threading: fix shutdown race condition
Browse files Browse the repository at this point in the history
A BUG_ON statement would seemingly randomly trigger during the threading
shutdown logic. After a packet thread reached the THV_RUNNING_DONE state,
it would sometimes still receive flow timeout packets which would then
remain unprocessed.

1 main:   TmThreadDisableReceiveThreads(); <- stop capturing packets
2 worker: -> TmThreadTimeoutLoop (THV_FLOW_LOOP) phase starts
3 main:   FlowForceReassembly();           <- inject packets from flow engine
4 main:   TmThreadDisablePacketThreads();  <- then disable packet threads
5 main:   -> checks if 'worker' is ready processing packets
6 main:   -> sends THV_KILL to worker
7 worker: breaks out of TmThreadTimeoutLoop and changes to THV_RUNNING_DONE.

Part of the problem was with (5) above. When checking if the worker was
already done with its work, TmThreadDisablePacketThreads would not consider
the injected flow timeout packets. The second part of the problem was with (7),
where the worker checked if it was ready with the TmThreadTimeoutLoop in a
thread unsafe way.

As a result TmThreadDisablePacketThreads would not wait long enough for the
worker(s) to finish its work and move the threads to the THV_RUNNING_DONE
phase by issuing the THV_KILL command.

When waiting for packet processing threads to process all in-flight packets,
also consider the 'stream_pq'. This will have received the flow timeout
packets.

Bug #1871.
  • Loading branch information
victorjulien authored and pull[bot] committed Jul 27, 2023
1 parent 0a785a6 commit 3090528
Showing 1 changed file with 41 additions and 53 deletions.
94 changes: 41 additions & 53 deletions src/tm-threads.c
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,36 @@ void TmThreadRemove(ThreadVars *tv, int type)
return;
}

static bool ThreadStillHasPackets(ThreadVars *tv)
{
if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
SCMutexLock(&q->mutex_q);
uint32_t len = q->len;
SCMutexUnlock(&q->mutex_q);
if (len != 0) {
return true;
}
}
}

if (tv->stream_pq != NULL) {
SCMutexLock(&tv->stream_pq->mutex_q);
uint32_t len = tv->stream_pq->len;
SCMutexUnlock(&tv->stream_pq->mutex_q);

if (len != 0) {
return true;
}
}
return false;
}

/**
* \brief Kill a thread.
*
Expand All @@ -1442,19 +1472,6 @@ static int TmThreadKillThread(ThreadVars *tv)
return 1;
}

if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
if (q->len != 0) {
return 0;
}
}
}

/* set the thread flag informing the thread that it needs to be
* terminated */
TmThreadsSetFlag(tv, THV_KILL);
Expand Down Expand Up @@ -1499,7 +1516,7 @@ static int TmThreadKillThread(ThreadVars *tv)
/** \internal
*
* \brief make sure that all packet threads are done processing their
* in-flight packets
* in-flight packets, including 'injected' flow packets.
*/
static void TmThreadDrainPacketThreads(void)
{
Expand All @@ -1521,23 +1538,16 @@ static void TmThreadDrainPacketThreads(void)
/* all receive threads are part of packet processing threads */
tv = tv_root[TVT_PPT];
while (tv) {
if (tv->inq != NULL) {
if (ThreadStillHasPackets(tv)) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
if (q->len != 0) {
SCMutexUnlock(&tv_root_lock);
SCMutexUnlock(&tv_root_lock);

/* sleep outside lock */
SleepMsec(1);
goto again;
}
}
/* sleep outside lock */
SleepMsec(1);
goto again;
}

tv = tv->next;
}

Expand Down Expand Up @@ -1593,20 +1603,14 @@ void TmThreadDisableReceiveThreads(void)
}

if (disable) {
if (tv->inq != NULL) {
if (ThreadStillHasPackets(tv)) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
if (q->len != 0) {
SCMutexUnlock(&tv_root_lock);
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
}
}
SCMutexUnlock(&tv_root_lock);
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
}

/* we found a receive TV. Send it a KILL_PKTACQ signal. */
Expand Down Expand Up @@ -1676,22 +1680,6 @@ void TmThreadDisablePacketThreads(void)
* receive TM amongst the slots in a tv, it indicates we are done
* with all receive threads */
while (tv) {
if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
if (q->len != 0) {
SCMutexUnlock(&tv_root_lock);
/* don't sleep while holding a lock */
SleepMsec(1);
goto again;
}
}
}

/* we found our receive TV. Send it a KILL signal. This is all
* we need to do to kill receive threads */
TmThreadsSetFlag(tv, THV_KILL);
Expand Down

0 comments on commit 3090528

Please sign in to comment.