From 4e417b72b54ab7bd3a59d92db1cf529a745909e6 Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Thu, 12 Jan 2012 00:03:13 +0530 Subject: [PATCH] support flow q handler schedulers active_flows and active_packets. Support new yaml option autofp_scheduler. Support for printing q handler stats as well --- src/flow-util.h | 9 ++- src/runmode-pcap-file.c | 1 + src/suricata.c | 3 + src/tmqh-flow.c | 164 ++++++++++++++++++++++++++++++++++------ src/tmqh-flow.h | 30 ++++++++ suricata.yaml.in | 8 ++ 6 files changed, 192 insertions(+), 23 deletions(-) diff --git a/src/flow-util.h b/src/flow-util.h index 751687dd61..e90d9c44db 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -25,6 +25,7 @@ #define __FLOW_UTIL_H__ #include "detect-engine-state.h" +#include "tmqh-flow.h" #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) @@ -92,7 +93,10 @@ (f)->tag_list = NULL; \ GenericVarFree((f)->flowvar); \ (f)->flowvar = NULL; \ - (f)->autofp_tmqh_flow_qid = -1; \ + if ((f)->autofp_tmqh_flow_qid != -1) { \ + TmqhFlowUpdateActiveFlows((f)); \ + (f)->autofp_tmqh_flow_qid = -1; \ + } \ RESET_COUNTERS((f)); \ } while(0) @@ -107,6 +111,9 @@ DetectTagDataListFree((f)->tag_list); \ GenericVarFree((f)->flowvar); \ SCMutexDestroy(&(f)->de_state_m); \ + if ((f)->autofp_tmqh_flow_qid != -1) { \ + TmqhFlowUpdateActiveFlows((f)); \ + } \ (f)->tag_list = NULL; \ } while(0) diff --git a/src/runmode-pcap-file.c b/src/runmode-pcap-file.c index 27c8edd7fc..0bfe01ec12 100644 --- a/src/runmode-pcap-file.c +++ b/src/runmode-pcap-file.c @@ -515,5 +515,6 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx) else cpu++; } + return 0; } diff --git a/src/suricata.c b/src/suricata.c index 40c5951282..cf1636d286 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -1902,5 +1902,8 @@ int main(int argc, char **argv) #endif /* OS_WIN32 */ SC_ATOMIC_DESTROY(engine_stage); + if (strcasecmp(RunmodeGetActive(), "autofp") == 0) + TmqhFlowPrintStatistics(); + exit(engine_retval); } diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 91d662016c..92cb70c528 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -34,42 +34,51 @@ #include "decode.h" #include "threads.h" #include "threadvars.h" +#include "tmqh-flow.h" #include "tm-queuehandlers.h" +#include "conf.h" #include "util-unittest.h" -typedef struct TmqhFlowMode_ { - PacketQueue *q; - - SC_ATOMIC_DECLARE(uint64_t, active_flows); - SC_ATOMIC_DECLARE(uint64_t, total_packets); -} TmqhFlowMode; - -/** \brief Ctx for the flow queue handler - * \param size number of queues to output to - * \param queues array of queue id's this flow handler outputs to */ -typedef struct TmqhFlowCtx_ { - uint16_t size; - uint16_t last; - - TmqhFlowMode *queues; - - uint16_t round_robin_idx; -} TmqhFlowCtx; - Packet *TmqhInputFlow(ThreadVars *t); +void TmqhOutputFlowActiveFlows(ThreadVars *t, Packet *p); +void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p); void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p); void *TmqhOutputFlowSetupCtx(char *queue_str); void TmqhFlowRegisterTests(void); +TmqhFlowCtx *tmqh_flow_outctx = NULL; + void TmqhFlowRegister (void) { tmqh_table[TMQH_FLOW].name = "flow"; tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow; - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx; tmqh_table[TMQH_FLOW].OutHandlerCtxFree = NULL; tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests; + + char *scheduler = NULL; + if (ConfGet("autofp-scheduler", &scheduler) == 1) { + if (strcmp(scheduler, "round_robin") == 0) { + SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; + } else if (strcmp(scheduler, "active_flows") == 0) { + SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows; + } else if (strcmp(scheduler, "active_packets") == 0) { + SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; + } else { + SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" for " + "autofp-scheduler in conf. Killing engine.", scheduler); + exit(EXIT_FAILURE); + } + } else { + SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; + } + + return; } /* same as 'simple' */ @@ -110,7 +119,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) { if (ctx->queues == NULL) { ctx->size = 1; ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode)); - memset(ctx->queues, 0, ctx->size * sizeof(sizeof(TmqhFlowMode))); + memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode)); if (ctx->queues == NULL) { return -1; } @@ -120,9 +129,10 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) { if (ctx->queues == NULL) { return -1; } - memset(ctx->queues + (ctx->size - 1), 0, sizeof(sizeof(TmqhFlowMode))); + memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode)); } ctx->queues[ctx->size - 1].q = &trans_q[id]; + SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows); return 0; } @@ -168,6 +178,8 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) { tstr = comma ? (comma + 1) : comma; } while (tstr != NULL); + tmqh_flow_outctx = ctx; + SCFree(str); return (void *)ctx; error: @@ -194,7 +206,54 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) if (qid == -1) { p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++; ctx->round_robin_idx = ctx->round_robin_idx % ctx->size; + SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1); + ctx->queues[qid].total_flows++; + } + ctx->queues[qid].total_packets++; + } else { + qid = ctx->last++; + + if (ctx->last == ctx->size) + ctx->last = 0; + } + + PacketQueue *q = ctx->queues[qid].q; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); +} + +/** \brief select the queue to output to based on flow + * \param tv thread vars + * \param p packet + */ +void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) +{ + int32_t qid = 0; + + TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; + + /* if no flow we use the first queue, + * should be rare */ + if (p->flow != NULL) { + qid = p->flow->autofp_tmqh_flow_qid; + if (qid == -1) { + uint16_t i = 0; + int lowest_id = 0; + TmqhFlowMode *queues = ctx->queues; + uint32_t lowest = queues[i].q->len; + for (i = 1; i < ctx->size; i++) { + if (queues[i].q->len < lowest) { + lowest = queues[i].q->len; + lowest_id = i; + } + } + p->flow->autofp_tmqh_flow_qid = qid = lowest_id; + SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1); + queues[qid].total_flows++; } + ctx->queues[qid].total_packets++; } else { qid = ctx->last++; @@ -209,6 +268,67 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) SCMutexUnlock(&q->mutex_q); } +/** \brief select the queue to output to based on flow + * \param tv thread vars + * \param p packet + */ +void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p) +{ + int32_t qid = 0; + + TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; + + /* if no flow we use the first queue, + * should be rare */ + if (p->flow != NULL) { + qid = p->flow->autofp_tmqh_flow_qid; + if (qid == -1) { + uint32_t i = 0; + int lowest_id = 0; + uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__; + for (i = 1; i < ctx->size; i++) { + if (ctx->queues[i].active_flows_sc_atomic__ < lowest) { + lowest = ctx->queues[i].active_flows_sc_atomic__; + lowest_id = i; + } + } + p->flow->autofp_tmqh_flow_qid = qid = lowest_id; + SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1); + ctx->queues[qid].total_flows++; + } + ctx->queues[qid].total_packets++; + } else { + qid = ctx->last++; + + if (ctx->last == ctx->size) + ctx->last = 0; + } + + PacketQueue *q = ctx->queues[qid].q; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); +} + +void TmqhFlowPrintStatistics(void) +{ + uint32_t i; + + SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16, + tmqh_flow_outctx->size); + for (i = 0; i < tmqh_flow_outctx->size; i++) { + SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i, + tmqh_flow_outctx->queues[i].total_packets); + } + for (i = 0; i < tmqh_flow_outctx->size; i++) { + SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i, + tmqh_flow_outctx->queues[i].total_flows); + } + + return; +} + #ifdef UNITTESTS #if 0 diff --git a/src/tmqh-flow.h b/src/tmqh-flow.h index cdaacda624..84ea3bcdf7 100644 --- a/src/tmqh-flow.h +++ b/src/tmqh-flow.h @@ -24,7 +24,37 @@ #ifndef __TMQH_FLOW_H__ #define __TMQH_FLOW_H__ +typedef struct TmqhFlowMode_ { + PacketQueue *q; + + SC_ATOMIC_DECLARE(uint64_t, active_flows); + uint64_t total_packets; + uint64_t total_flows; +} TmqhFlowMode; + +/** \brief Ctx for the flow queue handler + * \param size number of queues to output to + * \param queues array of queue id's this flow handler outputs to */ +typedef struct TmqhFlowCtx_ { + uint16_t size; + uint16_t last; + + TmqhFlowMode *queues; + + uint16_t round_robin_idx; +} TmqhFlowCtx; + +extern TmqhFlowCtx *tmqh_flow_outctx; + +static inline void TmqhFlowUpdateActiveFlows(Flow *f) +{ + SC_ATOMIC_SUB(tmqh_flow_outctx->queues[f->autofp_tmqh_flow_qid].active_flows, 1); + + return; +} + void TmqhFlowRegister (void); void TmqhFlowRegisterTests(void); +void TmqhFlowPrintStatistics(void); #endif /* __TMQH_FLOW_H__ */ diff --git a/suricata.yaml.in b/suricata.yaml.in index f16aac4614..cde848a957 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -19,6 +19,14 @@ # to get the runmode custom modes that can be used here for a particular runmode. #runmode: auto +# Specifies the kind of q scheduler used by flow pinned autofp mode. +# Supported scheduler are : +# round_robin - Flow alloted to queue in a round robin fashion. +# active-packets - Flow alloted to queue that has the least no of +# unprocessed packets. +# active-flows - Flow alloted to queue that has least no of active flows. +autofp-scheduler: round-robin + # Default pid file. # Will use this file if no --pidfile in command options. #pid-file: /var/run/suricata.pid