flow-recycler: support multiple instances

Use new management API to run the flow recycler.

Make number of threads configurable:

flow:
  memcap: 64mb
  hash-size: 65536
  prealloc: 10000
  emergency-recovery: 30
  managers: 2
  recyclers: 2

This sets up 2 flow recyclers.
pull/1058/head
Victor Julien 11 years ago
parent e0841218f0
commit 0ac94ef777

@ -74,6 +74,11 @@ static uint32_t flowmgr_number = 1;
/* atomic counter for flow managers, to assign instance id */ /* atomic counter for flow managers, to assign instance id */
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt); SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
/* multi flow recycler support */
static uint32_t flowrec_number = 1;
/* atomic counter for flow recyclers, to assign instance id */
SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
SC_ATOMIC_EXTERN(unsigned int, flow_flags); SC_ATOMIC_EXTERN(unsigned int, flow_flags);
/* 1 seconds */ /* 1 seconds */
@ -654,52 +659,56 @@ void FlowManagerThreadSpawn()
return; return;
} }
typedef struct FlowRecyclerThreadData_ {
void *output_thread_data;
} FlowRecyclerThreadData;
static TmEcode FlowRecyclerThreadInit(ThreadVars *t, void *initdata, void **data)
{
FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
SCFree(ftd);
return TM_ECODE_FAILED;
}
SCLogDebug("output_thread_data %p", ftd->output_thread_data);
*data = ftd;
return TM_ECODE_OK;
}
static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
{
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
if (ftd->output_thread_data != NULL)
OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
SCFree(data);
return TM_ECODE_OK;
}
/** \brief Thread that manages timed out flows. /** \brief Thread that manages timed out flows.
* *
* \param td ThreadVars casted to void ptr * \param td ThreadVars casted to void ptr
*/ */
void *FlowRecyclerThread(void *td) static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
{ {
/* block usr2. usr2 to be handled by the main thread only */ /* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2); UtilSignalBlock(SIGUSR2);
ThreadVars *th_v = (ThreadVars *)td;
struct timeval ts; struct timeval ts;
struct timespec cond_time; struct timespec cond_time;
int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC; int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC; int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
uint64_t recycled_cnt = 0; uint64_t recycled_cnt = 0;
void *output_thread_data = NULL; FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
BUG_ON(ftd == NULL);
if (th_v->thread_setup_flags != 0)
TmThreadSetupOptions(th_v);
memset(&ts, 0, sizeof(ts)); memset(&ts, 0, sizeof(ts));
/* set the thread name */
if (SCSetThreadName(th_v->name) < 0) {
SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
} else {
SCLogDebug("%s started...", th_v->name);
}
/* Set the threads capability */
th_v->cap_flags = 0;
SCDropCaps(th_v);
if (OutputFlowLogThreadInit(th_v, NULL, &output_thread_data) != TM_ECODE_OK) {
SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
/* failure */
TmThreadsSetFlag(th_v, THV_RUNNING_DONE);
TmThreadWaitForFlag(th_v, THV_DEINIT);
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
}
SCLogDebug("output_thread_data %p", output_thread_data);
TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1) while (1)
{ {
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
@ -725,7 +734,7 @@ void *FlowRecyclerThread(void *td)
while ((f = FlowDequeue(&flow_recycle_q)) != NULL) { while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
FLOWLOCK_WRLOCK(f); FLOWLOCK_WRLOCK(f);
(void)OutputFlowLog(th_v, output_thread_data, f); (void)OutputFlowLog(th_v, ftd->output_thread_data, f);
FlowClearMemory (f, f->protomap); FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f); FLOWLOCK_UNLOCK(f);
@ -753,17 +762,9 @@ void *FlowRecyclerThread(void *td)
SCPerfSyncCountersIfSignalled(th_v); SCPerfSyncCountersIfSignalled(th_v);
} }
if (output_thread_data != NULL)
OutputFlowLogThreadDeinit(th_v, output_thread_data);
SCLogInfo("%"PRIu64" flows processed", recycled_cnt); SCLogInfo("%"PRIu64" flows processed", recycled_cnt);
TmThreadsSetFlag(th_v, THV_RUNNING_DONE); return TM_ECODE_OK;
TmThreadWaitForFlag(th_v, THV_DEINIT);
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
} }
int FlowRecyclerReadyToShutdown(void) int FlowRecyclerReadyToShutdown(void)
@ -779,25 +780,44 @@ int FlowRecyclerReadyToShutdown(void)
/** \brief spawn the flow recycler thread */ /** \brief spawn the flow recycler thread */
void FlowRecyclerThreadSpawn() void FlowRecyclerThreadSpawn()
{ {
ThreadVars *tv_flowmgr = NULL; intmax_t setting = 1;
(void)ConfGetInt("flow.recyclers", &setting);
if (setting < 1 || setting > 1024) {
SCLogError(SC_ERR_INVALID_ARGUMENTS,
"invalid flow.recyclers setting %"PRIdMAX, setting);
exit(EXIT_FAILURE);
}
flowrec_number = (uint32_t)setting;
SCLogInfo("using %u flow recycler threads", flowrec_number);
SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
tv_flowmgr = TmThreadCreateMgmtThread("FlowRecyclerThread",
FlowRecyclerThread, 0);
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET); uint32_t u;
for (u = 0; u < flowrec_number; u++) {
ThreadVars *tv_flowmgr = NULL;
if (tv_flowmgr == NULL) { char name[32] = "";
printf("ERROR: TmThreadsCreate failed\n"); snprintf(name, sizeof(name), "FlowRecyclerThread%02u", u+1);
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowRecyclerThread",
"FlowRecycler", 0);
BUG_ON(tv_flowmgr == NULL);
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return; return;
} }
@ -820,27 +840,41 @@ void FlowKillFlowRecyclerThread(void)
usleep(10); usleep(10);
} while (FlowRecyclerReadyToShutdown() == 0); } while (FlowRecyclerReadyToShutdown() == 0);
/* wake up threads */
uint32_t u;
for (u = 0; u < flowrec_number; u++)
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
SCMutexLock(&tv_root_lock); SCMutexLock(&tv_root_lock);
/* flow manager thread(s) is/are a part of mgmt threads */ /* flow recycler thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT]; tv = tv_root[TVT_MGMT];
while (tv != NULL) { while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) { if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL); TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT); TmThreadsSetFlag(tv, THV_DEINIT);
cnt++;
}
tv = tv->next;
}
SCCtrlCondSignal(&flow_recycler_ctrl_cond); /* wake up threads, another try */
for (u = 0; u < flowrec_number; u++)
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
/* be sure it has shut down */ /* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) { while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100); usleep(100);
} }
cnt++;
} }
tv = tv->next; tv = tv->next;
} }
/* not possible, unless someone decides to rename FlowManagerThread */ /* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) { if (cnt == 0) {
SCMutexUnlock(&tv_root_lock); SCMutexUnlock(&tv_root_lock);
@ -848,6 +882,9 @@ void FlowKillFlowRecyclerThread(void)
} }
SCMutexUnlock(&tv_root_lock); SCMutexUnlock(&tv_root_lock);
/* reset count, so we can kill and respawn (unix socket) */
SC_ATOMIC_SET(flowrec_cnt, 0);
return; return;
} }
@ -863,6 +900,18 @@ void TmModuleFlowManagerRegister (void)
SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name); SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
} }
void TmModuleFlowRecyclerRegister (void)
{
tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
// tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
}
#ifdef UNITTESTS #ifdef UNITTESTS
/** /**

@ -43,5 +43,6 @@ void FlowRecyclerThreadSpawn(void);
void FlowKillFlowRecyclerThread(void); void FlowKillFlowRecyclerThread(void);
void TmModuleFlowManagerRegister (void); void TmModuleFlowManagerRegister (void);
void TmModuleFlowRecyclerRegister (void);
#endif /* __FLOW_MANAGER_H__ */ #endif /* __FLOW_MANAGER_H__ */

@ -795,6 +795,7 @@ void RegisterAllModules()
{ {
/* managers */ /* managers */
TmModuleFlowManagerRegister(); TmModuleFlowManagerRegister();
TmModuleFlowRecyclerRegister();
/* nfq */ /* nfq */
TmModuleReceiveNFQRegister(); TmModuleReceiveNFQRegister();
TmModuleVerdictNFQRegister(); TmModuleVerdictNFQRegister();

@ -624,6 +624,7 @@ flow:
prealloc: 10000 prealloc: 10000
emergency-recovery: 30 emergency-recovery: 30
#managers: 1 # default to one flow manager #managers: 1 # default to one flow manager
#recyclers: 1 # default to one flow recycler thread
# This option controls the use of vlan ids in the flow (and defrag) # This option controls the use of vlan ids in the flow (and defrag)
# hashing. Normally this should be enabled, but in some (broken) # hashing. Normally this should be enabled, but in some (broken)

Loading…
Cancel
Save