flow-manager: support multiple instances

Use new management API to run the flow manager.

Support multiple flow managers, where each of them works with it's
own part of the flow hash.

Make number of threads configurable:

flow:
  memcap: 64mb
  hash-size: 65536
  prealloc: 10000
  emergency-recovery: 30
  managers: 2

This sets up 2 flow managers.

Handle misc tasks only in instance 1: Handle defrag hash timeout
handing, host hash timeout handling and flow spare queue updating
only from the first instance.
pull/1058/head
Victor Julien 11 years ago
parent 46cee88ef8
commit e0841218f0

@ -69,6 +69,11 @@
/* Run mode selected at suricata.c */
extern int run_mode;
/* multi flow mananger support */
static uint32_t flowmgr_number = 1;
/* atomic counter for flow managers, to assign instance id */
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
/* 1 seconds */
@ -96,7 +101,10 @@ void FlowKillFlowManagerThread(void)
ThreadVars *tv = NULL;
int cnt = 0;
SCCtrlCondSignal(&flow_manager_ctrl_cond);
/* wake up threads */
uint32_t u;
for (u = 0; u < flowmgr_number; u++)
SCCtrlCondSignal(&flow_manager_ctrl_cond);
SCMutexLock(&tv_root_lock);
@ -107,16 +115,27 @@ void FlowKillFlowManagerThread(void)
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
cnt++;
}
tv = tv->next;
}
/* wake up threads, another try */
for (u = 0; u < flowmgr_number; u++)
SCCtrlCondSignal(&flow_manager_ctrl_cond);
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
cnt++;
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
@ -124,6 +143,9 @@ void FlowKillFlowManagerThread(void)
}
SCMutexUnlock(&tv_root_lock);
/* reset count, so we can kill and respawn (unix socket) */
SC_ATOMIC_SET(flowmgr_cnt, 0);
return;
}
@ -324,11 +346,16 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
*
* \param ts timestamp
* \param try_cnt number of flows to time out max (0 is unlimited)
* \param hash_min min hash index to consider
* \param hash_max max hash index to consider
* \param counters ptr to FlowTimeoutCounters structure
*
* \retval cnt number of timed out flow
*/
uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounters *counters) {
static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
uint32_t hash_min, uint32_t hash_max,
FlowTimeoutCounters *counters)
{
uint32_t idx = 0;
uint32_t cnt = 0;
int emergency = 0;
@ -336,7 +363,7 @@ uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, FlowTimeoutCounte
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
emergency = 1;
for (idx = 0; idx < flow_config.hash_size; idx++) {
for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx];
if (FBLOCK_TRYLOCK(fb) != 0)
@ -362,18 +389,86 @@ next:
extern int g_detect_disabled;
typedef struct FlowManagerThreadData_ {
uint32_t instance;
uint32_t min;
uint32_t max;
uint16_t flow_mgr_cnt_clo;
uint16_t flow_mgr_cnt_new;
uint16_t flow_mgr_cnt_est;
uint16_t flow_mgr_memuse;
uint16_t flow_mgr_spare;
uint16_t flow_emerg_mode_enter;
uint16_t flow_emerg_mode_over;
} FlowManagerThreadData;
static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
{
FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
SCLogDebug("flow manager instance %u", ftd->instance);
/* set the min and max value used for hash row walking
* each thread has it's own section of the flow hash */
uint32_t range = flow_config.hash_size / flowmgr_number;
if (ftd->instance == 1)
ftd->max = range;
else if (ftd->instance == flowmgr_number) {
ftd->min = (range * (ftd->instance - 1));
ftd->max = flow_config.hash_size;
} else {
ftd->min = (range * (ftd->instance - 1));
ftd->max = (range * ftd->instance);
}
BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
/* pass thread data back to caller */
*data = ftd;
ftd->flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", t,
SC_PERF_TYPE_UINT64, "NULL");
PacketPoolInit();
return TM_ECODE_OK;
}
static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
{
SCFree(data);
return TM_ECODE_OK;
}
/** \brief Thread that manages the flow table and times out flows.
*
* \param td ThreadVars casted to void ptr
*
* Keeps an eye on the spare list, alloc flows if needed...
*/
void *FlowManagerThread(void *td)
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
{
/* block usr1. usr1 to be handled by the main thread only */
/* block usr2. usr1 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
ThreadVars *th_v = (ThreadVars *)td;
FlowManagerThreadData *ftd = thread_data;
struct timeval ts;
uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
int emerg = FALSE;
@ -394,53 +489,10 @@ void *FlowManagerThread(void *td)
SC_PERF_TYPE_Q_NORMAL,
"NULL");
*/
uint16_t flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
if (th_v->thread_setup_flags != 0)
TmThreadSetupOptions(th_v);
memset(&ts, 0, sizeof(ts));
FlowForceReassemblySetup(g_detect_disabled);
/* set the thread name */
if (SCSetThreadName(th_v->name) < 0) {
SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
} else {
SCLogDebug("%s started...", th_v->name);
}
th_v->sc_perf_pca = SCPerfGetAllCountersArray(&th_v->sc_perf_pctx);
SCPerfAddToClubbedTMTable(th_v->name, &th_v->sc_perf_pctx);
/* Set the threads capability */
th_v->cap_flags = 0;
SCDropCaps(th_v);
PacketPoolInit();
FlowHashDebugInit();
TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
@ -457,7 +509,7 @@ void *FlowManagerThread(void *td)
SCLogDebug("Flow emergency mode entered...");
SCPerfCounterIncr(flow_emerg_mode_enter, th_v->sc_perf_pca);
SCPerfCounterIncr(ftd->flow_emerg_mode_enter, th_v->sc_perf_pca);
}
}
@ -472,16 +524,19 @@ void *FlowManagerThread(void *td)
}
/* see if we still have enough spare flows */
FlowUpdateSpareFlows();
if (ftd->instance == 1)
FlowUpdateSpareFlows();
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, };
FlowTimeoutHash(&ts, 0 /* check all */, &counters);
FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
DefragTimeoutHash(&ts);
//uint32_t hosts_pruned =
HostTimeoutHash(&ts);
if (ftd->instance == 1) {
DefragTimeoutHash(&ts);
//uint32_t hosts_pruned =
HostTimeoutHash(&ts);
}
/*
SCPerfCounterAddUI64(flow_mgr_host_prune, th_v->sc_perf_pca, (uint64_t)hosts_pruned);
uint32_t hosts_active = HostGetActiveCount();
@ -489,17 +544,17 @@ void *FlowManagerThread(void *td)
uint32_t hosts_spare = HostGetSpareCount();
SCPerfCounterSetUI64(flow_mgr_host_spare, th_v->sc_perf_pca, (uint64_t)hosts_spare);
*/
SCPerfCounterAddUI64(flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
SCPerfCounterAddUI64(flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
SCPerfCounterAddUI64(flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
SCPerfCounterAddUI64(ftd->flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
SCPerfCounterAddUI64(ftd->flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
SCPerfCounterAddUI64(ftd->flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
SCPerfCounterSetUI64(flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
SCPerfCounterSetUI64(ftd->flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
SCPerfCounterSetUI64(flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
SCPerfCounterSetUI64(ftd->flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
/* Don't fear, FlowManagerThread is here...
* clear emergency bit if we have at least xx flows pruned. */
@ -523,7 +578,7 @@ void *FlowManagerThread(void *td)
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
SCPerfCounterIncr(flow_emerg_mode_over, th_v->sc_perf_pca);
SCPerfCounterIncr(ftd->flow_emerg_mode_over, th_v->sc_perf_pca);
} else {
flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
@ -547,42 +602,55 @@ void *FlowManagerThread(void *td)
SCPerfSyncCountersIfSignalled(th_v);
}
TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
TmThreadWaitForFlag(th_v, THV_DEINIT);
FlowHashDebugDeinit();
SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were "
"timed out, %"PRIu32" flows in closed state", new_cnt,
established_cnt, closing_cnt);
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
return TM_ECODE_OK;
}
/** \brief spawn the flow manager thread */
void FlowManagerThreadSpawn()
{
ThreadVars *tv_flowmgr = NULL;
intmax_t setting = 1;
(void)ConfGetInt("flow.managers", &setting);
if (setting < 1 || setting > 1024) {
SCLogError(SC_ERR_INVALID_ARGUMENTS,
"invalid flow.managers setting %"PRIdMAX, setting);
exit(EXIT_FAILURE);
}
flowmgr_number = (uint32_t)setting;
SCLogInfo("using %u flow manager threads", flowmgr_number);
FlowForceReassemblySetup(g_detect_disabled);
SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread",
FlowManagerThread, 0);
uint32_t u;
for (u = 0; u < flowmgr_number; u++) {
ThreadVars *tv_flowmgr = NULL;
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
char name[32] = "";
snprintf(name, sizeof(name), "FlowManagerThread%02u", u+1);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowManagerThread",
"FlowManager", 0);
BUG_ON(tv_flowmgr == NULL);
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return;
}
@ -783,6 +851,18 @@ void FlowKillFlowRecyclerThread(void)
return;
}
void TmModuleFlowManagerRegister (void)
{
tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
// tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
}
#ifdef UNITTESTS
/**
@ -1033,7 +1113,7 @@ static int FlowMgrTest05 (void) {
TimeGet(&ts);
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, };
FlowTimeoutHash(&ts, 0 /* check all */, &counters);
FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
if (flow_recycle_q.len > 0) {
result = 1;

@ -42,4 +42,6 @@ SCCtrlMutex flow_recycler_ctrl_mutex;
void FlowRecyclerThreadSpawn(void);
void FlowKillFlowRecyclerThread(void);
void TmModuleFlowManagerRegister (void);
#endif /* __FLOW_MANAGER_H__ */

@ -793,6 +793,8 @@ int g_ut_covered;
void RegisterAllModules()
{
/* managers */
TmModuleFlowManagerRegister();
/* nfq */
TmModuleReceiveNFQRegister();
TmModuleVerdictNFQRegister();

@ -255,6 +255,8 @@ const char * TmModuleTmmIdToString(TmmId id)
CASE_CODE (TMM_JSONSSHLOG);
CASE_CODE (TMM_JSONTLSLOG);
CASE_CODE (TMM_OUTPUTJSON);
CASE_CODE (TMM_FLOWMANAGER);
CASE_CODE (TMM_FLOWRECYCLER);
CASE_CODE (TMM_SIZE);
}

@ -93,6 +93,10 @@ typedef enum {
TMM_DECODENFLOG,
TMM_JSONFLOWLOG,
TMM_JSONNETFLOWLOG,
TMM_FLOWMANAGER,
TMM_FLOWRECYCLER,
TMM_SIZE,
} TmmId;

@ -623,6 +623,7 @@ flow:
hash-size: 65536
prealloc: 10000
emergency-recovery: 30
#managers: 1 # default to one flow manager
# This option controls the use of vlan ids in the flow (and defrag)
# hashing. Normally this should be enabled, but in some (broken)

Loading…
Cancel
Save