From 2e9279dd42840f5e2ba4e6ca969871f9f7b18ede Mon Sep 17 00:00:00 2001 From: cdwakelin Date: Wed, 23 Mar 2016 17:13:55 +0000 Subject: [PATCH] autofp: add "ippair" scheduler Add "ippair" autofp scheduler to split traffic based on source and destination IP only (not ports). - This is useful when using the "xbits" feature to track events that occur between the same hosts but not necessarily the same flow (such as exploit kit landings/expoits/payloads) - The disadvantage is that traffic may be balanced very unevenly between threads if some host pairs are much more frequently seen than others, so it may be only practicable for sandbox or pcap analysis - not tested for IPv6 See https://redmine.openinfosecfoundation.org/issues/1661 --- src/tmqh-flow.c | 54 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index a1ec839bcf..49679b8fb2 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -41,6 +41,7 @@ Packet *TmqhInputFlow(ThreadVars *t); void TmqhOutputFlowHash(ThreadVars *t, Packet *p); +void TmqhOutputFlowIPPair(ThreadVars *t, Packet *p); void TmqhOutputFlowActivePackets(ThreadVars *t, Packet *p); void TmqhOutputFlowRoundRobin(ThreadVars *t, Packet *p); void *TmqhOutputFlowSetupCtx(char *queue_str); @@ -66,6 +67,9 @@ void TmqhFlowRegister(void) } else if (strcasecmp(scheduler, "hash") == 0) { SCLogInfo("AutoFP mode using \"Hash\" flow load balancer"); tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; + } else if (strcasecmp(scheduler, "ippair") == 0) { + SCLogInfo("AutoFP mode using \"ippair\" flow load balancer"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair; } else { SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" " "for autofp-scheduler in conf. Killing engine.", @@ -359,6 +363,56 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) return; } +/** + * \brief select the queue to output based on IP address pair. + * + * \param tv thread vars. + * \param p packet. + */ +void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p) +{ + int16_t qid = 0; + uint32_t addr_hash = 0; + int i; + + TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; + + /* if no flow we use the first queue, + * should be rare */ + if (p->flow != NULL) { + qid = SC_ATOMIC_GET(p->flow->autofp_tmqh_flow_qid); + if (qid == -1) { + if (p->src.family == AF_INET6) { + for (i = 0; i < 4; i++) { + addr_hash += p->src.addr_data32[i] + p->dst.addr_data32[i]; + } + } else { + addr_hash = p->src.addr_data32[0] + p->dst.addr_data32[0]; + } + + /* we don't have to worry about possible overflow, since + * ctx->size will be lesser than 2 ** 31 for sure */ + qid = addr_hash % ctx->size; + (void) SC_ATOMIC_SET(p->flow->autofp_tmqh_flow_qid, qid); + (void) SC_ATOMIC_ADD(ctx->queues[qid].total_flows, 1); + } + } else { + qid = ctx->last++; + + if (ctx->last == ctx->size) + ctx->last = 0; + } + (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); + + PacketQueue *q = ctx->queues[qid].q; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); + + return; +} + #ifdef UNITTESTS static int TmqhOutputFlowSetupCtxTest01(void)