Rearrange flow manager functions into flow-manager.[ch]. Some other minor changes/updates

remotes/origin/master-1.1.x
Anoop Saldanha 14 years ago committed by Victor Julien
parent 7c729d2d53
commit b6ba944e6d

@ -43,6 +43,7 @@ decode-udp.c decode-udp.h \
decode-sctp.c decode-sctp.h \
flow.c flow.h \
flow-timeout.c flow-timeout.h \
flow-manager.c flow-manager.h \
flow-queue.c flow-queue.h \
flow-hash.c flow-hash.h \
flow-util.c flow-util.h \

@ -0,0 +1,293 @@
/* Copyright (C) 2007-2011 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Anoop Saldanha <poonaatsoc@gmail.com>
*/
#include "suricata-common.h"
#include "suricata.h"
#include "decode.h"
#include "conf.h"
#include "threadvars.h"
#include "tm-threads.h"
#include "runmodes.h"
#include "util-random.h"
#include "util-time.h"
#include "flow.h"
#include "flow-queue.h"
#include "flow-hash.h"
#include "flow-util.h"
#include "flow-var.h"
#include "flow-private.h"
#include "flow-timeout.h"
#include "flow-manager.h"
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp.h"
#include "util-unittest.h"
#include "util-unittest-helper.h"
#include "util-byte.h"
#include "util-debug.h"
#include "util-privs.h"
#include "threads.h"
#include "detect.h"
#include "detect-engine-state.h"
#include "stream.h"
#include "app-layer-parser.h"
/* Run mode selected at suricata.c */
extern int run_mode;
/**
* \brief Used to kill flow manager thread(s).
*
* \todo Kinda hackish since it uses the tv name to identify flow manager
* thread. We need an all weather identification scheme.
*/
void FlowKillFlowManagerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
SCMutexLock(&tv_root_lock);
/* flow manager thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
cnt++;
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
return;
}
/** \brief Thread that manages the various queue's and removes timed out flows.
* \param td ThreadVars casted to void ptr
*
* IDEAS/TODO
* Create a 'emergency mode' in which flow handling threads can indicate
* we are/seem to be under attack..... maybe this thread should check
* key indicators for that like:
* - number of flows created in the last x time
* - avg number of pkts per flow (how?)
* - avg flow age
*
* Keep an eye on the spare list, alloc flows if needed...
*/
void *FlowManagerThread(void *td)
{
ThreadVars *th_v = (ThreadVars *)td;
struct timeval ts;
struct timeval tsdiff;
uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0, nowcnt;
uint32_t sleeping = 0;
uint8_t emerg = FALSE;
uint32_t last_sec = 0;
memset(&ts, 0, sizeof(ts));
FlowForceReassemblySetup();
/* set the thread name */
SCSetThreadName(th_v->name);
SCLogDebug("%s started...", th_v->name);
/* Set the threads capability */
th_v->cap_flags = 0;
SCDropCaps(th_v);
FlowHashDebugInit();
TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1)
{
TmThreadTestThreadUnPaused(th_v);
if (sleeping >= 100 || flow_flags & FLOW_EMERGENCY)
{
if (flow_flags & FLOW_EMERGENCY) {
emerg = TRUE;
SCLogDebug("Flow emergency mode entered...");
}
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
if (((uint32_t)ts.tv_sec - last_sec) > 600) {
FlowHashDebugPrint((uint32_t)ts.tv_sec);
last_sec = (uint32_t)ts.tv_sec;
}
/* see if we still have enough spare flows */
FlowUpdateSpareFlows();
int i;
for (i = 0; i < FLOW_PROTO_MAX; i++) {
/* prune closing list */
nowcnt = FlowPruneFlowQueue(&flow_close_q[i], &ts);
if (nowcnt) {
SCLogDebug("Pruned %" PRIu32 " closing flows...", nowcnt);
closing_cnt += nowcnt;
}
/* prune new list */
nowcnt = FlowPruneFlowQueue(&flow_new_q[i], &ts);
if (nowcnt) {
SCLogDebug("Pruned %" PRIu32 " new flows...", nowcnt);
new_cnt += nowcnt;
}
/* prune established list */
nowcnt = FlowPruneFlowQueue(&flow_est_q[i], &ts);
if (nowcnt) {
SCLogDebug("Pruned %" PRIu32 " established flows...", nowcnt);
established_cnt += nowcnt;
}
}
sleeping = 0;
/* Don't fear, FlowManagerThread is here...
* clear emergency bit if we have at least xx flows pruned. */
if (emerg == TRUE) {
uint32_t len = 0;
SCMutexLock(&flow_spare_q.mutex_q);
len = flow_spare_q.len;
SCMutexUnlock(&flow_spare_q.mutex_q);
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc);
/* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
flow_flags &= ~FLOW_EMERGENCY;
emerg = FALSE;
SCLogInfo("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
"ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
}
}
}
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
SCPerfUpdateCounterArray(th_v->sc_perf_pca, &th_v->sc_perf_pctx, 0);
break;
}
if (run_mode != RUNMODE_PCAP_FILE) {
usleep(10);
sleeping += 10;
} else {
/* If we are reading a pcap, how long the pcap timestamps
* says that has passed */
memset(&tsdiff, 0, sizeof(tsdiff));
TimeGet(&tsdiff);
if (tsdiff.tv_sec == ts.tv_sec &&
tsdiff.tv_usec > ts.tv_usec &&
tsdiff.tv_usec - ts.tv_usec < 10) {
/* if it has passed less than 10 usec, sleep that usecs */
sleeping += tsdiff.tv_usec - ts.tv_usec;
usleep(tsdiff.tv_usec - ts.tv_usec);
} else {
/* Else update the sleeping var but don't sleep so long */
if (tsdiff.tv_sec == ts.tv_sec && tsdiff.tv_usec > ts.tv_usec)
sleeping += tsdiff.tv_usec - ts.tv_usec;
else if (tsdiff.tv_sec == ts.tv_sec + 1)
sleeping += tsdiff.tv_usec + (1000000 - ts.tv_usec);
else
sleeping += 100;
usleep(1);
}
}
}
TmThreadWaitForFlag(th_v, THV_DEINIT);
FlowHashDebugDeinit();
SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were "
"timed out, %"PRIu32" flows in closed state", new_cnt,
established_cnt, closing_cnt);
#ifdef FLOW_PRUNE_DEBUG
SCLogInfo("prune_queue_lock %"PRIu64, prune_queue_lock);
SCLogInfo("prune_queue_empty %"PRIu64, prune_queue_empty);
SCLogInfo("prune_flow_lock %"PRIu64, prune_flow_lock);
SCLogInfo("prune_bucket_lock %"PRIu64, prune_bucket_lock);
SCLogInfo("prune_no_timeout %"PRIu64, prune_no_timeout);
SCLogInfo("prune_usecnt %"PRIu64, prune_usecnt);
#endif
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
}
/** \brief spawn the flow manager thread */
void FlowManagerThreadSpawn()
{
ThreadVars *tv_flowmgr = NULL;
tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread",
FlowManagerThread, 0);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
return;
}

@ -0,0 +1,30 @@
/* Copyright (C) 2007-2011 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Anoop Saldanha <poonaatsoc@gmail.com>
*/
#ifndef __FLOW_MANAGER_H__
#define __FLOW_MANAGER_H__
void FlowManagerThreadSpawn(void);
void FlowKillFlowManagerThread(void);
#endif /* __FLOW_MANAGER_H__ */

@ -38,6 +38,7 @@
#include "flow-util.h"
#include "flow-var.h"
#include "flow-private.h"
#include "flow-manager.h"
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
@ -66,46 +67,6 @@ static ThreadVars *stream_pseudo_pkt_detect_prev_TV = NULL;
static TmSlot *stream_pseudo_pkt_decode_tm_slot = NULL;
static ThreadVars *stream_pseudo_pkt_decode_TV = NULL;
/**
* \brief Used to kill flow manager thread(s).
*
* \todo Kinda hackish since it uses the tv name to identify flow manager
* thread. We need an all weather identification scheme.
*/
static inline void FlowKillFlowManagerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
SCMutexLock(&tv_root_lock);
/* flow manager thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
cnt++;
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
return;
}
/**
* \internal
* \brief Pseudo packet setup for flow forced reassembly.

@ -44,6 +44,7 @@
#include "flow-var.h"
#include "flow-private.h"
#include "flow-timeout.h"
#include "flow-manager.h"
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
@ -74,7 +75,6 @@
void FlowRegisterTests(void);
void FlowInitFlowProto();
static int FlowUpdateSpareFlows(void);
int FlowSetProtoTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
static int FlowClearMemory(Flow *,uint8_t );
@ -388,7 +388,7 @@ static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt)
* \param timeout timeout to consider
* \retval cnt number of flows that are timed out
*/
static uint32_t FlowPruneFlowQueue(FlowQueue *q, struct timeval *ts)
uint32_t FlowPruneFlowQueue(FlowQueue *q, struct timeval *ts)
{
SCEnter();
return FlowPrune(q, ts, 0);
@ -611,7 +611,7 @@ static uint32_t FlowPruneFlowQueueCnt(FlowQueue *q, struct timeval *ts, int try_
* \retval 1 if the queue was properly updated (or if it already was in good shape)
* \retval 0 otherwise.
*/
static int FlowUpdateSpareFlows(void)
int FlowUpdateSpareFlows(void)
{
SCEnter();
uint32_t toalloc = 0, tofree = 0, len;
@ -1058,196 +1058,6 @@ void FlowShutdown(void)
return;
}
/** \brief Thread that manages the various queue's and removes timed out flows.
* \param td ThreadVars casted to void ptr
*
* IDEAS/TODO
* Create a 'emergency mode' in which flow handling threads can indicate
* we are/seem to be under attack..... maybe this thread should check
* key indicators for that like:
* - number of flows created in the last x time
* - avg number of pkts per flow (how?)
* - avg flow age
*
* Keep an eye on the spare list, alloc flows if needed...
*/
void *FlowManagerThread(void *td)
{
ThreadVars *th_v = (ThreadVars *)td;
struct timeval ts;
struct timeval tsdiff;
uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0, nowcnt;
uint32_t sleeping = 0;
uint8_t emerg = FALSE;
uint32_t last_sec = 0;
memset(&ts, 0, sizeof(ts));
FlowForceReassemblySetup();
/* set the thread name */
SCSetThreadName(th_v->name);
SCLogDebug("%s started...", th_v->name);
/* Set the threads capability */
th_v->cap_flags = 0;
SCDropCaps(th_v);
FlowHashDebugInit();
TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1)
{
TmThreadTestThreadUnPaused(th_v);
if (sleeping >= 100 || flow_flags & FLOW_EMERGENCY)
{
if (flow_flags & FLOW_EMERGENCY) {
emerg = TRUE;
SCLogDebug("Flow emergency mode entered...");
}
/* Get the time */
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
if (((uint32_t)ts.tv_sec - last_sec) > 600) {
FlowHashDebugPrint((uint32_t)ts.tv_sec);
last_sec = (uint32_t)ts.tv_sec;
}
/* see if we still have enough spare flows */
FlowUpdateSpareFlows();
int i;
for (i = 0; i < FLOW_PROTO_MAX; i++) {
/* prune closing list */
nowcnt = FlowPruneFlowQueue(&flow_close_q[i], &ts);
if (nowcnt) {
SCLogDebug("Pruned %" PRIu32 " closing flows...", nowcnt);
closing_cnt += nowcnt;
}
/* prune new list */
nowcnt = FlowPruneFlowQueue(&flow_new_q[i], &ts);
if (nowcnt) {
SCLogDebug("Pruned %" PRIu32 " new flows...", nowcnt);
new_cnt += nowcnt;
}
/* prune established list */
nowcnt = FlowPruneFlowQueue(&flow_est_q[i], &ts);
if (nowcnt) {
SCLogDebug("Pruned %" PRIu32 " established flows...", nowcnt);
established_cnt += nowcnt;
}
}
sleeping = 0;
/* Don't fear, FlowManagerThread is here...
* clear emergency bit if we have at least xx flows pruned. */
if (emerg == TRUE) {
uint32_t len = 0;
SCMutexLock(&flow_spare_q.mutex_q);
len = flow_spare_q.len;
SCMutexUnlock(&flow_spare_q.mutex_q);
SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32
"flow_spare_q status: %"PRIu32"%% flows at the queue",
len, flow_config.prealloc, len * 100 / flow_config.prealloc);
/* only if we have pruned this "emergency_recovery" percentage
* of flows, we will unset the emergency bit */
if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) {
flow_flags &= ~FLOW_EMERGENCY;
emerg = FALSE;
SCLogInfo("Flow emergency mode over, back to normal... unsetting"
" FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", "
"ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32
"%% flows at the queue", (uintmax_t)ts.tv_sec,
(uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc);
}
}
}
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
SCPerfUpdateCounterArray(th_v->sc_perf_pca, &th_v->sc_perf_pctx, 0);
break;
}
if (run_mode != RUNMODE_PCAP_FILE) {
usleep(10);
sleeping += 10;
} else {
/* If we are reading a pcap, how long the pcap timestamps
* says that has passed */
memset(&tsdiff, 0, sizeof(tsdiff));
TimeGet(&tsdiff);
if (tsdiff.tv_sec == ts.tv_sec &&
tsdiff.tv_usec > ts.tv_usec &&
tsdiff.tv_usec - ts.tv_usec < 10) {
/* if it has passed less than 10 usec, sleep that usecs */
sleeping += tsdiff.tv_usec - ts.tv_usec;
usleep(tsdiff.tv_usec - ts.tv_usec);
} else {
/* Else update the sleeping var but don't sleep so long */
if (tsdiff.tv_sec == ts.tv_sec && tsdiff.tv_usec > ts.tv_usec)
sleeping += tsdiff.tv_usec - ts.tv_usec;
else if (tsdiff.tv_sec == ts.tv_sec + 1)
sleeping += tsdiff.tv_usec + (1000000 - ts.tv_usec);
else
sleeping += 100;
usleep(1);
}
}
}
TmThreadWaitForFlag(th_v, THV_DEINIT);
FlowHashDebugDeinit();
SCLogInfo("%" PRIu32 " new flows, %" PRIu32 " established flows were "
"timed out, %"PRIu32" flows in closed state", new_cnt,
established_cnt, closing_cnt);
#ifdef FLOW_PRUNE_DEBUG
SCLogInfo("prune_queue_lock %"PRIu64, prune_queue_lock);
SCLogInfo("prune_queue_empty %"PRIu64, prune_queue_empty);
SCLogInfo("prune_flow_lock %"PRIu64, prune_flow_lock);
SCLogInfo("prune_bucket_lock %"PRIu64, prune_bucket_lock);
SCLogInfo("prune_no_timeout %"PRIu64, prune_no_timeout);
SCLogInfo("prune_usecnt %"PRIu64, prune_usecnt);
#endif
TmThreadsSetFlag(th_v, THV_CLOSED);
pthread_exit((void *) 0);
}
/** \brief spawn the flow manager thread */
void FlowManagerThreadSpawn()
{
ThreadVars *tv_flowmgr = NULL;
tv_flowmgr = TmThreadCreateMgmtThread("FlowManagerThread",
FlowManagerThread, 0);
if (tv_flowmgr == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
return;
}
/**
* \brief Function to set the default timeout, free function and flow state
* function for all supported flow_proto.

@ -258,9 +258,6 @@ void FlowDecrUsecnt(Flow *);
uint32_t FlowPruneFlowsCnt(struct timeval *, int);
uint32_t FlowKillFlowsCnt(int);
void *FlowManagerThread(void *td);
void FlowManagerThreadSpawn(void);
void FlowRegisterTests (void);
int FlowSetProtoTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
@ -268,7 +265,10 @@ int FlowSetProtoFreeFunc (uint8_t , void (*Free)(void *));
int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *));
void FlowUpdateQueue(Flow *);
void FlowForceReassembly(void);
struct FlowQueue_;
int FlowUpdateSpareFlows(void);
uint32_t FlowPruneFlowQueue(struct FlowQueue_ *, struct timeval *);
static inline void FlowLockSetNoPacketInspectionFlag(Flow *);
static inline void FlowSetNoPacketInspectionFlag(Flow *);

@ -121,7 +121,6 @@ int RunModeIdsPfringAuto(DetectEngineCtx *de_ctx)
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_receivepfring, tm_module, NULL);
TmThreadMSRegisterSyncPt(tv_receivepfring, "ReceiveTMBeforeDeInit");
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_receivepfring, 0);
@ -372,7 +371,6 @@ int RunModeIdsPfringAutoFp(DetectEngineCtx *de_ctx)
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_receive, tm_module, NULL);
TmThreadMSRegisterSyncPt(tv_receive, "ReceiveTMBeforeDeInit");
tm_module = TmModuleGetByName("DecodePfring");
if (tm_module == NULL) {

@ -107,6 +107,8 @@
#include "respond-reject.h"
#include "flow.h"
#include "flow-timeout.h"
#include "flow-manager.h"
#include "flow-var.h"
#include "flow-bit.h"
#include "flow-alert-sid.h"

Loading…
Cancel
Save