capture: check for flow packets on capture timeout

The capture threads can receive packets from the flow manager in their
Threadvars::stream_pq packet queue. This mechanism makes sure the packets
the flow manager injects into the engine are processed by the correct
worker thread.

If the capture thread(s) would not receive packets for a long time, the
Threadvars::stream_pq would not be checked and processed. This could
lead to packet pool depletion in the flow manager. It would also lead
to flows not being timed out/logged until either packets started flowing
again or until the engine was shut down.

The scenario is more likely to happen in a test (e.g. replay) but could
also delay logging on low traffic sensors.
pull/3898/head
Victor Julien 6 years ago
parent 952cbb563c
commit ce71bf1fff

@ -1607,8 +1607,8 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
AFPDumpCounters(ptv);
last_dump = current_time;
}
/* poll timed out, lets see if we need to inject a fake packet */
TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL);
/* poll timed out, lets see handle our timeout path */
TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL);
} else if ((r < 0) && (errno != EINTR)) {
SCLogError(SC_ERR_AFP_READ, "Error reading data from iface '%s': (%d) %s",

@ -639,8 +639,8 @@ static TmEcode ReceiveNetmapLoop(ThreadVars *tv, void *data, void *slot)
NetmapDumpCounters(ntv);
StatsSyncCountersIfSignalled(tv);
/* poll timed out, lets see if we need to inject a fake packet */
TmThreadsCaptureInjectPacket(tv, ntv->slot, NULL);
/* poll timed out, lets handle the timeout */
TmThreadsCaptureHandleTimeout(tv, ntv->slot, NULL);
continue;
}

@ -998,8 +998,8 @@ static void NFQRecvPkt(NFQQueueVars *t, NFQThreadVars *tv)
if (flag)
NFQVerdictCacheFlush(t);
/* inject a fake packet on timeout */
TmThreadsCaptureInjectPacket(tv->tv, tv->slot, NULL);
/* handle timeout */
TmThreadsCaptureHandleTimeout(tv->tv, tv->slot, NULL);
} else {
#ifdef COUNTERS
NFQMutexLock(t);

@ -288,7 +288,7 @@ TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
SCLogError(SC_ERR_PCAP_DISPATCH, "Pcap callback PcapCallbackLoop failed");
SCReturnInt(TM_ECODE_FAILED);
} else if (unlikely(r == 0)) {
TmThreadsCaptureInjectPacket(tv, ptv->slot, NULL);
TmThreadsCaptureHandleTimeout(tv, ptv->slot, NULL);
}
StatsSyncCountersIfSignalled(tv);

@ -435,7 +435,7 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
}
/* pfring didn't use the packet yet */
TmThreadsCaptureInjectPacket(tv, ptv->slot, p);
TmThreadsCaptureHandleTimeout(tv, ptv->slot, p);
} else {
SCLogError(SC_ERR_PF_RING_RECV,"pfring_recv error %" PRId32 "", r);

@ -209,6 +209,43 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
return r;
}
/**
* \brief Handle timeout from the capture layer. Checks
* post-pq which may have been filled by the flow
* manager.
*/
static inline TmEcode TmThreadsSlotHandlePostPQs(ThreadVars *tv, TmSlot *s)
{
/* post process pq */
for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->slot_post_pq.top != NULL) {
while (1) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
if (extra_p == NULL)
break;
if (slot->slot_next != NULL) {
TmEcode r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
SCMutexLock(&slot->slot_post_pq.mutex_q);
TmqhReleasePacketsToPacketPool(&slot->slot_post_pq);
SCMutexUnlock(&slot->slot_post_pq.mutex_q);
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
return TM_ECODE_FAILED;
}
}
tv->tmqh_out(tv, extra_p);
}
}
}
return TM_ECODE_OK;
}
/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
* Allow caller to supply their own packet
*
@ -216,19 +253,31 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet
* to force a packet through the engine to complete a reload */
static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, TmSlot *slot, Packet *p)
{
if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
if (p == NULL)
p = PacketGetFromQueueOrAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(tv, p);
}
TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
if (p == NULL)
p = PacketGetFromQueueOrAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
if (TmThreadsSlotProcessPkt(tv, slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(tv, p);
}
}
}
static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, TmSlot *slot, Packet *p)
{
if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
TmThreadsCaptureInjectPacket(tv, slot, p);
} else {
TmThreadsSlotHandlePostPQs(tv, slot);
/* packet could have been passed to us that we won't use
* return it to the pool. */
if (p != NULL)
tv->tmqh_out(tv, p);
}
}
void TmThreadsListThreads(void);
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
void TmThreadsUnregisterThread(const int id);

Loading…
Cancel
Save