You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
suricata/src/flow-manager.c

1190 lines
34 KiB
C

/* Copyright (C) 2007-2013 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Anoop Saldanha <anoopsaldanha@gmail.com>
* \author Victor Julien <victor@inliniac.net>
*/
#include "suricata-common.h"
#include "suricata.h"
#include "decode.h"
#include "conf.h"
#include "threadvars.h"
#include "tm-threads.h"
#include "runmodes.h"
#include "util-random.h"
#include "util-time.h"
#include "flow.h"
#include "flow-queue.h"
#include "flow-hash.h"
#include "flow-util.h"
#include "flow-var.h"
#include "flow-private.h"
#include "flow-timeout.h"
#include "flow-manager.h"
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp.h"
#include "util-unittest.h"
#include "util-unittest-helper.h"
#include "util-byte.h"
#include "util-debug.h"
#include "util-privs.h"
#include "util-signal.h"
#include "threads.h"
#include "detect.h"
#include "detect-engine-state.h"
#include "stream.h"
#include "app-layer-parser.h"
#include "host-timeout.h"
#include "defrag-timeout.h"
#include "output-flow.h"
/* Run mode selected at suricata.c */
extern int run_mode;
/* multi flow mananger support */
static uint32_t flowmgr_number = 1;
/* atomic counter for flow managers, to assign instance id */
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
/* multi flow recycler support */
static uint32_t flowrec_number = 1;
/* atomic counter for flow recyclers, to assign instance id */
SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
/* 1 seconds */
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
/* 0.1 seconds */
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 100000
#define NEW_FLOW_COUNT_COND 10
typedef struct FlowTimeoutCounters_ {
uint32_t new;
uint32_t est;
uint32_t clo;
} FlowTimeoutCounters;
/**
* \brief Used to kill flow manager thread(s).
*
* \todo Kinda hackish since it uses the tv name to identify flow manager
* thread. We need an all weather identification scheme.
*/
void FlowKillFlowManagerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
/* wake up threads */
uint32_t u;
for (u = 0; u < flowmgr_number; u++)
SCCtrlCondSignal(&flow_manager_ctrl_cond);
SCMutexLock(&tv_root_lock);
/* flow manager thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
cnt++;
}
tv = tv->next;
}
/* wake up threads, another try */
for (u = 0; u < flowmgr_number; u++)
SCCtrlCondSignal(&flow_manager_ctrl_cond);
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
/* reset count, so we can kill and respawn (unix socket) */
SC_ATOMIC_SET(flowmgr_cnt, 0);
return;
}
/** \internal
* \brief get timeout for flow
*
* \param f flow
* \param state flow state
* \param emergency bool indicating emergency mode 1 yes, 0 no
*
* \retval timeout timeout in seconds
*/
static inline uint32_t FlowGetFlowTimeout(Flow *f, int state, int emergency) {
uint32_t timeout;
if (emergency) {
switch(state) {
default:
case FLOW_STATE_NEW:
timeout = flow_proto[f->protomap].emerg_new_timeout;
break;
case FLOW_STATE_ESTABLISHED:
timeout = flow_proto[f->protomap].emerg_est_timeout;
break;
case FLOW_STATE_CLOSED:
timeout = flow_proto[f->protomap].emerg_closed_timeout;
break;
}
} else { /* implies no emergency */
switch(state) {
default:
case FLOW_STATE_NEW:
timeout = flow_proto[f->protomap].new_timeout;
break;
case FLOW_STATE_ESTABLISHED:
timeout = flow_proto[f->protomap].est_timeout;
break;
case FLOW_STATE_CLOSED:
timeout = flow_proto[f->protomap].closed_timeout;
break;
}
}
return timeout;
}
/** \internal
* \brief check if a flow is timed out
*
* \param f flow
* \param ts timestamp
* \param emergency bool indicating emergency mode
*
* \retval 0 not timed out
* \retval 1 timed out
*/
static int FlowManagerFlowTimeout(Flow *f, int state, struct timeval *ts, int emergency) {
/* set the timeout value according to the flow operating mode,
* flow's state and protocol.*/
uint32_t timeout = FlowGetFlowTimeout(f, state, emergency);
/* do the timeout check */
if ((int32_t)(f->lastts.tv_sec + timeout) >= ts->tv_sec) {
return 0;
}
return 1;
}
/** \internal
* \brief See if we can really discard this flow. Check use_cnt reference
* counter and force reassembly if necessary.
*
* \param f flow
* \param ts timestamp
* \param emergency bool indicating emergency mode
*
* \retval 0 not timed out just yet
* \retval 1 fully timed out, lets kill it
*/
static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts) {
/** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */
if (SC_ATOMIC_GET(f->use_cnt) > 0) {
return 0;
}
int server = 0, client = 0;
if (!(f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) &&
FlowForceReassemblyNeedReassembly(f, &server, &client) == 1) {
FlowForceReassemblyForFlowV2(f, server, client);
return 0;
}
#ifdef DEBUG
/* this should not be possible */
BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
#endif
return 1;
}
/**
* \internal
*
* \brief check all flows in a hash row for timing out
*
* \param f last flow in the hash row
* \param ts timestamp
* \param emergency bool indicating emergency mode
* \param counters ptr to FlowTimeoutCounters structure
*
* \retval cnt timed out flows
*/
static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
int emergency, FlowTimeoutCounters *counters)
{
uint32_t cnt = 0;
do {
if (FLOWLOCK_TRYWRLOCK(f) != 0) {
f = f->hprev;
continue;
}
Flow *next_flow = f->hprev;
int state = FlowGetFlowState(f);
/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) {
FLOWLOCK_UNLOCK(f);
f = f->hprev;
continue;
}
/* check if the flow is fully timed out and
* ready to be discarded. */
if (FlowManagerFlowTimedOut(f, ts) == 1) {
/* remove from the hash */
if (f->hprev != NULL)
f->hprev->hnext = f->hnext;
if (f->hnext != NULL)
f->hnext->hprev = f->hprev;
if (f->fb->head == f)
f->fb->head = f->hnext;
if (f->fb->tail == f)
f->fb->tail = f->hprev;
f->hnext = NULL;
f->hprev = NULL;
if (state == FLOW_STATE_NEW)
f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW;
else if (state == FLOW_STATE_ESTABLISHED)
f->flow_end_flags |= FLOW_END_FLAG_STATE_ESTABLISHED;
else if (state == FLOW_STATE_CLOSED)
f->flow_end_flags |= FLOW_END_FLAG_STATE_CLOSED;
if (emergency)
f->flow_end_flags |= FLOW_END_FLAG_EMERGENCY;
f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
// FlowClearMemory (f, f->protomap);
/* no one is referring to this flow, use_cnt 0, removed from hash
* so we can unlock it and move it back to the spare queue. */
FLOWLOCK_UNLOCK(f);
FlowEnqueue(&flow_recycle_q, f);
/* move to spare list */
// FlowMoveToSpare(f);
cnt++;
switch (state) {
case FLOW_STATE_NEW:
default:
counters->new++;
break;
case FLOW_STATE_ESTABLISHED:
counters->est++;
break;
case FLOW_STATE_CLOSED:
counters->clo++;
break;
}
} else {
FLOWLOCK_UNLOCK(f);
}
f = next_flow;
} while (f != NULL);
return cnt;
}
/**
* \brief time out flows from the hash
*
* \param ts timestamp
* \param try_cnt number of flows to time out max (0 is unlimited)
* \param hash_min min hash index to consider
* \param hash_max max hash index to consider
* \param counters ptr to FlowTimeoutCounters structure
*
* \retval cnt number of timed out flow
*/
static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
uint32_t hash_min, uint32_t hash_max,
FlowTimeoutCounters *counters)
{
uint32_t idx = 0;
uint32_t cnt = 0;
int emergency = 0;
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)
emergency = 1;
for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx];
if (FBLOCK_TRYLOCK(fb) != 0)
continue;
/* flow hash bucket is now locked */
if (fb->tail == NULL)
goto next;
/* we have a flow, or more than one */
cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters);
next:
FBLOCK_UNLOCK(fb);
if (try_cnt > 0 && cnt >= try_cnt)
break;
}
return cnt;
}
extern int g_detect_disabled;
typedef struct FlowManagerThreadData_ {
uint32_t instance;
uint32_t min;
uint32_t max;
uint16_t flow_mgr_cnt_clo;
uint16_t flow_mgr_cnt_new;
uint16_t flow_mgr_cnt_est;
uint16_t flow_mgr_memuse;
uint16_t flow_mgr_spare;
uint16_t flow_emerg_mode_enter;
uint16_t flow_emerg_mode_over;
} FlowManagerThreadData;
static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
{
FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
SCLogDebug("flow manager instance %u", ftd->instance);
/* set the min and max value used for hash row walking
* each thread has it's own section of the flow hash */
uint32_t range = flow_config.hash_size / flowmgr_number;
if (ftd->instance == 1)
ftd->max = range;
else if (ftd->instance == flowmgr_number) {
ftd->min = (range * (ftd->instance - 1));
ftd->max = flow_config.hash_size;
} else {
ftd->min = (range * (ftd->instance - 1));
ftd->max = (range * ftd->instance);
}
BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
/* pass thread data back to caller */
*data = ftd;
ftd->flow_mgr_cnt_clo = SCPerfTVRegisterCounter("flow_mgr.closed_pruned", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_cnt_new = SCPerfTVRegisterCounter("flow_mgr.new_pruned", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_cnt_est = SCPerfTVRegisterCounter("flow_mgr.est_pruned", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_memuse = SCPerfTVRegisterCounter("flow.memuse", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_mgr_spare = SCPerfTVRegisterCounter("flow.spare", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_emerg_mode_enter = SCPerfTVRegisterCounter("flow.emerg_mode_entered", t,
SC_PERF_TYPE_UINT64, "NULL");
ftd->flow_emerg_mode_over = SCPerfTVRegisterCounter("flow.emerg_mode_over", t,
SC_PERF_TYPE_UINT64, "NULL");
PacketPoolInit();
return TM_ECODE_OK;
}
static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
{
SCFree(data);
return TM_ECODE_OK;
}
/** \brief Thread that manages the flow table and times out flows.
*
* \param td ThreadVars casted to void ptr
*
* Keeps an eye on the spare list, alloc flows if needed...
*/
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
{
/* block usr2. usr1 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
FlowManagerThreadData *ftd = thread_data;
struct timeval ts;
uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
int emerg = FALSE;
int prev_emerg = FALSE;
uint32_t last_sec = 0;
struct timespec cond_time;
int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
/* VJ leaving disabled for now, as hosts are only used by tags and the numbers
* are really low. Might confuse ppl
uint16_t flow_mgr_host_prune = SCPerfTVRegisterCounter("hosts.pruned", th_v,
SC_PERF_TYPE_UINT64,
"NULL");
uint16_t flow_mgr_host_active = SCPerfTVRegisterCounter("hosts.active", th_v,
SC_PERF_TYPE_Q_NORMAL,
"NULL");
uint16_t flow_mgr_host_spare = SCPerfTVRegisterCounter("hosts.spare", th_v,
SC_PERF_TYPE_Q_NORMAL,
"NULL");
*/
memset(&ts, 0, sizeof(ts));
FlowHashDebugInit();
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
emerg = TRUE;
if (emerg == TRUE && prev_emerg == FALSE) {
prev_emerg = TRUE;
SCLogDebug("Flow emergency mode entered...");
SCPerfCounterIncr(ftd->flow_emerg_mode_enter, th_v->sc_perf_pca);
}
}
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
if (((uint32_t)ts.tv_sec - last_sec) > 600) {
FlowHashDebugPrint((uint32_t)ts.tv_sec);
last_sec = (uint32_t)ts.tv_sec;
}
/* see if we still have enough spare flows */
if (ftd->instance == 1)
FlowUpdateSpareFlows();
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, };
FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
if (ftd->instance == 1) {
DefragTimeoutHash(&ts);
//uint32_t hosts_pruned =
HostTimeoutHash(&ts);
}
/*
SCPerfCounterAddUI64(flow_mgr_host_prune, th_v->sc_perf_pca, (uint64_t)hosts_pruned);
uint32_t hosts_active = HostGetActiveCount();
SCPerfCounterSetUI64(flow_mgr_host_active, th_v->sc_perf_pca, (uint64_t)hosts_active);
uint32_t hosts_spare = HostGetSpareCount();
SCPerfCounterSetUI64(flow_mgr_host_spare, th_v->sc_perf_pca, (uint64_t)hosts_spare);
*/
SCPerfCounterAddUI64(ftd->flow_mgr_cnt_clo, th_v->sc_perf_pca, (uint64_t)counters.clo);
SCPerfCounterAddUI64(ftd->flow_mgr_cnt_new, th_v->sc_perf_pca, (uint64_t)counters.new);
SCPerfCounterAddUI64(ftd->flow_mgr_cnt_est, th_v->sc_perf_pca, (uint64_t)counters.est);
long long unsigned int flow_memuse = SC_ATOMIC_GET(flow_memuse);
SCPerfCounterSetUI64(ftd->flow_mgr_memuse, th_v->sc_perf_pca, (uint64_t)flow_memuse);
uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len;
FQLOCK_UNLOCK(&flow_spare_q);
SCPerfCounterSetUI64(ftd->flow_mgr_spare, th_v->sc_perf_pca, (uint64_t)len);
/* Don't fear, FlowManagerThread is here...
* clear emergency bit if we have at least xx flows pruned. */
if (emerg == TRUE) {
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc);
/* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
emerg = FALSE;
prev_emerg = FALSE;
flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
SCLogInfo("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
"ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
SCPerfCounterIncr(ftd->flow_emerg_mode_over, th_v->sc_perf_pca);
} else {
flow_update_delay_sec = FLOW_EMERG_MODE_UPDATE_DELAY_SEC;
flow_update_delay_nsec = FLOW_EMERG_MODE_UPDATE_DELAY_NSEC;
}
}
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
SCPerfSyncCounters(th_v);
break;
}
cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
cond_time.tv_nsec = flow_update_delay_nsec;
SCCtrlMutexLock(&flow_manager_ctrl_mutex);
SCCtrlCondTimedwait(&flow_manager_ctrl_cond, &flow_manager_ctrl_mutex,
&cond_time);
SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
SCPerfSyncCountersIfSignalled(th_v);
}
FlowHashDebugDeinit();
SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were "
"timed out, %"PRIu32" flows in closed state", new_cnt,
established_cnt, closing_cnt);
return TM_ECODE_OK;
}
/** \brief spawn the flow manager thread */
void FlowManagerThreadSpawn()
{
intmax_t setting = 1;
(void)ConfGetInt("flow.managers", &setting);
if (setting < 1 || setting > 1024) {
SCLogError(SC_ERR_INVALID_ARGUMENTS,
"invalid flow.managers setting %"PRIdMAX, setting);
exit(EXIT_FAILURE);
}
flowmgr_number = (uint32_t)setting;
SCLogInfo("using %u flow manager threads", flowmgr_number);
FlowForceReassemblySetup(g_detect_disabled);
SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
uint32_t u;
for (u = 0; u < flowmgr_number; u++) {
ThreadVars *tv_flowmgr = NULL;
char name[32] = "";
snprintf(name, sizeof(name), "FlowManagerThread%02u", u+1);
tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowManagerThread",
"FlowManager", 0);
BUG_ON(tv_flowmgr == NULL);
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return;
}
typedef struct FlowRecyclerThreadData_ {
void *output_thread_data;
} FlowRecyclerThreadData;
static TmEcode FlowRecyclerThreadInit(ThreadVars *t, void *initdata, void **data)
{
FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
SCLogError(SC_ERR_THREAD_INIT, "initializing flow log API for thread failed");
SCFree(ftd);
return TM_ECODE_FAILED;
}
SCLogDebug("output_thread_data %p", ftd->output_thread_data);
*data = ftd;
return TM_ECODE_OK;
}
static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
{
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
if (ftd->output_thread_data != NULL)
OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
SCFree(data);
return TM_ECODE_OK;
}
/** \brief Thread that manages timed out flows.
*
* \param td ThreadVars casted to void ptr
*/
static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
struct timeval ts;
struct timespec cond_time;
int flow_update_delay_sec = FLOW_NORMAL_MODE_UPDATE_DELAY_SEC;
int flow_update_delay_nsec = FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC;
uint64_t recycled_cnt = 0;
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
BUG_ON(ftd == NULL);
memset(&ts, 0, sizeof(ts));
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
uint32_t len = 0;
FQLOCK_LOCK(&flow_recycle_q);
len = flow_recycle_q.len;
FQLOCK_UNLOCK(&flow_recycle_q);
/* Loop through the queue and clean up all flows in it */
if (len) {
Flow *f;
while ((f = FlowDequeue(&flow_recycle_q)) != NULL) {
FLOWLOCK_WRLOCK(f);
(void)OutputFlowLog(th_v, ftd->output_thread_data, f);
FlowClearMemory (f, f->protomap);
FLOWLOCK_UNLOCK(f);
FlowMoveToSpare(f);
recycled_cnt++;
}
}
SCLogDebug("%u flows to recycle", len);
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
SCPerfSyncCounters(th_v);
break;
}
cond_time.tv_sec = time(NULL) + flow_update_delay_sec;
cond_time.tv_nsec = flow_update_delay_nsec;
SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
SCCtrlCondTimedwait(&flow_recycler_ctrl_cond,
&flow_recycler_ctrl_mutex, &cond_time);
SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
SCLogDebug("woke up...");
SCPerfSyncCountersIfSignalled(th_v);
}
SCLogInfo("%"PRIu64" flows processed", recycled_cnt);
return TM_ECODE_OK;
}
int FlowRecyclerReadyToShutdown(void)
{
uint32_t len = 0;
FQLOCK_LOCK(&flow_recycle_q);
len = flow_recycle_q.len;
FQLOCK_UNLOCK(&flow_recycle_q);
return ((len == 0));
}
/** \brief spawn the flow recycler thread */
void FlowRecyclerThreadSpawn()
{
intmax_t setting = 1;
(void)ConfGetInt("flow.recyclers", &setting);
if (setting < 1 || setting > 1024) {
SCLogError(SC_ERR_INVALID_ARGUMENTS,
"invalid flow.recyclers setting %"PRIdMAX, setting);
exit(EXIT_FAILURE);
}
flowrec_number = (uint32_t)setting;
SCLogInfo("using %u flow recycler threads", flowrec_number);
SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
uint32_t u;
for (u = 0; u < flowrec_number; u++) {
ThreadVars *tv_flowmgr = NULL;
char name[32] = "";
snprintf(name, sizeof(name), "FlowRecyclerThread%02u", u+1);
tv_flowmgr = TmThreadCreateMgmtThreadByName("FlowRecyclerThread",
"FlowRecycler", 0);
BUG_ON(tv_flowmgr == NULL);
TmThreadSetCPU(tv_flowmgr, MANAGEMENT_CPU_SET);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return;
}
/**
* \brief Used to kill flow recycler thread(s).
*
* \note this should only be called when the flow manager is already gone
*
* \todo Kinda hackish since it uses the tv name to identify flow recycler
* thread. We need an all weather identification scheme.
*/
void FlowKillFlowRecyclerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
/* make sure all flows are processed */
do {
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
usleep(10);
} while (FlowRecyclerReadyToShutdown() == 0);
/* wake up threads */
uint32_t u;
for (u = 0; u < flowrec_number; u++)
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
SCMutexLock(&tv_root_lock);
/* flow recycler thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
cnt++;
}
tv = tv->next;
}
/* wake up threads, another try */
for (u = 0; u < flowrec_number; u++)
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
/* reset count, so we can kill and respawn (unix socket) */
SC_ATOMIC_SET(flowrec_cnt, 0);
return;
}
void TmModuleFlowManagerRegister (void)
{
tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
// tmm_modules[TMM_FLOWMANAGER].RegisterTests = FlowManagerRegisterTests;
tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
}
void TmModuleFlowRecyclerRegister (void)
{
tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
// tmm_modules[TMM_FLOWRECYCLER].RegisterTests = FlowRecyclerRegisterTests;
tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
}
#ifdef UNITTESTS
/**
* \test Test the timing out of a flow with a fresh TcpSession
* (just initialized, no data segments) in normal mode.
*
* \retval On success it returns 1 and on failure 0.
*/
static int FlowMgrTest01 (void) {
TcpSession ssn;
Flow f;
FlowBucket fb;
struct timeval ts;
FlowQueueInit(&flow_spare_q);
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
memset(&ts, 0, sizeof(ts));
memset(&fb, 0, sizeof(FlowBucket));
FBLOCK_INIT(&fb);
FLOW_INITIALIZE(&f);
f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
TimeGet(&ts);
f.lastts.tv_sec = ts.tv_sec - 5000;
f.protoctx = &ssn;
f.fb = &fb;
f.proto = IPPROTO_TCP;
int state = FlowGetFlowState(&f);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 1;
}
/**
* \test Test the timing out of a flow with a TcpSession
* (with data segments) in normal mode.
*
* \retval On success it returns 1 and on failure 0.
*/
static int FlowMgrTest02 (void) {
TcpSession ssn;
Flow f;
FlowBucket fb;
struct timeval ts;
TcpSegment seg;
TcpStream client;
uint8_t payload[3] = {0x41, 0x41, 0x41};
FlowQueueInit(&flow_spare_q);
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
memset(&fb, 0, sizeof(FlowBucket));
memset(&ts, 0, sizeof(ts));
memset(&seg, 0, sizeof(TcpSegment));
memset(&client, 0, sizeof(TcpSegment));
FBLOCK_INIT(&fb);
FLOW_INITIALIZE(&f);
f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
TimeGet(&ts);
seg.payload = payload;
seg.payload_len = 3;
seg.next = NULL;
seg.prev = NULL;
client.seg_list = &seg;
ssn.client = client;
ssn.server = client;
ssn.state = TCP_ESTABLISHED;
f.lastts.tv_sec = ts.tv_sec - 5000;
f.protoctx = &ssn;
f.fb = &fb;
f.proto = IPPROTO_TCP;
int state = FlowGetFlowState(&f);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 1;
}
/**
* \test Test the timing out of a flow with a fresh TcpSession
* (just initialized, no data segments) in emergency mode.
*
* \retval On success it returns 1 and on failure 0.
*/
static int FlowMgrTest03 (void) {
TcpSession ssn;
Flow f;
FlowBucket fb;
struct timeval ts;
FlowQueueInit(&flow_spare_q);
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
memset(&ts, 0, sizeof(ts));
memset(&fb, 0, sizeof(FlowBucket));
FBLOCK_INIT(&fb);
FLOW_INITIALIZE(&f);
f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
TimeGet(&ts);
ssn.state = TCP_SYN_SENT;
f.lastts.tv_sec = ts.tv_sec - 300;
f.protoctx = &ssn;
f.fb = &fb;
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
int state = FlowGetFlowState(&f);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 1;
}
/**
* \test Test the timing out of a flow with a TcpSession
* (with data segments) in emergency mode.
*
* \retval On success it returns 1 and on failure 0.
*/
static int FlowMgrTest04 (void) {
TcpSession ssn;
Flow f;
FlowBucket fb;
struct timeval ts;
TcpSegment seg;
TcpStream client;
uint8_t payload[3] = {0x41, 0x41, 0x41};
FlowQueueInit(&flow_spare_q);
memset(&ssn, 0, sizeof(TcpSession));
memset(&f, 0, sizeof(Flow));
memset(&fb, 0, sizeof(FlowBucket));
memset(&ts, 0, sizeof(ts));
memset(&seg, 0, sizeof(TcpSegment));
memset(&client, 0, sizeof(TcpSegment));
FBLOCK_INIT(&fb);
FLOW_INITIALIZE(&f);
f.flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
TimeGet(&ts);
seg.payload = payload;
seg.payload_len = 3;
seg.next = NULL;
seg.prev = NULL;
client.seg_list = &seg;
ssn.client = client;
ssn.server = client;
ssn.state = TCP_ESTABLISHED;
f.lastts.tv_sec = ts.tv_sec - 5000;
f.protoctx = &ssn;
f.fb = &fb;
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
int state = FlowGetFlowState(&f);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 0;
}
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q);
return 1;
}
/**
* \test Test flow allocations when it reach memcap
*
*
* \retval On success it returns 1 and on failure 0.
*/
static int FlowMgrTest05 (void) {
int result = 0;
FlowInitConfig(FLOW_QUIET);
FlowConfig backup;
memcpy(&backup, &flow_config, sizeof(FlowConfig));
uint32_t ini = 0;
uint32_t end = flow_spare_q.len;
flow_config.memcap = 10000;
flow_config.prealloc = 100;
/* Let's get the flow_spare_q empty */
UTHBuildPacketOfFlows(ini, end, 0);
/* And now let's try to reach the memcap val */
while (FLOW_CHECK_MEMCAP(sizeof(Flow))) {
ini = end + 1;
end = end + 2;
UTHBuildPacketOfFlows(ini, end, 0);
}
/* should time out normal */
TimeSetIncrementTime(2000);
ini = end + 1;
end = end + 2;;
UTHBuildPacketOfFlows(ini, end, 0);
struct timeval ts;
TimeGet(&ts);
/* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, };
FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
if (flow_recycle_q.len > 0) {
result = 1;
}
memcpy(&flow_config, &backup, sizeof(FlowConfig));
FlowShutdown();
return result;
}
#endif /* UNITTESTS */
/**
* \brief Function to register the Flow Unitests.
*/
void FlowMgrRegisterTests (void) {
#ifdef UNITTESTS
UtRegisterTest("FlowMgrTest01 -- Timeout a flow having fresh TcpSession", FlowMgrTest01, 1);
UtRegisterTest("FlowMgrTest02 -- Timeout a flow having TcpSession with segments", FlowMgrTest02, 1);
UtRegisterTest("FlowMgrTest03 -- Timeout a flow in emergency having fresh TcpSession", FlowMgrTest03, 1);
UtRegisterTest("FlowMgrTest04 -- Timeout a flow in emergency having TcpSession with segments", FlowMgrTest04, 1);
UtRegisterTest("FlowMgrTest05 -- Test flow Allocations when it reach memcap", FlowMgrTest05, 1);
#endif /* UNITTESTS */
}