diff --git a/src/flow.c b/src/flow.c index 9549792bb6..2f55a21cc6 100644 --- a/src/flow.c +++ b/src/flow.c @@ -461,6 +461,9 @@ static inline int FlowForceReassemblyForFlowV2(ThreadVars *tv, Flow *f) if (p3 != NULL) PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3); SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); + if (stream_pseudo_pkt_decode_TV->inq != NULL) { + SCCondSignal(&trans_q[stream_pseudo_pkt_decode_TV->inq->id].cond_q); + } return 1; } @@ -1762,6 +1765,14 @@ void *FlowManagerThread(void *td) "retrieve the slot for DECODE TM"); exit(EXIT_FAILURE); } + stream_pseudo_pkt_decode_TV = + TmThreadsGetTVContainingSlot(stream_pseudo_pkt_decode_tm_slot); + if (stream_pseudo_pkt_decode_TV == NULL) { + /* yes, this is fatal! */ + SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to " + "retrieve the TV containing the Decode TM slot"); + exit(EXIT_FAILURE); + } /* set the thread name */ SCSetThreadName(th_v->name); diff --git a/src/tm-threads.c b/src/tm-threads.c index 3dabca835f..dbb56a17e5 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -401,15 +401,14 @@ void *TmThreadsSlot1(void *td) /* output the packet */ tv->tmqh_out(tv, p); - - while (s->slot_post_pq.top != NULL) { - /* handle new packets from this func */ - SCMutexLock(&s->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&s->slot_post_pq); - SCMutexUnlock(&s->slot_post_pq.mutex_q); - if (extra_p != NULL) { - tv->tmqh_out(tv, extra_p); - } + } + while (s->slot_post_pq.top != NULL) { + /* handle new packets from this func */ + SCMutexLock(&s->slot_post_pq.mutex_q); + Packet *extra_p = PacketDequeue(&s->slot_post_pq); + SCMutexUnlock(&s->slot_post_pq.mutex_q); + if (extra_p != NULL) { + tv->tmqh_out(tv, extra_p); } } @@ -663,29 +662,29 @@ void *TmThreadsSlotVar(void *td) /* output the packet */ tv->tmqh_out(tv, p); - /* now handle the post_pq packets */ - TmSlot *slot; - for (slot = s; slot != NULL; slot = slot->slot_next) { - while (slot->slot_post_pq.top != NULL) { - SCMutexLock(&slot->slot_post_pq.mutex_q); - Packet *extra_p = PacketDequeue(&slot->slot_post_pq); - SCMutexUnlock(&slot->slot_post_pq.mutex_q); - if (extra_p == NULL) + } /* if (p != NULL) */ + /* now handle the post_pq packets */ + TmSlot *slot; + for (slot = s; slot != NULL; slot = slot->slot_next) { + while (slot->slot_post_pq.top != NULL) { + SCMutexLock(&slot->slot_post_pq.mutex_q); + Packet *extra_p = PacketDequeue(&slot->slot_post_pq); + SCMutexUnlock(&slot->slot_post_pq.mutex_q); + if (extra_p == NULL) + break; + + if (slot->slot_next != NULL) { + r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); + if (r == TM_ECODE_FAILED) { + TmqhOutputPacketpool(tv, extra_p); + TmThreadsSetFlag(tv, THV_FAILED); break; - - if (slot->slot_next != NULL) { - r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); - if (r == TM_ECODE_FAILED) { - TmqhOutputPacketpool(tv, extra_p); - TmThreadsSetFlag(tv, THV_FAILED); - break; - } } - /* output the packet */ - tv->tmqh_out(tv, extra_p); - } /* while (slot->slot_post_pq.top != NULL) */ - } /* for (slot = s; slot != NULL; slot = slot->slot_next) */ - } /* if (p != NULL) */ + } + /* output the packet */ + tv->tmqh_out(tv, extra_p); + } /* while (slot->slot_post_pq.top != NULL) */ + } /* for (slot = s; slot != NULL; slot = slot->slot_next) */ if (TmThreadsCheckFlag(tv, THV_KILL)) { run = 0;