diff --git a/src/decode.c b/src/decode.c index e837502651..3f6a3a674b 100644 --- a/src/decode.c +++ b/src/decode.c @@ -560,6 +560,9 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv) dtv->counter_nsh = StatsRegisterMaxCounter("decoder.nsh", tv); dtv->counter_flow_memcap = StatsRegisterCounter("flow.memcap", tv); + dtv->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv); + dtv->counter_flow_total = StatsRegisterCounter("flow.total", tv); + dtv->counter_flow_active = StatsRegisterCounter("flow.active", tv); dtv->counter_flow_tcp = StatsRegisterCounter("flow.tcp", tv); dtv->counter_flow_udp = StatsRegisterCounter("flow.udp", tv); dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv); diff --git a/src/decode.h b/src/decode.h index fb4f58647a..010c3ad629 100644 --- a/src/decode.h +++ b/src/decode.h @@ -724,6 +724,9 @@ typedef struct DecodeThreadVars_ uint16_t counter_flow_memcap; + uint16_t counter_tcp_active_sessions; + uint16_t counter_flow_total; + uint16_t counter_flow_active; uint16_t counter_flow_tcp; uint16_t counter_flow_udp; uint16_t counter_flow_icmp4; diff --git a/src/flow-hash.c b/src/flow-hash.c index 9421d7ee73..f16bac3ef8 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -486,6 +486,8 @@ static inline void FlowUpdateCounter(ThreadVars *tv, DecodeThreadVars *dtv, #ifdef UNITTESTS if (tv && dtv) { #endif + StatsIncr(tv, dtv->counter_flow_total); + StatsIncr(tv, dtv->counter_flow_active); switch (proto){ case IPPROTO_UDP: StatsIncr(tv, dtv->counter_flow_udp); diff --git a/src/flow-manager.c b/src/flow-manager.c index 2faabc018d..d65505eb82 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -599,17 +599,6 @@ static uint32_t FlowCleanupHash(void) return cnt; } -static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f) -{ - FLOWLOCK_WRLOCK(f); - - (void)OutputFlowLog(tv, output_thread_data, f); - - FlowClearMemory (f, f->protomap); - FLOWLOCK_UNLOCK(f); - FlowSparePoolReturnFlow(f); -} - typedef struct FlowQueueTimeoutCounters { uint32_t flows_removed; uint32_t flows_timeout; @@ -1028,6 +1017,14 @@ void FlowManagerThreadSpawn() typedef struct FlowRecyclerThreadData_ { void *output_thread_data; + + uint16_t counter_flows; + uint16_t counter_queue_avg; + uint16_t counter_queue_max; + + uint16_t counter_flow_active; + uint16_t counter_tcp_active_sessions; + FlowEndCounters fec; } FlowRecyclerThreadData; static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data) @@ -1042,6 +1039,15 @@ static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void } SCLogDebug("output_thread_data %p", ftd->output_thread_data); + ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t); + ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t); + ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t); + + ftd->counter_flow_active = StatsRegisterCounter("flow.active", t); + ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t); + + FlowEndCountersRegister(t, &ftd->fec); + *data = ftd; return TM_ECODE_OK; } @@ -1056,6 +1062,23 @@ static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) return TM_ECODE_OK; } +static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f) +{ + FLOWLOCK_WRLOCK(f); + + (void)OutputFlowLog(tv, ftd->output_thread_data, f); + + FlowEndCountersUpdate(tv, &ftd->fec, f); + if (f->proto == IPPROTO_TCP && f->protoctx != NULL) { + StatsDecr(tv, ftd->counter_tcp_active_sessions); + } + StatsDecr(tv, ftd->counter_flow_active); + + FlowClearMemory(f, f->protomap); + FLOWLOCK_UNLOCK(f); + FlowSparePoolReturnFlow(f); +} + /** \brief Thread that manages timed out flows. * * \param td ThreadVars casted to void ptr @@ -1079,6 +1102,9 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) SC_ATOMIC_ADD(flowrec_busy,1); FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q); + StatsAddUI64(th_v, ftd->counter_queue_avg, list.len); + StatsSetUI64(th_v, ftd->counter_queue_max, list.len); + const int bail = (TmThreadsCheckFlag(th_v, THV_KILL)); /* Get the time */ @@ -1088,8 +1114,9 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) Flow *f; while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) { - Recycler(th_v, ftd->output_thread_data, f); + Recycler(th_v, ftd, f); recycled_cnt++; + StatsIncr(th_v, ftd->counter_flows); } SC_ATOMIC_SUB(flowrec_busy,1); diff --git a/src/flow-util.c b/src/flow-util.c index a9f94c70fd..289caa3e66 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -240,3 +240,70 @@ void RegisterFlowBypassInfo(void) g_bypass_info_id = FlowStorageRegister("bypass_counters", sizeof(void *), NULL, FlowBypassFree); } + +void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec) +{ + for (int i = 0; i < FLOW_STATE_SIZE; i++) { + const char *name = NULL; + if (i == FLOW_STATE_NEW) { + name = "flow.end.state.new"; + } else if (i == FLOW_STATE_ESTABLISHED) { + name = "flow.end.state.established"; + } else if (i == FLOW_STATE_CLOSED) { + name = "flow.end.state.closed"; + } else if (i == FLOW_STATE_LOCAL_BYPASSED) { + name = "flow.end.state.local_bypassed"; +#ifdef CAPTURE_OFFLOAD + } else if (i == FLOW_STATE_CAPTURE_BYPASSED) { + name = "flow.end.state.capture_bypassed"; +#endif + } + if (name) { + fec->flow_state[i] = StatsRegisterCounter(name, t); + } + } + + for (enum TcpState i = TCP_NONE; i <= TCP_CLOSED; i++) { + const char *name; + switch (i) { + case TCP_NONE: + name = "flow.end.tcp_state.none"; + break; + case TCP_LISTEN: + name = "flow.end.tcp_state.listen"; + break; + case TCP_SYN_SENT: + name = "flow.end.tcp_state.syn_sent"; + break; + case TCP_SYN_RECV: + name = "flow.end.tcp_state.syn_recv"; + break; + case TCP_ESTABLISHED: + name = "flow.end.tcp_state.established"; + break; + case TCP_FIN_WAIT1: + name = "flow.end.tcp_state.fin_wait1"; + break; + case TCP_FIN_WAIT2: + name = "flow.end.tcp_state.fin_wait2"; + break; + case TCP_TIME_WAIT: + name = "flow.end.tcp_state.time_wait"; + break; + case TCP_LAST_ACK: + name = "flow.end.tcp_state.last_ack"; + break; + case TCP_CLOSE_WAIT: + name = "flow.end.tcp_state.close_wait"; + break; + case TCP_CLOSING: + name = "flow.end.tcp_state.closing"; + break; + case TCP_CLOSED: + name = "flow.end.tcp_state.closed"; + break; + } + + fec->flow_tcp_state[i] = StatsRegisterCounter(name, t); + } +} diff --git a/src/flow-util.h b/src/flow-util.h index ff7a7bce53..9e11693a0d 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -26,6 +26,7 @@ #include "detect-engine-state.h" #include "tmqh-flow.h" +#include "stream-tcp-private.h" #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) @@ -151,5 +152,23 @@ uint8_t FlowGetProtoMapping(uint8_t); void FlowInit(Flow *, const Packet *); uint8_t FlowGetReverseProtoMapping(uint8_t rproto); +/* flow end counter logic */ + +typedef struct FlowEndCounters_ { + uint16_t flow_state[FLOW_STATE_SIZE]; + uint16_t flow_tcp_state[TCP_CLOSED + 1]; +} FlowEndCounters; + +static inline void FlowEndCountersUpdate(ThreadVars *tv, FlowEndCounters *fec, Flow *f) +{ + if (f->proto == IPPROTO_TCP && f->protoctx != NULL) { + TcpSession *ssn = f->protoctx; + StatsIncr(tv, fec->flow_tcp_state[ssn->state]); + } + StatsIncr(tv, fec->flow_state[f->flow_state]); +} + +void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec); + #endif /* __FLOW_UTIL_H__ */ diff --git a/src/flow-worker.c b/src/flow-worker.c index 0a781cca97..97f3f192a7 100644 --- a/src/flow-worker.c +++ b/src/flow-worker.c @@ -83,6 +83,7 @@ typedef struct FlowWorkerThreadData_ { uint16_t flows_aside_needs_work; uint16_t flows_aside_pkt_inject; } cnt; + FlowEndCounters fec; } FlowWorkerThreadData; @@ -189,6 +190,12 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, if (fw->output_thread_flow != NULL) (void)OutputFlowLog(tv, fw->output_thread_flow, f); + FlowEndCountersUpdate(tv, &fw->fec, f); + if (f->proto == IPPROTO_TCP && f->protoctx != NULL) { + StatsDecr(tv, fw->dtv->counter_tcp_active_sessions); + } + StatsDecr(tv, fw->dtv->counter_flow_active); + FlowClearMemory (f, f->protomap); FLOWLOCK_UNLOCK(f); if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size @@ -288,6 +295,7 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void * DecodeRegisterPerfCounters(fw->dtv, tv); AppLayerRegisterThreadCounters(tv); + FlowEndCountersRegister(tv, &fw->fec); /* setup pq for stream end pkts */ memset(&fw->pq, 0, sizeof(PacketQueueNoLock)); diff --git a/src/flow.h b/src/flow.h index 9887380459..2ec7f531fc 100644 --- a/src/flow.h +++ b/src/flow.h @@ -517,6 +517,11 @@ enum FlowState { FLOW_STATE_CAPTURE_BYPASSED, #endif }; +#ifdef CAPTURE_OFFLOAD +#define FLOW_STATE_SIZE 5 +#else +#define FLOW_STATE_SIZE 4 +#endif typedef struct FlowProtoTimeout_ { uint32_t new_timeout; diff --git a/src/stream-tcp.c b/src/stream-tcp.c index d0d770f39c..f083fe1af1 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -935,6 +935,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, return -1; } StatsIncr(tv, stt->counter_tcp_sessions); + StatsIncr(tv, stt->counter_tcp_active_sessions); StatsIncr(tv, stt->counter_tcp_midstream_pickups); } @@ -1028,6 +1029,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, } StatsIncr(tv, stt->counter_tcp_sessions); + StatsIncr(tv, stt->counter_tcp_active_sessions); } /* set the state */ @@ -1094,6 +1096,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, return -1; } StatsIncr(tv, stt->counter_tcp_sessions); + StatsIncr(tv, stt->counter_tcp_active_sessions); StatsIncr(tv, stt->counter_tcp_midstream_pickups); } /* set the state */ @@ -5371,6 +5374,7 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) *data = (void *)stt; + stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv); stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv); stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv); stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv); diff --git a/src/stream-tcp.h b/src/stream-tcp.h index 58b97c9b2a..15cfee210d 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -78,6 +78,7 @@ typedef struct StreamTcpThread_ { * receiving (valid) RST packets */ PacketQueueNoLock pseudo_queue; + uint16_t counter_tcp_active_sessions; uint16_t counter_tcp_sessions; /** sessions not picked up because memcap was reached */ uint16_t counter_tcp_ssn_memcap;