threading: optimize and unify post_pq checks

TmThreadsSlotProcessPkt did not need to look all 'slots' as only the first
slots post_pq can have been used.

Unify post_pq cleanup handling.
pull/4531/head
Victor Julien 5 years ago
parent 2a1ed3ba1b
commit b55f617c2f

@ -110,7 +110,7 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint32_t flag)
/**
* \brief Separate run function so we can call it recursively.
*
* \todo Deal with post_pq for slots beyond the first.
* \note post_pq if only used for first slot
*/
TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
{
@ -602,37 +602,9 @@ static void *TmThreadsSlotVar(void *td)
/* output the packet */
tv->tmqh_out(tv, p);
} /* if (p != NULL) */
/* now handle the post_pq packets */
TmSlot *slot;
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->slot_post_pq.top != NULL) {
while (1) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
if (extra_p == NULL)
break;
if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
}
/* output the packet */
tv->tmqh_out(tv, extra_p);
} /* while */
} /* if */
} /* for */
/* now handle the post_pq packets */
TmThreadsSlotHandlePostPQs(tv, s);
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0;

@ -159,6 +159,34 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet
TmThreadsSetFlag(tv, THV_FAILED);
}
/**
* \brief Handle timeout from the capture layer. Checks
* post-pq which may have been filled by the flow
* manager.
*/
static inline void TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
{
/* post process pq: only the first slot will possible have used it */
if (s->slot_post_pq.top != NULL) {
while (1) {
SCMutexLock(&s->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&s->slot_post_pq);
SCMutexUnlock(&s->slot_post_pq.mutex_q);
if (extra_p == NULL)
break;
if (s->slot_next != NULL) {
TmEcode r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
if (r == TM_ECODE_FAILED) {
TmThreadsSlotProcessPktFail(tv, s, extra_p);
break;
}
}
tv->tmqh_out(tv, extra_p);
}
}
}
/**
* \brief Process the rest of the functions (if any) and queue.
*/
@ -177,65 +205,8 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
tv->tmqh_out(tv, p);
/* post process pq */
for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->slot_post_pq.top != NULL) {
while (1) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
if (extra_p == NULL)
break;
if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
TmThreadsSlotProcessPktFail(tv, slot, extra_p);
break;
}
}
tv->tmqh_out(tv, extra_p);
}
} /* if (slot->slot_post_pq.top != NULL) */
}
return TM_ECODE_OK;
}
/**
* \brief Handle timeout from the capture layer. Checks
* post-pq which may have been filled by the flow
* manager.
*/
static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
{
/* post process pq */
for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->slot_post_pq.top != NULL) {
while (1) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
if (extra_p == NULL)
break;
TmThreadsSlotHandlePostPQs(tv, s);
if (slot->slot_next != NULL) {
TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
return TM_ECODE_FAILED;
}
}
tv->tmqh_out(tv, extra_p);
}
}
}
return TM_ECODE_OK;
}

Loading…
Cancel
Save