diff --git a/src/flow-manager.c b/src/flow-manager.c index 892a84f43d..cdf6944896 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -71,6 +71,8 @@ #include "output-flow.h" #include "util-validate.h" +#include "runmode-unix-socket.h" + /* Run mode selected at suricata.c */ extern int run_mode; @@ -481,22 +483,34 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, return cnt; } -static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, - struct timeval *ts, - const uint32_t hash_min, const uint32_t hash_max, - FlowTimeoutCounters *counters, uint32_t iter, const uint32_t chunks) +/** \internal + * \brief handle timeout for a slice of hash rows + * If we wrap around we call FlowTimeoutHash twice */ +static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, struct timeval *ts, + const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters, + const uint32_t rows, uint32_t *pos) { - const uint32_t rows = hash_max - hash_min; - const uint32_t chunk_size = rows / chunks; - - const uint32_t min = iter * chunk_size + hash_min; - uint32_t max = min + chunk_size; - /* we start at beginning of hash at next iteration so let's check - * hash till the end */ - if (iter + 1 == chunks) { - max = hash_max; + uint32_t start = 0; + uint32_t end = 0; + uint32_t cnt = 0; + uint32_t rows_left = rows; + +again: + start = hash_min + (*pos); + if (start >= hash_max) { + start = hash_min; + } + end = start + rows_left; + if (end > hash_max) { + end = hash_max; + } + *pos = (end == hash_max) ? hash_min : end; + rows_left = rows_left - (end - start); + + cnt += FlowTimeoutHash(td, ts, start, end, counters); + if (rows_left) { + goto again; } - const uint32_t cnt = FlowTimeoutHash(td, ts, min, max, counters); return cnt; } @@ -614,6 +628,9 @@ typedef struct FlowCounters_ { uint16_t flow_bypassed_cnt_clo; uint16_t flow_bypassed_pkts; uint16_t flow_bypassed_bytes; + + uint16_t memcap_pressure; + uint16_t memcap_pressure_max; } FlowCounters; typedef struct FlowManagerThreadData_ { @@ -648,6 +665,9 @@ static void FlowCountersInit(ThreadVars *t, FlowCounters *fc) fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t); fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t); fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t); + + fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t); + fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t); } static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data) @@ -710,6 +730,42 @@ static uint32_t FlowTimeoutsMin(void) //#define FM_PROFILE +static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp, + const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows) +{ + if (emergency) { + *wu_rows = rows; + *wu_sleep = 250; + return; + } + + uint32_t full_pass_in_ms = pass_in_sec * 1000; + float perc = MIN((((float)(100 - mp) / (float)100)), 1); + full_pass_in_ms *= perc; + full_pass_in_ms = MAX(full_pass_in_ms, 333); + + uint32_t work_unit_ms = 999 * perc; + work_unit_ms = MAX(work_unit_ms, 250); + + uint32_t wus_per_full_pass = full_pass_in_ms / work_unit_ms; + + uint32_t rows_per_wu = MAX(1, rows / wus_per_full_pass); + uint32_t rows_process_cost = rows_per_wu / 1000; // est 1usec per row + + int32_t sleep_per_wu = work_unit_ms - rows_process_cost; + sleep_per_wu = MAX(sleep_per_wu, 10); +#if 0 + float passes_sec = 1000.0/(float)full_pass_in_ms; + SCLogNotice("full_pass_in_ms %u perc %f rows %u " + "wus_per_full_pass %u rows_per_wu %u work_unit_ms %u sleep_per_wu %u => passes/s %f rows/s %u", + full_pass_in_ms, perc, rows, + wus_per_full_pass, rows_per_wu, work_unit_ms, (uint32_t)sleep_per_wu, + passes_sec, (uint32_t)((float)rows * passes_sec)); +#endif + *wu_sleep = sleep_per_wu; + *wu_rows = rows_per_wu; +} + /** \brief Thread that manages the flow table and times out flows. * * \param td ThreadVars casted to void ptr @@ -724,7 +780,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) bool emerg = false; bool prev_emerg = false; uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */ - uint32_t flow_last_sec = 0; /* VJ leaving disabled for now, as hosts are only used by tags and the numbers * are really low. Might confuse ppl uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v); @@ -732,12 +787,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v); */ memset(&ts, 0, sizeof(ts)); - uint32_t hash_passes = 0; -#ifdef FM_PROFILE - uint32_t hash_row_checks = 0; - uint32_t hash_passes_chunks = 0; -#endif - uint32_t hash_full_passes = 0; const uint32_t min_timeout = FlowTimeoutsMin(); const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60; @@ -766,9 +815,19 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) memset(&startts, 0, sizeof(startts)); gettimeofday(&startts, NULL); - uint32_t hash_pass_iter = 0; uint32_t emerg_over_cnt = 0; uint64_t next_run_ms = 0; + const uint32_t rows = ftd->max - ftd->min; + uint32_t pos = 0; + uint32_t rows_per_wu = 0; + uint64_t sleep_per_wu = 0; + + uint32_t mp = MemcapsGetPressure() * 100; + if (ftd->instance == 0) { + StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); + StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); + } + GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu); while (1) { @@ -805,7 +864,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) TimeGet(&ts); SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000; - const uint32_t rt = (uint32_t)ts.tv_sec; const bool emerge_p = (emerg && !prev_emerg); if (emerge_p) { next_run_ms = 0; @@ -822,7 +880,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) FlowSparePoolUpdate(sq_len); } } - const uint32_t secs_passed = rt - flow_last_sec; /* try to time out flows */ FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }; @@ -830,34 +887,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) if (emerg) { /* in emergency mode, do a full pass of the hash table */ FlowTimeoutHash(&ftd->timeout, &ts, ftd->min, ftd->max, &counters); - hash_passes++; - hash_full_passes++; - hash_passes++; -#ifdef FM_PROFILE - hash_passes_chunks += 1; - hash_row_checks += counters.rows_checked; -#endif StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); } else { - /* non-emergency mode: scan part of the hash */ - const uint32_t chunks = MIN(secs_passed, pass_in_sec); - for (uint32_t i = 0; i < chunks; i++) { - FlowTimeoutHashInChunks(&ftd->timeout, &ts, ftd->min, ftd->max, - &counters, hash_pass_iter, pass_in_sec); - hash_pass_iter++; - if (hash_pass_iter == pass_in_sec) { - hash_pass_iter = 0; - hash_full_passes++; - StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); - } + SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos, + rows_per_wu); + + const uint32_t ppos = pos; + FlowTimeoutHashInChunks( + &ftd->timeout, &ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos); + if (ppos > pos) { + StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); } - hash_passes++; -#ifdef FM_PROFILE - hash_row_checks += counters.rows_checked; - hash_passes_chunks += chunks; -#endif } - flow_last_sec = rt; /* StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned); @@ -891,54 +932,58 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) * clear emergency bit if we have at least xx flows pruned. */ uint32_t len = FlowSpareGetPoolSize(); StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len); + if (emerg == true) { SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32 "flow_spare_q status: %"PRIu32"%% flows at the queue", len, flow_config.prealloc, len * 100 / flow_config.prealloc); - /* only if we have pruned this "emergency_recovery" percentage - * of flows, we will unset the emergency bit */ - if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { - emerg_over_cnt++; - } else { - emerg_over_cnt = 0; + /* only if we have pruned this "emergency_recovery" percentage + * of flows, we will unset the emergency bit */ + if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { + emerg_over_cnt++; + } else { + emerg_over_cnt = 0; + } + + if (emerg_over_cnt >= 30) { + SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); + FlowTimeoutsReset(); + + emerg = false; + prev_emerg = false; + emerg_over_cnt = 0; + SCLogNotice("Flow emergency mode over, back to normal... unsetting" + " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", " + "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32 + "%% flows at the queue", + (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec, + len * 100 / flow_config.prealloc); + + StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over); + } } - if (emerg_over_cnt >= 30) { - SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); - FlowTimeoutsReset(); - - emerg = false; - prev_emerg = false; - emerg_over_cnt = 0; - hash_pass_iter = 0; - SCLogNotice("Flow emergency mode over, back to normal... unsetting" - " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", " - "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32 - "%% flows at the queue", (uintmax_t)ts.tv_sec, - (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc); - - StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over); + /* update work units */ + mp = MemcapsGetPressure() * 100; + if (ftd->instance == 0) { + StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); + StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); } - } - next_run_ms = ts_ms + 667; - if (emerg) - next_run_ms = ts_ms + 250; - } - if (flow_last_sec == 0) { - flow_last_sec = rt; - } + GetWorkUnitSizing(pass_in_sec, rows, mp, emerg, &sleep_per_wu, &rows_per_wu); - if (ftd->instance == 0 && - (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec)) { - DefragTimeoutHash(&ts); - //uint32_t hosts_pruned = - HostTimeoutHash(&ts); - IPPairTimeoutHash(&ts); - HttpRangeContainersTimeoutHash(&ts); - other_last_sec = (uint32_t)ts.tv_sec; + next_run_ms = ts_ms + sleep_per_wu; + } + if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) { + if (ftd->instance == 0) { + DefragTimeoutHash(&ts); + // uint32_t hosts_pruned = + HostTimeoutHash(&ts); + IPPairTimeoutHash(&ts); + HttpRangeContainersTimeoutHash(&ts); + other_last_sec = (uint32_t)ts.tv_sec; + } } - #ifdef FM_PROFILE struct timeval run_endts; @@ -981,11 +1026,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) established_cnt, closing_cnt); #ifdef FM_PROFILE - SCLogNotice("hash passes %u avg chunks %u full %u rows %u (rows/s %u)", - hash_passes, hash_passes_chunks / (hash_passes ? hash_passes : 1), - hash_full_passes, hash_row_checks, - hash_row_checks / ((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1)); - gettimeofday(&endts, NULL); struct timeval total_run_time; timersub(&endts, &startts, &total_run_time); diff --git a/src/runmode-unix-socket.c b/src/runmode-unix-socket.c index 29f2264f58..6fedc5d391 100644 --- a/src/runmode-unix-socket.c +++ b/src/runmode-unix-socket.c @@ -82,8 +82,6 @@ const char *RunModeUnixSocketGetDefaultMode(void) return "autofp"; } -#ifdef BUILD_UNIX_SOCKET - #define MEMCAPS_MAX 7 static MemcapCommand memcaps[MEMCAPS_MAX] = { { @@ -130,6 +128,24 @@ static MemcapCommand memcaps[MEMCAPS_MAX] = { }, }; +float MemcapsGetPressure(void) +{ + float percent = 0.0; + for (int i = 0; i < 4; i++) { // only flow, streams, http + uint64_t memcap = memcaps[i].GetFunc(); + if (memcap) { + uint64_t memuse = memcaps[i].GetMemuseFunc(); + float p = (float)((double)memuse / (double)memcap); + // SCLogNotice("%s: memuse %"PRIu64", memcap %"PRIu64" => %f%%", + // memcaps[i].name, memuse, memcap, (p * 100)); + percent = MAX(p, percent); + } + } + return percent; +} + +#ifdef BUILD_UNIX_SOCKET + static int RunModeUnixSocketMaster(void); static int unix_manager_pcap_task_running = 0; static int unix_manager_pcap_task_failed = 0; diff --git a/src/runmode-unix-socket.h b/src/runmode-unix-socket.h index fc8ff8a3c8..21134eb845 100644 --- a/src/runmode-unix-socket.h +++ b/src/runmode-unix-socket.h @@ -30,6 +30,8 @@ int RunModeUnixSocketIsActive(void); TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed); +float MemcapsGetPressure(void); + #ifdef BUILD_UNIX_SOCKET TmEcode UnixSocketDatasetAdd(json_t *cmd, json_t* answer, void *data); TmEcode UnixSocketDatasetRemove(json_t *cmd, json_t* answer, void *data);