flow/manager: add flow.mgr.rows_sec counter

pull/7533/head
Victor Julien 3 years ago committed by Victor Julien
parent f271fb4575
commit 88edc8630c

@ -617,6 +617,8 @@ typedef struct FlowQueueTimeoutCounters {
typedef struct FlowCounters_ {
uint16_t flow_mgr_full_pass;
uint16_t flow_mgr_rows_sec;
uint16_t flow_mgr_cnt_clo;
uint16_t flow_mgr_cnt_new;
uint16_t flow_mgr_cnt_est;
@ -655,6 +657,8 @@ typedef struct FlowManagerThreadData_ {
static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
{
fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
fc->flow_mgr_cnt_clo = StatsRegisterCounter("flow.mgr.closed_pruned", t);
fc->flow_mgr_cnt_new = StatsRegisterCounter("flow.mgr.new_pruned", t);
fc->flow_mgr_cnt_est = StatsRegisterCounter("flow.mgr.est_pruned", t);
@ -763,7 +767,7 @@ static uint32_t FlowTimeoutsMin(void)
}
static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows)
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
{
if (emergency) {
*wu_rows = rows;
@ -772,30 +776,26 @@ static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, c
}
uint32_t full_pass_in_ms = pass_in_sec * 1000;
float perc = MIN((((float)(100 - mp) / (float)100)), 1);
const float perc = MIN((((float)(100 - mp) / (float)100)), 1.0);
full_pass_in_ms *= perc;
full_pass_in_ms = MAX(full_pass_in_ms, 333);
uint32_t work_unit_ms = 999 * perc;
work_unit_ms = MAX(work_unit_ms, 250);
uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
const uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
const uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
const uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
int32_t sleep_per_wu = work_unit_ms - rows_process_cost;
sleep_per_wu = MAX(sleep_per_wu, 10);
#if 0
float passes_sec = 1000.0/(float)full_pass_in_ms;
SCLogNotice("full_pass_in_ms %u perc %f rows %u "
"wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u",
full_pass_in_ms, perc, rows,
wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu,
passes_sec, (uint32_t)((float)rows * passes_sec));
#endif
const float passes_sec = 1000.0 / (float)full_pass_in_ms;
*wu_sleep = sleep_per_wu;
*wu_rows = rows_per_wu;
*rows_sec = (uint32_t)((float)rows * passes_sec);
}
/** \brief Thread that manages the flow table and times out flows.
@ -815,6 +815,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
uint32_t emerg_over_cnt = 0;
uint64_t next_run_ms = 0;
uint32_t pos = 0;
uint32_t rows_sec = 0;
uint32_t rows_per_wu = 0;
uint64_t sleep_per_wu = 0;
bool emerg = false;
@ -837,7 +838,8 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
while (1)
{
@ -929,12 +931,16 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
}
/* update work units */
const uint32_t pmp = mp;
mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
if (pmp != mp) {
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
}
next_run_ms = ts_ms + sleep_per_wu;
}

Loading…
Cancel
Save