diff --git a/src/flow-manager.c b/src/flow-manager.c index b68eb22ffc..5b5a29cd0b 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -738,54 +738,43 @@ static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data) return TM_ECODE_OK; } -static uint32_t FlowTimeoutsMin(void) -{ - FlowProtoTimeoutPtr t = SC_ATOMIC_GET(flow_timeouts); - uint32_t m = -1; - for (unsigned int i = 0; i < FLOW_PROTO_MAX; i++) { - m = MIN(m, t[i].new_timeout); - m = MIN(m, t[i].est_timeout); - - if (i == FLOW_PROTO_TCP) { - m = MIN(m, t[i].closed_timeout); - } - if (i == FLOW_PROTO_TCP || i == FLOW_PROTO_UDP) { - m = MIN(m, t[i].bypassed_timeout); - } - } - return m; -} - -static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp, - const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec) +/** \internal + * \brief calculate number of rows to scan and how much time to sleep + * based on the busy score `mp` (0 idle, 100 max busy). + * + * We try to to make sure we scan the hash once a second. The number size + * of the slice of the hash scanned is determined by our busy score 'mp'. + * We sleep for the remainder of the second after processing the slice, + * or at least an approximation of it. + * A minimum busy score of 10 is assumed to avoid a longer than 10 second + * full hash pass. This is to avoid burstiness in scanning when there is + * a rapid increase of the busy score, which could lead to the flow manager + * suddenly scanning a much larger slice of the hash leading to a burst + * in scan/eviction work. + */ +static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency, + uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec) { if (emergency) { *wu_rows = rows; *wu_sleep = 250; return; } - - uint32_t full_pass_in_ms = pass_in_sec * 1000; - const float perc = MIN((((float)(100 - mp) / (float)100)), 1.0); - full_pass_in_ms *= perc; - full_pass_in_ms = MAX(full_pass_in_ms, 333); - - uint32_t work_unit_ms = 999 * perc; - work_unit_ms = MAX(work_unit_ms, 250); - - const uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms; - - const uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass); - const uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row - - int32_t sleep_per_wu = work_unit_ms - rows_process_cost; - sleep_per_wu = MAX(sleep_per_wu, 10); - - const float passes_sec = 1000.0 / (float)full_pass_in_ms; - - *wu_sleep = sleep_per_wu; - *wu_rows = rows_per_wu; - *rows_sec = (uint32_t)((float)rows * passes_sec); + /* minimum busy score is 10 */ + const uint32_t emp = MAX(mp, 10); + const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100)); + /* calc how much time we estimate the work will take, in ms. We assume + * each row takes an average of 1usec. Maxing out at 1sec. */ + const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000); + /* calc how much time we need to sleep to get to the per second cadence + * but sleeping for at least 250ms. */ + const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit); + SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec, + sleep_per_unit); + + *wu_sleep = sleep_per_unit; + *wu_rows = rows_per_sec; + *rows_sec = rows_per_sec; } /** \brief Thread that manages the flow table and times out flows. @@ -798,8 +787,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) { FlowManagerThreadData *ftd = thread_data; const uint32_t rows = ftd->max - ftd->min; - const uint32_t min_timeout = FlowTimeoutsMin(); - const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60; const bool time_is_live = TimeModeIsLive(); uint32_t emerg_over_cnt = 0; @@ -820,15 +807,12 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) return TM_ECODE_OK; } - SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name, - ftd->instance, min_timeout, pass_in_sec); - uint32_t mp = MemcapsGetPressure() * 100; if (ftd->instance == 0) { StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); } - GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); + GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); while (1) @@ -927,7 +911,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); } - GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); + GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); if (pmp != mp) { StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); }