Move flow use cnt to atomic and outside of the flow mutex protection.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent 87345e5c60
commit c26434fef1

@ -180,7 +180,7 @@ int AppLayerHandleMsg(AlpProtoDetectThreadCtx *dp_ctx, StreamMsg *smsg)
} }
/* flow is free again */ /* flow is free again */
smsg->flow->use_cnt--; FlowDecrUsecnt(smsg->flow);
/* return the used message to the queue */ /* return the used message to the queue */
StreamMsgReturnToPool(smsg); StreamMsgReturnToPool(smsg);

@ -853,7 +853,7 @@ static int DeStateTest04(void) {
result = 1; result = 1;
end: end:
CLEAR_FLOW(&f); FLOW_DESTROY(&f);
if (det_ctx != NULL) { if (det_ctx != NULL) {
DetectEngineThreadCtxDeinit(&th_v, (void *)det_ctx); DetectEngineThreadCtxDeinit(&th_v, (void *)det_ctx);

@ -514,8 +514,9 @@ int SigMatchSignatures(ThreadVars *th_v, DetectEngineCtx *de_ctx, DetectEngineTh
/* grab the protocol state we will detect on */ /* grab the protocol state we will detect on */
if (p->flow != NULL) { if (p->flow != NULL) {
FlowIncrUsecnt(p->flow);
SCMutexLock(&p->flow->m); SCMutexLock(&p->flow->m);
p->flow->use_cnt++;
alstate = AppLayerGetProtoStateFromPacket(p); alstate = AppLayerGetProtoStateFromPacket(p);
alproto = AppLayerGetProtoFromPacket(p); alproto = AppLayerGetProtoFromPacket(p);
if (p->flowflags & FLOW_PKT_TOSERVER && p->flow->flags & FLOW_SGH_TOSERVER) { if (p->flowflags & FLOW_PKT_TOSERVER && p->flow->flags & FLOW_SGH_TOSERVER) {
@ -853,8 +854,9 @@ end:
p->flow->sgh_toclient = det_ctx->sgh; p->flow->sgh_toclient = det_ctx->sgh;
p->flow->flags |= FLOW_SGH_TOCLIENT; p->flow->flags |= FLOW_SGH_TOCLIENT;
} }
p->flow->use_cnt--;
SCMutexUnlock(&p->flow->m); SCMutexUnlock(&p->flow->m);
FlowDecrUsecnt(p->flow);
} }
SCReturnInt(fmatch); SCReturnInt(fmatch);

@ -28,7 +28,7 @@
#include "threads.h" #include "threads.h"
#include "decode.h" #include "decode.h"
#include "debug.h" #include "detect-engine-state.h"
#include "flow.h" #include "flow.h"
#include "flow-hash.h" #include "flow-hash.h"
@ -345,11 +345,16 @@ static Flow *FlowGetNew(Packet *p) {
return NULL; return NULL;
} }
/* f is locked */ /* flow is initialized but *unlocked* */
} else { } else {
SCMutexLock(&f->m); FLOW_RECYCLE(f);
/* flow is initialized (recylced) but *unlocked* */
} }
FlowIncrUsecnt(f);
SCMutexLock(&f->m);
return f; return f;
} }
@ -391,16 +396,13 @@ Flow *FlowGetFlowFromHash (Packet *p)
/* flow is locked */ /* flow is locked */
/* these are protected by the bucket lock */
f->hnext = NULL;
f->hprev = NULL;
/* got one, now lock, initialize and return */ /* got one, now lock, initialize and return */
FlowInit(f,p); FlowInit(f,p);
FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1);
f->flags |= FLOW_NEW_LIST; f->flags |= FLOW_NEW_LIST;
f->fb = fb; f->fb = fb;
FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1);
SCSpinUnlock(&fb->s); SCSpinUnlock(&fb->s);
FlowHashCountUpdate; FlowHashCountUpdate;
return f; return f;
@ -429,16 +431,16 @@ Flow *FlowGetFlowFromHash (Packet *p)
/* flow is locked */ /* flow is locked */
f->hnext = NULL;
f->hprev = pf; f->hprev = pf;
/* initialize and return */ /* initialize and return */
FlowInit(f,p); FlowInit(f,p);
FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1);
f->flags |= FLOW_NEW_LIST; f->flags |= FLOW_NEW_LIST;
f->fb = fb; f->fb = fb;
FlowRequeue(f, NULL, &flow_new_q[f->protomap], 1);
SCSpinUnlock(&fb->s); SCSpinUnlock(&fb->s);
FlowHashCountUpdate; FlowHashCountUpdate;
return f; return f;
@ -455,7 +457,8 @@ Flow *FlowGetFlowFromHash (Packet *p)
fb->f->hprev = f; fb->f->hprev = f;
fb->f = f; fb->f = f;
/* found our flow */ /* found our flow, lock & return */
FlowIncrUsecnt(f);
SCMutexLock(&f->m); SCMutexLock(&f->m);
SCSpinUnlock(&fb->s); SCSpinUnlock(&fb->s);
FlowHashCountUpdate; FlowHashCountUpdate;
@ -464,10 +467,9 @@ Flow *FlowGetFlowFromHash (Packet *p)
} }
} }
if (f != NULL) { /* lock & return */
SCMutexLock(&f->m); FlowIncrUsecnt(f);
} SCMutexLock(&f->m);
SCSpinUnlock(&fb->s); SCSpinUnlock(&fb->s);
FlowHashCountUpdate; FlowHashCountUpdate;
return f; return f;

@ -37,7 +37,13 @@
#include "detect.h" #include "detect.h"
#include "detect-engine-state.h" #include "detect-engine-state.h"
/* Allocate a flow */ /** \brief allocate a flow
*
* We check against the memuse counter. If it passes that check we increment
* the counter first, then we try to alloc.
*
* \retval f the flow or NULL on out of memory
*/
Flow *FlowAlloc(void) Flow *FlowAlloc(void)
{ {
Flow *f; Flow *f;
@ -46,27 +52,29 @@ Flow *FlowAlloc(void)
return NULL; return NULL;
} }
SC_ATOMIC_ADD(flow_memuse, sizeof(Flow));
f = SCMalloc(sizeof(Flow)); f = SCMalloc(sizeof(Flow));
if (f == NULL) { if (f == NULL) {
SC_ATOMIC_SUB(flow_memuse, sizeof(Flow));
return NULL; return NULL;
} }
SC_ATOMIC_ADD(flow_memuse, sizeof(Flow));
SCMutexInit(&f->m, NULL); FLOW_INITIALIZE(f);
f->lnext = NULL;
f->lprev = NULL;
f->hnext = NULL;
f->hprev = NULL;
return f; return f;
} }
/** free the memory of a flow */ /**
* \brief cleanup & free the memory of a flow
*
* \param f flow to clear & destroy
*/
void FlowFree(Flow *f) void FlowFree(Flow *f)
{ {
SCMutexDestroy(&f->m); FLOW_DESTROY(f);
SCFree(f); SCFree(f);
SC_ATOMIC_SUB(flow_memuse, sizeof(Flow)); SC_ATOMIC_SUB(flow_memuse, sizeof(Flow));
@ -99,10 +107,6 @@ void FlowInit(Flow *f, Packet *p)
SCEnter(); SCEnter();
SCLogDebug("flow %p", f); SCLogDebug("flow %p", f);
f->flowvar = NULL;
f->de_state = NULL;
CLEAR_FLOW(f);
f->proto = p->proto; f->proto = p->proto;
f->recursion_level = p->recursion_level; f->recursion_level = p->recursion_level;

@ -26,26 +26,60 @@
#define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec) #define COPY_TIMESTAMP(src,dst) ((dst)->tv_sec = (src)->tv_sec, (dst)->tv_usec = (src)->tv_usec)
/* only clear the parts that won't be overwritten #define FLOW_INITIALIZE(f) do { \
* in FlowInit anyway */ SCMutexInit(&(f)->m, NULL); \
#define CLEAR_FLOW(f) { \ (f)->lnext = NULL; \
(f)->sp = 0; \ (f)->lprev = NULL; \
(f)->dp = 0; \ (f)->hnext = NULL; \
(f)->flags = 0; \ (f)->hprev = NULL; \
(f)->todstpktcnt = 0; \ (f)->sp = 0; \
(f)->tosrcpktcnt = 0; \ (f)->dp = 0; \
(f)->bytecnt = 0; \ (f)->flags = 0; \
(f)->lastts.tv_sec = 0; \ (f)->todstpktcnt = 0; \
(f)->lastts.tv_usec = 0; \ (f)->tosrcpktcnt = 0; \
GenericVarFree((f)->flowvar); \ (f)->bytecnt = 0; \
(f)->flowvar = NULL; \ (f)->lastts.tv_sec = 0; \
(f)->protoctx = NULL; \ (f)->lastts.tv_usec = 0; \
(f)->use_cnt = 0; \ (f)->flowvar = NULL; \
DetectEngineStateFree((f)->de_state); \ (f)->protoctx = NULL; \
(f)->de_state = NULL; \ SC_ATOMIC_INIT((f)->use_cnt); \
(f)->sgh_toserver = NULL; \ (f)->de_state = NULL; \
(f)->sgh_toclient = NULL; \ (f)->sgh_toserver = NULL; \
} (f)->sgh_toclient = NULL; \
} while (0)
#define FLOW_RECYCLE(f) do { \
(f)->lnext = NULL; \
(f)->lprev = NULL; \
(f)->hnext = NULL; \
(f)->hprev = NULL; \
(f)->sp = 0; \
(f)->dp = 0; \
(f)->flags = 0; \
(f)->todstpktcnt = 0; \
(f)->tosrcpktcnt = 0; \
(f)->bytecnt = 0; \
(f)->lastts.tv_sec = 0; \
(f)->lastts.tv_usec = 0; \
GenericVarFree((f)->flowvar); \
(f)->flowvar = NULL; \
(f)->protoctx = NULL; \
SC_ATOMIC_RESET((f)->use_cnt); \
DetectEngineStateFree((f)->de_state); \
(f)->de_state = NULL; \
(f)->sgh_toserver = NULL; \
(f)->sgh_toclient = NULL; \
} while(0)
#define FLOW_DESTROY(f) do { \
SCMutexDestroy(&(f)->m); \
GenericVarFree((f)->flowvar); \
(f)->flowvar = NULL; \
(f)->protoctx = NULL; \
SC_ATOMIC_DESTROY((f)->use_cnt); \
DetectEngineStateFree((f)->de_state); \
(f)->de_state = NULL; \
} while(0)
Flow *FlowAlloc(void); Flow *FlowAlloc(void);
Flow *FlowAllocDirect(void); Flow *FlowAllocDirect(void);

@ -228,14 +228,17 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts)
/** never prune a flow that is used by a packet or stream msg /** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */ * we are currently processing in one of the threads */
if (f->use_cnt > 0) { if (SC_ATOMIC_GET(f->use_cnt) > 0) {
SCLogDebug("timed out but use_cnt > 0: %"PRIu16", %p, proto %"PRIu8"", f->use_cnt, f, f->proto); SCLogDebug("timed out but use_cnt > 0: %"PRIu16", %p, proto %"PRIu8"", SC_ATOMIC_GET(f->use_cnt), f, f->proto);
SCSpinUnlock(&f->fb->s); SCSpinUnlock(&f->fb->s);
SCMutexUnlock(&f->m); SCMutexUnlock(&f->m);
SCLogDebug("it is in one of the threads"); SCLogDebug("it is in one of the threads");
return 0; return 0;
} }
/* this should not be possible */
BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0);
/* remove from the hash */ /* remove from the hash */
if (f->hprev) if (f->hprev)
f->hprev->hnext = f->hnext; f->hprev->hnext = f->hnext;
@ -378,7 +381,7 @@ int FlowKill (FlowQueue *q)
/** never prune a flow that is used by a packet or stream msg /** never prune a flow that is used by a packet or stream msg
* we are currently processing in one of the threads */ * we are currently processing in one of the threads */
if (f->use_cnt > 0) { if (SC_ATOMIC_GET(f->use_cnt) > 0) {
SCSpinUnlock(&f->fb->s); SCSpinUnlock(&f->fb->s);
SCMutexUnlock(&f->m); SCMutexUnlock(&f->m);
f = f->lnext; f = f->lnext;
@ -549,30 +552,27 @@ void FlowSetIPOnlyFlagNoLock(Flow *f, char direction) {
direction ? (f->flags |= FLOW_TOSERVER_IPONLY_SET) : (f->flags |= FLOW_TOCLIENT_IPONLY_SET); direction ? (f->flags |= FLOW_TOSERVER_IPONLY_SET) : (f->flags |= FLOW_TOCLIENT_IPONLY_SET);
} }
/** \brief increase the use cnt of a flow /**
* \param tv thread vars (\todo unused?) * \brief increase the use cnt of a flow
* \param p packet with flow to decrease use cnt for *
* \param f flow to decrease use cnt for
*/ */
void FlowIncrUsecnt(ThreadVars *tv, Packet *p) { void FlowIncrUsecnt(Flow *f) {
if (p == NULL || p->flow == NULL) if (f == NULL)
return; return;
SCMutexLock(&p->flow->m); SC_ATOMIC_ADD(f->use_cnt, 1);
p->flow->use_cnt++;
SCMutexUnlock(&p->flow->m);
} }
/** \brief decrease the use cnt of a flow /**
* \param tv thread vars (\todo unused?) * \brief decrease the use cnt of a flow
* \param p packet with flow to decrease use cnt for *
* \param f flow to decrease use cnt for
*/ */
void FlowDecrUsecnt(ThreadVars *tv, Packet *p) { void FlowDecrUsecnt(Flow *f) {
if (p == NULL || p->flow == NULL) if (f == NULL)
return; return;
SCMutexLock(&p->flow->m); SC_ATOMIC_SUB(f->use_cnt, 1);
if (p->flow->use_cnt > 0)
p->flow->use_cnt--;
SCMutexUnlock(&p->flow->m);
} }
#define TOSERVER 0 #define TOSERVER 0
@ -645,8 +645,6 @@ void FlowHandlePacket (ThreadVars *tv, Packet *p)
if (f == NULL) if (f == NULL)
return; return;
f->use_cnt++;
/* update the last seen timestamp of this flow */ /* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts, &f->lastts); COPY_TIMESTAMP(&p->ts, &f->lastts);
@ -1308,7 +1306,7 @@ static int FlowClearMemory(Flow* f, uint8_t proto_map) {
flow_proto[proto_map].Freefunc(f->protoctx); flow_proto[proto_map].Freefunc(f->protoctx);
} }
CLEAR_FLOW(f); FLOW_DESTROY(f);
SCReturnInt(1); SCReturnInt(1);
} }
@ -1553,6 +1551,7 @@ static int FlowTestPrune(Flow *f, struct timeval *ts) {
goto error; goto error;
} }
SCLogDebug("calling FlowPrune");
FlowPrune(q, ts); FlowPrune(q, ts);
if (q->len != 0) { if (q->len != 0) {
printf("Failed in prunning the flow: "); printf("Failed in prunning the flow: ");
@ -1593,7 +1592,8 @@ static int FlowTest03 (void) {
memset(&fb, 0, sizeof(FlowBucket)); memset(&fb, 0, sizeof(FlowBucket));
SCSpinInit(&fb.s, 0); SCSpinInit(&fb.s, 0);
SCMutexInit(&f.m, NULL);
FLOW_INITIALIZE(&f);
TimeGet(&ts); TimeGet(&ts);
f.lastts.tv_sec = ts.tv_sec - 5000; f.lastts.tv_sec = ts.tv_sec - 5000;

@ -26,6 +26,7 @@
#include "decode.h" #include "decode.h"
#include "util-var.h" #include "util-var.h"
#include "util-atomic.h"
#define FLOW_QUIET TRUE #define FLOW_QUIET TRUE
#define FLOW_VERBOSE FALSE #define FLOW_VERBOSE FALSE
@ -159,8 +160,13 @@ typedef struct Flow_
/** protocol specific data pointer, e.g. for TcpSession */ /** protocol specific data pointer, e.g. for TcpSession */
void *protoctx; void *protoctx;
/** how many pkts and stream msgs are using the flow *right now* */ /** how many pkts and stream msgs are using the flow *right now*. This
uint16_t use_cnt; * variable is atomic so not protected by the Flow mutex "m".
*
* On receiving a packet the counter is incremented while the flow
* bucked is locked, which is also the case on timeout pruning.
*/
SC_ATOMIC_DECLARE(unsigned short, use_cnt);
/** detection engine state */ /** detection engine state */
struct DetectEngineState_ *de_state; struct DetectEngineState_ *de_state;
@ -208,7 +214,10 @@ void FlowPrintQueueInfo (void);
void FlowShutdown(void); void FlowShutdown(void);
void FlowSetIPOnlyFlag(Flow *, char); void FlowSetIPOnlyFlag(Flow *, char);
void FlowSetIPOnlyFlagNoLock(Flow *, char); void FlowSetIPOnlyFlagNoLock(Flow *, char);
void FlowDecrUsecnt(ThreadVars *, Packet *);
void FlowIncrUsecnt(Flow *);
void FlowDecrUsecnt(Flow *);
uint32_t FlowPruneFlowsCnt(struct timeval *, int); uint32_t FlowPruneFlowsCnt(struct timeval *, int);
uint32_t FlowKillFlowsCnt(int); uint32_t FlowKillFlowsCnt(int);

@ -1295,9 +1295,8 @@ static void StreamTcpSetupMsg(TcpSession *ssn, TcpStream *stream, Packet *p,
smsg->data.data_len = 0; smsg->data.data_len = 0;
smsg->flow = p->flow; smsg->flow = p->flow;
if (smsg->flow != NULL) {
smsg->flow->use_cnt++; FlowIncrUsecnt(smsg->flow);
}
SCLogDebug("smsg %p", smsg); SCLogDebug("smsg %p", smsg);
SCReturn; SCReturn;

@ -133,7 +133,7 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p)
SCLogDebug("tunnel stuff done, move on (proot %d)", proot); SCLogDebug("tunnel stuff done, move on (proot %d)", proot);
} }
FlowDecrUsecnt(t,p); FlowDecrUsecnt(p->flow);
/* we're done with the tunnel root now as well */ /* we're done with the tunnel root now as well */
if (proot == 1) { if (proot == 1) {

@ -67,14 +67,25 @@
* \brief Initialize the previously declared atomic variable and it's * \brief Initialize the previously declared atomic variable and it's
* lock. * lock.
*/ */
#define SC_ATOMIC_INIT(name) \ #define SC_ATOMIC_INIT(name) do { \
SCSpinInit(&(name ## _sc_lock__), 0) SCSpinInit(&(name ## _sc_lock__), 0); \
(name ## _sc_atomic__) = 0; \
} while(0)
/**
* \brief Initialize the previously declared atomic variable and it's
* lock.
*/
#define SC_ATOMIC_RESET(name) do { \
(name ## _sc_atomic__) = 0; \
} while(0)
/** /**
* \brief Destroy the lock used to protect this variable * \brief Destroy the lock used to protect this variable
*/ */
#define SC_ATOMIC_DESTROY(name) \ #define SC_ATOMIC_DESTROY(name) do { \
SCSpinDestroy(&(name ## _sc_lock__)) SCSpinDestroy(&(name ## _sc_lock__)); \
} while (0)
/** /**
* \brief add a value to our atomic variable * \brief add a value to our atomic variable
@ -200,6 +211,12 @@
#define SC_ATOMIC_INIT(name) \ #define SC_ATOMIC_INIT(name) \
(name ## _sc_atomic__) = 0 (name ## _sc_atomic__) = 0
/**
* \brief wrapper for reinitializing an atomic variable.
**/
#define SC_ATOMIC_RESET(name) \
(name ## _sc_atomic__) = 0
/** /**
* \brief No-op. * \brief No-op.
*/ */

@ -702,7 +702,7 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) {
} }
FlowHandlePacket(NULL, p); FlowHandlePacket(NULL, p);
if (p->flow != NULL) if (p->flow != NULL)
p->flow->use_cnt = 0; SC_ATOMIC_RESET(p->flow->use_cnt);
/* Now the queues shoul be updated */ /* Now the queues shoul be updated */
UTHFreePacket(p); UTHFreePacket(p);

Loading…
Cancel
Save