From de034f186745ee6ef1b53cf34e28a9a8c0403ac6 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Fri, 9 May 2014 14:37:07 +0200 Subject: [PATCH] flow: prepare flow forced reuse logging Most flows are marked for clean up by the flow manager, which then passes them to the recycler. The recycler logs and cleans up. However, under resource stress conditions, the packet threads can recycle existing flow directly. So here the recycler has no role to play, as the flow is immediately used. For this reason, the packet threads need to be able to invoke the flow logger directly. The flow logging thread ctx will stored in the DecodeThreadVars stucture. Therefore, this patch makes the DecodeThreadVars an argument to FlowHandlePacket. --- src/decode-icmpv4.c | 2 +- src/decode-icmpv6.c | 2 +- src/decode-sctp.c | 2 +- src/decode-tcp.c | 2 +- src/decode-udp.c | 4 ++-- src/decode.h | 3 +++ src/flow-hash.c | 35 ++++++++++++++++++++++++++--------- src/flow-hash.h | 2 +- src/flow.c | 5 +++-- src/flow.h | 2 +- src/util-unittest-helper.c | 2 +- 11 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/decode-icmpv4.c b/src/decode-icmpv4.c index 8b4402e6ff..6493550d99 100644 --- a/src/decode-icmpv4.c +++ b/src/decode-icmpv4.c @@ -192,7 +192,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, /* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */ if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); } } } diff --git a/src/decode-icmpv6.c b/src/decode-icmpv6.c index 7819f25498..0a58db5488 100644 --- a/src/decode-icmpv6.c +++ b/src/decode-icmpv6.c @@ -337,7 +337,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, #endif /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } diff --git a/src/decode-sctp.c b/src/decode-sctp.c index 178ed8bd69..6fd8be8d61 100644 --- a/src/decode-sctp.c +++ b/src/decode-sctp.c @@ -74,7 +74,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u #endif /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } diff --git a/src/decode-tcp.c b/src/decode-tcp.c index 11ba600f33..47c066160d 100644 --- a/src/decode-tcp.c +++ b/src/decode-tcp.c @@ -203,7 +203,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui #endif /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } diff --git a/src/decode-udp.c b/src/decode-udp.c index e1d52082e3..5b83768f9e 100644 --- a/src/decode-udp.c +++ b/src/decode-udp.c @@ -85,12 +85,12 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) { /* Here we have a Teredo packet and don't need to handle app * layer */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); return TM_ECODE_OK; } /* Flow is an integral part of us */ - FlowHandlePacket(tv, p); + FlowHandlePacket(tv, dtv, p); /* handle the app layer part of the UDP packet payload */ if (unlikely(p->flow != NULL)) { diff --git a/src/decode.h b/src/decode.h index fa48f4ac89..dd60e78876 100644 --- a/src/decode.h +++ b/src/decode.h @@ -573,6 +573,9 @@ typedef struct DecodeThreadVars_ int vlan_disabled; + /* thread data for flow logging api */ + void *output_flow_thread_data; + /** stats/counters */ uint16_t counter_pkts; uint16_t counter_bytes; diff --git a/src/flow-hash.c b/src/flow-hash.c index 52bbd8ebcd..607e00c532 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -42,12 +42,16 @@ #include "util-hash-lookup3.h" +#include "conf.h" +#include "output.h" +#include "output-flow.h" + #define FLOW_DEFAULT_FLOW_PRUNE 5 SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx); SC_ATOMIC_EXTERN(unsigned int, flow_flags); -static Flow *FlowGetUsedFlow(void); +static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv); #ifdef FLOW_DEBUG_STATS #define FLOW_DEBUG_STATS_PROTO_ALL 0 @@ -422,9 +426,12 @@ static inline int FlowCreateCheck(const Packet *p) * Get a new flow. We're checking memcap first and will try to make room * if the memcap is reached. * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * * \retval f *LOCKED* flow on succes, NULL on error. */ -static Flow *FlowGetNew(const Packet *p) +static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) { Flow *f = NULL; @@ -447,7 +454,7 @@ static Flow *FlowGetNew(const Packet *p) FlowWakeupFlowManagerThread(); } - f = FlowGetUsedFlow(); + f = FlowGetUsedFlow(tv, dtv); if (f == NULL) { /* very rare, but we can fail. Just giving up */ return NULL; @@ -473,7 +480,7 @@ static Flow *FlowGetNew(const Packet *p) return f; } -/* FlowGetFlowFromHash +/** \brief Get Flow for packet * * Hash retrieval function for flows. Looks up the hash bucket containing the * flow pointer. Then compares the packet with the found flow to see if it is @@ -485,9 +492,12 @@ static Flow *FlowGetNew(const Packet *p) * * The p->flow pointer is updated to point to the flow. * - * returns a *LOCKED* flow or NULL + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * + * \retval f *LOCKED* flow or NULL */ -Flow *FlowGetFlowFromHash(const Packet *p) +Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) { Flow *f = NULL; FlowHashCountInit; @@ -504,7 +514,7 @@ Flow *FlowGetFlowFromHash(const Packet *p) /* see if the bucket already has a flow */ if (fb->head == NULL) { - f = FlowGetNew(p); + f = FlowGetNew(tv, dtv, p); if (f == NULL) { FBLOCK_UNLOCK(fb); FlowHashCountUpdate; @@ -538,7 +548,7 @@ Flow *FlowGetFlowFromHash(const Packet *p) f = f->hnext; if (f == NULL) { - f = pf->hnext = FlowGetNew(p); + f = pf->hnext = FlowGetNew(tv, dtv, p); if (f == NULL) { FBLOCK_UNLOCK(fb); FlowHashCountUpdate; @@ -603,9 +613,12 @@ Flow *FlowGetFlowFromHash(const Packet *p) * top each time since that would clear the top of the hash leading to longer * and longer search times under high pressure (observed). * + * \param tv thread vars + * \param dtv decode thread vars (for flow log api thread data) + * * \retval f flow or NULL */ -static Flow *FlowGetUsedFlow(void) +static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv) { uint32_t idx = SC_ATOMIC_GET(flow_prune_idx) % flow_config.hash_size; uint32_t cnt = flow_config.hash_size; @@ -653,6 +666,10 @@ static Flow *FlowGetUsedFlow(void) f->fb = NULL; FBLOCK_UNLOCK(fb); + /* invoke flow log api */ + if (dtv && dtv->output_flow_thread_data) + (void)OutputFlowLog(tv, dtv->output_flow_thread_data, f); + FlowClearMemory(f, f->protomap); FLOWLOCK_UNLOCK(f); diff --git a/src/flow-hash.h b/src/flow-hash.h index 0de1a64346..a5635b06ce 100644 --- a/src/flow-hash.h +++ b/src/flow-hash.h @@ -68,7 +68,7 @@ typedef struct FlowBucket_ { /* prototypes */ -Flow *FlowGetFlowFromHash(const Packet *); +Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *); /** enable to print stats on hash lookups in flow-debug.log */ //#define FLOW_DEBUG_STATS diff --git a/src/flow.c b/src/flow.c index 3ba14d7464..516e684e94 100644 --- a/src/flow.c +++ b/src/flow.c @@ -232,14 +232,15 @@ static inline int FlowUpdateSeenFlag(const Packet *p) * This is called for every packet. * * \param tv threadvars + * \param dtv decode thread vars (for flow output api thread data) * \param p packet to handle flow for */ -void FlowHandlePacket(ThreadVars *tv, Packet *p) +void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p) { /* Get this packet's flow from the hash. FlowHandlePacket() will setup * a new flow if nescesary. If we get NULL, we're out of flow memory. * The returned flow is locked. */ - Flow *f = FlowGetFlowFromHash(p); + Flow *f = FlowGetFlowFromHash(tv, dtv, p); if (f == NULL) return; diff --git a/src/flow.h b/src/flow.h index d1896d2019..1cec317603 100644 --- a/src/flow.h +++ b/src/flow.h @@ -401,7 +401,7 @@ typedef struct FlowProto_ { int (*GetProtoState)(void *); } FlowProto; -void FlowHandlePacket (ThreadVars *, Packet *); +void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *); void FlowInitConfig (char); void FlowPrintQueueInfo (void); void FlowShutdown(void); diff --git a/src/util-unittest-helper.c b/src/util-unittest-helper.c index bac2083e21..f2c96b08e3 100644 --- a/src/util-unittest-helper.c +++ b/src/util-unittest-helper.c @@ -831,7 +831,7 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) { p->src.addr_data32[0] = i + 1; p->dst.addr_data32[0] = i; } - FlowHandlePacket(NULL, p); + FlowHandlePacket(NULL, NULL, p); if (p->flow != NULL) SC_ATOMIC_RESET(p->flow->use_cnt);