flow-manager: reduce burstiness in adaptive timing

Previous adaptive model would have a large time range when scanning the
hash when not so busy. In the default case it would take up to 4 minutes
for a full hash scan. In case of sudden increase in business, where the
hash would fill up rapidily during a few seconds, the flow manager would
be forced to suddenly consider a much larger slice of the hash leading
to a burst of work. This burst would increase pressure on the rest of the
system leading to packet loss as the worker threads would be overloaded
with flow housekeeping tasks.

This patch reduces the max scan time to 10 seconds, and ramps up quickly
to increase the slice of the hash scanned.
pull/7774/head
Victor Julien 3 years ago
parent 58ef3cde7a
commit 50f8779128

@ -738,54 +738,43 @@ static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
return TM_ECODE_OK;
}
static uint32_t FlowTimeoutsMin(void)
{
FlowProtoTimeoutPtr t = SC_ATOMIC_GET(flow_timeouts);
uint32_t m = -1;
for (unsigned int i = 0; i < FLOW_PROTO_MAX; i++) {
m = MIN(m, t[i].new_timeout);
m = MIN(m, t[i].est_timeout);
if (i == FLOW_PROTO_TCP) {
m = MIN(m, t[i].closed_timeout);
}
if (i == FLOW_PROTO_TCP || i == FLOW_PROTO_UDP) {
m = MIN(m, t[i].bypassed_timeout);
}
}
return m;
}
static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
/** \internal
* \brief calculate number of rows to scan and how much time to sleep
* based on the busy score `mp` (0 idle, 100 max busy).
*
* We try to to make sure we scan the hash once a second. The number size
* of the slice of the hash scanned is determined by our busy score 'mp'.
* We sleep for the remainder of the second after processing the slice,
* or at least an approximation of it.
* A minimum busy score of 10 is assumed to avoid a longer than 10 second
* full hash pass. This is to avoid burstiness in scanning when there is
* a rapid increase of the busy score, which could lead to the flow manager
* suddenly scanning a much larger slice of the hash leading to a burst
* in scan/eviction work.
*/
static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
{
if (emergency) {
*wu_rows = rows;
*wu_sleep = 250;
return;
}
/* minimum busy score is 10 */
const uint32_t emp = MAX(mp, 10);
const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
/* calc how much time we estimate the work will take, in ms. We assume
* each row takes an average of 1usec. Maxing out at 1sec. */
const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
/* calc how much time we need to sleep to get to the per second cadence
* but sleeping for at least 250ms. */
const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
sleep_per_unit);
uint32_t full_pass_in_ms = pass_in_sec * 1000;
const float perc = MIN((((float)(100 - mp) / (float)100)), 1.0);
full_pass_in_ms *= perc;
full_pass_in_ms = MAX(full_pass_in_ms, 333);
uint32_t work_unit_ms = 999 * perc;
work_unit_ms = MAX(work_unit_ms, 250);
const uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms;
const uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass);
const uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row
int32_t sleep_per_wu = work_unit_ms - rows_process_cost;
sleep_per_wu = MAX(sleep_per_wu, 10);
const float passes_sec = 1000.0 / (float)full_pass_in_ms;
*wu_sleep = sleep_per_wu;
*wu_rows = rows_per_wu;
*rows_sec = (uint32_t)((float)rows * passes_sec);
*wu_sleep = sleep_per_unit;
*wu_rows = rows_per_sec;
*rows_sec = rows_per_sec;
}
/** \brief Thread that manages the flow table and times out flows.
@ -798,8 +787,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
{
FlowManagerThreadData *ftd = thread_data;
const uint32_t rows = ftd->max - ftd->min;
const uint32_t min_timeout = FlowTimeoutsMin();
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
const bool time_is_live = TimeModeIsLive();
uint32_t emerg_over_cnt = 0;
@ -820,15 +807,12 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
return TM_ECODE_OK;
}
SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
ftd->instance, min_timeout, pass_in_sec);
uint32_t mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
while (1)
@ -927,7 +911,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
if (pmp != mp) {
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
}

Loading…
Cancel
Save