threading: force break loop on flow inject

Track availability of break loop callback to avoid overhead.
pull/6438/head
Victor Julien 3 years ago
parent b788d3345c
commit ff97d7c15d

@ -132,6 +132,7 @@ typedef struct ThreadVars_ {
SCCtrlCondT *ctrl_cond;
struct FlowQueue_ *flow_queue;
bool break_loop;
} ThreadVars;

@ -668,6 +668,9 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, const void *data)
slot->SlotFunc = tm->Func;
} else if (tm->PktAcqLoop) {
slot->PktAcqLoop = tm->PktAcqLoop;
if (tm->PktAcqBreakLoop) {
tv->break_loop = true;
}
} else if (tm->Management) {
slot->Management = tm->Management;
}
@ -2319,6 +2322,23 @@ uint16_t TmThreadsGetWorkerThreadMax()
return thread_max;
}
static inline void ThreadBreakLoop(ThreadVars *tv)
{
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
return;
}
/* find the correct slot */
TmSlot *s = tv->tm_slots;
TmModule *tm = TmModuleGetById(s->tm_id);
if (tm->flags & TM_FLAG_RECEIVE_TM) {
/* if the method supports it, BreakLoop. Otherwise we rely on
* the capture method's recv timeout */
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
}
}
}
/**
* \retval r 1 if packet was accepted, 0 otherwise
* \note if packet was not accepted, it's still the responsibility
@ -2347,6 +2367,8 @@ int TmThreadsInjectPacketsById(Packet **packets, const int id)
/* wake up listening thread(s) if necessary */
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
ThreadBreakLoop(tv);
}
return 1;
}
@ -2369,5 +2391,7 @@ void TmThreadsInjectFlowById(Flow *f, const int id)
/* wake up listening thread(s) if necessary */
if (tv->inq != NULL) {
SCCondSignal(&tv->inq->pq->cond_q);
} else if (tv->break_loop) {
ThreadBreakLoop(tv);
}
}

Loading…
Cancel
Save