@ -45,6 +45,7 @@
# include "flow-private.h"
# include "util-unittest.h"
# include "util-unittest-helper.h"
# include "util-byte.h"
# include "util-debug.h"
@ -52,6 +53,8 @@
# include "detect.h"
# include "detect-engine-state.h"
# define FLOW_DEFAULT_EMERGENCY_RECOVERY 30
# define FLOW_DEFAULT_FLOW_PRUNE 5
//#define FLOW_DEFAULT_HASHSIZE 262144
# define FLOW_DEFAULT_HASHSIZE 65536
@ -68,6 +71,8 @@ int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
static int FlowClearMemory ( Flow * , uint8_t ) ;
int FlowSetProtoFreeFunc ( uint8_t , void ( * Free ) ( void * ) ) ;
int FlowSetFlowStateFunc ( uint8_t , int ( * GetProtoState ) ( void * ) ) ;
static uint32_t FlowPruneFlowQueueCnt ( FlowQueue * , struct timeval * , int ) ;
int FlowKill ( FlowQueue * ) ;
/* Run mode selected at suricata.c */
extern int run_mode ;
@ -84,12 +89,12 @@ void FlowUpdateQueue(Flow *f)
/* in the new list -- we consider a flow no longer
* new if we have seen at least 2 pkts in both ways . */
if ( f - > todstpktcnt & & f - > tosrcpktcnt ) {
FlowRequeue ( f , & flow_new_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] );
FlowRequeue ( f , & flow_new_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] , 1 );
f - > flags | = FLOW_EST_LIST ; /* transition */
f - > flags & = ~ FLOW_NEW_LIST ;
} else {
FlowRequeue ( f , & flow_new_q [ f - > protomap ] , & flow_new_q [ f - > protomap ] );
FlowRequeue ( f , & flow_new_q [ f - > protomap ] , & flow_new_q [ f - > protomap ] , 1 );
}
} else if ( f - > flags & FLOW_EST_LIST ) {
if ( flow_proto [ f - > protomap ] . GetProtoState ! = NULL ) {
@ -99,21 +104,21 @@ void FlowUpdateQueue(Flow *f)
f - > flags & = ~ FLOW_EST_LIST ;
SCLogDebug ( " flow %p was put into closing queue ts % " PRIuMAX " " , f , ( uintmax_t ) f - > lastts . tv_sec ) ;
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_close_q [ f - > protomap ] );
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_close_q [ f - > protomap ] , 1 );
} else {
/* Pull and put back -- this way the flows on
* top of the list are least recently used . */
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] );
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] , 1 );
}
} else {
/* Pull and put back -- this way the flows on
* top of the list are least recently used . */
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] );
FlowRequeue ( f , & flow_est_q [ f - > protomap ] , & flow_est_q [ f - > protomap ] , 1 );
}
} else if ( f - > flags & FLOW_CLOSED_LIST ) {
/* Pull and put back -- this way the flows on
* top of the list are least recently used . */
FlowRequeue ( f , & flow_close_q [ f - > protomap ] , & flow_close_q [ f - > protomap ] );
FlowRequeue ( f , & flow_close_q [ f - > protomap ] , & flow_close_q [ f - > protomap ] , 1 );
}
}
@ -134,6 +139,7 @@ void FlowUpdateQueue(Flow *f)
*/
static int FlowPrune ( FlowQueue * q , struct timeval * ts )
{
SCEnter ( ) ;
int mr = SCMutexTrylock ( & q - > mutex_q ) ;
if ( mr ! = 0 ) {
SCLogDebug ( " trylock failed " ) ;
@ -247,7 +253,7 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts)
FlowClearMemory ( f , f - > protomap ) ;
/* move to spare list */
FlowRequeue ( f , q , & flow_spare_q );
FlowRequeue ( f , q , & flow_spare_q , 1 );
SCMutexUnlock ( & f - > m ) ;
return 1 ;
@ -259,10 +265,219 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts)
* \ param timeout timeout to consider
* \ retval cnt number of flows that are timed out
*/
static uint32_t FlowPruneFlow s ( FlowQueue * q , struct timeval * ts )
static uint32_t FlowPruneFlow Queue ( FlowQueue * q , struct timeval * ts )
{
SCEnter ( ) ;
uint32_t cnt = 0 ;
while ( FlowPrune ( q , ts ) ) { cnt + + ; }
//while(FlowPrune(q, ts)) { cnt++; }
return cnt ;
}
/** \brief Time out flows on new/estabhlished/close queues for each proto until
* we release cnt flows as max
* Called from the FlowManager
* \ param ts current time
* \ retval cnt number of flows that are timed out
*/
uint32_t FlowPruneFlowsCnt ( struct timeval * ts , int cnt )
{
SCEnter ( ) ;
uint32_t nowcnt = 0 ;
int i = 0 ;
for ( ; i < FLOW_PROTO_MAX ; i + + ) {
/* prune closing list */
nowcnt = FlowPruneFlowQueueCnt ( & flow_close_q [ i ] , ts , cnt ) ;
if ( nowcnt ) {
cnt - = nowcnt ;
}
if ( cnt < = 0 ) break ;
/* prune new list */
nowcnt = FlowPruneFlowQueueCnt ( & flow_new_q [ i ] , ts , cnt ) ;
if ( nowcnt ) {
cnt - = nowcnt ;
}
if ( cnt < = 0 ) break ;
/* prune established list */
nowcnt = FlowPruneFlowQueueCnt ( & flow_est_q [ i ] , ts , cnt ) ;
if ( nowcnt ) {
cnt - = nowcnt ;
}
if ( cnt < = 0 ) break ;
}
return cnt ;
}
/** \brief FlowKillFlowQueueCnt It will try to kill try_cnt count of flows
* It will return the number of flows released , and can be 0 or more .
* \ param q flow queue to time out flows from
* \ param try_cnt try to prune this number of flows
* \ retval cnt number of flows that are timed out
*/
static uint32_t FlowKillFlowQueueCnt ( FlowQueue * q , int try_cnt , uint8_t mode )
{
SCEnter ( ) ;
uint32_t cnt = 0 ;
while ( try_cnt - - ) {
cnt + = FlowKill ( q ) ;
}
SCLogDebug ( " EMERGENCY mode, Flows killed: % " PRIu32 , cnt ) ;
return cnt ;
}
/** FlowKill
*
* Inspect the top flows ( last recently used ) from the queue
* and see if we can prune any it ( this is if it ' s not in use ) .
*
* Use trylock here so prevent us from blocking the packet handling .
*
* \ param q flow queue to prune
*
* \ retval 0 on error , failed block , nothing to prune
* \ retval 1 on successfully pruned one
*/
int FlowKill ( FlowQueue * q )
{
SCEnter ( ) ;
int mr = SCMutexTrylock ( & q - > mutex_q ) ;
if ( mr ! = 0 ) {
SCLogDebug ( " trylock failed " ) ;
if ( mr = = EBUSY )
SCLogDebug ( " was locked " ) ;
if ( mr = = EINVAL )
SCLogDebug ( " bad mutex value " ) ;
return 0 ;
}
Flow * f = q - > top ;
/* This means that the queue is empty */
if ( f = = NULL ) {
SCMutexUnlock ( & q - > mutex_q ) ;
SCLogDebug ( " top is null " ) ;
return 0 ;
}
do {
if ( SCMutexTrylock ( & f - > m ) ! = 0 ) {
f = f - > lnext ;
/* Skip to the next */
continue ;
}
if ( SCSpinTrylock ( & f - > fb - > s ) ! = 0 ) {
SCMutexUnlock ( & f - > m ) ;
f = f - > lnext ;
continue ;
}
/** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */
if ( f - > use_cnt > 0 ) {
SCSpinUnlock ( & f - > fb - > s ) ;
SCMutexUnlock ( & f - > m ) ;
f = f - > lnext ;
continue ;
}
/* remove from the hash */
if ( f - > hprev )
f - > hprev - > hnext = f - > hnext ;
if ( f - > hnext )
f - > hnext - > hprev = f - > hprev ;
if ( f - > fb - > f = = f )
f - > fb - > f = f - > hnext ;
f - > hnext = NULL ;
f - > hprev = NULL ;
SCSpinUnlock ( & f - > fb - > s ) ;
f - > fb = NULL ;
FlowClearMemory ( f , f - > protomap ) ;
/* move to spare list */
FlowRequeue ( f , q , & flow_spare_q , 0 ) ;
SCMutexUnlock ( & f - > m ) ;
/* so.. we did it */
/* unlock list */
SCMutexUnlock ( & q - > mutex_q ) ;
return 1 ;
break ;
} while ( f ! = NULL ) ;
/* If we reach this point, then we didn't prune any */
/* unlock list */
SCMutexUnlock ( & q - > mutex_q ) ;
return 0 ;
}
/** \brief Try to kill cnt flows by last recently seen activity on new/estabhlished/close queues for each proto until
* we release cnt flows as max . Called only on emergency mode .
* \ param cnt number of flows to release
* \ retval cnt number of flows that are not killed ( so 0 if we prune all of them )
*/
uint32_t FlowKillFlowsCnt ( int cnt )
{
SCEnter ( ) ;
uint32_t nowcnt = 0 ;
int i = 0 ;
/* Inspect the top of each protocol to select the last recently used */
for ( ; i < FLOW_PROTO_MAX ; i + + ) {
/* prune closing list */
nowcnt = FlowKillFlowQueueCnt ( & flow_close_q [ i ] , cnt , 0 ) ;
if ( nowcnt ) {
cnt - = nowcnt ;
}
if ( cnt < = 0 ) break ;
/* prune new list */
nowcnt = FlowKillFlowQueueCnt ( & flow_new_q [ i ] , cnt , 0 ) ;
if ( nowcnt ) {
cnt - = nowcnt ;
}
if ( cnt < = 0 ) break ;
/* prune established list */
nowcnt = FlowKillFlowQueueCnt ( & flow_est_q [ i ] , cnt , 0 ) ;
if ( nowcnt ) {
cnt - = nowcnt ;
}
if ( cnt < = 0 ) break ;
}
return cnt ;
}
/** \brief Time out flows will try to prune try_cnt count of flows
* It will return the number of flows released , and can be 0 or more .
* A more agressive aproach is calling this function with the emergency
* bit set ( and there will be another even more agressive , killing
* flows without the criteria of time outs )
* \ param q flow queue to time out flows from
* \ param ts current time
* \ param timeout timeout to consider
* \ param try_cnt try to prune this number of flows if they are timed out
* \ retval cnt number of flows that are timed out
*/
static uint32_t FlowPruneFlowQueueCnt ( FlowQueue * q , struct timeval * ts , int try_cnt )
{
SCEnter ( ) ;
uint32_t cnt = 0 ;
while ( try_cnt - - ) {
cnt + = FlowPrune ( q , ts ) ;
}
return cnt ;
}
@ -275,6 +490,7 @@ static uint32_t FlowPruneFlows(FlowQueue *q, struct timeval *ts)
* \ retval 0 otherwise .
*/
static int FlowUpdateSpareFlows ( void ) {
SCEnter ( ) ;
uint32_t toalloc = 0 , tofree = 0 , len ;
SCMutexLock ( & flow_spare_q . mutex_q ) ;
@ -499,6 +715,25 @@ void FlowInitConfig (char quiet)
/* If we have specific config, overwrite the defaults with them,
* otherwise , leave the default values */
intmax_t val = 0 ;
if ( ConfGetInt ( " flow.emergency_recovery " , & val ) = = 1 ) {
if ( val < = 100 & & val > = 1 ) {
flow_config . emergency_recovery = ( uint8_t ) val ;
} else {
SCLogError ( SC_ERR_INVALID_VALUE , " flow.emergency_recovery must be in the range of 1 and 100 (as percentage) " ) ;
flow_config . emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY ;
}
} else {
SCLogDebug ( " flow.emergency_recovery, using default value " ) ;
flow_config . emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY ;
}
if ( ConfGetInt ( " flow.prune_flows " , & val ) = = 1 ) {
flow_config . flow_try_release = ( uint8_t ) val ;
} else {
SCLogDebug ( " flow.flow.prune_flows, using default value " ) ;
flow_config . flow_try_release = FLOW_DEFAULT_FLOW_PRUNE ;
}
/* Check if we have memcap and hash_size defined at config */
char * conf_val ;
@ -727,21 +962,21 @@ void *FlowManagerThread(void *td)
int i ;
for ( i = 0 ; i < FLOW_PROTO_MAX ; i + + ) {
/* prune closing list */
nowcnt = FlowPruneFlow s ( & flow_close_q [ i ] , & ts ) ;
nowcnt = FlowPruneFlow Queue ( & flow_close_q [ i ] , & ts ) ;
if ( nowcnt ) {
SCLogDebug ( " Pruned % " PRIu32 " closing flows... " , nowcnt ) ;
closing_cnt + = nowcnt ;
}
/* prune new list */
nowcnt = FlowPruneFlow s ( & flow_new_q [ i ] , & ts ) ;
nowcnt = FlowPruneFlow Queue ( & flow_new_q [ i ] , & ts ) ;
if ( nowcnt ) {
SCLogDebug ( " Pruned % " PRIu32 " new flows... " , nowcnt ) ;
new_cnt + = nowcnt ;
}
/* prune established list */
nowcnt = FlowPruneFlow s ( & flow_est_q [ i ] , & ts ) ;
nowcnt = FlowPruneFlow Queue ( & flow_est_q [ i ] , & ts ) ;
if ( nowcnt ) {
SCLogDebug ( " Pruned % " PRIu32 " established flows... " , nowcnt ) ;
established_cnt + = nowcnt ;
@ -751,11 +986,30 @@ void *FlowManagerThread(void *td)
sleeping = 0 ;
/* Don't fear, FlowManagerThread is here...
* clear emergency bit . */
* clear emergency bit if we have at least xx flows pruned . */
if ( emerg = = TRUE ) {
flow_flags & = ~ FLOW_EMERGENCY ;
emerg = FALSE ;
SCLogDebug ( " Flow emergency mode over, back to normal... " ) ;
uint32_t len = 0 ;
SCMutexLock ( & flow_spare_q . mutex_q ) ;
len = flow_spare_q . len ;
SCMutexUnlock ( & flow_spare_q . mutex_q ) ;
SCLogDebug ( " flow_sparse_q.len = % " PRIu32 " prealloc: % " PRIu32
" flow_spare_q status: % " PRIu32 " %% flows at the queue " ,
len , flow_config . prealloc , len * 100 / flow_config . prealloc ) ;
/* only if we have pruned this "emergency_recovery" percentage
* of flows , we will unset the emergency bit */
if ( len * 100 / flow_config . prealloc > flow_config . emergency_recovery ) {
flow_flags & = ~ FLOW_EMERGENCY ;
emerg = FALSE ;
SCLogInfo ( " Flow emergency mode over, back to normal... unsetting "
" FLOW_EMERGENCY bit (ts.tv_sec: % " PRIuMAX " , "
" ts.tv_usec:% " PRIuMAX " ) flow_spare_q status(): % " PRIu32
" %% flows at the queue " , ( uintmax_t ) ts . tv_sec ,
( uintmax_t ) ts . tv_usec , len * 100 / flow_config . prealloc ) ;
}
}
}
@ -1495,6 +1749,145 @@ static int FlowTest06 (void) {
return 1 ;
}
/**
* \ test Test flow allocations when it reach memcap
*
*
* \ retval On success it returns 1 and on failure 0.
*/
static int FlowTest07 ( void ) {
int result = 0 ;
FlowInitConfig ( FLOW_QUIET ) ;
FlowConfig backup ;
memcpy ( & backup , & flow_config , sizeof ( FlowConfig ) ) ;
uint32_t ini = 0 ;
uint32_t end = flow_spare_q . len ;
flow_config . memcap = 10000 ;
flow_config . prealloc = 100 ;
/* Let's get the flow_spare_q empty */
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
/* And now let's try to reach the memcap val */
while ( flow_memuse + sizeof ( Flow ) < flow_config . memcap ) {
ini = end + 1 ;
end = end + 2 ;
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
}
/* should time out normal */
TimeSetIncrementTime ( 2000 ) ;
ini = end + 1 ;
end = end + 2 ; ;
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
/* This means that the engine released 5 flows by normal timeout */
if ( flow_spare_q . len = = 5 )
result = 1 ;
memcpy ( & flow_config , & backup , sizeof ( FlowConfig ) ) ;
FlowShutdown ( ) ;
return result ;
}
/**
* \ test Test flow allocations when it reach memcap
*
*
* \ retval On success it returns 1 and on failure 0.
*/
static int FlowTest08 ( void ) {
int result = 0 ;
FlowInitConfig ( FLOW_QUIET ) ;
FlowConfig backup ;
memcpy ( & backup , & flow_config , sizeof ( FlowConfig ) ) ;
uint32_t ini = 0 ;
uint32_t end = flow_spare_q . len ;
flow_config . memcap = 10000 ;
flow_config . prealloc = 100 ;
/* Let's get the flow_spare_q empty */
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
/* And now let's try to reach the memcap val */
while ( flow_memuse + sizeof ( Flow ) < flow_config . memcap ) {
ini = end + 1 ;
end = end + 2 ;
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
}
/* By default we use 30 for timing out new flows. This means
* that the Emergency mode should be set */
TimeSetIncrementTime ( 20 ) ;
ini = end + 1 ;
end = end + 2 ;
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
/* This means that the engine released 5 flows by emergency timeout */
if ( flow_spare_q . len = = 5 & & ( flow_flags & FLOW_EMERGENCY ) )
result = 1 ;
memcpy ( & flow_config , & backup , sizeof ( FlowConfig ) ) ;
FlowShutdown ( ) ;
return result ;
}
/**
* \ test Test flow allocations when it reach memcap
*
*
* \ retval On success it returns 1 and on failure 0.
*/
static int FlowTest09 ( void ) {
int result = 0 ;
FlowInitConfig ( FLOW_QUIET ) ;
FlowConfig backup ;
memcpy ( & backup , & flow_config , sizeof ( FlowConfig ) ) ;
uint32_t ini = 0 ;
uint32_t end = flow_spare_q . len ;
flow_config . memcap = 10000 ;
flow_config . prealloc = 100 ;
/* Let's get the flow_spare_q empty */
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
/* And now let's try to reach the memcap val */
while ( flow_memuse + sizeof ( Flow ) < flow_config . memcap ) {
ini = end + 1 ;
end = end + 2 ;
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
}
/* No timeout will work */
TimeSetIncrementTime ( 5 ) ;
ini = end + 1 ;
end = end + 2 ;
UTHBuildPacketOfFlows ( ini , end , 0 ) ;
/* This means that the engine release 5 flows by killing them */
if ( flow_spare_q . len = = 5 & & ( flow_flags & FLOW_EMERGENCY ) )
result = 1 ;
memcpy ( & flow_config , & backup , sizeof ( FlowConfig ) ) ;
FlowShutdown ( ) ;
return result ;
}
# endif /* UNITTESTS */
/**
@ -1508,5 +1901,8 @@ void FlowRegisterTests (void) {
UtRegisterTest ( " FlowTest04 -- Timeout a flow having TcpSession with segments " , FlowTest04 , 1 ) ;
UtRegisterTest ( " FlowTest05 -- Timeout a flow in emergency having fresh TcpSession " , FlowTest05 , 1 ) ;
UtRegisterTest ( " FlowTest06 -- Timeout a flow in emergency having TcpSession with segments " , FlowTest06 , 1 ) ;
UtRegisterTest ( " FlowTest07 -- Test flow Allocations when it reach memcap " , FlowTest07 , 1 ) ;
UtRegisterTest ( " FlowTest08 -- Test flow Allocations when it reach memcap " , FlowTest08 , 1 ) ;
UtRegisterTest ( " FlowTest09 -- Test flow Allocations when it reach memcap " , FlowTest09 , 1 ) ;
# endif /* UNITTESTS */
}