diff --git a/src/flow-hash.c b/src/flow-hash.c index c7418aa294..466b246cc5 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -415,7 +415,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv, old_f->flags |= FLOW_TCP_REUSED; /* get some settings that we move over to the new flow */ FlowThreadId thread_id = old_f->thread_id; - int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid); /* since fb lock is still held this flow won't be found until we are done */ FLOWLOCK_UNLOCK(old_f); @@ -439,9 +438,6 @@ static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv, f->fb = fb; f->thread_id = thread_id; - if (autofp_tmqh_flow_qid != -1) { - SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid); - } return f; } diff --git a/src/flow-util.h b/src/flow-util.h index ca6a49cccb..7ab08f6c4f 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -70,8 +70,6 @@ (f)->hprev = NULL; \ (f)->lnext = NULL; \ (f)->lprev = NULL; \ - SC_ATOMIC_INIT((f)->autofp_tmqh_flow_qid); \ - (void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \ RESET_COUNTERS((f)); \ } while (0) @@ -113,9 +111,6 @@ (f)->sgh_toclient = NULL; \ GenericVarFree((f)->flowvar); \ (f)->flowvar = NULL; \ - if (SC_ATOMIC_GET((f)->autofp_tmqh_flow_qid) != -1) { \ - (void) SC_ATOMIC_SET((f)->autofp_tmqh_flow_qid, -1); \ - } \ RESET_COUNTERS((f)); \ } while(0) @@ -129,7 +124,6 @@ DetectEngineStateFlowFree((f)->de_state); \ } \ GenericVarFree((f)->flowvar); \ - SC_ATOMIC_DESTROY((f)->autofp_tmqh_flow_qid); \ } while(0) /** \brief check if a memory alloc would fit in the memcap diff --git a/src/flow.h b/src/flow.h index 95f3333a65..0400250e23 100644 --- a/src/flow.h +++ b/src/flow.h @@ -334,9 +334,6 @@ typedef struct Flow_ */ SC_ATOMIC_DECLARE(FlowRefCount, use_cnt); - /** flow queue id, used with autofp */ - SC_ATOMIC_DECLARE(int16_t, autofp_tmqh_flow_qid); - /** flow tenant id, used to setup flow timeout and stream pseudo * packets with the correct tenant id set */ uint32_t tenant_id; diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 881c5985ca..480e909c32 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -42,8 +42,6 @@ Packet *TmqhInputFlow(ThreadVars *t); void TmqhOutputFlowHash(ThreadVars *t, Packet *p); void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p); -void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p); -void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p); void *TmqhOutputFlowSetupCtx(char *queue_str); void TmqhOutputFlowFreeCtx(void *ctx); void TmqhFlowRegisterTests(void); @@ -59,10 +57,10 @@ void TmqhFlowRegister(void) char *scheduler = NULL; if (ConfGet("autofp-scheduler", &scheduler) == 1) { if (strcasecmp(scheduler, "round-robin") == 0) { - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; + SCLogNotice("using flow hash instead of round robin"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else if (strcasecmp(scheduler, "active-packets") == 0) { - //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; - SCLogNotice("FIXME: using flow hash instead of active packets"); + SCLogNotice("using flow hash instead of active packets"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else if (strcasecmp(scheduler, "hash") == 0) { tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; @@ -75,7 +73,6 @@ void TmqhFlowRegister(void) exit(EXIT_FAILURE); } } else { - //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } @@ -88,8 +85,6 @@ void TmqhFlowPrintAutofpHandler(void) if (tmqh_table[TMQH_FLOW].OutHandler == (f)) \ SCLogInfo("AutoFP mode using \"%s\" flow load balancer", (msg)) - PRINT_IF_FUNC(TmqhOutputFlowRoundRobin, "Round Robin"); - PRINT_IF_FUNC(TmqhOutputFlowActivePackets, "Active Packets"); PRINT_IF_FUNC(TmqhOutputFlowHash, "Hash"); PRINT_IF_FUNC(TmqhOutputFlowIPPair, "IPPair"); @@ -153,8 +148,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name) memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode)); } ctx->queues[ctx->size - 1].q = &trans_q[id]; - SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_packets); - SC_ATOMIC_INIT(ctx->queues[ctx->size - 1].total_flows); return 0; } @@ -205,8 +198,6 @@ void *TmqhOutputFlowSetupCtx(char *queue_str) tstr = comma ? (comma + 1) : comma; } while (tstr != NULL); - SC_ATOMIC_INIT(ctx->round_robin_idx); - SCFree(str); return (void *)ctx; @@ -219,161 +210,16 @@ error: void TmqhOutputFlowFreeCtx(void *ctx) { - int i; TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx; SCLogInfo("AutoFP - Total flow handler queues - %" PRIu16, fctx->size); - for (i = 0; i < fctx->size; i++) { - SCLogInfo("AutoFP - Queue %-2"PRIu32 " - pkts: %-12"PRIu64" flows: %-12"PRIu64, i, - SC_ATOMIC_GET(fctx->queues[i].total_packets), - SC_ATOMIC_GET(fctx->queues[i].total_flows)); - SC_ATOMIC_DESTROY(fctx->queues[i].total_packets); - SC_ATOMIC_DESTROY(fctx->queues[i].total_flows); - } - SCFree(fctx->queues); SCFree(fctx); return; } -/** - * \brief select the queue to output in a round robin fashion. - * - * \param tv thread vars - * \param p packet - */ -void TmqhOutputFlowRoundRobin(ThreadVars *tv, Packet *p) -{ - int16_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { - qid = SC_ATOMIC_ADD(ctx->round_robin_idx, 1); - if (qid >= ctx->size) { - SC_ATOMIC_RESET(ctx->round_robin_idx); - qid = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); - } - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - -/** - * \brief select the queue to output to based on queue lengths. - * - * \param tv thread vars - * \param p packet - */ -void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) -{ - int16_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we round robin the packets over the queues */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { - int16_t i = 0; - int16_t lowest_id = 0; - TmqhFlowMode *queues = ctx->queues; - uint32_t lowest = queues[i].q->len; - for (i = 1; i < ctx->size; i++) { - if (queues[i].q->len < lowest) { - lowest = queues[i].q->len; - lowest_id = i; - } - } - qid = lowest_id; - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, lowest_id); - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); - } - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - -/** - * \brief select the queue to output based on address hash. - * - * \param tv thread vars. - * \param p packet. - */ -void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p) -{ - int16_t qid = 0; - - TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { -#if __WORDSIZE == 64 - uint64_t addr = (uint64_t)p->flow; -#else - uint32_t addr = (uint32_t)p->flow; -#endif - addr >>= 7; - - /* we don't have to worry about possible overflow, since - * ctx->size will be less than 2 ** 15 for sure */ - qid = addr % ctx->size; - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); - } - } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; - } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); - - PacketQueue *q = ctx->queues[qid].q; - SCMutexLock(&q->mutex_q); - PacketEnqueue(q, p); - SCCondSignal(&q->cond_q); - SCMutexUnlock(&q->mutex_q); - - return; -} - void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) { int16_t qid = 0; @@ -389,7 +235,6 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) if (ctx->last == ctx->size) ctx->last = 0; } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); @@ -399,6 +244,7 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) return; } + /** * \brief select the queue to output based on IP address pair. * @@ -413,32 +259,17 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p) TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; - /* if no flow we use the first queue, - * should be rare */ - if (p->flow != NULL) { - qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); - if (qid == -1) { - if (p->src.family == AF_INET6) { - for (i = 0; i < 4; i++) { - addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i]; - } - } else { - addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0]; - } - - /* we don't have to worry about possible overflow, since - * ctx->size will be lesser than 2 ** 31 for sure */ - qid = addr_hash % ctx->size; - (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); + if (p->src.family == AF_INET6) { + for (i = 0; i < 4; i++) { + addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i]; } } else { - qid = ctx->last++; - - if (ctx->last == ctx->size) - ctx->last = 0; + addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0]; } - (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); + + /* we don't have to worry about possible overflow, since + * ctx->size will be lesser than 2 ** 31 for sure */ + qid = addr_hash % ctx->size; PacketQueue *q = ctx->queues[qid].q; SCMutexLock(&q->mutex_q); diff --git a/src/tmqh-flow.h b/src/tmqh-flow.h index e710ffeca4..3cec7a165f 100644 --- a/src/tmqh-flow.h +++ b/src/tmqh-flow.h @@ -26,8 +26,6 @@ typedef struct TmqhFlowMode_ { PacketQueue *q; - SC_ATOMIC_DECLARE(uint64_t, total_packets); - SC_ATOMIC_DECLARE(uint64_t, total_flows); } TmqhFlowMode; /** \brief Ctx for the flow queue handler @@ -38,8 +36,6 @@ typedef struct TmqhFlowCtx_ { uint16_t last; TmqhFlowMode *queues; - - SC_ATOMIC_DECLARE(int16_t, round_robin_idx); } TmqhFlowCtx; void TmqhFlowRegister (void);