support flow q handler schedulers active_flows and active_packets. Support new yaml option autofp_scheduler. Support for printing q handler stats as well

remotes/origin/HEAD
Anoop Saldanha 14 years ago committed by Victor Julien
parent e252048900
commit 4e417b72b5

@ -25,6 +25,7 @@
#define __FLOW_UTIL_H__
#include "detect-engine-state.h"
#include "tmqh-flow.h"
#define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec)
@ -92,7 +93,10 @@
(f)->tag_list = NULL; \
GenericVarFree((f)->flowvar); \
(f)->flowvar = NULL; \
(f)->autofp_tmqh_flow_qid = -1; \
if ((f)->autofp_tmqh_flow_qid != -1) { \
TmqhFlowUpdateActiveFlows((f)); \
(f)->autofp_tmqh_flow_qid = -1; \
} \
RESET_COUNTERS((f)); \
} while(0)
@ -107,6 +111,9 @@
DetectTagDataListFree((f)->tag_list); \
GenericVarFree((f)->flowvar); \
SCMutexDestroy(&(f)->de_state_m); \
if ((f)->autofp_tmqh_flow_qid != -1) { \
TmqhFlowUpdateActiveFlows((f)); \
} \
(f)->tag_list = NULL; \
} while(0)

@ -515,5 +515,6 @@ int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx)
else
cpu++;
}
return 0;
}

@ -1902,5 +1902,8 @@ int main(int argc, char **argv)
#endif /* OS_WIN32 */
SC_ATOMIC_DESTROY(engine_stage);
if (strcasecmp(RunmodeGetActive(), "autofp") == 0)
TmqhFlowPrintStatistics();
exit(engine_retval);
}

@ -34,42 +34,51 @@
#include "decode.h"
#include "threads.h"
#include "threadvars.h"
#include "tmqh-flow.h"
#include "tm-queuehandlers.h"
#include "conf.h"
#include "util-unittest.h"
typedef struct TmqhFlowMode_ {
PacketQueue *q;
SC_ATOMIC_DECLARE(uint64_t, active_flows);
SC_ATOMIC_DECLARE(uint64_t, total_packets);
} TmqhFlowMode;
/** \brief Ctx for the flow queue handler
* \param size number of queues to output to
* \param queues array of queue id's this flow handler outputs to */
typedef struct TmqhFlowCtx_ {
uint16_t size;
uint16_t last;
TmqhFlowMode *queues;
uint16_t round_robin_idx;
} TmqhFlowCtx;
Packet *TmqhInputFlow(ThreadVars *t);
void TmqhOutputFlowActiveFlows(ThreadVars *t, Packet *p);
void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
void *TmqhOutputFlowSetupCtx(char *queue_str);
void TmqhFlowRegisterTests(void);
TmqhFlowCtx *tmqh_flow_outctx = NULL;
void TmqhFlowRegister (void) {
tmqh_table[TMQH_FLOW].name = "flow";
tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
tmqh_table[TMQH_FLOW].OutHandlerCtxFree = NULL;
tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;
char *scheduler = NULL;
if (ConfGet("autofp-scheduler", &scheduler) == 1) {
if (strcmp(scheduler, "round_robin") == 0) {
SCLogInfo("AutoFP mode using \"Round Robin\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
} else if (strcmp(scheduler, "active_flows") == 0) {
SCLogInfo("AutoFP mode using \"Active Flows\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActiveFlows;
} else if (strcmp(scheduler, "active_packets") == 0) {
SCLogInfo("AutoFP mode using \"Active Packets\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
} else {
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" for "
"autofp-scheduler in conf. Killing engine.", scheduler);
exit(EXIT_FAILURE);
}
} else {
SCLogInfo("AutoFP mode using default \"Round Robin\" Q Handler");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
}
return;
}
/* same as 'simple' */
@ -110,7 +119,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
if (ctx->queues == NULL) {
ctx->size = 1;
ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
memset(ctx->queues, 0, ctx->size * sizeof(sizeof(TmqhFlowMode)));
memset(ctx->queues, 0, ctx->size * sizeof(TmqhFlowMode));
if (ctx->queues == NULL) {
return -1;
}
@ -120,9 +129,10 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) {
if (ctx->queues == NULL) {
return -1;
}
memset(ctx->queues + (ctx->size - 1), 0, sizeof(sizeof(TmqhFlowMode)));
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = &trans_q[id];
SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].active_flows);
return 0;
}
@ -168,6 +178,8 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) {
tstr = comma ? (comma + 1) : comma;
} while (tstr != NULL);
tmqh_flow_outctx = ctx;
SCFree(str);
return (void *)ctx;
error:
@ -194,7 +206,54 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
if (qid == -1) {
p->flow->autofp_tmqh_flow_qid = qid = ctx->round_robin_idx++;
ctx->round_robin_idx = ctx->round_robin_idx % ctx->size;
SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
ctx->queues[qid].total_flows++;
}
ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
/** \brief select the queue to output to based on flow
* \param tv thread vars
* \param p packet
*/
void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
{
int32_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = p->flow->autofp_tmqh_flow_qid;
if (qid == -1) {
uint16_t i = 0;
int lowest_id = 0;
TmqhFlowMode *queues = ctx->queues;
uint32_t lowest = queues[i].q->len;
for (i = 1; i < ctx->size; i++) {
if (queues[i].q->len < lowest) {
lowest = queues[i].q->len;
lowest_id = i;
}
}
p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
queues[qid].total_flows++;
}
ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
@ -209,6 +268,67 @@ void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
SCMutexUnlock(&q->mutex_q);
}
/** \brief select the queue to output to based on flow
* \param tv thread vars
* \param p packet
*/
void TmqhOutputFlowActiveFlows(ThreadVars *tv, Packet *p)
{
int32_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = p->flow->autofp_tmqh_flow_qid;
if (qid == -1) {
uint32_t i = 0;
int lowest_id = 0;
uint32_t lowest = ctx->queues[i].active_flows_sc_atomic__;
for (i = 1; i < ctx->size; i++) {
if (ctx->queues[i].active_flows_sc_atomic__ < lowest) {
lowest = ctx->queues[i].active_flows_sc_atomic__;
lowest_id = i;
}
}
p->flow->autofp_tmqh_flow_qid = qid = lowest_id;
SC_ATOMIC_ADD(ctx->queues[qid].active_flows, 1);
ctx->queues[qid].total_flows++;
}
ctx->queues[qid].total_packets++;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
void TmqhFlowPrintStatistics(void)
{
uint32_t i;
SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
tmqh_flow_outctx->size);
for (i = 0; i < tmqh_flow_outctx->size; i++) {
SCLogInfo("AutoFP - Total Packets - Queue %"PRIu32 " - %"PRIu64 , i,
tmqh_flow_outctx->queues[i].total_packets);
}
for (i = 0; i < tmqh_flow_outctx->size; i++) {
SCLogInfo("AutoFP - Total Flows - Queue %"PRIu32 " - %"PRIu64 , i,
tmqh_flow_outctx->queues[i].total_flows);
}
return;
}
#ifdef UNITTESTS
#if 0

@ -24,7 +24,37 @@
#ifndef __TMQH_FLOW_H__
#define __TMQH_FLOW_H__
typedef struct TmqhFlowMode_ {
PacketQueue *q;
SC_ATOMIC_DECLARE(uint64_t, active_flows);
uint64_t total_packets;
uint64_t total_flows;
} TmqhFlowMode;
/** \brief Ctx for the flow queue handler
* \param size number of queues to output to
* \param queues array of queue id's this flow handler outputs to */
typedef struct TmqhFlowCtx_ {
uint16_t size;
uint16_t last;
TmqhFlowMode *queues;
uint16_t round_robin_idx;
} TmqhFlowCtx;
extern TmqhFlowCtx *tmqh_flow_outctx;
static inline void TmqhFlowUpdateActiveFlows(Flow *f)
{
SC_ATOMIC_SUB(tmqh_flow_outctx->queues[f->autofp_tmqh_flow_qid].active_flows, 1);
return;
}
void TmqhFlowRegister (void);
void TmqhFlowRegisterTests(void);
void TmqhFlowPrintStatistics(void);
#endif /* __TMQH_FLOW_H__ */

@ -19,6 +19,14 @@
# to get the runmode custom modes that can be used here for a particular runmode.
#runmode: auto
# Specifies the kind of q scheduler used by flow pinned autofp mode.
# Supported scheduler are :
# round_robin - Flow alloted to queue in a round robin fashion.
# active-packets - Flow alloted to queue that has the least no of
# unprocessed packets.
# active-flows - Flow alloted to queue that has least no of active flows.
autofp-scheduler: round-robin
# Default pid file.
# Will use this file if no --pidfile in command options.
#pid-file: /var/run/suricata.pid

Loading…
Cancel
Save