From 88edc8630c6dd3ff659d23cc7e21f361bec0ce72 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Thu, 11 Nov 2021 13:30:46 +0100 Subject: [PATCH] flow/manager: add flow.mgr.rows_sec counter --- src/flow-manager.c | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/flow-manager.c b/src/flow-manager.c index 171feb8d2c..2faabc018d 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -617,6 +617,8 @@ typedef struct FlowQueueTimeoutCounters { typedef struct FlowCounters_ { uint16_t flow_mgr_full_pass; + uint16_t flow_mgr_rows_sec; + uint16_t flow_mgr_cnt_clo; uint16_t flow_mgr_cnt_new; uint16_t flow_mgr_cnt_est; @@ -655,6 +657,8 @@ typedef struct FlowManagerThreadData_ { static void FlowCountersInit(ThreadVars *t, FlowCounters *fc) { fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t); + fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t); + fc->flow_mgr_cnt_clo = StatsRegisterCounter("flow.mgr.closed_pruned", t); fc->flow_mgr_cnt_new = StatsRegisterCounter("flow.mgr.new_pruned", t); fc->flow_mgr_cnt_est = StatsRegisterCounter("flow.mgr.est_pruned", t); @@ -763,7 +767,7 @@ static uint32_t FlowTimeoutsMin(void) } static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp, - const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows) + const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec) { if (emergency) { *wu_rows = rows; @@ -772,30 +776,26 @@ static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, c } uint32_t full_pass_in_ms = pass_in_sec * 1000; - float perc = MIN((((float)(100 - mp) / (float)100)), 1); + const float perc = MIN((((float)(100 - mp) / (float)100)), 1.0); full_pass_in_ms *= perc; full_pass_in_ms = MAX(full_pass_in_ms, 333); uint32_t work_unit_ms = 999 * perc; work_unit_ms = MAX(work_unit_ms, 250); - uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms; + const uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms; - uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass); - uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row + const uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass); + const uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row int32_t sleep_per_wu = work_unit_ms - rows_process_cost; sleep_per_wu = MAX(sleep_per_wu, 10); -#if 0 - float passes_sec = 1000.0/(float)full_pass_in_ms; - SCLogNotice("full_pass_in_ms %u perc %f rows %u " - "wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u", - full_pass_in_ms, perc, rows, - wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu, - passes_sec, (uint32_t)((float)rows * passes_sec)); -#endif + + const float passes_sec = 1000.0 / (float)full_pass_in_ms; + *wu_sleep = sleep_per_wu; *wu_rows = rows_per_wu; + *rows_sec = (uint32_t)((float)rows * passes_sec); } /** \brief Thread that manages the flow table and times out flows. @@ -815,6 +815,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) uint32_t emerg_over_cnt = 0; uint64_t next_run_ms = 0; uint32_t pos = 0; + uint32_t rows_sec = 0; uint32_t rows_per_wu = 0; uint64_t sleep_per_wu = 0; bool emerg = false; @@ -837,7 +838,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); } - GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu); + GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); + StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); while (1) { @@ -929,12 +931,16 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) } /* update work units */ + const uint32_t pmp = mp; mp = MemcapsGetPressure() * 100; if (ftd->instance == 0) { StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); } - GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu); + GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); + if (pmp != mp) { + StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); + } next_run_ms = ts_ms + sleep_per_wu; }