From f2bcf9ea2c1a211b4c18fd79a33b55a1d1a4be6b Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Tue, 13 Sep 2011 16:49:31 +0530 Subject: [PATCH] modify post_pq packet handling. - Lock the q just once, once we have detected the presence of packet(s) in the queue. Unlock it when we consume all packets from the q. --- src/tm-threads.c | 59 +++++++++++++++++++++++++++--------------------- src/tm-threads.h | 32 ++++++++++++++------------ 2 files changed, 50 insertions(+), 41 deletions(-) diff --git a/src/tm-threads.c b/src/tm-threads.c index fac3e9384a..6bf69bd47a 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -161,12 +161,14 @@ void *TmThreadsSlot1NoIn(void *td) tv->tmqh_out(tv, p); /* handle post queue */ - while (s->slot_post_pq.top != NULL) { + if (s->slot_post_pq.top != NULL) { SCMutexLock(&s->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&s->slot_post_pq); + while (s->slot_post_pq.top != NULL) { + Packet *extra_p = PacketDequeue(&s->slot_post_pq); + if (extra_p != NULL) + tv->tmqh_out(tv, extra_p); + } SCMutexUnlock(&s->slot_post_pq.mutex_q); - if (extra_p != NULL) - tv->tmqh_out(tv, extra_p); } if (TmThreadsCheckFlag(tv, THV_KILL)) { @@ -404,14 +406,16 @@ void *TmThreadsSlot1(void *td) /* output the packet */ tv->tmqh_out(tv, p); } - while (s->slot_post_pq.top != NULL) { - /* handle new packets from this func */ + if (s->slot_post_pq.top != NULL) { SCMutexLock(&s->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); - if (extra_p != NULL) { - tv->tmqh_out(tv, extra_p); + while (s->slot_post_pq.top != NULL) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s->slot_post_pq); + if (extra_p != NULL) { + tv->tmqh_out(tv, extra_p); + } } + SCMutexUnlock(&s->slot_post_pq.mutex_q); } if (TmThreadsCheckFlag(tv, THV_KILL)) { @@ -672,25 +676,28 @@ void *TmThreadsSlotVar(void *td) /* now handle the post_pq packets */ TmSlot *slot; for (slot = s; slot != NULL; slot = slot->slot_next) { - while (slot->slot_post_pq.top != NULL) { + if (slot->slot_post_pq.top != NULL) { SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - if (extra_p == NULL) - break; - - if (slot->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); + while (slot->slot_post_pq.top != NULL) { + Packet *extra_p = PacketDequeue(&slot->slot_post_pq); + if (extra_p == NULL) break; + + if (slot->slot_next != NULL) { + r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); + if (r == TM_ECODE_FAILED) { + TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); + TmqhOutputPacketpool(tv, extra_p); + TmThreadsSetFlag(tv, THV_FAILED); + break; + } } - } - /* output the packet */ - tv->tmqh_out(tv, extra_p); - } /* while (slot->slot_post_pq.top != NULL) */ - } /* for (slot = s; slot != NULL; slot = slot->slot_next) */ + /* output the packet */ + tv->tmqh_out(tv, extra_p); + } /* while */ + SCMutexUnlock(&slot->slot_post_pq.mutex_q); + } /* if */ + } /* for */ if (TmThreadsCheckFlag(tv, THV_KILL)) { run = 0; diff --git a/src/tm-threads.h b/src/tm-threads.h index 115fbf74a9..474f7c1837 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -152,25 +152,27 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet /* post process pq */ TmSlot *slot = s; while (slot != NULL) { - while (slot->slot_post_pq.top != NULL) { + if (slot->slot_post_pq.top != NULL) { SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - if (extra_p != NULL) { - if (slot->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); - break; + while (slot->slot_post_pq.top != NULL) { + Packet *extra_p = PacketDequeue(&slot->slot_post_pq); + if (extra_p != NULL) { + if (slot->slot_next != NULL) { + r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); + if (r == TM_ECODE_FAILED) { + TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); + TmqhOutputPacketpool(tv, extra_p); + TmThreadsSetFlag(tv, THV_FAILED); + break; + } } + tv->tmqh_out(tv, extra_p); } - tv->tmqh_out(tv, extra_p); - } - } + } /* while (slot->slot_post_pq.top != NULL) */ + SCMutexUnlock(&slot->slot_post_pq.mutex_q); + } /* if (slot->slot_post_pq.top != NULL) */ slot = slot->slot_next; - } + } /* while (slot != NULL) */ } return r;