flow: Refactor how FlowPrune deals with forced timeouts, improving locking logic.

remotes/origin/master
Victor Julien 14 years ago
parent 372ab9c433
commit bfb3f1b7cf

@ -247,24 +247,19 @@ static inline Packet *FlowForceReassemblyPseudoPacketGet(int direction,
return FlowForceReassemblyPseudoPacketSetup(p, direction, f, ssn, dummy);
}
/**
* \internal
* \brief Forces reassembly for flow if it needs it.
* \brief Check if a flow needs forced reassembly
*
* The function requires flow to be locked beforehand.
* \param f *LOCKED* flow
* \param server ptr to int that should be set to 1 or 2 if we return 1
* \param client ptr to int that should be set to 1 or 2 if we return 1
*
* \param f Pointer to the flow.
*
* \retval 0 This flow doesn't need any reassembly processing; 1 otherwise.
* \retval 0 no
* \retval 1 yes
*/
int FlowForceReassemblyForFlowV2(Flow *f)
{
int FlowForceReassemblyNeedReassmbly(Flow *f, int *server, int *client) {
TcpSession *ssn;
int client_ok = 0;
int server_ok = 0;
/* looks like we have no flows in this queue */
if (f == NULL || f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) {
return 0;
@ -272,32 +267,57 @@ int FlowForceReassemblyForFlowV2(Flow *f)
/* Get the tcp session for the flow */
ssn = (TcpSession *)f->protoctx;
/* \todo Also skip flows that shouldn't be inspected */
if (ssn == NULL) {
return 0;
}
client_ok = StreamHasUnprocessedSegments(ssn, 0);
server_ok = StreamHasUnprocessedSegments(ssn, 1);
*client = StreamHasUnprocessedSegments(ssn, 0);
*server = StreamHasUnprocessedSegments(ssn, 1);
/* if state is not fully closed we assume that we haven't fully
* inspected the app layer state yet */
if (ssn->state >= TCP_ESTABLISHED && ssn->state != TCP_CLOSED) {
if (client_ok != 1)
client_ok = 2;
if (server_ok != 1)
server_ok = 2;
if (*client != 1)
*client = 2;
if (*server != 1)
*server = 2;
}
/* nothing to do */
if (client_ok == 0 && server_ok == 0) {
if (*client == 0 && *server == 0) {
return 0;
}
/* move this unlock after the stream reassemble call */
SCSpinUnlock(&f->fb->s);
return 1;
}
/**
* \internal
* \brief Forces reassembly for flow if it needs it.
*
* The function requires flow to be locked beforehand.
*
* \param f Pointer to the flow.
* \param server action required for server: 1 or 2
* \param client action required for client: 1 or 2
*
* \retval 0 This flow doesn't need any reassembly processing; 1 otherwise.
*/
int FlowForceReassemblyForFlowV2(Flow *f, int server, int client)
{
Packet *p1 = NULL, *p2 = NULL, *p3 = NULL;
TcpSession *ssn;
/* looks like we have no flows in this queue */
if (f == NULL) {
return 0;
}
/* Get the tcp session for the flow */
ssn = (TcpSession *)f->protoctx;
if (ssn == NULL) {
return 0;
}
/* The packets we use are based on what segments in what direction are
* unprocessed.
@ -308,13 +328,13 @@ int FlowForceReassemblyForFlowV2(Flow *f)
* toclient which is now dummy since all we need it for is detection */
/* insert a pseudo packet in the toserver direction */
if (client_ok == 1) {
if (client == 1) {
p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 0);
if (p1 == NULL) {
return 1;
}
if (server_ok == 1) {
if (server == 1) {
p2 = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 0);
if (p2 == NULL) {
TmqhOutputPacketpool(NULL,p1);
@ -335,8 +355,8 @@ int FlowForceReassemblyForFlowV2(Flow *f)
}
}
} else if (client_ok == 2) {
if (server_ok == 1) {
} else if (client == 2) {
if (server == 1) {
p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 0);
if (p1 == NULL) {
return 1;
@ -353,7 +373,7 @@ int FlowForceReassemblyForFlowV2(Flow *f)
return 1;
}
if (server_ok == 2) {
if (server == 2) {
p2 = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
if (p2 == NULL) {
TmqhOutputPacketpool(NULL, p1);
@ -363,7 +383,7 @@ int FlowForceReassemblyForFlowV2(Flow *f)
}
} else {
if (server_ok == 1) {
if (server == 1) {
p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn, 0);
if (p1 == NULL) {
return 1;
@ -374,7 +394,7 @@ int FlowForceReassemblyForFlowV2(Flow *f)
TmqhOutputPacketpool(NULL, p1);
return 1;
}
} else if (server_ok == 2) {
} else if (server == 2) {
p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn, 1);
if (p1 == NULL) {
return 1;

@ -24,7 +24,9 @@
#ifndef __FLOW_TIMEOUT_H__
#define __FLOW_TIMEOUT_H__
int FlowForceReassemblyForFlowV2(Flow *);
int FlowForceReassemblyForFlowV2(Flow *f, int server, int client);
int FlowForceReassemblyNeedReassmbly(Flow *f, int *server, int *client);
//int FlowForceReassemblyForFlowV2(Flow *);
void FlowForceReassembly(void);
void FlowForceReassemblySetup(void);

@ -170,6 +170,62 @@ static uint64_t prune_no_timeout = 0;
static uint64_t prune_usecnt = 0;
#endif
/** \internal
* \brief get timeout for flow
*
* \param f flow
* \param emergency bool indicating emergency mode 1 yes, 0 no
*
* \retval timeout timeout in seconds
*/
static inline uint32_t FlowPruneGetFlowTimeout(Flow *f, int emergency) {
uint32_t timeout;
if (emergency) {
if (flow_proto[f->protomap].GetProtoState != NULL) {
switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
default:
case FLOW_STATE_NEW:
timeout = flow_proto[f->protomap].emerg_new_timeout;
break;
case FLOW_STATE_ESTABLISHED:
timeout = flow_proto[f->protomap].emerg_est_timeout;
break;
case FLOW_STATE_CLOSED:
timeout = flow_proto[f->protomap].emerg_closed_timeout;
break;
}
} else {
if (f->flags & FLOW_EST_LIST)
timeout = flow_proto[f->protomap].emerg_est_timeout;
else
timeout = flow_proto[f->protomap].emerg_new_timeout;
}
} else { /* implies no emergency */
if (flow_proto[f->protomap].GetProtoState != NULL) {
switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
default:
case FLOW_STATE_NEW:
timeout = flow_proto[f->protomap].new_timeout;
break;
case FLOW_STATE_ESTABLISHED:
timeout = flow_proto[f->protomap].est_timeout;
break;
case FLOW_STATE_CLOSED:
timeout = flow_proto[f->protomap].closed_timeout;
break;
}
} else {
if (f->flags & FLOW_EST_LIST)
timeout = flow_proto[f->protomap].est_timeout;
else
timeout = flow_proto[f->protomap].new_timeout;
}
}
return timeout;
}
/** FlowPrune
*
* Inspect top (last recently used) flow from the queue and see if
@ -179,11 +235,10 @@ static uint64_t prune_usecnt = 0;
*
* \param q Flow queue to prune
* \param ts Current time
* \param timeout Timeout to enforce
* \param try_cnt Tries to prune the first try_cnt no of flows in the q
*
* \retval 0 on error, failed block, nothing to prune
* \retval 1 on successfully pruned one
* \retval cnt on successfully pruned, cnt flows were pruned
*/
static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt)
{
@ -248,47 +303,7 @@ static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt)
/*set the timeout value according to the flow operating mode, flow's state
and protocol.*/
uint32_t timeout = 0;
if (flow_flags & FLOW_EMERGENCY) {
if (flow_proto[f->protomap].GetProtoState != NULL) {
switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
case FLOW_STATE_NEW:
timeout = flow_proto[f->protomap].emerg_new_timeout;
break;
case FLOW_STATE_ESTABLISHED:
timeout = flow_proto[f->protomap].emerg_est_timeout;
break;
case FLOW_STATE_CLOSED:
timeout = flow_proto[f->protomap].emerg_closed_timeout;
break;
}
} else {
if (f->flags & FLOW_EST_LIST)
timeout = flow_proto[f->protomap].emerg_est_timeout;
else
timeout = flow_proto[f->protomap].emerg_new_timeout;
}
} else { /* implies no emergency */
if (flow_proto[f->protomap].GetProtoState != NULL) {
switch(flow_proto[f->protomap].GetProtoState(f->protoctx)) {
case FLOW_STATE_NEW:
timeout = flow_proto[f->protomap].new_timeout;
break;
case FLOW_STATE_ESTABLISHED:
timeout = flow_proto[f->protomap].est_timeout;
break;
case FLOW_STATE_CLOSED:
timeout = flow_proto[f->protomap].closed_timeout;
break;
}
} else {
if (f->flags & FLOW_EST_LIST)
timeout = flow_proto[f->protomap].est_timeout;
else
timeout = flow_proto[f->protomap].new_timeout;
}
}
uint32_t timeout = FlowPruneGetFlowTimeout(f, flow_flags & FLOW_EMERGENCY ? 1 : 0);
SCLogDebug("got lock, now check: %" PRIdMAX "+%" PRIu32 "=(%" PRIdMAX ") < "
"%" PRIdMAX "", (intmax_t)f->lastts_sec,
@ -317,18 +332,23 @@ static int FlowPrune(FlowQueue *q, struct timeval *ts, int try_cnt)
#ifdef FLOW_PRUNE_DEBUG
prune_usecnt++;
#endif
Flow *prev_f = f;
SCSpinUnlock(&f->fb->s);
SCMutexUnlock(&f->m);
f = f->lnext;
SCSpinUnlock(&prev_f->fb->s);
SCMutexUnlock(&prev_f->m);
continue;
}
if (FlowForceReassemblyForFlowV2(f) == 1) {
Flow *prev_f = f;
int server = 0, client = 0;
if (FlowForceReassemblyNeedReassmbly(f, &server, &client) == 1) {
/* we no longer need the fb lock. We know this flow won't be timed
* out just yet. So an incoming pkt is allowed to pick up this
* flow. */
SCSpinUnlock(&f->fb->s);
FlowForceReassemblyForFlowV2(f, server, client);
SCMutexUnlock(&f->m);
f = f->lnext;
SCSpinUnlock(&prev_f->fb->s);
SCMutexUnlock(&prev_f->m);
continue;
}
#ifdef DEBUG

@ -302,12 +302,12 @@ typedef struct Flow_
SCMutex de_state_m; /**< mutex lock for the de_state object */
/* list flow ptrs
* NOTE!!! These are NOT protected by the
* above mutex, but by the FlowQ's */
/** hash list pointers, protected by fb->s */
struct Flow_ *hnext; /* hash list */
struct Flow_ *hprev;
struct FlowBucket_ *fb;
/** queue list pointers, protected by queue mutex */
struct Flow_ *lnext; /* list */
struct Flow_ *lprev;

Loading…
Cancel
Save