flow: change flow state logic

A flow has 3 states: NEW, ESTABLISHED and CLOSED.

For all protocols except TCP, a flow is in state NEW as long as just one
side of the conversation has been seen. When both sides have been
observed the state is moved to ESTABLISHED.

TCP has a different logic, controlled by the stream engine. Here the TCP
state is leading.

Until now, when parts of the engine needed to know the flow state, it
would invoke a per protocol callback 'GetProtoState'. For TCP this would
return the state based on the TcpSession.

This patch changes this logic. It introduces an atomic variable in the
flow 'flow_state'. It defaults to NEW and is set to ESTABLISHED for non-
TCP protocols when we've seen both sides of the conversation.

For TCP, the state is updated from the TCP engine directly.

The goal is to allow for access to the state without holding the Flow's
main mutex lock. This will later allow the Flow Manager(s) to evaluate
the Flow w/o interupting it.
pull/1315/head
Victor Julien 11 years ago
parent 9327b08ab1
commit a0732d3db2

@ -666,7 +666,7 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
f->fb = NULL;
FBLOCK_UNLOCK(fb);
int state = FlowGetFlowState(f);
int state = SC_ATOMIC_GET(f->flow_state);
if (state == FLOW_STATE_NEW)
f->flow_end_flags |= FLOW_END_FLAG_STATE_NEW;
else if (state == FLOW_STATE_ESTABLISHED)

@ -280,7 +280,7 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
Flow *next_flow = f->hprev;
int state = FlowGetFlowState(f);
int state = SC_ATOMIC_GET(f->flow_state);
/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, state, ts, emergency) == 0) {
@ -413,7 +413,7 @@ static uint32_t FlowManagerHashRowCleanup(Flow *f)
Flow *next_flow = f->hprev;
int state = FlowGetFlowState(f);
int state = SC_ATOMIC_GET(f->flow_state);
/* remove from the hash */
if (f->hprev != NULL)
@ -1034,7 +1034,7 @@ static int FlowMgrTest01 (void)
f.proto = IPPROTO_TCP;
int state = FlowGetFlowState(&f);
int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
@ -1093,7 +1093,7 @@ static int FlowMgrTest02 (void)
f.fb = &fb;
f.proto = IPPROTO_TCP;
int state = FlowGetFlowState(&f);
int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
@ -1140,7 +1140,7 @@ static int FlowMgrTest03 (void)
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
int state = FlowGetFlowState(&f);
int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);
@ -1200,7 +1200,7 @@ static int FlowMgrTest04 (void)
f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY;
int state = FlowGetFlowState(&f);
int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts, 0) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f);

@ -96,24 +96,5 @@ uint32_t flowbits_removed;
SCMutex flowbits_mutex;
#endif /* FLOWBITS_STATS */
/** \internal
* \brief Get the flow's state
*
* \param f flow
*
* \retval state either FLOW_STATE_NEW, FLOW_STATE_ESTABLISHED or FLOW_STATE_CLOSED
*/
static inline int FlowGetFlowState(Flow *f)
{
if (flow_proto[f->protomap].GetProtoState != NULL) {
return flow_proto[f->protomap].GetProtoState(f->protoctx);
} else {
if ((f->flags & FLOW_TO_SRC_SEEN) && (f->flags & FLOW_TO_DST_SEEN))
return FLOW_STATE_ESTABLISHED;
else
return FLOW_STATE_NEW;
}
}
#endif /* __FLOW_PRIVATE_H__ */

@ -40,6 +40,7 @@
(f)->sp = 0; \
(f)->dp = 0; \
(f)->proto = 0; \
SC_ATOMIC_INIT((f)->flow_state); \
SC_ATOMIC_INIT((f)->use_cnt); \
(f)->probing_parser_toserver_alproto_masks = 0; \
(f)->probing_parser_toclient_alproto_masks = 0; \
@ -82,6 +83,7 @@
(f)->sp = 0; \
(f)->dp = 0; \
(f)->proto = 0; \
SC_ATOMIC_RESET((f)->flow_state); \
SC_ATOMIC_RESET((f)->use_cnt); \
(f)->probing_parser_toserver_alproto_masks = 0; \
(f)->probing_parser_toclient_alproto_masks = 0; \
@ -116,6 +118,7 @@
#define FLOW_DESTROY(f) do { \
FlowCleanupAppLayer((f)); \
SC_ATOMIC_DESTROY((f)->flow_state); \
SC_ATOMIC_DESTROY((f)->use_cnt); \
\
FLOWLOCK_DESTROY((f)); \

@ -87,7 +87,6 @@ void FlowInitFlowProto();
int FlowSetProtoTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t , uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoFreeFunc(uint8_t, void (*Free)(void *));
int FlowSetFlowStateFunc(uint8_t , int (*GetProtoState)(void *));
/* Run mode selected at suricata.c */
extern int run_mode;
@ -270,6 +269,10 @@ void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
if ((f->flags & FLOW_TO_DST_SEEN) && (f->flags & FLOW_TO_SRC_SEEN)) {
SCLogDebug("pkt %p FLOW_PKT_ESTABLISHED", p);
p->flowflags |= FLOW_PKT_ESTABLISHED;
if (f->proto != IPPROTO_TCP) {
SC_ATOMIC_SET(f->flow_state, FLOW_STATE_ESTABLISHED);
}
}
/*set the detection bypass flags*/
@ -497,7 +500,6 @@ void FlowInitFlowProto(void)
flow_proto[FLOW_PROTO_DEFAULT].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_DEFAULT].Freefunc = NULL;
flow_proto[FLOW_PROTO_DEFAULT].GetProtoState = NULL;
/*TCP*/
flow_proto[FLOW_PROTO_TCP].new_timeout = FLOW_IPPROTO_TCP_NEW_TIMEOUT;
flow_proto[FLOW_PROTO_TCP].est_timeout = FLOW_IPPROTO_TCP_EST_TIMEOUT;
@ -509,7 +511,6 @@ void FlowInitFlowProto(void)
flow_proto[FLOW_PROTO_TCP].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_TCP].Freefunc = NULL;
flow_proto[FLOW_PROTO_TCP].GetProtoState = NULL;
/*UDP*/
flow_proto[FLOW_PROTO_UDP].new_timeout = FLOW_IPPROTO_UDP_NEW_TIMEOUT;
flow_proto[FLOW_PROTO_UDP].est_timeout = FLOW_IPPROTO_UDP_EST_TIMEOUT;
@ -521,7 +522,6 @@ void FlowInitFlowProto(void)
flow_proto[FLOW_PROTO_UDP].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_UDP].Freefunc = NULL;
flow_proto[FLOW_PROTO_UDP].GetProtoState = NULL;
/*ICMP*/
flow_proto[FLOW_PROTO_ICMP].new_timeout = FLOW_IPPROTO_ICMP_NEW_TIMEOUT;
flow_proto[FLOW_PROTO_ICMP].est_timeout = FLOW_IPPROTO_ICMP_EST_TIMEOUT;
@ -533,7 +533,6 @@ void FlowInitFlowProto(void)
flow_proto[FLOW_PROTO_ICMP].emerg_closed_timeout =
FLOW_DEFAULT_EMERG_CLOSED_TIMEOUT;
flow_proto[FLOW_PROTO_ICMP].Freefunc = NULL;
flow_proto[FLOW_PROTO_ICMP].GetProtoState = NULL;
/* Let's see if we have custom timeouts defined from config */
const char *new = NULL;
@ -764,22 +763,6 @@ int FlowSetProtoFreeFunc (uint8_t proto, void (*Free)(void *))
return 1;
}
/**
* \brief Function to set the function to get protocol specific flow state.
*
* \param proto protocol of which function is needed to be set.
* \param GetFlowState Function pointer which will be called to get state.
*/
int FlowSetFlowStateFunc (uint8_t proto, int (*GetProtoState)(void *))
{
uint8_t proto_map;
proto_map = FlowGetProtoMapping(proto);
flow_proto[proto_map].GetProtoState = GetProtoState;
return 1;
}
/**
* \brief Function to set the timeout values for the specified protocol.
*

@ -272,6 +272,13 @@ typedef unsigned int FlowRefCount;
typedef unsigned short FlowRefCount;
#endif
#ifdef __tile__
/* Atomic Ints performance better on Tile. */
typedef unsigned int FlowStateType;
#else
typedef unsigned short FlowStateType;
#endif
/** Local Thread ID */
typedef uint16_t FlowThreadId;
@ -312,6 +319,8 @@ typedef struct Flow_
/* end of flow "header" */
SC_ATOMIC_DECLARE(FlowStateType, flow_state);
/** how many pkts and stream msgs are using the flow *right now*. This
* variable is atomic so not protected by the Flow mutex "m".
*
@ -414,7 +423,6 @@ typedef struct FlowProto_ {
uint32_t emerg_est_timeout;
uint32_t emerg_closed_timeout;
void (*Freefunc)(void *);
int (*GetProtoState)(void *);
} FlowProto;
void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
@ -428,7 +436,6 @@ void FlowRegisterTests (void);
int FlowSetProtoTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoEmergencyTimeout(uint8_t ,uint32_t ,uint32_t ,uint32_t);
int FlowSetProtoFreeFunc (uint8_t , void (*Free)(void *));
int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *));
void FlowUpdateQueue(Flow *);
struct FlowQueue_;

@ -596,7 +596,6 @@ void StreamTcpInitConfig(char quiet)
/* set the default free function and flow state function
* values. */
FlowSetProtoFreeFunc(IPPROTO_TCP, StreamTcpSessionClear);
FlowSetFlowStateFunc(IPPROTO_TCP, StreamTcpGetFlowState);
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
@ -679,6 +678,22 @@ static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
return;
ssn->state = state;
/* update the flow state */
switch(ssn->state) {
case TCP_ESTABLISHED:
case TCP_FIN_WAIT1:
case TCP_FIN_WAIT2:
case TCP_CLOSING:
case TCP_CLOSE_WAIT:
SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_ESTABLISHED);
break;
case TCP_LAST_ACK:
case TCP_TIME_WAIT:
case TCP_CLOSED:
SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_CLOSED);
break;
}
}
/**
@ -5038,45 +5053,6 @@ static int StreamTcpValidateRst(TcpSession *ssn, Packet *p)
return 0;
}
/**
* \brief Function to return the FLOW state depending upon the TCP session state.
*
* \param s TCP session of which the state has to be returned
* \retval state The FLOW_STATE_ depends upon the TCP sesison state, default is
* FLOW_STATE_CLOSED
*/
int StreamTcpGetFlowState(void *s)
{
SCEnter();
TcpSession *ssn = (TcpSession *)s;
if (unlikely(ssn == NULL)) {
SCReturnInt(FLOW_STATE_CLOSED);
}
/* sorted most likely to least likely */
switch(ssn->state) {
case TCP_ESTABLISHED:
case TCP_FIN_WAIT1:
case TCP_FIN_WAIT2:
case TCP_CLOSING:
case TCP_CLOSE_WAIT:
SCReturnInt(FLOW_STATE_ESTABLISHED);
case TCP_NONE:
case TCP_SYN_SENT:
case TCP_SYN_RECV:
case TCP_LISTEN:
SCReturnInt(FLOW_STATE_NEW);
case TCP_LAST_ACK:
case TCP_TIME_WAIT:
case TCP_CLOSED:
SCReturnInt(FLOW_STATE_CLOSED);
}
SCReturnInt(FLOW_STATE_CLOSED);
}
/**
* \brief Function to check the validity of the received timestamp based on
* the target OS of the given stream.

Loading…
Cancel
Save