From e9d2417e0ff34f72f824a11f9d840adeeb534256 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sat, 6 Nov 2021 20:29:01 +0100 Subject: [PATCH] flow/manager: adaptive hash eviction timing The flow manager scans the hash table in chunks based on the flow timeout settings. In the default config this will lead to a full hash pass every 240 seconds. Under pressure, this will lead to a large amount of memory still in use by flows waiting to be evicted, or evicted flows waiting to be freed. This patch implements a new adaptive logic to the timing and amount of work that is done by the flow manager. It takes the memcap budgets and calculates the proportion of the memcap budgets in use. It takes the max in-use percentage, and adapts the flow manager behavior based on that. The memcaps considered are: flow, stream, stream-reassembly and app-layer-http The percentage in use, is inversely applies to the time the flow manager takes for a full hash pass. In addition, it is also applied to the chunk size and the sleep time. Example: tcp.reassembly_memuse is at 90% of the memcap and normal flow hash pass is 240s. Hash pass time will be: 240 * (100 - 90) / 100 = 24s Chunk size and sleep time will automatically be updated for this. Adds various counters. Bug: #4650. Bug: #4808. --- src/flow-manager.c | 222 ++++++++++++++++++++++---------------- src/runmode-unix-socket.c | 20 +++- src/runmode-unix-socket.h | 2 + 3 files changed, 151 insertions(+), 93 deletions(-) diff --git a/src/flow-manager.c b/src/flow-manager.c index 892a84f43d..cdf6944896 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -71,6 +71,8 @@ #include "output-flow.h" #include "util-validate.h" +#include "runmode-unix-socket.h" + /* Run mode selected at suricata.c */ extern int run_mode; @@ -481,22 +483,34 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, return cnt; } -static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, - struct timeval *ts, - const uint32_t hash_min, const uint32_t hash_max, - FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks) +/** \internal + * \brief handle timeout for a slice of hash rows + * If we wrap around we call FlowTimeoutHash twice */ +static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts, + const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters, + const uint32_t rows, uint32_t *pos) { - const uint32_t rows = hash_max - hash_min; - const uint32_t chunk_size = rows / chunks; - - const uint32_t min = iter * chunk_size + hash_min; - uint32_t max = min + chunk_size; - /* we start at beginning of hash at next iteration so let's check - * hash till the end */ - if (iter + 1 == chunks) { - max = hash_max; + uint32_t start = 0; + uint32_t end = 0; + uint32_t cnt = 0; + uint32_t rows_left = rows; + +again: + start = hash_min + (*pos); + if (start >= hash_max) { + start = hash_min; + } + end = start + rows_left; + if (end > hash_max) { + end = hash_max; + } + *pos = (end == hash_max) ? hash_min : end; + rows_left = rows_left - (end - start); + + cnt += FlowTimeoutHash(td, ts, start, end, counters); + if (rows_left) { + goto again; } - const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters); return cnt; } @@ -614,6 +628,9 @@ typedef struct FlowCounters_ { uint16_t flow_bypassed_cnt_clo; uint16_t flow_bypassed_pkts; uint16_t flow_bypassed_bytes; + + uint16_t memcap_pressure; + uint16_t memcap_pressure_max; } FlowCounters; typedef struct FlowManagerThreadData_ { @@ -648,6 +665,9 @@ static void FlowCountersInit(ThreadVars *t, FlowCounters *fc) fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t); fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t); fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t); + + fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t); + fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t); } static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data) @@ -710,6 +730,42 @@ static uint32_t FlowTimeoutsMin(void) //#define FM_PROFILE +static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp, + const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows) +{ + if (emergency) { + *wu_rows = rows; + *wu_sleep = 250; + return; + } + + uint32_t full_pass_in_ms = pass_in_sec * 1000; + float perc = MIN((((float)(100 - mp) / (float)100)), 1); + full_pass_in_ms *= perc; + full_pass_in_ms = MAX(full_pass_in_ms, 333); + + uint32_t work_unit_ms = 999 * perc; + work_unit_ms = MAX(work_unit_ms, 250); + + uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms; + + uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass); + uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row + + int32_t sleep_per_wu = work_unit_ms - rows_process_cost; + sleep_per_wu = MAX(sleep_per_wu, 10); +#if 0 + float passes_sec = 1000.0/(float)full_pass_in_ms; + SCLogNotice("full_pass_in_ms %u perc %f rows %u " + "wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u", + full_pass_in_ms, perc, rows, + wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu, + passes_sec, (uint32_t)((float)rows * passes_sec)); +#endif + *wu_sleep = sleep_per_wu; + *wu_rows = rows_per_wu; +} + /** \brief Thread that manages the flow table and times out flows. * * \param td ThreadVars casted to void ptr @@ -724,7 +780,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) bool emerg = false; bool prev_emerg = false; uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */ - uint32_t flow_last_sec = 0; /* VJ leaving disabled for now, as hosts are only used by tags and the numbers * are really low. Might confuse ppl uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v); @@ -732,12 +787,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v); */ memset(&ts, 0, sizeof(ts)); - uint32_t hash_passes = 0; -#ifdef FM_PROFILE - uint32_t hash_row_checks = 0; - uint32_t hash_passes_chunks = 0; -#endif - uint32_t hash_full_passes = 0; const uint32_t min_timeout = FlowTimeoutsMin(); const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60; @@ -766,9 +815,19 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) memset(&startts, 0, sizeof(startts)); gettimeofday(&startts, NULL); - uint32_t hash_pass_iter = 0; uint32_t emerg_over_cnt = 0; uint64_t next_run_ms = 0; + const uint32_t rows = ftd->max - ftd->min; + uint32_t pos = 0; + uint32_t rows_per_wu = 0; + uint64_t sleep_per_wu = 0; + + uint32_t mp = MemcapsGetPressure() * 100; + if (ftd->instance == 0) { + StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); + StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); + } + GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu); while (1) { @@ -805,7 +864,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) TimeGet(&ts); SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000; - const uint32_t rt = (uint32_t)ts.tv_sec; const bool emerge_p = (emerg && !prev_emerg); if (emerge_p) { next_run_ms = 0; @@ -822,7 +880,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) FlowSparePoolUpdate(sq_len); } } - const uint32_t secs_passed = rt - flow_last_sec; /* try to time out flows */ FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; @@ -830,34 +887,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) if (emerg) { /* in emergency mode, do a full pass of the hash table */ FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters); - hash_passes++; - hash_full_passes++; - hash_passes++; -#ifdef FM_PROFILE - hash_passes_chunks += 1; - hash_row_checks += counters.rows_checked; -#endif StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); } else { - /* non-emergency mode: scan part of the hash */ - const uint32_t chunks = MIN(secs_passed, pass_in_sec); - for (uint32_t i = 0; i < chunks; i++) { - FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max, - &counters, hash_pass_iter, pass_in_sec); - hash_pass_iter++; - if (hash_pass_iter == pass_in_sec) { - hash_pass_iter = 0; - hash_full_passes++; - StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); - } + SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos, + rows_per_wu); + + const uint32_t ppos = pos; + FlowTimeoutHashInChunks( + &ftd->timeout, &ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos); + if (ppos > pos) { + StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); } - hash_passes++; -#ifdef FM_PROFILE - hash_row_checks += counters.rows_checked; - hash_passes_chunks += chunks; -#endif } - flow_last_sec = rt; /* StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned); @@ -891,54 +932,58 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) * clear emergency bit if we have at least xx flows pruned. */ uint32_t len = FlowSpareGetPoolSize(); StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len); + if (emerg == true) { SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32 "flow_spare_q status: %"PRIu32"%% flows at the queue", len, flow_config.prealloc, len * 100 / flow_config.prealloc); - /* only if we have pruned this "emergency_recovery" percentage - * of flows, we will unset the emergency bit */ - if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { - emerg_over_cnt++; - } else { - emerg_over_cnt = 0; + /* only if we have pruned this "emergency_recovery" percentage + * of flows, we will unset the emergency bit */ + if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { + emerg_over_cnt++; + } else { + emerg_over_cnt = 0; + } + + if (emerg_over_cnt >= 30) { + SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); + FlowTimeoutsReset(); + + emerg = false; + prev_emerg = false; + emerg_over_cnt = 0; + SCLogNotice("Flow emergency mode over, back to normal... unsetting" + " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", " + "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32 + "%% flows at the queue", + (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec, + len * 100 / flow_config.prealloc); + + StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over); + } } - if (emerg_over_cnt >= 30) { - SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); - FlowTimeoutsReset(); - - emerg = false; - prev_emerg = false; - emerg_over_cnt = 0; - hash_pass_iter = 0; - SCLogNotice("Flow emergency mode over, back to normal... unsetting" - " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", " - "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32 - "%% flows at the queue", (uintmax_t)ts.tv_sec, - (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc); - - StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over); + /* update work units */ + mp = MemcapsGetPressure() * 100; + if (ftd->instance == 0) { + StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); + StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); } - } - next_run_ms = ts_ms + 667; - if (emerg) - next_run_ms = ts_ms + 250; - } - if (flow_last_sec == 0) { - flow_last_sec = rt; - } + GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu); - if (ftd->instance == 0 && - (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) { - DefragTimeoutHash(&ts); - //uint32_t hosts_pruned = - HostTimeoutHash(&ts); - IPPairTimeoutHash(&ts); - HttpRangeContainersTimeoutHash(&ts); - other_last_sec = (uint32_t)ts.tv_sec; + next_run_ms = ts_ms + sleep_per_wu; + } + if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) { + if (ftd->instance == 0) { + DefragTimeoutHash(&ts); + // uint32_t hosts_pruned = + HostTimeoutHash(&ts); + IPPairTimeoutHash(&ts); + HttpRangeContainersTimeoutHash(&ts); + other_last_sec = (uint32_t)ts.tv_sec; + } } - #ifdef FM_PROFILE struct timeval run_endts; @@ -981,11 +1026,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) established_cnt, closing_cnt); #ifdef FM_PROFILE - SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)", - hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1), - hash_full_passes, hash_row_checks, - hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1)); - gettimeofday(&endts, NULL); struct timeval total_run_time; timersub(&endts, &startts, &total_run_time); diff --git a/src/runmode-unix-socket.c b/src/runmode-unix-socket.c index 29f2264f58..6fedc5d391 100644 --- a/src/runmode-unix-socket.c +++ b/src/runmode-unix-socket.c @@ -82,8 +82,6 @@ const char *RunModeUnixSocketGetDefaultMode(void) return "autofp"; } -#ifdef BUILD_UNIX_SOCKET - #define MEMCAPS_MAX 7 static MemcapCommand memcaps[MEMCAPS_MAX] = { { @@ -130,6 +128,24 @@ static MemcapCommand memcaps[MEMCAPS_MAX] = { }, }; +float MemcapsGetPressure(void) +{ + float percent = 0.0; + for (int i = 0; i < 4; i++) { // only flow, streams, http + uint64_t memcap = memcaps[i].GetFunc(); + if (memcap) { + uint64_t memuse = memcaps[i].GetMemuseFunc(); + float p = (float)((double)memuse / (double)memcap); + // SCLogNotice("%s: memuse %"PRIu64", memcap %"PRIu64" => %f%%", + // memcaps[i].name, memuse, memcap, (p * 100)); + percent = MAX(p, percent); + } + } + return percent; +} + +#ifdef BUILD_UNIX_SOCKET + static int RunModeUnixSocketMaster(void); static int unix_manager_pcap_task_running = 0; static int unix_manager_pcap_task_failed = 0; diff --git a/src/runmode-unix-socket.h b/src/runmode-unix-socket.h index fc8ff8a3c8..21134eb845 100644 --- a/src/runmode-unix-socket.h +++ b/src/runmode-unix-socket.h @@ -30,6 +30,8 @@ int RunModeUnixSocketIsActive(void); TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed); +float MemcapsGetPressure(void); + #ifdef BUILD_UNIX_SOCKET TmEcode UnixSocketDatasetAdd(json_t *cmd, json_t* answer, void *data); TmEcode UnixSocketDatasetRemove(json_t *cmd, json_t* answer, void *data);