From 78ecfe8780ca3d06bba9318c42775f51e132e45f Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Tue, 17 May 2016 18:34:55 +0200 Subject: [PATCH] autofp: update queue handlers Now that the flow lookup is done in the worker threads the flow queue handlers running after the capture thread(s) no longer have access to the flow. This limits the options of how flow balancing can be done. This patch removes all code that is now useless. The only 2 methods that still make sense are 'hash' and 'ippair'. --- src/flow-hash.c | 4 - src/flow-util.h | 6 -- src/flow.h | 3 - src/tmqh-flow.c | 193 +++--------------------------------------------- src/tmqh-flow.h | 4 - 5 files changed, 12 insertions(+), 198 deletions(-) diff --git a/src/flow-hash.c b/src/flow-hash.c index c7418aa294..466b246cc5 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -415,7 +415,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv, old_f->flags |= FLOW_TCP_REUSED; /* get some settings that we move over to the new flow */ FlowThreadId thread_id = old_f->thread_id; - int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid); /* since fb lock is still held this flow won't be found until we are done */ FLOWLOCK_UNLOCK(old_f); @@ -439,9 +438,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv, f->fb = fb; f->thread_id = thread_id; - if (autofp_tmqh_flow_qid != -1) { - SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid); - } return f; } diff --git a/src/flow-util.h b/src/flow-util.h index ca6a49cccb..7ab08f6c4f 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -70,8 +70,6 @@ (f)->hprev = NULL; \ (f)->lnext = NULL; \ (f)->lprev = NULL; \ - SC_ATOMIC_INIT((f)->autofp_tmqh_flow_qid); \ - (void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \ RESET_COUNTERS((f)); \ } while (0) @@ -113,9 +111,6 @@ (f)->sgh_toclient = NULL; \ GenericVarFree((f)->flowvar); \ (f)->flowvar = NULL; \ - if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \ - (void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \ - } \ RESET_COUNTERS((f)); \ } while(0) @@ -129,7 +124,6 @@ DetectEngineStateFlowFree((f)->de_state); \ } \ GenericVarFree((f)->flowvar); \ - SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid); \ } while(0) /** \brief check if a memory alloc would fit in the memcap diff --git a/src/flow.h b/src/flow.h index 95f3333a65..0400250e23 100644 --- a/src/flow.h +++ b/src/flow.h @@ -334,9 +334,6 @@ typedef struct Flow_ */ SC_ATOMIC_DECLARE(FlowRefCount, use_cnt); - /** flow queue id, used with autofp */ - SC_ATOMIC_DECLARE(int16_t, autofp_tmqh_flow_qid); - /** flow tenant id, used to setup flow timeout and stream pseudo * packets with the correct tenant id set */ uint32_t tenant_id; diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 881c5985ca..480e909c32 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -42,8 +42,6 @@ Packet *TmqhInputFlow(ThreadVars *t); void TmqhOutputFlowHash(ThreadVars *t, Packet *p); void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p); -void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p); -void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p); void *TmqhOutputFlowSetupCtx(char *queue_str); void TmqhOutputFlowFreeCtx(void *ctx); void TmqhFlowRegisterTests(void); @@ -59,10 +57,10 @@ void TmqhFlowRegister(void) char *scheduler = NULL; if (ConfGet("autofp-scheduler", &scheduler) == 1) { if (strcasecmp(scheduler, "round-robin") == 0) { - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; + SCLogNotice("using flow hash instead of round robin"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else if (strcasecmp(scheduler, "active-packets") == 0) { - //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; - SCLogNotice("FIXME: using flow hash instead of active packets"); + SCLogNotice("using flow hash instead of active packets"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else if (strcasecmp(scheduler, "hash") == 0) { tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; @@ -75,7 +73,6 @@ void TmqhFlowRegister(void) exit(EXIT_FAILURE); } } else { - //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } @@ -88,8 +85,6 @@ void TmqhFlowPrintAutofpHandler(void) if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \ SCLogInfo("AutoFP mode using \"%s\" flow load balancer", (msg)) - PRINT_IF_FUNC(TmqhOutputFlowRoundRobin, "Round Robin"); - PRINT_IF_FUNC(TmqhOutputFlowActivePackets, "Active Packets"); PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash"); PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair"); @@ -153,8 +148,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode)); } ctx->queues[ctx->size - 1].q = &trans_q[id]; - SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets); - SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows); return 0; } @@ -205,8 +198,6 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) tstr = comma ? (comma + 1) : comma; } while (tstr != NULL); - SC_ATOMIC_INIT(ctx->round_robin_idx); - SCFree(str); return (void *)ctx; @@ -219,161 +210,16 @@ error: void TmqhOutputFlowFreeCtx(void *ctx) { - int i; TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx; SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16, fctx->size); - for (i = 0; i < fctx->size; i++) { - SCLogInfo("AutoFP - Queue %-2"PRIu32 " - pkts: %-12"PRIu64" flows: %-12"PRIu64, i, - SC_ATOMIC_GET(fctx->queues[i].total_packets), - SC_ATOMIC_GET(fctx->queues[i].total_flows)); - SC_ATOMIC_DESTROY(fctx->queues[i].total_packets); - SC_ATOMIC_DESTROY(fctx->queues[i].total_flows); - } - SCFree(fctx->queues); SCFree(fctx); return; } -/** - * \brief select the queue to output in a round robin fashion. - * - * \param tv thread vars - * \param p packet - */ -void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) -{ - int16_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { - qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1); - if (qid >= ctx->size) { - SC_ATOMIC_RESET(ctx->round_robin_idx); - qid = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); - } - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - -/** - * \brief select the queue to output to based on queue lengths. - * - * \param tv thread vars - * \param p packet - */ -void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) -{ - int16_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we round robin the packets over the queues */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { - int16_t i = 0; - int16_t lowest_id = 0; - TmqhFlowMode *queues = ctx->queues; - uint32_t lowest = queues[i].q->len; - for (i = 1; i < ctx->size; i++) { - if (queues[i].q->len < lowest) { - lowest = queues[i].q->len; - lowest_id = i; - } - } - qid = lowest_id; - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id); - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); - } - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - -/** - * \brief select the queue to output based on address hash. - * - * \param tv thread vars. - * \param p packet. - */ -void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p) -{ - int16_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { -#if __WORDSIZE == 64 - uint64_t addr = (uint64_t)p->flow; -#else - uint32_t addr = (uint32_t)p->flow; -#endif - addr >>= 7; - - /* we don't have to worry about possible overflow, since - * ctx->size will be less than 2 ** 15 for sure */ - qid = addr % ctx->size; - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); - } - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) { int16_t qid = 0; @@ -389,7 +235,6 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) if (ctx->last == ctx->size) ctx->last = 0; } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); @@ -399,6 +244,7 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) return; } + /** * \brief select the queue to output based on IP address pair. * @@ -413,32 +259,17 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p) TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { - if (p->src.family == AF_INET6) { - for (i = 0; i < 4; i++) { - addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i]; - } - } else { - addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0]; - } - - /* we don't have to worry about possible overflow, since - * ctx->size will be lesser than 2 ** 31 for sure */ - qid = addr_hash % ctx->size; - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); + if (p->src.family == AF_INET6) { + for (i = 0; i < 4; i++) { + addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i]; } } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; + addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0]; } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); + + /* we don't have to worry about possible overflow, since + * ctx->size will be lesser than 2 ** 31 for sure */ + qid = addr_hash % ctx->size; PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); diff --git a/src/tmqh-flow.h b/src/tmqh-flow.h index e710ffeca4..3cec7a165f 100644 --- a/src/tmqh-flow.h +++ b/src/tmqh-flow.h @@ -26,8 +26,6 @@ typedef struct TmqhFlowMode_ { PacketQueue *q; - SC_ATOMIC_DECLARE(uint64_t, total_packets); - SC_ATOMIC_DECLARE(uint64_t, total_flows); } TmqhFlowMode; /** \brief Ctx for the flow queue handler @@ -38,8 +36,6 @@ typedef struct TmqhFlowCtx_ { uint16_t last; TmqhFlowMode *queues; - - SC_ATOMIC_DECLARE(int16_t, round_robin_idx); } TmqhFlowCtx; void TmqhFlowRegister (void);