neaten flow q handler code

remotes/origin/master
Anoop Saldanha 14 years ago committed by Victor Julien
parent 0fa14292c0
commit d01589c9d8

@ -19,14 +19,12 @@
* \file * \file
* *
* \author Victor Julien <victor@inliniac.net> * \author Victor Julien <victor@inliniac.net>
* \author Anoop Saldanha <anoopsaldanha@gmail.com>
* *
* Simple output queue handler that makes sure all packets of the same flow * Simple output queue handler that makes sure all packets of the same flow
* are sent to the same queue. This is done by simply hashing the flow's * are sent to the same queue. We support different kind of q handlers. Have
* memory address as thats readable from a packet without the need to lock * a look at "autofp-scheduler" conf to further undertsand the various q
* the flow itself. * handlers we provide.
*
* \todo we can also think about a queue handler that takes queue load into
* account.
*/ */
#include "suricata.h" #include "suricata.h"
@ -50,7 +48,8 @@ void TmqhFlowRegisterTests(void);
TmqhFlowCtx *tmqh_flow_outctx = NULL; TmqhFlowCtx *tmqh_flow_outctx = NULL;
void TmqhFlowRegister (void) { void TmqhFlowRegister(void)
{
tmqh_table[TMQH_FLOW].name = "flow"; tmqh_table[TMQH_FLOW].name = "flow";
tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow; tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx; tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
@ -69,8 +68,9 @@ void TmqhFlowRegister (void) {
SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler"); SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
} else { } else {
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" for " SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
"autofp-scheduler in conf. Killing engine.", scheduler); "for autofp-scheduler in conf. Killing engine.",
scheduler);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} else { } else {
@ -105,7 +105,8 @@ Packet *TmqhInputFlow(ThreadVars *tv)
} }
} }
static int StoreQueueId(TmqhFlowCtx *ctx, char *name) { static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
{
Tmq *tmq = TmqGetQueueByName(name); Tmq *tmq = TmqGetQueueByName(name);
if (tmq == NULL) { if (tmq == NULL) {
tmq = TmqCreateQueue(SCStrdup(name)); tmq = TmqCreateQueue(SCStrdup(name));
@ -137,15 +138,18 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
return 0; return 0;
} }
/** \brief setup the queue handlers ctx /**
* \brief setup the queue handlers ctx
* *
* Parses a comma separated string "queuename1,queuename2,etc" * Parses a comma separated string "queuename1,queuename2,etc"
* and sets the ctx up to devide flows over these queue's. * and sets the ctx up to devide flows over these queue's.
* *
* \param queue_str comma separated string with output queue names * \param queue_str comma separated string with output queue names
* \retval ctx queues handlers ctx or NULL in error *
* \retval ctx queues handlers ctx or NULL in error
*/ */
void *TmqhOutputFlowSetupCtx(char *queue_str) { void *TmqhOutputFlowSetupCtx(char *queue_str)
{
if (queue_str == NULL || strlen(queue_str) == 0) if (queue_str == NULL || strlen(queue_str) == 0)
return NULL; return NULL;
@ -182,6 +186,7 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) {
SCFree(str); SCFree(str);
return (void *)ctx; return (void *)ctx;
error: error:
SCFree(ctx); SCFree(ctx);
if (str != NULL) if (str != NULL)
@ -189,9 +194,11 @@ error:
return NULL; return NULL;
} }
/** \brief select the queue to output to based on flow /**
* \param tv thread vars * \brief select the queue to output in a round robin fashion.
* \param p packet *
* \param tv thread vars
* \param p packet
*/ */
void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
{ {
@ -222,11 +229,15 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
PacketEnqueue(q, p); PacketEnqueue(q, p);
SCCondSignal(&q->cond_q); SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q); SCMutexUnlock(&q->mutex_q);
return;
} }
/** \brief select the queue to output to based on flow /**
* \param tv thread vars * \brief select the queue to output to based on queue lengths.
* \param p packet *
* \param tv thread vars
* \param p packet
*/ */
void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
{ {
@ -266,11 +277,15 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
PacketEnqueue(q, p); PacketEnqueue(q, p);
SCCondSignal(&q->cond_q); SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q); SCMutexUnlock(&q->mutex_q);
return;
} }
/** \brief select the queue to output to based on flow /**
* \param tv thread vars * \brief select the queue to output to based on active flows.
* \param p packet *
* \param tv thread vars
* \param p packet
*/ */
void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p) void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
{ {
@ -309,8 +324,15 @@ void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
PacketEnqueue(q, p); PacketEnqueue(q, p);
SCCondSignal(&q->cond_q); SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q); SCMutexUnlock(&q->mutex_q);
return;
} }
/**
* \brief Prints flow q handler statistics.
*
* Requires engine to be dead when this function's called.
*/
void TmqhFlowPrintStatistics(void) void TmqhFlowPrintStatistics(void)
{ {
uint32_t i; uint32_t i;
@ -331,7 +353,8 @@ void TmqhFlowPrintStatistics(void)
#ifdef UNITTESTS #ifdef UNITTESTS
static int TmqhOutputFlowSetupCtxTest01(void) { static int TmqhOutputFlowSetupCtxTest01(void)
{
int retval = 0; int retval = 0;
Tmq *tmq = NULL; Tmq *tmq = NULL;
@ -379,7 +402,8 @@ end:
return retval; return retval;
} }
static int TmqhOutputFlowSetupCtxTest02(void) { static int TmqhOutputFlowSetupCtxTest02(void)
{
int retval = 0; int retval = 0;
Tmq *tmq = NULL; Tmq *tmq = NULL;
@ -421,7 +445,8 @@ end:
return retval; return retval;
} }
static int TmqhOutputFlowSetupCtxTest03(void) { static int TmqhOutputFlowSetupCtxTest03(void)
{
int retval = 0; int retval = 0;
TmqResetQueues(); TmqResetQueues();
@ -457,11 +482,13 @@ end:
#endif /* UNITTESTS */ #endif /* UNITTESTS */
void TmqhFlowRegisterTests(void) { void TmqhFlowRegisterTests(void)
{
#ifdef UNITTESTS #ifdef UNITTESTS
UtRegisterTest("TmqhOutputFlowSetupCtxTest01", TmqhOutputFlowSetupCtxTest01, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest01", TmqhOutputFlowSetupCtxTest01, 1);
UtRegisterTest("TmqhOutputFlowSetupCtxTest02", TmqhOutputFlowSetupCtxTest02, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest02", TmqhOutputFlowSetupCtxTest02, 1);
UtRegisterTest("TmqhOutputFlowSetupCtxTest03", TmqhOutputFlowSetupCtxTest03, 1); UtRegisterTest("TmqhOutputFlowSetupCtxTest03", TmqhOutputFlowSetupCtxTest03, 1);
#endif #endif
}
return;
}

Loading…
Cancel
Save