Introduce Flow timeout injection api

Add function TmThreadsInjectPacketById that is to be used to inject flow
timeout packets into the threads stream_pq queue.

TmThreadsInjectPacketById will also wake up listening threads if
applicable.

Packets are passed all packets together in an NULL terminated array
to reduce locking overhead.
pull/1267/head
Victor Julien 11 years ago
parent 51a782fd8c
commit 7f80516563

@ -481,15 +481,20 @@ int FlowForceReassemblyForFlowV2(Flow *f, int server, int client)
}
}
SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p1);
if (p2 != NULL)
PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p2);
if (p3 != NULL)
PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3);
SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
if (stream_pseudo_pkt_decode_TV->inq != NULL) {
SCCondSignal(&trans_q[stream_pseudo_pkt_decode_TV->inq->id].cond_q);
/* inject the packet(s) into the appropriate thread */
int thread_id = (int)f->thread_id;
Packet *packets[4] = { p1, p2 ? p2 : p3, p2 ? p3 : NULL, NULL }; /**< null terminated array of packets */
if (unlikely(!(TmThreadsInjectPacketsById(packets, thread_id)))) {
FlowDeReference(&p1->flow);
TmqhOutputPacketpool(NULL, p1);
if (p2) {
FlowDeReference(&p2->flow);
TmqhOutputPacketpool(NULL, p2);
}
if (p3) {
FlowDeReference(&p3->flow);
TmqhOutputPacketpool(NULL, p3);
}
}
/* done, in case of error (no packet) we still tag flow as complete

@ -2074,3 +2074,33 @@ void TmThreadsUnregisterThread(const int id)
end:
SCMutexUnlock(&thread_store_lock);
}
/**
* \retval r 1 if packet was accepted, 0 otherwise
* \note if packet was not accepted, it's still the responsibility
* of the caller.
*/
int TmThreadsInjectPacketsById(Packet **packets, int id)
{
if (id < 0 || id >= (int)thread_store.threads_size)
return 0;
Thread *t = &thread_store.threads[id];
ThreadVars *tv = t->tv;
if (tv == NULL || tv->stream_pq == NULL)
return 0;
SCMutexLock(&tv->stream_pq->mutex_q);
while (*packets != NULL) {
PacketEnqueue(tv->stream_pq, *packets);
packets++;
}
SCMutexUnlock(&tv->stream_pq->mutex_q);
/* wake up listening thread(s) if necessary */
if (tv->inq != NULL) {
SCCondSignal(&trans_q[tv->inq->id].cond_q);
}
return 1;
}

@ -199,5 +199,6 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);
int TmThreadsInjectPacketsById(Packet **, int id);
#endif /* __TM_THREADS_H__ */

Loading…
Cancel
Save