autofp: update queue handlers

Now that the flow lookup is done in the worker threads the flow
queue handlers running after the capture thread(s) no longer have
access to the flow. This limits the options of how flow balancing
can be done.

This patch removes all code that is now useless. The only 2 methods
that still make sense are 'hash' and 'ippair'.
pull/2089/head
Victor Julien 9 years ago
parent 61ce05e7ed
commit 78ecfe8780

@ -415,7 +415,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
old_f->flags |= FLOW_TCP_REUSED;
/* get some settings that we move over to the new flow */
FlowThreadId thread_id = old_f->thread_id;
int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
/* since fb lock is still held this flow won't be found until we are done */
FLOWLOCK_UNLOCK(old_f);
@ -439,9 +438,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
f->fb = fb;
f->thread_id = thread_id;
if (autofp_tmqh_flow_qid != -1) {
SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
}
return f;
}

@ -70,8 +70,6 @@
(f)->hprev = NULL; \
(f)->lnext = NULL; \
(f)->lprev = NULL; \
SC_ATOMIC_INIT((f)->autofp_tmqh_flow_qid); \
(void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \
RESET_COUNTERS((f)); \
} while (0)
@ -113,9 +111,6 @@
(f)->sgh_toclient = NULL; \
GenericVarFree((f)->flowvar); \
(f)->flowvar = NULL; \
if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \
(void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \
} \
RESET_COUNTERS((f)); \
} while(0)
@ -129,7 +124,6 @@
DetectEngineStateFlowFree((f)->de_state); \
} \
GenericVarFree((f)->flowvar); \
SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid); \
} while(0)
/** \brief check if a memory alloc would fit in the memcap

@ -334,9 +334,6 @@ typedef struct Flow_
*/
SC_ATOMIC_DECLARE(FlowRefCount, use_cnt);
/** flow queue id, used with autofp */
SC_ATOMIC_DECLARE(int16_t, autofp_tmqh_flow_qid);
/** flow tenant id, used to setup flow timeout and stream pseudo
* packets with the correct tenant id set */
uint32_t tenant_id;

@ -42,8 +42,6 @@
Packet *TmqhInputFlow(ThreadVars *t);
void TmqhOutputFlowHash(ThreadVars *t, Packet *p);
void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p);
void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p);
void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p);
void *TmqhOutputFlowSetupCtx(char *queue_str);
void TmqhOutputFlowFreeCtx(void *ctx);
void TmqhFlowRegisterTests(void);
@ -59,10 +57,10 @@ void TmqhFlowRegister(void)
char *scheduler = NULL;
if (ConfGet("autofp-scheduler", &scheduler) == 1) {
if (strcasecmp(scheduler, "round-robin") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
SCLogNotice("using flow hash instead of round robin");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "active-packets") == 0) {
//tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
SCLogNotice("FIXME: using flow hash instead of active packets");
SCLogNotice("using flow hash instead of active packets");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "hash") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
@ -75,7 +73,6 @@ void TmqhFlowRegister(void)
exit(EXIT_FAILURE);
}
} else {
//tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
}
@ -88,8 +85,6 @@ void TmqhFlowPrintAutofpHandler(void)
if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \
SCLogInfo("AutoFP mode using \"%s\" flow load balancer", (msg))
PRINT_IF_FUNC(TmqhOutputFlowRoundRobin, "Round Robin");
PRINT_IF_FUNC(TmqhOutputFlowActivePackets, "Active Packets");
PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash");
PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair");
@ -153,8 +148,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
}
ctx->queues[ctx->size - 1].q = &trans_q[id];
SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets);
SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows);
return 0;
}
@ -205,8 +198,6 @@ void *TmqhOutputFlowSetupCtx(char *queue_str)
tstr = comma ? (comma + 1) : comma;
} while (tstr != NULL);
SC_ATOMIC_INIT(ctx->round_robin_idx);
SCFree(str);
return (void *)ctx;
@ -219,161 +210,16 @@ error:
void TmqhOutputFlowFreeCtx(void *ctx)
{
int i;
TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16,
fctx->size);
for (i = 0; i < fctx->size; i++) {
SCLogInfo("AutoFP - Queue %-2"PRIu32 " - pkts: %-12"PRIu64" flows: %-12"PRIu64, i,
SC_ATOMIC_GET(fctx->queues[i].total_packets),
SC_ATOMIC_GET(fctx->queues[i].total_flows));
SC_ATOMIC_DESTROY(fctx->queues[i].total_packets);
SC_ATOMIC_DESTROY(fctx->queues[i].total_flows);
}
SCFree(fctx->queues);
SCFree(fctx);
return;
}
/**
* \brief select the queue to output in a round robin fashion.
*
* \param tv thread vars
* \param p packet
*/
void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1);
if (qid >= ctx->size) {
SC_ATOMIC_RESET(ctx->round_robin_idx);
qid = 0;
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
(void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
}
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
/**
* \brief select the queue to output to based on queue lengths.
*
* \param tv thread vars
* \param p packet
*/
void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we round robin the packets over the queues */
if (p->flow != NULL) {
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
int16_t i = 0;
int16_t lowest_id = 0;
TmqhFlowMode *queues = ctx->queues;
uint32_t lowest = queues[i].q->len;
for (i = 1; i < ctx->size; i++) {
if (queues[i].q->len < lowest) {
lowest = queues[i].q->len;
lowest_id = i;
}
}
qid = lowest_id;
(void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id);
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
}
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
/**
* \brief select the queue to output based on address hash.
*
* \param tv thread vars.
* \param p packet.
*/
void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
#if __WORDSIZE == 64
uint64_t addr = (uint64_t)p->flow;
#else
uint32_t addr = (uint32_t)p->flow;
#endif
addr >>= 7;
/* we don't have to worry about possible overflow, since
* ctx->size will be less than 2 ** 15 for sure */
qid = addr % ctx->size;
(void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
}
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
@ -389,7 +235,6 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
if (ctx->last == ctx->size)
ctx->last = 0;
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
@ -399,6 +244,7 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
return;
}
/**
* \brief select the queue to output based on IP address pair.
*
@ -413,32 +259,17 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
/* if no flow we use the first queue,
* should be rare */
if (p->flow != NULL) {
qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid);
if (qid == -1) {
if (p->src.family == AF_INET6) {
for (i = 0; i < 4; i++) {
addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
}
} else {
addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
}
/* we don't have to worry about possible overflow, since
* ctx->size will be lesser than 2 ** 31 for sure */
qid = addr_hash % ctx->size;
(void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid);
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1);
if (p->src.family == AF_INET6) {
for (i = 0; i < 4; i++) {
addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i];
}
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0];
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
/* we don't have to worry about possible overflow, since
* ctx->size will be lesser than 2 ** 31 for sure */
qid = addr_hash % ctx->size;
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);

@ -26,8 +26,6 @@
typedef struct TmqhFlowMode_ {
PacketQueue *q;
SC_ATOMIC_DECLARE(uint64_t, total_packets);
SC_ATOMIC_DECLARE(uint64_t, total_flows);
} TmqhFlowMode;
/** \brief Ctx for the flow queue handler
@ -38,8 +36,6 @@ typedef struct TmqhFlowCtx_ {
uint16_t last;
TmqhFlowMode *queues;
SC_ATOMIC_DECLARE(int16_t, round_robin_idx);
} TmqhFlowCtx;
void TmqhFlowRegister (void);

Loading…
Cancel
Save