flow: add various flow counters

Add flow.end state counters

Add active TCP sessions counter

Add flow.active counter

Add flow.total counter

Ticket: #1478.
pull/7533/head
Victor Julien 3 years ago
parent aa31d2193f
commit b0993d6fd8

@ -560,6 +560,9 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv)
dtv->counter_nsh = StatsRegisterMaxCounter("decoder.nsh", tv);
dtv->counter_flow_memcap = StatsRegisterCounter("flow.memcap", tv);
dtv->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
dtv->counter_flow_total = StatsRegisterCounter("flow.total", tv);
dtv->counter_flow_active = StatsRegisterCounter("flow.active", tv);
dtv->counter_flow_tcp = StatsRegisterCounter("flow.tcp", tv);
dtv->counter_flow_udp = StatsRegisterCounter("flow.udp", tv);
dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv);

@ -724,6 +724,9 @@ typedef struct DecodeThreadVars_
uint16_t counter_flow_memcap;
uint16_t counter_tcp_active_sessions;
uint16_t counter_flow_total;
uint16_t counter_flow_active;
uint16_t counter_flow_tcp;
uint16_t counter_flow_udp;
uint16_t counter_flow_icmp4;

@ -486,6 +486,8 @@ static inline void FlowUpdateCounter(ThreadVars *tv, DecodeThreadVars *dtv,
#ifdef UNITTESTS
if (tv && dtv) {
#endif
StatsIncr(tv, dtv->counter_flow_total);
StatsIncr(tv, dtv->counter_flow_active);
switch (proto){
case IPPROTO_UDP:
StatsIncr(tv, dtv->counter_flow_udp);

@ -599,17 +599,6 @@ static uint32_t FlowCleanupHash(void)
return cnt;
}
static void Recycler(ThreadVars *tv, void *output_thread_data, Flow *f)
{
FLOWLOCK_WRLOCK(f);
(void)OutputFlowLog(tv, output_thread_data, f);
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
FlowSparePoolReturnFlow(f);
}
typedef struct FlowQueueTimeoutCounters {
uint32_t flows_removed;
uint32_t flows_timeout;
@ -1028,6 +1017,14 @@ void FlowManagerThreadSpawn()
typedef struct FlowRecyclerThreadData_ {
void *output_thread_data;
uint16_t counter_flows;
uint16_t counter_queue_avg;
uint16_t counter_queue_max;
uint16_t counter_flow_active;
uint16_t counter_tcp_active_sessions;
FlowEndCounters fec;
} FlowRecyclerThreadData;
static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
@ -1042,6 +1039,15 @@ static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void
}
SCLogDebug("output_thread_data %p", ftd->output_thread_data);
ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
FlowEndCountersRegister(t, &ftd->fec);
*data = ftd;
return TM_ECODE_OK;
}
@ -1056,6 +1062,23 @@ static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
return TM_ECODE_OK;
}
static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
{
FLOWLOCK_WRLOCK(f);
(void)OutputFlowLog(tv, ftd->output_thread_data, f);
FlowEndCountersUpdate(tv, &ftd->fec, f);
if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
StatsDecr(tv, ftd->counter_tcp_active_sessions);
}
StatsDecr(tv, ftd->counter_flow_active);
FlowClearMemory(f, f->protomap);
FLOWLOCK_UNLOCK(f);
FlowSparePoolReturnFlow(f);
}
/** \brief Thread that manages timed out flows.
*
* \param td ThreadVars casted to void ptr
@ -1079,6 +1102,9 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
SC_ATOMIC_ADD(flowrec_busy,1);
FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
/* Get the time */
@ -1088,8 +1114,9 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
Flow *f;
while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
Recycler(th_v, ftd->output_thread_data, f);
Recycler(th_v, ftd, f);
recycled_cnt++;
StatsIncr(th_v, ftd->counter_flows);
}
SC_ATOMIC_SUB(flowrec_busy,1);

@ -240,3 +240,70 @@ void RegisterFlowBypassInfo(void)
g_bypass_info_id = FlowStorageRegister("bypass_counters", sizeof(void *),
NULL, FlowBypassFree);
}
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec)
{
for (int i = 0; i < FLOW_STATE_SIZE; i++) {
const char *name = NULL;
if (i == FLOW_STATE_NEW) {
name = "flow.end.state.new";
} else if (i == FLOW_STATE_ESTABLISHED) {
name = "flow.end.state.established";
} else if (i == FLOW_STATE_CLOSED) {
name = "flow.end.state.closed";
} else if (i == FLOW_STATE_LOCAL_BYPASSED) {
name = "flow.end.state.local_bypassed";
#ifdef CAPTURE_OFFLOAD
} else if (i == FLOW_STATE_CAPTURE_BYPASSED) {
name = "flow.end.state.capture_bypassed";
#endif
}
if (name) {
fec->flow_state[i] = StatsRegisterCounter(name, t);
}
}
for (enum TcpState i = TCP_NONE; i <= TCP_CLOSED; i++) {
const char *name;
switch (i) {
case TCP_NONE:
name = "flow.end.tcp_state.none";
break;
case TCP_LISTEN:
name = "flow.end.tcp_state.listen";
break;
case TCP_SYN_SENT:
name = "flow.end.tcp_state.syn_sent";
break;
case TCP_SYN_RECV:
name = "flow.end.tcp_state.syn_recv";
break;
case TCP_ESTABLISHED:
name = "flow.end.tcp_state.established";
break;
case TCP_FIN_WAIT1:
name = "flow.end.tcp_state.fin_wait1";
break;
case TCP_FIN_WAIT2:
name = "flow.end.tcp_state.fin_wait2";
break;
case TCP_TIME_WAIT:
name = "flow.end.tcp_state.time_wait";
break;
case TCP_LAST_ACK:
name = "flow.end.tcp_state.last_ack";
break;
case TCP_CLOSE_WAIT:
name = "flow.end.tcp_state.close_wait";
break;
case TCP_CLOSING:
name = "flow.end.tcp_state.closing";
break;
case TCP_CLOSED:
name = "flow.end.tcp_state.closed";
break;
}
fec->flow_tcp_state[i] = StatsRegisterCounter(name, t);
}
}

@ -26,6 +26,7 @@
#include "detect-engine-state.h"
#include "tmqh-flow.h"
#include "stream-tcp-private.h"
#define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec)
@ -151,5 +152,23 @@ uint8_t FlowGetProtoMapping(uint8_t);
void FlowInit(Flow *, const Packet *);
uint8_t FlowGetReverseProtoMapping(uint8_t rproto);
/* flow end counter logic */
typedef struct FlowEndCounters_ {
uint16_t flow_state[FLOW_STATE_SIZE];
uint16_t flow_tcp_state[TCP_CLOSED + 1];
} FlowEndCounters;
static inline void FlowEndCountersUpdate(ThreadVars *tv, FlowEndCounters *fec, Flow *f)
{
if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
TcpSession *ssn = f->protoctx;
StatsIncr(tv, fec->flow_tcp_state[ssn->state]);
}
StatsIncr(tv, fec->flow_state[f->flow_state]);
}
void FlowEndCountersRegister(ThreadVars *t, FlowEndCounters *fec);
#endif /* __FLOW_UTIL_H__ */

@ -83,6 +83,7 @@ typedef struct FlowWorkerThreadData_ {
uint16_t flows_aside_needs_work;
uint16_t flows_aside_pkt_inject;
} cnt;
FlowEndCounters fec;
} FlowWorkerThreadData;
@ -189,6 +190,12 @@ static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw,
if (fw->output_thread_flow != NULL)
(void)OutputFlowLog(tv, fw->output_thread_flow, f);
FlowEndCountersUpdate(tv, &fw->fec, f);
if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
StatsDecr(tv, fw->dtv->counter_tcp_active_sessions);
}
StatsDecr(tv, fw->dtv->counter_flow_active);
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
if (fw->fls.spare_queue.len >= 200) { // TODO match to API? 200 = 2 * block size
@ -288,6 +295,7 @@ static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void *
DecodeRegisterPerfCounters(fw->dtv, tv);
AppLayerRegisterThreadCounters(tv);
FlowEndCountersRegister(tv, &fw->fec);
/* setup pq for stream end pkts */
memset(&fw->pq, 0, sizeof(PacketQueueNoLock));

@ -517,6 +517,11 @@ enum FlowState {
FLOW_STATE_CAPTURE_BYPASSED,
#endif
};
#ifdef CAPTURE_OFFLOAD
#define FLOW_STATE_SIZE 5
#else
#define FLOW_STATE_SIZE 4
#endif
typedef struct FlowProtoTimeout_ {
uint32_t new_timeout;

@ -935,6 +935,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
return -1;
}
StatsIncr(tv, stt->counter_tcp_sessions);
StatsIncr(tv, stt->counter_tcp_active_sessions);
StatsIncr(tv, stt->counter_tcp_midstream_pickups);
}
@ -1028,6 +1029,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
}
StatsIncr(tv, stt->counter_tcp_sessions);
StatsIncr(tv, stt->counter_tcp_active_sessions);
}
/* set the state */
@ -1094,6 +1096,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
return -1;
}
StatsIncr(tv, stt->counter_tcp_sessions);
StatsIncr(tv, stt->counter_tcp_active_sessions);
StatsIncr(tv, stt->counter_tcp_midstream_pickups);
}
/* set the state */
@ -5371,6 +5374,7 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
*data = (void *)stt;
stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv);
stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv);
stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv);

@ -78,6 +78,7 @@ typedef struct StreamTcpThread_ {
* receiving (valid) RST packets */
PacketQueueNoLock pseudo_queue;
uint16_t counter_tcp_active_sessions;
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
uint16_t counter_tcp_ssn_memcap;

Loading…
Cancel
Save