@ -41,6 +41,7 @@
Packet * TmqhInputFlow ( ThreadVars * t ) ;
Packet * TmqhInputFlow ( ThreadVars * t ) ;
void TmqhOutputFlowHash ( ThreadVars * t , Packet * p ) ;
void TmqhOutputFlowHash ( ThreadVars * t , Packet * p ) ;
void TmqhOutputFlowIPPair ( ThreadVars * t , Packet * p ) ;
void TmqhOutputFlowActivePackets ( ThreadVars * t , Packet * p ) ;
void TmqhOutputFlowActivePackets ( ThreadVars * t , Packet * p ) ;
void TmqhOutputFlowRoundRobin ( ThreadVars * t , Packet * p ) ;
void TmqhOutputFlowRoundRobin ( ThreadVars * t , Packet * p ) ;
void * TmqhOutputFlowSetupCtx ( char * queue_str ) ;
void * TmqhOutputFlowSetupCtx ( char * queue_str ) ;
@ -66,6 +67,9 @@ void TmqhFlowRegister(void)
} else if ( strcasecmp ( scheduler , " hash " ) = = 0 ) {
} else if ( strcasecmp ( scheduler , " hash " ) = = 0 ) {
SCLogInfo ( " AutoFP mode using \" Hash \" flow load balancer " ) ;
SCLogInfo ( " AutoFP mode using \" Hash \" flow load balancer " ) ;
tmqh_table [ TMQH_FLOW ] . OutHandler = TmqhOutputFlowHash ;
tmqh_table [ TMQH_FLOW ] . OutHandler = TmqhOutputFlowHash ;
} else if ( strcasecmp ( scheduler , " ippair " ) = = 0 ) {
SCLogInfo ( " AutoFP mode using \" ippair \" flow load balancer " ) ;
tmqh_table [ TMQH_FLOW ] . OutHandler = TmqhOutputFlowIPPair ;
} else {
} else {
SCLogError ( SC_ERR_INVALID_YAML_CONF_ENTRY , " Invalid entry \" %s \" "
SCLogError ( SC_ERR_INVALID_YAML_CONF_ENTRY , " Invalid entry \" %s \" "
" for autofp-scheduler in conf. Killing engine. " ,
" for autofp-scheduler in conf. Killing engine. " ,
@ -359,6 +363,56 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
return ;
return ;
}
}
/**
* \ brief select the queue to output based on IP address pair .
*
* \ param tv thread vars .
* \ param p packet .
*/
void TmqhOutputFlowIPPair ( ThreadVars * tv , Packet * p )
{
int16_t qid = 0 ;
uint32_t addr_hash = 0 ;
int i ;
TmqhFlowCtx * ctx = ( TmqhFlowCtx * ) tv - > outctx ;
/* if no flow we use the first queue,
* should be rare */
if ( p - > flow ! = NULL ) {
qid = SC_ATOMIC_GET ( p - > flow - > autofp_tmqh_flow_qid ) ;
if ( qid = = - 1 ) {
if ( p - > src . family = = AF_INET6 ) {
for ( i = 0 ; i < 4 ; i + + ) {
addr_hash + = p - > src . addr_data32 [ i ] + p - > dst . addr_data32 [ i ] ;
}
} else {
addr_hash = p - > src . addr_data32 [ 0 ] + p - > dst . addr_data32 [ 0 ] ;
}
/* we don't have to worry about possible overflow, since
* ctx - > size will be lesser than 2 * * 31 for sure */
qid = addr_hash % ctx - > size ;
( void ) SC_ATOMIC_SET ( p - > flow - > autofp_tmqh_flow_qid , qid ) ;
( void ) SC_ATOMIC_ADD ( ctx - > queues [ qid ] . total_flows , 1 ) ;
}
} else {
qid = ctx - > last + + ;
if ( ctx - > last = = ctx - > size )
ctx - > last = 0 ;
}
( void ) SC_ATOMIC_ADD ( ctx - > queues [ qid ] . total_packets , 1 ) ;
PacketQueue * q = ctx - > queues [ qid ] . q ;
SCMutexLock ( & q - > mutex_q ) ;
PacketEnqueue ( q , p ) ;
SCCondSignal ( & q - > cond_q ) ;
SCMutexUnlock ( & q - > mutex_q ) ;
return ;
}
# ifdef UNITTESTS
# ifdef UNITTESTS
static int TmqhOutputFlowSetupCtxTest01 ( void )
static int TmqhOutputFlowSetupCtxTest01 ( void )