flow: process evicted flows on low/no traffic

In a scenario where there was suddenly no more traffic flowing, flows
in a threads `flow_queue` would not be processed. The easiest way to
see this would be in a traffic replay scenario. After the replay is done
no more packets come in and these evicted flows got stuck.

In workers mode, the capture part handles timeout this was updated to
take the `ThreadVars::flow_queue` into account.

The autofp mode the logic that puts a flow into a threads `flow_queue`
would already wake a thread up, but the `flow_queue` was then ignored.
This has been updated to take the `flow_queue` into account.

In both cases a "capture timeout" packet is pushed through the pipeline
to "flush" the queues.

Bug: #4722.
pull/6438/head
Victor Julien 4 years ago
parent 31977170a8
commit b788d3345c

@ -447,6 +447,17 @@ static void *TmThreadsSlotVar(void *td)
/* input a packet */
p = tv->tmqh_in(tv);
/* if we didn't get a packet see if we need to do some housekeeping */
if (unlikely(p == NULL)) {
if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
p = PacketGetFromQueueOrAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
}
}
}
if (p != NULL) {
/* run the thread module(s) */
r = TmThreadsSlotVarRun(tv, p, s);

@ -159,7 +159,7 @@ static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet
* manager.
* \param s pipeline to run on these packets.
*/
static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
{
PacketQueue *pq = tv->stream_pq_local;
if (pq && pq->len > 0) {
@ -176,6 +176,9 @@ static inline void TmThreadsHandleInjectedPackets(ThreadVars *tv)
}
tv->tmqh_out(tv, extra_p);
}
return true;
} else {
return false;
}
}
@ -221,18 +224,34 @@ static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
}
}
/** \brief handle capture timeout
* When a capture method times out we check for house keeping
* tasks in the capture thread.
*
* \param p packet. Capture method may have taken a packet from
* the pool prior to the timing out call. We will then
* use that packet. Otherwise we can get our own.
*/
static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
{
if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
TmThreadsCaptureInjectPacket(tv, p);
} else {
TmThreadsHandleInjectedPackets(tv);
TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
return;
/* packet could have been passed to us that we won't use
* return it to the pool. */
if (p != NULL)
tv->tmqh_out(tv, p);
} else {
if (TmThreadsHandleInjectedPackets(tv) == false) {
/* see if we have to do some house keeping */
if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
return;
}
}
}
/* packet could have been passed to us that we won't use
* return it to the pool. */
if (p != NULL)
tv->tmqh_out(tv, p);
}
void TmThreadsListThreads(void);

Loading…
Cancel
Save