diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 4f62d89d3a..d62addd9ab 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -481,15 +481,20 @@ int FlowForceReassemblyForFlowV2(Flow *f, int server, int client) } } - SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); - PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p1); - if (p2 != NULL) - PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p2); - if (p3 != NULL) - PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3); - SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); - if (stream_pseudo_pkt_decode_TV->inq != NULL) { - SCCondSignal(&trans_q[stream_pseudo_pkt_decode_TV->inq->id].cond_q); + /* inject the packet(s) into the appropriate thread */ + int thread_id = (int)f->thread_id; + Packet *packets[4] = { p1, p2 ? p2 : p3, p2 ? p3 : NULL, NULL }; /**< null terminated array of packets */ + if (unlikely(!(TmThreadsInjectPacketsById(packets, thread_id)))) { + FlowDeReference(&p1->flow); + TmqhOutputPacketpool(NULL, p1); + if (p2) { + FlowDeReference(&p2->flow); + TmqhOutputPacketpool(NULL, p2); + } + if (p3) { + FlowDeReference(&p3->flow); + TmqhOutputPacketpool(NULL, p3); + } } /* done, in case of error (no packet) we still tag flow as complete diff --git a/src/tm-threads.c b/src/tm-threads.c index bb8ab9be90..59754c0ae4 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -2074,3 +2074,33 @@ void TmThreadsUnregisterThread(const int id) end: SCMutexUnlock(&thread_store_lock); } + +/** + * \retval r 1 if packet was accepted, 0 otherwise + * \note if packet was not accepted, it's still the responsibility + * of the caller. + */ +int TmThreadsInjectPacketsById(Packet **packets, int id) +{ + if (id < 0 || id >= (int)thread_store.threads_size) + return 0; + + Thread *t = &thread_store.threads[id]; + ThreadVars *tv = t->tv; + + if (tv == NULL || tv->stream_pq == NULL) + return 0; + + SCMutexLock(&tv->stream_pq->mutex_q); + while (*packets != NULL) { + PacketEnqueue(tv->stream_pq, *packets); + packets++; + } + SCMutexUnlock(&tv->stream_pq->mutex_q); + + /* wake up listening thread(s) if necessary */ + if (tv->inq != NULL) { + SCCondSignal(&trans_q[tv->inq->id].cond_q); + } + return 1; +} diff --git a/src/tm-threads.h b/src/tm-threads.h index 0684d28b02..4d864d6ebe 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -199,5 +199,6 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet void TmThreadsListThreads(void); int TmThreadsRegisterThread(ThreadVars *tv, const int type); void TmThreadsUnregisterThread(const int id); +int TmThreadsInjectPacketsById(Packet **, int id); #endif /* __TM_THREADS_H__ */