diff --git a/src/flow-hash.c b/src/flow-hash.c index 88c215f498..12b21bb8e3 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -37,6 +37,8 @@ #include "util-time.h" #include "util-debug.h" +#define FLOW_DEFAULT_FLOW_PRUNE 5 + #ifdef FLOW_DEBUG_STATS #define FLOW_DEBUG_STATS_PROTO_ALL 0 #define FLOW_DEBUG_STATS_PROTO_TCP 1 @@ -295,6 +297,8 @@ Flow *FlowGetFlowFromHash (Packet *p) { Flow *f = NULL; FlowHashCountInit; + struct timeval ts; + uint32_t not_released = 0; /* get the key to our bucket */ uint32_t key = FlowGetKey(p); @@ -317,14 +321,54 @@ Flow *FlowGetFlowFromHash (Packet *p) /* no, so get a new one */ f = fb->f = FlowDequeue(&flow_spare_q); if (f == NULL) { - flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ - - f = fb->f = FlowAlloc(); + f = fb->f = FlowAllocDirect(); if (f == NULL) { SCSpinUnlock(&fb->s); FlowHashCountUpdate; return NULL; } + + /* If we reached the max memcap, try to clean some flows: + * 1- first by normal timeouts + * 2- by emergency mode timeouts + * 3- by last time seen + */ + if (flow_memuse + sizeof(Flow) > flow_config.memcap) { + /* Get the time */ + memset(&ts, 0, sizeof(ts)); + TimeGet(&ts); + + SCLogDebug("We need to prune some flows(1)"); + /* Ok, then try to release flow_try_release flows */ + not_released = FlowPruneFlowsCnt(&ts, flow_config.flow_try_release); + if (not_released == (uint32_t)flow_config.flow_try_release) { + /* This means that none of the flows was released, so try again + * with more agressive timeout values (emergency mode) */ + + if ( !(flow_flags & FLOW_EMERGENCY)) { + SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine " + "running with FLOW_EMERGENCY bit set " + "(ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")", + (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec); + flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ + } + SCLogDebug("We need to prune some flows with emerg bit (2)"); + + not_released = FlowPruneFlowsCnt(&ts, FLOW_DEFAULT_FLOW_PRUNE); + if (not_released == (uint32_t)flow_config.flow_try_release) { + /* Here the engine is on a real stress situation + * Try to kill the last time seen "flow_try_release" flows + * directly, ignoring timeouts */ + SCLogDebug("We need to KILL some flows (3)"); + not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE); + if (not_released == (uint32_t)flow_config.flow_try_release) { + SCSpinUnlock(&fb->s); + FlowHashCountUpdate; + return NULL; + } + } + } + } } /* these are protected by the bucket lock */ f->hnext = NULL; @@ -333,7 +377,7 @@ Flow *FlowGetFlowFromHash (Packet *p) /* got one, now lock, initialize and return */ SCMutexLock(&f->m); FlowInit(f,p); - FlowRequeue(f, NULL, &flow_new_q[f->protomap]); + FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); f->flags |= FLOW_NEW_LIST; f->fb = fb; @@ -368,14 +412,52 @@ Flow *FlowGetFlowFromHash (Packet *p) /* get us a new one and put it and the list tail */ f = pf->hnext = FlowDequeue(&flow_spare_q); if (f == NULL) { - flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ - f = fb->f = FlowAlloc(); + f = fb->f = FlowAllocDirect(); if (f == NULL) { SCSpinUnlock(&fb->s); FlowHashCountUpdate; return NULL; } + + /* If we reached the max memcap, try to clean some flows: + * 1- first by normal timeouts + * 2- by emergency mode timeouts + * 3- by last time seen + */ + if (flow_memuse + sizeof(Flow) > flow_config.memcap) { + /* Get the time */ + memset(&ts, 0, sizeof(ts)); + TimeGet(&ts); + //SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec); + /* Ok, then try to release flow_try_release flows */ + SCLogDebug("We need to prune some flows(1)"); + not_released = FlowPruneFlowsCnt(&ts, flow_config.flow_try_release); + if (not_released == (uint32_t)flow_config.flow_try_release) { + /* This means that none of the flows was released, so try again + * with more agressive timeout values (emergency mode) */ + if ( !(flow_flags & FLOW_EMERGENCY)) { + SCLogWarning(SC_WARN_FLOW_EMERGENCY, "Warning, engine running with FLOW_EMERGENCY bit set (ts.tv_sec: %"PRIuMAX", ts.tv_usec:%"PRIuMAX")", (uintmax_t)ts.tv_sec, (uintmax_t)ts.tv_usec); + flow_flags |= FLOW_EMERGENCY; /* XXX mutex this */ + } + SCLogDebug("We need to prune some flows with emerg bit (2)"); + + not_released = FlowPruneFlowsCnt(&ts, FLOW_DEFAULT_FLOW_PRUNE); + if (not_released == (uint32_t)flow_config.flow_try_release) { + /* Here the engine is on a real stress situation + * Try to kill the last time seen "flow_try_release" flows + * directly, ignoring timeouts */ + SCLogDebug("We need to KILL some flows (3)"); + not_released = FlowKillFlowsCnt(FLOW_DEFAULT_FLOW_PRUNE); + if (not_released == (uint32_t)flow_config.flow_try_release) { + SCSpinUnlock(&fb->s); + FlowHashCountUpdate; + return NULL; + } + } + } + } + } f->hnext = NULL; @@ -384,7 +466,7 @@ Flow *FlowGetFlowFromHash (Packet *p) /* lock, initialize and return */ SCMutexLock(&f->m); FlowInit(f,p); - FlowRequeue(f, NULL, &flow_new_q[f->protomap]); + FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); f->flags |= FLOW_NEW_LIST; f->fb = fb; diff --git a/src/flow-queue.c b/src/flow-queue.c index 71d4e0d1f2..ecd1aaaac2 100644 --- a/src/flow-queue.c +++ b/src/flow-queue.c @@ -108,12 +108,13 @@ Flow *FlowDequeue (FlowQueue *q) { return f; } -void FlowRequeue(Flow *f, FlowQueue *srcq, FlowQueue *dstq) +void FlowRequeue(Flow *f, FlowQueue *srcq, FlowQueue *dstq, uint8_t need_srclock) { if (srcq != NULL) { - SCMutexLock(&srcq->mutex_q); - + if (need_srclock == 1) { + SCMutexLock(&srcq->mutex_q); + } /* remove from old queue */ if (srcq->top == f) srcq->top = f->lnext; /* remove from queue top */ @@ -130,7 +131,7 @@ void FlowRequeue(Flow *f, FlowQueue *srcq, FlowQueue *dstq) f->lprev = NULL; /* don't unlock if src and dst are the same */ - if (srcq != dstq) SCMutexUnlock(&srcq->mutex_q); + if (srcq != dstq && need_srclock == 1) SCMutexUnlock(&srcq->mutex_q); } /* now put it in dst */ @@ -154,3 +155,4 @@ void FlowRequeue(Flow *f, FlowQueue *srcq, FlowQueue *dstq) SCMutexUnlock(&dstq->mutex_q); } + diff --git a/src/flow-queue.h b/src/flow-queue.h index 4b954edf7c..7222809f17 100644 --- a/src/flow-queue.h +++ b/src/flow-queue.h @@ -47,7 +47,7 @@ void FlowQueueDestroy (FlowQueue *); void FlowEnqueue (FlowQueue *, Flow *); Flow *FlowDequeue (FlowQueue *); -void FlowRequeue(Flow *, FlowQueue *, FlowQueue *); +void FlowRequeue(Flow *, FlowQueue *, FlowQueue *, uint8_t); #endif /* __FLOW_QUEUE_H__ */ diff --git a/src/flow-util.c b/src/flow-util.c index 353f9de05d..f8d2d47f54 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -36,6 +36,38 @@ #include "detect.h" #include "detect-engine-state.h" +/* Counter of flows that reached memcap capability */ +uint32_t directflows = 0; + +/* Allocate a flow without checking memcap */ +Flow *FlowAllocDirect(void) +{ + Flow *f; + + SCMutexLock(&flow_memuse_mutex); + /* We allow to alloc even if memcap is reached (but later we + * will clean up some flows */ + f = SCMalloc(sizeof(Flow)); + if (f == NULL) { + SCMutexUnlock(&flow_memuse_mutex); + return NULL; + } + directflows++; + + flow_memuse += sizeof(Flow); + SCMutexUnlock(&flow_memuse_mutex); + FlowPrintQueueInfo(); + + SCMutexInit(&f->m, NULL); + f->lnext = NULL; + f->lprev = NULL; + f->hnext = NULL; + f->hprev = NULL; + + f->flowvar = NULL; + + return f; +} /* Allocate a flow */ Flow *FlowAlloc(void) @@ -67,6 +99,7 @@ Flow *FlowAlloc(void) return f; } + /** free the memory of a flow */ void FlowFree(Flow *f) { diff --git a/src/flow-util.h b/src/flow-util.h index 6b9f8f40b7..b5ba4b098a 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -46,6 +46,7 @@ } Flow *FlowAlloc(void); +Flow *FlowAllocDirect(void); void FlowFree(Flow *); uint8_t FlowGetProtoMapping(uint8_t); void FlowInit(Flow *, Packet *); diff --git a/src/flow.c b/src/flow.c index 5c121f910c..044b279733 100644 --- a/src/flow.c +++ b/src/flow.c @@ -45,6 +45,7 @@ #include "flow-private.h" #include "util-unittest.h" +#include "util-unittest-helper.h" #include "util-byte.h" #include "util-debug.h" @@ -52,6 +53,8 @@ #include "detect.h" #include "detect-engine-state.h" +#define FLOW_DEFAULT_EMERGENCY_RECOVERY 30 +#define FLOW_DEFAULT_FLOW_PRUNE 5 //#define FLOW_DEFAULT_HASHSIZE 262144 #define FLOW_DEFAULT_HASHSIZE 65536 @@ -68,6 +71,8 @@ int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t); static int FlowClearMemory(Flow *,uint8_t ); int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *)); int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *)); +static uint32_t FlowPruneFlowQueueCnt(FlowQueue *, struct timeval *, int); +int FlowKill (FlowQueue *); /* Run mode selected at suricata.c */ extern int run_mode; @@ -84,12 +89,12 @@ void FlowUpdateQueue(Flow *f) /* in the new list -- we consider a flow no longer * new if we have seen at least 2 pkts in both ways. */ if (f->todstpktcnt && f->tosrcpktcnt) { - FlowRequeue(f, &flow_new_q[f->protomap], &flow_est_q[f->protomap]); + FlowRequeue(f, &flow_new_q[f->protomap], &flow_est_q[f->protomap], 1); f->flags |= FLOW_EST_LIST; /* transition */ f->flags &= ~FLOW_NEW_LIST; } else { - FlowRequeue(f, &flow_new_q[f->protomap], &flow_new_q[f->protomap]); + FlowRequeue(f, &flow_new_q[f->protomap], &flow_new_q[f->protomap], 1); } } else if (f->flags & FLOW_EST_LIST) { if (flow_proto[f->protomap].GetProtoState != NULL) { @@ -99,21 +104,21 @@ void FlowUpdateQueue(Flow *f) f->flags &=~ FLOW_EST_LIST; SCLogDebug("flow %p was put into closing queue ts %"PRIuMAX"", f, (uintmax_t)f->lastts.tv_sec); - FlowRequeue(f, &flow_est_q[f->protomap], &flow_close_q[f->protomap]); + FlowRequeue(f, &flow_est_q[f->protomap], &flow_close_q[f->protomap], 1); } else { /* Pull and put back -- this way the flows on * top of the list are least recently used. */ - FlowRequeue(f, &flow_est_q[f->protomap], &flow_est_q[f->protomap]); + FlowRequeue(f, &flow_est_q[f->protomap], &flow_est_q[f->protomap], 1); } } else { /* Pull and put back -- this way the flows on * top of the list are least recently used. */ - FlowRequeue(f, &flow_est_q[f->protomap], &flow_est_q[f->protomap]); + FlowRequeue(f, &flow_est_q[f->protomap], &flow_est_q[f->protomap], 1); } } else if (f->flags & FLOW_CLOSED_LIST){ /* Pull and put back -- this way the flows on * top of the list are least recently used. */ - FlowRequeue(f, &flow_close_q[f->protomap], &flow_close_q[f->protomap]); + FlowRequeue(f, &flow_close_q[f->protomap], &flow_close_q[f->protomap], 1); } } @@ -134,6 +139,7 @@ void FlowUpdateQueue(Flow *f) */ static int FlowPrune (FlowQueue *q, struct timeval *ts) { + SCEnter(); int mr = SCMutexTrylock(&q->mutex_q); if (mr != 0) { SCLogDebug("trylock failed"); @@ -247,7 +253,7 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts) FlowClearMemory (f, f->protomap); /* move to spare list */ - FlowRequeue(f, q, &flow_spare_q); + FlowRequeue(f, q, &flow_spare_q, 1); SCMutexUnlock(&f->m); return 1; @@ -259,10 +265,219 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts) * \param timeout timeout to consider * \retval cnt number of flows that are timed out */ -static uint32_t FlowPruneFlows(FlowQueue *q, struct timeval *ts) +static uint32_t FlowPruneFlowQueue(FlowQueue *q, struct timeval *ts) { + SCEnter(); uint32_t cnt = 0; - while(FlowPrune(q, ts)) { cnt++; } + //while(FlowPrune(q, ts)) { cnt++; } + return cnt; +} + +/** \brief Time out flows on new/estabhlished/close queues for each proto until + * we release cnt flows as max + * Called from the FlowManager + * \param ts current time + * \retval cnt number of flows that are timed out + */ +uint32_t FlowPruneFlowsCnt(struct timeval *ts, int cnt) +{ + SCEnter(); + uint32_t nowcnt = 0; + int i = 0; + + for (; i < FLOW_PROTO_MAX; i++) { + /* prune closing list */ + nowcnt = FlowPruneFlowQueueCnt(&flow_close_q[i], ts, cnt); + if (nowcnt) { + cnt -= nowcnt; + } + if (cnt <= 0) break; + + /* prune new list */ + nowcnt = FlowPruneFlowQueueCnt(&flow_new_q[i], ts, cnt); + if (nowcnt) { + cnt -= nowcnt; + } + if (cnt <= 0) break; + + /* prune established list */ + nowcnt = FlowPruneFlowQueueCnt(&flow_est_q[i], ts, cnt); + if (nowcnt) { + cnt -= nowcnt; + } + if (cnt <= 0) break; + } + return cnt; +} + +/** \brief FlowKillFlowQueueCnt It will try to kill try_cnt count of flows + * It will return the number of flows released, and can be 0 or more. + * \param q flow queue to time out flows from + * \param try_cnt try to prune this number of flows + * \retval cnt number of flows that are timed out + */ +static uint32_t FlowKillFlowQueueCnt(FlowQueue *q, int try_cnt, uint8_t mode) +{ + SCEnter(); + uint32_t cnt = 0; + while (try_cnt--) { + cnt += FlowKill(q); + } + SCLogDebug("EMERGENCY mode, Flows killed: %"PRIu32, cnt); + return cnt; +} + +/** FlowKill + * + * Inspect the top flows (last recently used) from the queue + * and see if we can prune any it (this is if it's not in use). + * + * Use trylock here so prevent us from blocking the packet handling. + * + * \param q flow queue to prune + * + * \retval 0 on error, failed block, nothing to prune + * \retval 1 on successfully pruned one + */ +int FlowKill (FlowQueue *q) +{ + SCEnter(); + int mr = SCMutexTrylock(&q->mutex_q); + + if (mr != 0) { + SCLogDebug("trylock failed"); + if (mr == EBUSY) + SCLogDebug("was locked"); + if (mr == EINVAL) + SCLogDebug("bad mutex value"); + return 0; + } + + Flow *f = q->top; + + /* This means that the queue is empty */ + if (f == NULL) { + SCMutexUnlock(&q->mutex_q); + SCLogDebug("top is null"); + return 0; + } + + do { + + if (SCMutexTrylock(&f->m) != 0) { + f = f->lnext; + /* Skip to the next */ + continue; + } + + if (SCSpinTrylock(&f->fb->s) != 0) { + SCMutexUnlock(&f->m); + f = f->lnext; + continue; + } + + /** never prune a flow that is used by a packet or stream msg + * we are currently processing in one of the threads */ + if (f->use_cnt > 0) { + SCSpinUnlock(&f->fb->s); + SCMutexUnlock(&f->m); + f = f->lnext; + continue; + } + + /* remove from the hash */ + if (f->hprev) + f->hprev->hnext = f->hnext; + if (f->hnext) + f->hnext->hprev = f->hprev; + if (f->fb->f == f) + f->fb->f = f->hnext; + + f->hnext = NULL; + f->hprev = NULL; + + SCSpinUnlock(&f->fb->s); + f->fb = NULL; + + FlowClearMemory (f, f->protomap); + + /* move to spare list */ + FlowRequeue(f, q, &flow_spare_q, 0); + + SCMutexUnlock(&f->m); + + /* so.. we did it */ + /* unlock list */ + SCMutexUnlock(&q->mutex_q); + return 1; + break; + + } while (f != NULL); + + /* If we reach this point, then we didn't prune any */ + /* unlock list */ + SCMutexUnlock(&q->mutex_q); + + return 0; +} + + +/** \brief Try to kill cnt flows by last recently seen activity on new/estabhlished/close queues for each proto until + * we release cnt flows as max. Called only on emergency mode. + * \param cnt number of flows to release + * \retval cnt number of flows that are not killed (so 0 if we prune all of them) + */ +uint32_t FlowKillFlowsCnt(int cnt) +{ + SCEnter(); + uint32_t nowcnt = 0; + int i = 0; + + /* Inspect the top of each protocol to select the last recently used */ + for (; i < FLOW_PROTO_MAX; i++) { + /* prune closing list */ + nowcnt = FlowKillFlowQueueCnt(&flow_close_q[i], cnt, 0); + if (nowcnt) { + cnt -= nowcnt; + } + if (cnt <= 0) break; + + /* prune new list */ + nowcnt = FlowKillFlowQueueCnt(&flow_new_q[i], cnt, 0); + if (nowcnt) { + cnt -= nowcnt; + } + if (cnt <= 0) break; + + /* prune established list */ + nowcnt = FlowKillFlowQueueCnt(&flow_est_q[i], cnt, 0); + if (nowcnt) { + cnt -= nowcnt; + } + if (cnt <= 0) break; + } + + return cnt; +} + +/** \brief Time out flows will try to prune try_cnt count of flows + * It will return the number of flows released, and can be 0 or more. + * A more agressive aproach is calling this function with the emergency + * bit set (and there will be another even more agressive, killing + * flows without the criteria of time outs) + * \param q flow queue to time out flows from + * \param ts current time + * \param timeout timeout to consider + * \param try_cnt try to prune this number of flows if they are timed out + * \retval cnt number of flows that are timed out + */ +static uint32_t FlowPruneFlowQueueCnt(FlowQueue *q, struct timeval *ts, int try_cnt) +{ + SCEnter(); + uint32_t cnt = 0; + while (try_cnt--) { + cnt += FlowPrune(q, ts); + } return cnt; } @@ -275,6 +490,7 @@ static uint32_t FlowPruneFlows(FlowQueue *q, struct timeval *ts) * \retval 0 otherwise. */ static int FlowUpdateSpareFlows(void) { + SCEnter(); uint32_t toalloc = 0, tofree = 0, len; SCMutexLock(&flow_spare_q.mutex_q); @@ -499,6 +715,25 @@ void FlowInitConfig (char quiet) /* If we have specific config, overwrite the defaults with them, * otherwise, leave the default values */ + intmax_t val = 0; + if (ConfGetInt("flow.emergency_recovery", &val) == 1) { + if (val <= 100 && val >= 1) { + flow_config.emergency_recovery = (uint8_t)val; + } else { + SCLogError(SC_ERR_INVALID_VALUE, "flow.emergency_recovery must be in the range of 1 and 100 (as percentage)"); + flow_config.emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY; + } + } else { + SCLogDebug("flow.emergency_recovery, using default value"); + flow_config.emergency_recovery = FLOW_DEFAULT_EMERGENCY_RECOVERY; + } + + if (ConfGetInt("flow.prune_flows", &val) == 1) { + flow_config.flow_try_release = (uint8_t)val; + } else { + SCLogDebug("flow.flow.prune_flows, using default value"); + flow_config.flow_try_release = FLOW_DEFAULT_FLOW_PRUNE; + } /* Check if we have memcap and hash_size defined at config */ char *conf_val; @@ -727,21 +962,21 @@ void *FlowManagerThread(void *td) int i; for (i = 0; i < FLOW_PROTO_MAX; i++) { /* prune closing list */ - nowcnt = FlowPruneFlows(&flow_close_q[i], &ts); + nowcnt = FlowPruneFlowQueue(&flow_close_q[i], &ts); if (nowcnt) { SCLogDebug("Pruned %" PRIu32 " closing flows...", nowcnt); closing_cnt += nowcnt; } /* prune new list */ - nowcnt = FlowPruneFlows(&flow_new_q[i], &ts); + nowcnt = FlowPruneFlowQueue(&flow_new_q[i], &ts); if (nowcnt) { SCLogDebug("Pruned %" PRIu32 " new flows...", nowcnt); new_cnt += nowcnt; } /* prune established list */ - nowcnt = FlowPruneFlows(&flow_est_q[i], &ts); + nowcnt = FlowPruneFlowQueue(&flow_est_q[i], &ts); if (nowcnt) { SCLogDebug("Pruned %" PRIu32 " established flows...", nowcnt); established_cnt += nowcnt; @@ -751,11 +986,30 @@ void *FlowManagerThread(void *td) sleeping = 0; /* Don't fear, FlowManagerThread is here... - * clear emergency bit. */ + * clear emergency bit if we have at least xx flows pruned. */ if (emerg == TRUE) { - flow_flags &= ~FLOW_EMERGENCY; - emerg = FALSE; - SCLogDebug("Flow emergency mode over, back to normal..."); + uint32_t len = 0; + + SCMutexLock(&flow_spare_q.mutex_q); + + len = flow_spare_q.len; + + SCMutexUnlock(&flow_spare_q.mutex_q); + + SCLogDebug("flow_sparse_q.len = %"PRIu32" prealloc: %"PRIu32 + "flow_spare_q status: %"PRIu32"%% flows at the queue", + len, flow_config.prealloc, len * 100 / flow_config.prealloc); + /* only if we have pruned this "emergency_recovery" percentage + * of flows, we will unset the emergency bit */ + if (len * 100 / flow_config.prealloc > flow_config.emergency_recovery) { + flow_flags &= ~FLOW_EMERGENCY; + emerg = FALSE; + SCLogInfo("Flow emergency mode over, back to normal... unsetting" + " FLOW_EMERGENCY bit (ts.tv_sec: %"PRIuMAX", " + "ts.tv_usec:%"PRIuMAX") flow_spare_q status(): %"PRIu32 + "%% flows at the queue", (uintmax_t)ts.tv_sec, + (uintmax_t)ts.tv_usec, len * 100 / flow_config.prealloc); + } } } @@ -1495,6 +1749,145 @@ static int FlowTest06 (void) { return 1; } + +/** + * \test Test flow allocations when it reach memcap + * + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowTest07 (void) { + + int result = 0; + + FlowInitConfig(FLOW_QUIET); + FlowConfig backup; + memcpy(&backup, &flow_config, sizeof(FlowConfig)); + + uint32_t ini = 0; + uint32_t end = flow_spare_q.len; + flow_config.memcap = 10000; + flow_config.prealloc = 100; + + /* Let's get the flow_spare_q empty */ + UTHBuildPacketOfFlows(ini, end, 0); + + /* And now let's try to reach the memcap val */ + while (flow_memuse + sizeof(Flow) < flow_config.memcap) { + ini = end + 1; + end = end + 2; + UTHBuildPacketOfFlows(ini, end, 0); + } + + /* should time out normal */ + TimeSetIncrementTime(2000); + ini = end + 1; + end = end + 2;; + UTHBuildPacketOfFlows(ini, end, 0); + + /* This means that the engine released 5 flows by normal timeout */ + if (flow_spare_q.len == 5) + result = 1; + + memcpy(&flow_config, &backup, sizeof(FlowConfig)); + FlowShutdown(); + + return result; +} + +/** + * \test Test flow allocations when it reach memcap + * + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowTest08 (void) { + + int result = 0; + + FlowInitConfig(FLOW_QUIET); + FlowConfig backup; + memcpy(&backup, &flow_config, sizeof(FlowConfig)); + + uint32_t ini = 0; + uint32_t end = flow_spare_q.len; + flow_config.memcap = 10000; + flow_config.prealloc = 100; + + /* Let's get the flow_spare_q empty */ + UTHBuildPacketOfFlows(ini, end, 0); + + /* And now let's try to reach the memcap val */ + while (flow_memuse + sizeof(Flow) < flow_config.memcap) { + ini = end + 1; + end = end + 2; + UTHBuildPacketOfFlows(ini, end, 0); + } + + /* By default we use 30 for timing out new flows. This means + * that the Emergency mode should be set */ + TimeSetIncrementTime(20); + ini = end + 1; + end = end + 2; + UTHBuildPacketOfFlows(ini, end, 0); + + /* This means that the engine released 5 flows by emergency timeout */ + if (flow_spare_q.len == 5 && (flow_flags & FLOW_EMERGENCY)) + result = 1; + + memcpy(&flow_config, &backup, sizeof(FlowConfig)); + FlowShutdown(); + + return result; +} + +/** + * \test Test flow allocations when it reach memcap + * + * + * \retval On success it returns 1 and on failure 0. + */ + +static int FlowTest09 (void) { + + int result = 0; + + FlowInitConfig(FLOW_QUIET); + FlowConfig backup; + memcpy(&backup, &flow_config, sizeof(FlowConfig)); + + uint32_t ini = 0; + uint32_t end = flow_spare_q.len; + flow_config.memcap = 10000; + flow_config.prealloc = 100; + + /* Let's get the flow_spare_q empty */ + UTHBuildPacketOfFlows(ini, end, 0); + + /* And now let's try to reach the memcap val */ + while (flow_memuse + sizeof(Flow) < flow_config.memcap) { + ini = end + 1; + end = end + 2; + UTHBuildPacketOfFlows(ini, end, 0); + } + + /* No timeout will work */ + TimeSetIncrementTime(5); + ini = end + 1; + end = end + 2; + UTHBuildPacketOfFlows(ini, end, 0); + + /* This means that the engine release 5 flows by killing them */ + if (flow_spare_q.len == 5 && (flow_flags & FLOW_EMERGENCY)) + result = 1; + + memcpy(&flow_config, &backup, sizeof(FlowConfig)); + FlowShutdown(); + + return result; +} #endif /* UNITTESTS */ /** @@ -1508,5 +1901,8 @@ void FlowRegisterTests (void) { UtRegisterTest("FlowTest04 -- Timeout a flow having TcpSession with segments", FlowTest04, 1); UtRegisterTest("FlowTest05 -- Timeout a flow in emergency having fresh TcpSession", FlowTest05, 1); UtRegisterTest("FlowTest06 -- Timeout a flow in emergency having TcpSession with segments", FlowTest06, 1); + UtRegisterTest("FlowTest07 -- Test flow Allocations when it reach memcap", FlowTest07, 1); + UtRegisterTest("FlowTest08 -- Test flow Allocations when it reach memcap", FlowTest08, 1); + UtRegisterTest("FlowTest09 -- Test flow Allocations when it reach memcap", FlowTest09, 1); #endif /* UNITTESTS */ } diff --git a/src/flow.h b/src/flow.h index fbe21866de..322c89fd12 100644 --- a/src/flow.h +++ b/src/flow.h @@ -83,6 +83,8 @@ typedef struct FlowCnf_ uint32_t emerg_timeout_new; uint32_t emerg_timeout_est; + uint32_t flow_try_release; + uint32_t emergency_recovery; } FlowConfig; @@ -172,6 +174,8 @@ void FlowPrintQueueInfo (void); void FlowShutdown(void); void FlowSetIPOnlyFlag(Flow *, char); void FlowDecrUsecnt(ThreadVars *, Packet *); +uint32_t FlowPruneFlowsCnt(struct timeval *, int); +uint32_t FlowKillFlowsCnt(int); void *FlowManagerThread(void *td); diff --git a/src/util-error.c b/src/util-error.c index bc451a5285..a987762bd8 100644 --- a/src/util-error.c +++ b/src/util-error.c @@ -179,6 +179,7 @@ const char * SCErrorToString(SCError err) CASE_CODE (SC_ERR_CHANGING_CAPS_FAILED); CASE_CODE (SC_ERR_LIBCAP_NG_REQUIRED); CASE_CODE (SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG); + CASE_CODE (SC_WARN_FLOW_EMERGENCY); default: return "UNKNOWN_ERROR"; diff --git a/src/util-error.h b/src/util-error.h index cb332a9397..19765cae72 100644 --- a/src/util-error.h +++ b/src/util-error.h @@ -187,6 +187,7 @@ typedef enum { SC_ERR_CHANGING_CAPS_FAILED, SC_ERR_LIBCAP_NG_REQUIRED, SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG, + SC_WARN_FLOW_EMERGENCY, SC_ERR_FATAL, } SCError; diff --git a/src/util-unittest-helper.c b/src/util-unittest-helper.c index 84d19de9cd..aef38d0f23 100644 --- a/src/util-unittest-helper.c +++ b/src/util-unittest-helper.c @@ -29,11 +29,14 @@ #include "detect.h" #include "detect-parse.h" #include "util-debug.h" +#include "util-time.h" #include "util-error.h" #include "util-unittest.h" #include "util-unittest-helper.h" #include #include "detect-engine.h" +#include "flow-private.h" +#include "flow-util.h" /** * \brief UTHBuildPacketReal is a function that create tcp/udp packets for unittests @@ -59,6 +62,8 @@ Packet *UTHBuildPacketIPV6Real(uint8_t *payload, uint16_t payload_len, return NULL; memset(p, 0, sizeof(Packet)); + TimeSet(&p->ts); + p->src.family = AF_INET6; p->dst.family = AF_INET6; p->payload = payload; @@ -82,9 +87,11 @@ Packet *UTHBuildPacketIPV6Real(uint8_t *payload, uint16_t payload_len, p->ip6h = SCMalloc(sizeof(IPV6Hdr)); if (p->ip6h == NULL) return NULL; + memset(p->ip6h, 0, sizeof(IPV6Hdr)); p->tcph = SCMalloc(sizeof(TCPHdr)); if (p->tcph == NULL) return NULL; + memset(p->tcph, 0, sizeof(TCPHdr)); p->tcph->th_sport = sport; p->tcph->th_dport = dport; return p; @@ -114,6 +121,10 @@ Packet *UTHBuildPacketReal(uint8_t *payload, uint16_t payload_len, return NULL; memset(p, 0, sizeof(Packet)); + struct timeval tv; + TimeGet(&tv); + COPY_TIMESTAMP(&tv, &p->ts); + p->src.family = AF_INET; p->dst.family = AF_INET; p->payload = payload; @@ -128,31 +139,33 @@ Packet *UTHBuildPacketReal(uint8_t *payload, uint16_t payload_len, p->dst.addr_data32[0] = in.s_addr; p->dp = dport; + p->ip4h = SCMalloc(sizeof(IPV4Hdr)); + if (p->ip4h == NULL) + return NULL; + memset(p->ip4h, 0, sizeof(IPV4Hdr)); + + p->ip4h->ip_src.s_addr = p->src.addr_data32[0]; + p->ip4h->ip_dst.s_addr = p->dst.addr_data32[0]; + switch (ipproto) { case IPPROTO_UDP: - p->ip4h = SCMalloc(sizeof(IPV4Hdr)); - if (p->ip4h == NULL) - return NULL; p->udph = SCMalloc(sizeof(UDPHdr)); if (p->udph == NULL) return NULL; + memset(p->udph, 0, sizeof(UDPHdr)); p->udph->uh_sport = sport; p->udph->uh_dport = dport; break; case IPPROTO_TCP: - p->ip4h = SCMalloc(sizeof(IPV4Hdr)); - if (p->ip4h == NULL) - return NULL; p->tcph = SCMalloc(sizeof(TCPHdr)); if (p->tcph == NULL) return NULL; + memset(p->tcph, 0, sizeof(TCPHdr)); p->tcph->th_sport = sport; p->tcph->th_dport = dport; break; case IPPROTO_ICMP: - p->ip4h = SCMalloc(sizeof(IPV4Hdr)); - if (p->ip4h == NULL) - return NULL; + default: break; /* TODO: Add more protocols */ } @@ -675,6 +688,28 @@ end: return result; } +uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) { + uint32_t i = start; + uint8_t payload[] = "Payload"; + for (; i < end; i++) { + Packet *p = UTHBuildPacket(payload, sizeof(payload), IPPROTO_TCP); + if (dir == 0) { + p->src.addr_data32[0] = i; + p->dst.addr_data32[0] = i + 1; + } else { + p->src.addr_data32[0] = i + 1; + p->dst.addr_data32[0] = i; + } + FlowHandlePacket(NULL, p); + p->flow->use_cnt = 0; + + /* Now the queues shoul be updated */ + UTHFreePacket(p); + } + + return i; +} + #ifdef UNITTESTS /** @@ -781,6 +816,27 @@ int UTHBuildPacketTest02(void) { return ret; } +/** + * \brief UTHBuildPacketOfFlowsTest01 wrapper to check packets for unittests + */ +int UTHBuildPacketOfFlowsTest01(void) { + int result = 0; + + FlowInitConfig(FLOW_QUIET); + uint32_t flow_spare_q_len = flow_spare_q.len; + + UTHBuildPacketOfFlows(0, 100, 0); + + if (flow_spare_q.len != flow_spare_q_len - 100) + result = 0; + else + result = 1; + FlowShutdown(); + + return result; +} + + /** * \brief UTHBuildPacketSrcDstTest01 wrapper to check packets for unittests */ @@ -853,6 +909,7 @@ void UTHRegisterTests(void) { UtRegisterTest("UTHBuildPacketSrcDstTest02", UTHBuildPacketSrcDstTest02, 1); UtRegisterTest("UTHBuildPacketSrcDstPortsTest01", UTHBuildPacketSrcDstPortsTest01, 1); UtRegisterTest("UTHBuildPacketSrcDstPortsTest02", UTHBuildPacketSrcDstPortsTest02, 1); + UtRegisterTest("UTHBuildPacketOfFlowsTest01", UTHBuildPacketOfFlowsTest01, 1); #endif /* UNITTESTS */ } diff --git a/src/util-unittest-helper.h b/src/util-unittest-helper.h index 060dd18fe3..12edb31132 100644 --- a/src/util-unittest-helper.h +++ b/src/util-unittest-helper.h @@ -47,6 +47,8 @@ int UTHCheckPacketMatchResults(Packet *, uint32_t *, uint32_t *, int); int UTHMatchPacketsWithResults(DetectEngineCtx *, Packet **, int, uint32_t *, uint32_t *, int); int UTHGenericTest(Packet **, int, char **, uint32_t *, uint32_t *, int); +uint32_t UTHBuildPacketOfFlows(uint32_t, uint32_t, uint8_t); + void UTHRegisterTests(void); #endif /* __UTIL_UNITTEST_HELPER__ */