flow/manager: adaptive hash eviction timing

The flow manager scans the hash table in chunks based on the flow timeout
settings. In the default config this will lead to a full hash pass every
240 seconds. Under pressure, this will lead to a large amount of memory
still in use by flows waiting to be evicted, or evicted flows waiting to
be freed.

This patch implements a new adaptive logic to the timing and amount of
work that is done by the flow manager. It takes the memcap budgets and
calculates the proportion of the memcap budgets in use. It takes the max
in-use percentage, and adapts the flow manager behavior based on that.

The memcaps considered are:
    flow, stream, stream-reassembly and app-layer-http

The percentage in use, is inversely applies to the time the flow manager
takes for a full hash pass. In addition, it is also applied to the chunk
size and the sleep time.

Example: tcp.reassembly_memuse is at 90% of the memcap and normal flow
hash pass is 240s. Hash pass time will be:

    240 * (100 - 90) / 100 = 24s

Chunk size and sleep time will automatically be updated for this.

Adds various counters.

Bug: #4650.
Bug: #4808.
Victor Julien 3 years ago committed by Victor Julien
parent f50af12068
commit e9d2417e0f

@ -71,6 +71,8 @@
#include "output-flow.h"
#include "util-validate.h"
#include "runmode-unix-socket.h"
/* Run mode selected at suricata.c */
extern int run_mode;
@ -481,22 +483,34 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td,
return cnt;
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td,
struct timeval *ts,
const uint32_t hash_min, const uint32_t hash_max,
FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks)
/** \internal
* \brief handle timeout for a slice of hash rows
* If we wrap around we call FlowTimeoutHash twice */
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts,
const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
const uint32_t rows, uint32_t *pos)
const uint32_t rows = hash_max - hash_min;
const uint32_t chunk_size = rows / chunks;
const uint32_t min = iter * chunk_size + hash_min;
uint32_t max = min + chunk_size;
/* we start at beginning of hash at next iteration so let's check
* hash till the end */
if (iter + 1 == chunks) {
max = hash_max;
uint32_t start = 0;
uint32_t end = 0;
uint32_t cnt = 0;
uint32_t rows_left = rows;
start = hash_min + (*pos);
if (start >= hash_max) {
start = hash_min;
end = start + rows_left;
if (end > hash_max) {
end = hash_max;
*pos = (end == hash_max) ? hash_min : end;
rows_left = rows_left - (end - start);
cnt += FlowTimeoutHash(td, ts, start, end, counters);
if (rows_left) {
goto again;
const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters);
return cnt;
@ -614,6 +628,9 @@ typedef struct FlowCounters_ {
uint16_t flow_bypassed_cnt_clo;
uint16_t flow_bypassed_pkts;
uint16_t flow_bypassed_bytes;
uint16_t memcap_pressure;
uint16_t memcap_pressure_max;
} FlowCounters;
typedef struct FlowManagerThreadData_ {
@ -648,6 +665,9 @@ static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
@ -710,6 +730,42 @@ static uint32_t FlowTimeoutsMin(void)
//#define FM_PROFILE
static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows)
if (emergency) {
*wu_rows = rows;
*wu_sleep = 250;
uint32_t full_pass_in_ms = pass_in_sec * 1000;
float perc = MIN((((float)(100 - mp) / (float)100)), 1);
full_pass_in_ms *= perc;
full_pass_in_ms = MAX(full_pass_in_ms, 333);
uint32_t work_unit_ms = 999 * perc;
work_unit_ms = MAX(work_unit_ms, 250);
uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
int32_t sleep_per_wu = work_unit_ms - rows_process_cost;
sleep_per_wu = MAX(sleep_per_wu, 10);
#if 0
float passes_sec = 1000.0/(float)full_pass_in_ms;
SCLogNotice("full_pass_in_ms %u perc %f rows %u "
"wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u",
full_pass_in_ms, perc, rows,
wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu,
passes_sec, (uint32_t)((float)rows * passes_sec));
*wu_sleep = sleep_per_wu;
*wu_rows = rows_per_wu;
/** \brief Thread that manages the flow table and times out flows.
* \param td ThreadVars casted to void ptr
@ -724,7 +780,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
bool emerg = false;
bool prev_emerg = false;
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
uint32_t flow_last_sec = 0;
/* VJ leaving disabled for now, as hosts are only used by tags and the numbers
* are really low. Might confuse ppl
uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
@ -732,12 +787,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
memset(&ts, 0, sizeof(ts));
uint32_t hash_passes = 0;
uint32_t hash_row_checks = 0;
uint32_t hash_passes_chunks = 0;
uint32_t hash_full_passes = 0;
const uint32_t min_timeout = FlowTimeoutsMin();
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
@ -766,9 +815,19 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
memset(&startts, 0, sizeof(startts));
gettimeofday(&startts, NULL);
uint32_t hash_pass_iter = 0;
uint32_t emerg_over_cnt = 0;
uint64_t next_run_ms = 0;
const uint32_t rows = ftd->max - ftd->min;
uint32_t pos = 0;
uint32_t rows_per_wu = 0;
uint64_t sleep_per_wu = 0;
uint32_t mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
while (1)
@ -805,7 +864,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
const uint32_t rt = (uint32_t)ts.tv_sec;
const bool emerge_p = (emerg && !prev_emerg);
if (emerge_p) {
next_run_ms = 0;
@ -822,7 +880,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
const uint32_t secs_passed = rt - flow_last_sec;
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
@ -830,34 +887,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
if (emerg) {
/* in emergency mode, do a full pass of the hash table */
FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters);
hash_passes_chunks += 1;
hash_row_checks += counters.rows_checked;
StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
} else {
/* non-emergency mode: scan part of the hash */
const uint32_t chunks = MIN(secs_passed, pass_in_sec);
for (uint32_t i = 0; i < chunks; i++) {
FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max,
&counters, hash_pass_iter, pass_in_sec);
if (hash_pass_iter == pass_in_sec) {
hash_pass_iter = 0;
StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
const uint32_t ppos = pos;
&ftd->timeout, &ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
if (ppos > pos) {
StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
hash_row_checks += counters.rows_checked;
hash_passes_chunks += chunks;
flow_last_sec = rt;
StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
@ -891,54 +932,58 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
* clear emergency bit if we have at least xx flows pruned. */
uint32_t len = FlowSpareGetPoolSize();
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
if (emerg == true) {
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc);
/* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
} else {
emerg_over_cnt = 0;
/* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
} else {
emerg_over_cnt = 0;
if (emerg_over_cnt >= 30) {
emerg = false;
prev_emerg = false;
emerg_over_cnt = 0;
SCLogNotice("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
"ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
"%% flows at the queue",
(uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec,
len * 100 / flow_config.prealloc);
StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
if (emerg_over_cnt >= 30) {
emerg = false;
prev_emerg = false;
emerg_over_cnt = 0;
hash_pass_iter = 0;
SCLogNotice("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
"ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
/* update work units */
mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
next_run_ms = ts_ms + 667;
if (emerg)
next_run_ms = ts_ms + 250;
if (flow_last_sec == 0) {
flow_last_sec = rt;
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu);
if (ftd->instance == 0 &&
(other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) {
//uint32_t hosts_pruned =
other_last_sec = (uint32_t)ts.tv_sec;
next_run_ms = ts_ms + sleep_per_wu;
if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) {
if (ftd->instance == 0) {
// uint32_t hosts_pruned =
other_last_sec = (uint32_t)ts.tv_sec;
struct timeval run_endts;
@ -981,11 +1026,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
established_cnt, closing_cnt);
SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)",
hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1),
hash_full_passes, hash_row_checks,
hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
gettimeofday(&endts, NULL);
struct timeval total_run_time;
timersub(&endts, &startts, &total_run_time);

@ -82,8 +82,6 @@ const char *RunModeUnixSocketGetDefaultMode(void)
return "autofp";
#define MEMCAPS_MAX 7
static MemcapCommand memcaps[MEMCAPS_MAX] = {
@ -130,6 +128,24 @@ static MemcapCommand memcaps[MEMCAPS_MAX] = {
float MemcapsGetPressure(void)
float percent = 0.0;
for (int i = 0; i < 4; i++) { // only flow, streams, http
uint64_t memcap = memcaps[i].GetFunc();
if (memcap) {
uint64_t memuse = memcaps[i].GetMemuseFunc();
float p = (float)((double)memuse / (double)memcap);
// SCLogNotice("%s: memuse %"PRIu64", memcap %"PRIu64" => %f%%",
// memcaps[i].name, memuse, memcap, (p * 100));
percent = MAX(p, percent);
return percent;
static int RunModeUnixSocketMaster(void);
static int unix_manager_pcap_task_running = 0;
static int unix_manager_pcap_task_failed = 0;

@ -30,6 +30,8 @@ int RunModeUnixSocketIsActive(void);
TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed);
float MemcapsGetPressure(void);
TmEcode UnixSocketDatasetAdd(json_t *cmd, json_t* answer, void *data);
TmEcode UnixSocketDatasetRemove(json_t *cmd, json_t* answer, void *data);
