From c26434fef12b6850762e35d0155ed90a56472ebb Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Sun, 6 Jun 2010 17:38:26 +0200 Subject: [PATCH] Move flow use cnt to atomic and outside of the flow mutex protection. --- src/app-layer.c | 2 +- src/detect-engine-state.c | 2 +- src/detect.c | 6 ++- src/flow-hash.c | 32 ++++++++-------- src/flow-util.c | 30 ++++++++------- src/flow-util.h | 74 +++++++++++++++++++++++++++---------- src/flow.c | 48 ++++++++++++------------ src/flow.h | 15 ++++++-- src/stream-tcp-reassemble.c | 5 +-- src/tmqh-packetpool.c | 2 +- src/util-atomic.h | 25 +++++++++++-- src/util-unittest-helper.c | 2 +- 12 files changed, 155 insertions(+), 88 deletions(-) diff --git a/src/app-layer.c b/src/app-layer.c index 90f90a0d16..09ada9b14b 100644 --- a/src/app-layer.c +++ b/src/app-layer.c @@ -180,7 +180,7 @@ int AppLayerHandleMsg(AlpProtoDetectThreadCtx *dp_ctx, StreamMsg *smsg) } /* flow is free again */ - smsg->flow->use_cnt--; + FlowDecrUsecnt(smsg->flow); /* return the used message to the queue */ StreamMsgReturnToPool(smsg); diff --git a/src/detect-engine-state.c b/src/detect-engine-state.c index f5cca92f6a..76e8d456c0 100644 --- a/src/detect-engine-state.c +++ b/src/detect-engine-state.c @@ -853,7 +853,7 @@ static int DeStateTest04(void) { result = 1; end: - CLEAR_FLOW(&f); + FLOW_DESTROY(&f); if (det_ctx != NULL) { DetectEngineThreadCtxDeinit(&th_v, (void *)det_ctx); diff --git a/src/detect.c b/src/detect.c index 61d5947d91..22d79e3e28 100644 --- a/src/detect.c +++ b/src/detect.c @@ -514,8 +514,9 @@ int SigMatchSignatures(ThreadVars *th_v, DetectEngineCtx *de_ctx, DetectEngineTh /* grab the protocol state we will detect on */ if (p->flow != NULL) { + FlowIncrUsecnt(p->flow); + SCMutexLock(&p->flow->m); - p->flow->use_cnt++; alstate = AppLayerGetProtoStateFromPacket(p); alproto = AppLayerGetProtoFromPacket(p); if (p->flowflags & FLOW_PKT_TOSERVER && p->flow->flags & FLOW_SGH_TOSERVER) { @@ -853,8 +854,9 @@ end: p->flow->sgh_toclient = det_ctx->sgh; p->flow->flags |= FLOW_SGH_TOCLIENT; } - p->flow->use_cnt--; SCMutexUnlock(&p->flow->m); + + FlowDecrUsecnt(p->flow); } SCReturnInt(fmatch); diff --git a/src/flow-hash.c b/src/flow-hash.c index 54612e3114..99ab81f36a 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -28,7 +28,7 @@ #include "threads.h" #include "decode.h" -#include "debug.h" +#include "detect-engine-state.h" #include "flow.h" #include "flow-hash.h" @@ -345,11 +345,16 @@ static Flow *FlowGetNew(Packet *p) { return NULL; } - /* f is locked */ + /* flow is initialized but *unlocked* */ } else { - SCMutexLock(&f->m); + FLOW_RECYCLE(f); + + /* flow is initialized (recylced) but *unlocked* */ } + FlowIncrUsecnt(f); + + SCMutexLock(&f->m); return f; } @@ -391,16 +396,13 @@ Flow *FlowGetFlowFromHash (Packet *p) /* flow is locked */ - /* these are protected by the bucket lock */ - f->hnext = NULL; - f->hprev = NULL; - /* got one, now lock, initialize and return */ FlowInit(f,p); - FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); f->flags |= FLOW_NEW_LIST; f->fb = fb; + FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); + SCSpinUnlock(&fb->s); FlowHashCountUpdate; return f; @@ -429,16 +431,16 @@ Flow *FlowGetFlowFromHash (Packet *p) /* flow is locked */ - f->hnext = NULL; f->hprev = pf; /* initialize and return */ FlowInit(f,p); - FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); f->flags |= FLOW_NEW_LIST; f->fb = fb; + FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1); + SCSpinUnlock(&fb->s); FlowHashCountUpdate; return f; @@ -455,7 +457,8 @@ Flow *FlowGetFlowFromHash (Packet *p) fb->f->hprev = f; fb->f = f; - /* found our flow */ + /* found our flow, lock & return */ + FlowIncrUsecnt(f); SCMutexLock(&f->m); SCSpinUnlock(&fb->s); FlowHashCountUpdate; @@ -464,10 +467,9 @@ Flow *FlowGetFlowFromHash (Packet *p) } } - if (f != NULL) { - SCMutexLock(&f->m); - } - + /* lock & return */ + FlowIncrUsecnt(f); + SCMutexLock(&f->m); SCSpinUnlock(&fb->s); FlowHashCountUpdate; return f; diff --git a/src/flow-util.c b/src/flow-util.c index 0099b6c9e2..2d50e1ea1c 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -37,7 +37,13 @@ #include "detect.h" #include "detect-engine-state.h" -/* Allocate a flow */ +/** \brief allocate a flow + * + * We check against the memuse counter. If it passes that check we increment + * the counter first, then we try to alloc. + * + * \retval f the flow or NULL on out of memory + */ Flow *FlowAlloc(void) { Flow *f; @@ -46,27 +52,29 @@ Flow *FlowAlloc(void) return NULL; } + SC_ATOMIC_ADD(flow_memuse, sizeof(Flow)); + f = SCMalloc(sizeof(Flow)); if (f == NULL) { + SC_ATOMIC_SUB(flow_memuse, sizeof(Flow)); return NULL; } - SC_ATOMIC_ADD(flow_memuse, sizeof(Flow)); - SCMutexInit(&f->m, NULL); - f->lnext = NULL; - f->lprev = NULL; - f->hnext = NULL; - f->hprev = NULL; + FLOW_INITIALIZE(f); return f; } -/** free the memory of a flow */ +/** + * \brief cleanup & free the memory of a flow + * + * \param f flow to clear & destroy + */ void FlowFree(Flow *f) { - SCMutexDestroy(&f->m); + FLOW_DESTROY(f); SCFree(f); SC_ATOMIC_SUB(flow_memuse, sizeof(Flow)); @@ -99,10 +107,6 @@ void FlowInit(Flow *f, Packet *p) SCEnter(); SCLogDebug("flow %p", f); - f->flowvar = NULL; - f->de_state = NULL; - CLEAR_FLOW(f); - f->proto = p->proto; f->recursion_level = p->recursion_level; diff --git a/src/flow-util.h b/src/flow-util.h index 219227e570..7eea4d20c9 100644 --- a/src/flow-util.h +++ b/src/flow-util.h @@ -26,26 +26,60 @@ #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) -/* only clear the parts that won't be overwritten - * in FlowInit anyway */ -#define CLEAR_FLOW(f) { \ - (f)->sp = 0; \ - (f)->dp = 0; \ - (f)->flags = 0; \ - (f)->todstpktcnt = 0; \ - (f)->tosrcpktcnt = 0; \ - (f)->bytecnt = 0; \ - (f)->lastts.tv_sec = 0; \ - (f)->lastts.tv_usec = 0; \ - GenericVarFree((f)->flowvar); \ - (f)->flowvar = NULL; \ - (f)->protoctx = NULL; \ - (f)->use_cnt = 0; \ - DetectEngineStateFree((f)->de_state); \ - (f)->de_state = NULL; \ - (f)->sgh_toserver = NULL; \ - (f)->sgh_toclient = NULL; \ -} +#define FLOW_INITIALIZE(f) do { \ + SCMutexInit(&(f)->m, NULL); \ + (f)->lnext = NULL; \ + (f)->lprev = NULL; \ + (f)->hnext = NULL; \ + (f)->hprev = NULL; \ + (f)->sp = 0; \ + (f)->dp = 0; \ + (f)->flags = 0; \ + (f)->todstpktcnt = 0; \ + (f)->tosrcpktcnt = 0; \ + (f)->bytecnt = 0; \ + (f)->lastts.tv_sec = 0; \ + (f)->lastts.tv_usec = 0; \ + (f)->flowvar = NULL; \ + (f)->protoctx = NULL; \ + SC_ATOMIC_INIT((f)->use_cnt); \ + (f)->de_state = NULL; \ + (f)->sgh_toserver = NULL; \ + (f)->sgh_toclient = NULL; \ + } while (0) + +#define FLOW_RECYCLE(f) do { \ + (f)->lnext = NULL; \ + (f)->lprev = NULL; \ + (f)->hnext = NULL; \ + (f)->hprev = NULL; \ + (f)->sp = 0; \ + (f)->dp = 0; \ + (f)->flags = 0; \ + (f)->todstpktcnt = 0; \ + (f)->tosrcpktcnt = 0; \ + (f)->bytecnt = 0; \ + (f)->lastts.tv_sec = 0; \ + (f)->lastts.tv_usec = 0; \ + GenericVarFree((f)->flowvar); \ + (f)->flowvar = NULL; \ + (f)->protoctx = NULL; \ + SC_ATOMIC_RESET((f)->use_cnt); \ + DetectEngineStateFree((f)->de_state); \ + (f)->de_state = NULL; \ + (f)->sgh_toserver = NULL; \ + (f)->sgh_toclient = NULL; \ + } while(0) + +#define FLOW_DESTROY(f) do { \ + SCMutexDestroy(&(f)->m); \ + GenericVarFree((f)->flowvar); \ + (f)->flowvar = NULL; \ + (f)->protoctx = NULL; \ + SC_ATOMIC_DESTROY((f)->use_cnt); \ + DetectEngineStateFree((f)->de_state); \ + (f)->de_state = NULL; \ + } while(0) Flow *FlowAlloc(void); Flow *FlowAllocDirect(void); diff --git a/src/flow.c b/src/flow.c index 4b0b90df37..4a711afc71 100644 --- a/src/flow.c +++ b/src/flow.c @@ -228,14 +228,17 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts) /** never prune a flow that is used by a packet or stream msg * we are currently processing in one of the threads */ - if (f->use_cnt > 0) { - SCLogDebug("timed out but use_cnt > 0: %"PRIu16", %p, proto %"PRIu8"", f->use_cnt, f, f->proto); + if (SC_ATOMIC_GET(f->use_cnt) > 0) { + SCLogDebug("timed out but use_cnt > 0: %"PRIu16", %p, proto %"PRIu8"", SC_ATOMIC_GET(f->use_cnt), f, f->proto); SCSpinUnlock(&f->fb->s); SCMutexUnlock(&f->m); SCLogDebug("it is in one of the threads"); return 0; } + /* this should not be possible */ + BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0); + /* remove from the hash */ if (f->hprev) f->hprev->hnext = f->hnext; @@ -378,7 +381,7 @@ int FlowKill (FlowQueue *q) /** never prune a flow that is used by a packet or stream msg * we are currently processing in one of the threads */ - if (f->use_cnt > 0) { + if (SC_ATOMIC_GET(f->use_cnt) > 0) { SCSpinUnlock(&f->fb->s); SCMutexUnlock(&f->m); f = f->lnext; @@ -549,30 +552,27 @@ void FlowSetIPOnlyFlagNoLock(Flow *f, char direction) { direction ? (f->flags |= FLOW_TOSERVER_IPONLY_SET) : (f->flags |= FLOW_TOCLIENT_IPONLY_SET); } -/** \brief increase the use cnt of a flow - * \param tv thread vars (\todo unused?) - * \param p packet with flow to decrease use cnt for +/** + * \brief increase the use cnt of a flow + * + * \param f flow to decrease use cnt for */ -void FlowIncrUsecnt(ThreadVars *tv, Packet *p) { - if (p == NULL || p->flow == NULL) +void FlowIncrUsecnt(Flow *f) { + if (f == NULL) return; - SCMutexLock(&p->flow->m); - p->flow->use_cnt++; - SCMutexUnlock(&p->flow->m); + SC_ATOMIC_ADD(f->use_cnt, 1); } -/** \brief decrease the use cnt of a flow - * \param tv thread vars (\todo unused?) - * \param p packet with flow to decrease use cnt for +/** + * \brief decrease the use cnt of a flow + * + * \param f flow to decrease use cnt for */ -void FlowDecrUsecnt(ThreadVars *tv, Packet *p) { - if (p == NULL || p->flow == NULL) +void FlowDecrUsecnt(Flow *f) { + if (f == NULL) return; - SCMutexLock(&p->flow->m); - if (p->flow->use_cnt > 0) - p->flow->use_cnt--; - SCMutexUnlock(&p->flow->m); + SC_ATOMIC_SUB(f->use_cnt, 1); } #define TOSERVER 0 @@ -645,8 +645,6 @@ void FlowHandlePacket (ThreadVars *tv, Packet *p) if (f == NULL) return; - f->use_cnt++; - /* update the last seen timestamp of this flow */ COPY_TIMESTAMP(&p->ts, &f->lastts); @@ -1308,7 +1306,7 @@ static int FlowClearMemory(Flow* f, uint8_t proto_map) { flow_proto[proto_map].Freefunc(f->protoctx); } - CLEAR_FLOW(f); + FLOW_DESTROY(f); SCReturnInt(1); } @@ -1553,6 +1551,7 @@ static int FlowTestPrune(Flow *f, struct timeval *ts) { goto error; } + SCLogDebug("calling FlowPrune"); FlowPrune(q, ts); if (q->len != 0) { printf("Failed in prunning the flow: "); @@ -1593,7 +1592,8 @@ static int FlowTest03 (void) { memset(&fb, 0, sizeof(FlowBucket)); SCSpinInit(&fb.s, 0); - SCMutexInit(&f.m, NULL); + + FLOW_INITIALIZE(&f); TimeGet(&ts); f.lastts.tv_sec = ts.tv_sec - 5000; diff --git a/src/flow.h b/src/flow.h index a710e5b009..bac0919a85 100644 --- a/src/flow.h +++ b/src/flow.h @@ -26,6 +26,7 @@ #include "decode.h" #include "util-var.h" +#include "util-atomic.h" #define FLOW_QUIET TRUE #define FLOW_VERBOSE FALSE @@ -159,8 +160,13 @@ typedef struct Flow_ /** protocol specific data pointer, e.g. for TcpSession */ void *protoctx; - /** how many pkts and stream msgs are using the flow *right now* */ - uint16_t use_cnt; + /** how many pkts and stream msgs are using the flow *right now*. This + * variable is atomic so not protected by the Flow mutex "m". + * + * On receiving a packet the counter is incremented while the flow + * bucked is locked, which is also the case on timeout pruning. + */ + SC_ATOMIC_DECLARE(unsigned short, use_cnt); /** detection engine state */ struct DetectEngineState_ *de_state; @@ -208,7 +214,10 @@ void FlowPrintQueueInfo (void); void FlowShutdown(void); void FlowSetIPOnlyFlag(Flow *, char); void FlowSetIPOnlyFlagNoLock(Flow *, char); -void FlowDecrUsecnt(ThreadVars *, Packet *); + +void FlowIncrUsecnt(Flow *); +void FlowDecrUsecnt(Flow *); + uint32_t FlowPruneFlowsCnt(struct timeval *, int); uint32_t FlowKillFlowsCnt(int); diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index 480e0b9d8f..620dd0b144 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -1295,9 +1295,8 @@ static void StreamTcpSetupMsg(TcpSession *ssn, TcpStream *stream, Packet *p, smsg->data.data_len = 0; smsg->flow = p->flow; - if (smsg->flow != NULL) { - smsg->flow->use_cnt++; - } + + FlowIncrUsecnt(smsg->flow); SCLogDebug("smsg %p", smsg); SCReturn; diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index 765e0ff328..aecbb1d41c 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -133,7 +133,7 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) SCLogDebug("tunnel stuff done, move on (proot %d)", proot); } - FlowDecrUsecnt(t,p); + FlowDecrUsecnt(p->flow); /* we're done with the tunnel root now as well */ if (proot == 1) { diff --git a/src/util-atomic.h b/src/util-atomic.h index 1f8b44e17a..ea14b9a877 100644 --- a/src/util-atomic.h +++ b/src/util-atomic.h @@ -67,14 +67,25 @@ * \brief Initialize the previously declared atomic variable and it's * lock. */ -#define SC_ATOMIC_INIT(name) \ - SCSpinInit(&(name ## _sc_lock__), 0) +#define SC_ATOMIC_INIT(name) do { \ + SCSpinInit(&(name ## _sc_lock__), 0); \ + (name ## _sc_atomic__) = 0; \ + } while(0) + +/** + * \brief Initialize the previously declared atomic variable and it's + * lock. + */ +#define SC_ATOMIC_RESET(name) do { \ + (name ## _sc_atomic__) = 0; \ + } while(0) /** * \brief Destroy the lock used to protect this variable */ -#define SC_ATOMIC_DESTROY(name) \ - SCSpinDestroy(&(name ## _sc_lock__)) +#define SC_ATOMIC_DESTROY(name) do { \ + SCSpinDestroy(&(name ## _sc_lock__)); \ + } while (0) /** * \brief add a value to our atomic variable @@ -200,6 +211,12 @@ #define SC_ATOMIC_INIT(name) \ (name ## _sc_atomic__) = 0 +/** + * \brief wrapper for reinitializing an atomic variable. + **/ +#define SC_ATOMIC_RESET(name) \ + (name ## _sc_atomic__) = 0 + /** * \brief No-op. */ diff --git a/src/util-unittest-helper.c b/src/util-unittest-helper.c index 32e2725c04..a64ae3ccd0 100644 --- a/src/util-unittest-helper.c +++ b/src/util-unittest-helper.c @@ -702,7 +702,7 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) { } FlowHandlePacket(NULL, p); if (p->flow != NULL) - p->flow->use_cnt = 0; + SC_ATOMIC_RESET(p->flow->use_cnt); /* Now the queues shoul be updated */ UTHFreePacket(p);