flow-manager: optimize hash walking

Until now the flow manager would walk the entire flow hash table on an
interval. It would thus touch all flows, leading to a lot of memory
and cache pressure. In scenario's where the number of tracked flows run
into the hundreds on thousands, and the memory used can run into many
hundreds of megabytes or even gigabytes, this would lead to serious
performance degradation.

This patch introduces a new approach. A timestamp per flow bucket
(hash row) is maintained by the flow manager. It holds the timestamp
of the earliest possible timeout of a flow in the list. The hash walk
skips rows with timestamps beyond the current time.

As the timestamp depends on the flows in the hash row's list, and on
the 'state' of each flow in the list, any addition of a flow or
changing of a flow's state invalidates the timestamp. The flow manager
then has to walk the list again to set a new timestamp.

A utility function FlowUpdateState is introduced to change Flow states,
taking care of the bucket timestamp invalidation while at it.

Empty flow buckets use a special value so that we don't have to take
the flow bucket lock to find out the bucket is empty.

This patch also adds more performance counters:

flow_mgr.flows_checked         | Total    | 929
flow_mgr.flows_notimeout       | Total    | 391
flow_mgr.flows_timeout         | Total    | 538
flow_mgr.flows_removed         | Total    | 277
flow_mgr.flows_timeout_inuse   | Total    | 261
flow_mgr.rows_checked          | Total    | 1000000
flow_mgr.rows_skipped          | Total    | 998835
flow_mgr.rows_empty            | Total    | 290
flow_mgr.rows_maxlen           | Total    | 2

flow_mgr.flows_checked: number of flows checked for timeout in the
                        last pass
flow_mgr.flows_notimeout: number of flows out of flow_mgr.flows_checked
                        that didn't time out
flow_mgr.flows_timeout: number of out of flow_mgr.flows_checked that
                        did reach the time out
flow_mgr.flows_removed: number of flows out of flow_mgr.flows_timeout
                        that were really removed
flow_mgr.flows_timeout_inuse: number of flows out of flow_mgr.flows_timeout
                        that were still in use or needed work

flow_mgr.rows_checked: hash table rows checked
flow_mgr.rows_skipped: hash table rows skipped because non of the flows
                        would time out anyway

The counters below are only relating to rows that were not skipped.

flow_mgr.rows_empty:   empty hash rows
flow_mgr.rows_maxlen:  max number of flows per hash row. Best to keep low,
                        so increase hash-size if needed.
flow_mgr.rows_busy:    row skipped because it was locked by another thread
pull/2266/head
Victor Julien 9 years ago
parent aee1f0bb99
commit 70c16f50e7

@ -487,6 +487,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
FlowInit(f, p); FlowInit(f, p);
f->flow_hash = hash; f->flow_hash = hash;
f->fb = fb; f->fb = fb;
FlowUpdateState(f, FLOW_STATE_NEW);
/* update the last seen timestamp of this flow */ /* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts); COPY_TIMESTAMP(&p->ts,&f->lastts);
@ -523,6 +524,7 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
FlowInit(f, p); FlowInit(f, p);
f->flow_hash = hash; f->flow_hash = hash;
f->fb = fb; f->fb = fb;
FlowUpdateState(f, FLOW_STATE_NEW);
/* update the last seen timestamp of this flow */ /* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts); COPY_TIMESTAMP(&p->ts,&f->lastts);
@ -649,6 +651,7 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
f->hnext = NULL; f->hnext = NULL;
f->hprev = NULL; f->hprev = NULL;
f->fb = NULL; f->fb = NULL;
SC_ATOMIC_SET(fb->next_ts, 0);
FBLOCK_UNLOCK(fb); FBLOCK_UNLOCK(fb);
int state = SC_ATOMIC_GET(f->flow_state); int state = SC_ATOMIC_GET(f->flow_state);
@ -670,6 +673,8 @@ static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv)
FlowClearMemory(f, f->protomap); FlowClearMemory(f, f->protomap);
FlowUpdateState(f, FLOW_STATE_NEW);
FLOWLOCK_UNLOCK(f); FLOWLOCK_UNLOCK(f);
(void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt)); (void) SC_ATOMIC_ADD(flow_prune_idx, (flow_config.hash_size - cnt));

@ -48,6 +48,12 @@ typedef struct FlowBucket_ {
#else #else
#error Enable FBLOCK_SPIN or FBLOCK_MUTEX #error Enable FBLOCK_SPIN or FBLOCK_MUTEX
#endif #endif
/** timestamp in seconds of the earliest possible moment a flow
* will time out in this row. Set by the flow manager. Cleared
* to 0 by workers, either when new flows are added or when a
* flow state changes. The flow manager sets this to INT_MAX for
* empty buckets. */
SC_ATOMIC_DECLARE(int32_t, next_ts);
} __attribute__((aligned(CLS))) FlowBucket; } __attribute__((aligned(CLS))) FlowBucket;
#ifdef FBLOCK_SPIN #ifdef FBLOCK_SPIN

@ -109,6 +109,18 @@ typedef struct FlowTimeoutCounters_ {
uint32_t est; uint32_t est;
uint32_t clo; uint32_t clo;
uint32_t tcp_reuse; uint32_t tcp_reuse;
uint32_t flows_checked;
uint32_t flows_notimeout;
uint32_t flows_timeout;
uint32_t flows_timeout_inuse;
uint32_t flows_removed;
uint32_t rows_checked;
uint32_t rows_skipped;
uint32_t rows_empty;
uint32_t rows_busy;
uint32_t rows_maxlen;
} FlowTimeoutCounters; } FlowTimeoutCounters;
/** /**
@ -181,7 +193,7 @@ void FlowDisableFlowManagerThread(void)
* *
* \retval timeout timeout in seconds * \retval timeout timeout in seconds
*/ */
static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state) static inline uint32_t FlowGetFlowTimeout(const Flow *f, enum FlowState state)
{ {
uint32_t timeout; uint32_t timeout;
FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts); FlowProtoTimeoutPtr flow_timeouts = SC_ATOMIC_GET(flow_timeouts);
@ -209,14 +221,18 @@ static inline uint32_t FlowGetFlowTimeout(const Flow *f, int state)
* \retval 0 not timed out * \retval 0 not timed out
* \retval 1 timed out * \retval 1 timed out
*/ */
static int FlowManagerFlowTimeout(const Flow *f, int state, struct timeval *ts) static int FlowManagerFlowTimeout(const Flow *f, enum FlowState state, struct timeval *ts, int32_t *next_ts)
{ {
/* set the timeout value according to the flow operating mode, /* set the timeout value according to the flow operating mode,
* flow's state and protocol.*/ * flow's state and protocol.*/
uint32_t timeout = FlowGetFlowTimeout(f, state); uint32_t timeout = FlowGetFlowTimeout(f, state);
int32_t flow_times_out_at = (int32_t)(f->lastts.tv_sec + timeout);
if (*next_ts == 0 || flow_times_out_at < *next_ts)
*next_ts = flow_times_out_at;
/* do the timeout check */ /* do the timeout check */
if ((int32_t)(f->lastts.tv_sec + timeout) >= ts->tv_sec) { if (flow_times_out_at >= ts->tv_sec) {
return 0; return 0;
} }
@ -268,20 +284,26 @@ static int FlowManagerFlowTimedOut(Flow *f, struct timeval *ts)
* \retval cnt timed out flows * \retval cnt timed out flows
*/ */
static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
int emergency, FlowTimeoutCounters *counters) int emergency, FlowTimeoutCounters *counters, int32_t *next_ts)
{ {
uint32_t cnt = 0; uint32_t cnt = 0;
uint32_t checked = 0;
do { do {
checked++;
/* check flow timeout based on lastts and state. Both can be /* check flow timeout based on lastts and state. Both can be
* accessed w/o Flow lock as we do have the hash row lock (so flow * accessed w/o Flow lock as we do have the hash row lock (so flow
* can't disappear) and flow_state is atomic. lastts can only * can't disappear) and flow_state is atomic. lastts can only
* be modified when we have both the flow and hash row lock */ * be modified when we have both the flow and hash row lock */
int state = SC_ATOMIC_GET(f->flow_state); enum FlowState state = SC_ATOMIC_GET(f->flow_state);
/* timeout logic goes here */ /* timeout logic goes here */
if (FlowManagerFlowTimeout(f, state, ts) == 0) { if (FlowManagerFlowTimeout(f, state, ts, next_ts) == 0) {
counters->flows_notimeout++;
f = f->hprev; f = f->hprev;
continue; continue;
} }
@ -294,6 +316,8 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
Flow *next_flow = f->hprev; Flow *next_flow = f->hprev;
counters->flows_timeout++;
/* check if the flow is fully timed out and /* check if the flow is fully timed out and
* ready to be discarded. */ * ready to be discarded. */
if (FlowManagerFlowTimedOut(f, ts) == 1) { if (FlowManagerFlowTimedOut(f, ts) == 1) {
@ -347,13 +371,19 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts,
counters->clo++; counters->clo++;
break; break;
} }
counters->flows_removed++;
} else { } else {
counters->flows_timeout_inuse++;
FLOWLOCK_UNLOCK(f); FLOWLOCK_UNLOCK(f);
} }
f = next_flow; f = next_flow;
} while (f != NULL); } while (f != NULL);
counters->flows_checked += checked;
if (checked > counters->rows_maxlen)
counters->rows_maxlen = checked;
return cnt; return cnt;
} }
@ -382,20 +412,37 @@ static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt,
for (idx = hash_min; idx < hash_max; idx++) { for (idx = hash_min; idx < hash_max; idx++) {
FlowBucket *fb = &flow_hash[idx]; FlowBucket *fb = &flow_hash[idx];
counters->rows_checked++;
int32_t check_ts = SC_ATOMIC_GET(fb->next_ts);
if (check_ts > (int32_t)ts->tv_sec) {
counters->rows_skipped++;
continue;
}
/* before grabbing the row lock, make sure we have at least /* before grabbing the row lock, make sure we have at least
* 9 packets in the pool */ * 9 packets in the pool */
PacketPoolWaitForN(9); PacketPoolWaitForN(9);
if (FBLOCK_TRYLOCK(fb) != 0) if (FBLOCK_TRYLOCK(fb) != 0) {
counters->rows_busy++;
continue; continue;
}
/* flow hash bucket is now locked */ /* flow hash bucket is now locked */
if (fb->tail == NULL) if (fb->tail == NULL) {
SC_ATOMIC_SET(fb->next_ts, INT_MAX);
counters->rows_empty++;
goto next; goto next;
}
int32_t next_ts = 0;
/* we have a flow, or more than one */ /* we have a flow, or more than one */
cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters); cnt += FlowManagerHashRowTimeout(fb->tail, ts, emergency, counters, &next_ts);
SC_ATOMIC_SET(fb->next_ts, next_ts);
next: next:
FBLOCK_UNLOCK(fb); FBLOCK_UNLOCK(fb);
@ -502,6 +549,19 @@ typedef struct FlowManagerThreadData_ {
uint16_t flow_emerg_mode_enter; uint16_t flow_emerg_mode_enter;
uint16_t flow_emerg_mode_over; uint16_t flow_emerg_mode_over;
uint16_t flow_tcp_reuse; uint16_t flow_tcp_reuse;
uint16_t flow_mgr_flows_checked;
uint16_t flow_mgr_flows_notimeout;
uint16_t flow_mgr_flows_timeout;
uint16_t flow_mgr_flows_timeout_inuse;
uint16_t flow_mgr_flows_removed;
uint16_t flow_mgr_rows_checked;
uint16_t flow_mgr_rows_skipped;
uint16_t flow_mgr_rows_empty;
uint16_t flow_mgr_rows_busy;
uint16_t flow_mgr_rows_maxlen;
} FlowManagerThreadData; } FlowManagerThreadData;
static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data) static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
@ -540,6 +600,18 @@ static TmEcode FlowManagerThreadInit(ThreadVars *t, void *initdata, void **data)
ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t); ftd->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t); ftd->flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", t);
ftd->flow_mgr_flows_checked = StatsRegisterCounter("flow_mgr.flows_checked", t);
ftd->flow_mgr_flows_notimeout = StatsRegisterCounter("flow_mgr.flows_notimeout", t);
ftd->flow_mgr_flows_timeout = StatsRegisterCounter("flow_mgr.flows_timeout", t);
ftd->flow_mgr_flows_timeout_inuse = StatsRegisterCounter("flow_mgr.flows_timeout_inuse", t);
ftd->flow_mgr_flows_removed = StatsRegisterCounter("flow_mgr.flows_removed", t);
ftd->flow_mgr_rows_checked = StatsRegisterCounter("flow_mgr.rows_checked", t);
ftd->flow_mgr_rows_skipped = StatsRegisterCounter("flow_mgr.rows_skipped", t);
ftd->flow_mgr_rows_empty = StatsRegisterCounter("flow_mgr.rows_empty", t);
ftd->flow_mgr_rows_busy = StatsRegisterCounter("flow_mgr.rows_busy", t);
ftd->flow_mgr_rows_maxlen = StatsRegisterCounter("flow_mgr.rows_maxlen", t);
PacketPoolInit(); PacketPoolInit();
return TM_ECODE_OK; return TM_ECODE_OK;
} }
@ -609,7 +681,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
FlowUpdateSpareFlows(); FlowUpdateSpareFlows();
/* try to time out flows */ /* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, 0, }; FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,};
FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters); FlowTimeoutHash(&ts, 0 /* check all */, ftd->min, ftd->max, &counters);
@ -631,6 +703,18 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est); StatsAddUI64(th_v, ftd->flow_mgr_cnt_est, (uint64_t)counters.est);
StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse); StatsAddUI64(th_v, ftd->flow_tcp_reuse, (uint64_t)counters.tcp_reuse);
StatsSetUI64(th_v, ftd->flow_mgr_flows_checked, (uint64_t)counters.flows_checked);
StatsSetUI64(th_v, ftd->flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
StatsSetUI64(th_v, ftd->flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
StatsSetUI64(th_v, ftd->flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
StatsSetUI64(th_v, ftd->flow_mgr_rows_checked, (uint64_t)counters.rows_checked);
StatsSetUI64(th_v, ftd->flow_mgr_rows_skipped, (uint64_t)counters.rows_skipped);
StatsSetUI64(th_v, ftd->flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
StatsSetUI64(th_v, ftd->flow_mgr_rows_busy, (uint64_t)counters.rows_busy);
StatsSetUI64(th_v, ftd->flow_mgr_rows_empty, (uint64_t)counters.rows_empty);
uint32_t len = 0; uint32_t len = 0;
FQLOCK_LOCK(&flow_spare_q); FQLOCK_LOCK(&flow_spare_q);
len = flow_spare_q.len; len = flow_spare_q.len;
@ -1046,8 +1130,9 @@ static int FlowMgrTest01 (void)
f.proto = IPPROTO_TCP; f.proto = IPPROTO_TCP;
int32_t next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state); int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb); FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f); FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q); FlowQueueDestroy(&flow_spare_q);
@ -1105,8 +1190,9 @@ static int FlowMgrTest02 (void)
f.fb = &fb; f.fb = &fb;
f.proto = IPPROTO_TCP; f.proto = IPPROTO_TCP;
int32_t next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state); int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb); FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f); FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q); FlowQueueDestroy(&flow_spare_q);
@ -1152,8 +1238,9 @@ static int FlowMgrTest03 (void)
f.proto = IPPROTO_TCP; f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY; f.flags |= FLOW_EMERGENCY;
int next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state); int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb); FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f); FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q); FlowQueueDestroy(&flow_spare_q);
@ -1212,8 +1299,9 @@ static int FlowMgrTest04 (void)
f.proto = IPPROTO_TCP; f.proto = IPPROTO_TCP;
f.flags |= FLOW_EMERGENCY; f.flags |= FLOW_EMERGENCY;
int next_ts = 0;
int state = SC_ATOMIC_GET(f.flow_state); int state = SC_ATOMIC_GET(f.flow_state);
if (FlowManagerFlowTimeout(&f, state, &ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) { if (FlowManagerFlowTimeout(&f, state, &ts, &next_ts) != 1 && FlowManagerFlowTimedOut(&f, &ts) != 1) {
FBLOCK_DESTROY(&fb); FBLOCK_DESTROY(&fb);
FLOW_DESTROY(&f); FLOW_DESTROY(&f);
FlowQueueDestroy(&flow_spare_q); FlowQueueDestroy(&flow_spare_q);
@ -1265,7 +1353,7 @@ static int FlowMgrTest05 (void)
struct timeval ts; struct timeval ts;
TimeGet(&ts); TimeGet(&ts);
/* try to time out flows */ /* try to time out flows */
FlowTimeoutCounters counters = { 0, 0, 0, 0, }; FlowTimeoutCounters counters = { 0, 0, 0, 0, 0,0,0,0,0,0,0,0,0,0,};
FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters); FlowTimeoutHash(&ts, 0 /* check all */, 0, flow_config.hash_size, &counters);
if (flow_recycle_q.len > 0) { if (flow_recycle_q.len > 0) {

@ -251,7 +251,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p)
p->flowflags |= FLOW_PKT_ESTABLISHED; p->flowflags |= FLOW_PKT_ESTABLISHED;
if (f->proto != IPPROTO_TCP) { if (f->proto != IPPROTO_TCP) {
SC_ATOMIC_SET(f->flow_state, FLOW_STATE_ESTABLISHED); FlowUpdateState(f, FLOW_STATE_ESTABLISHED);
} }
} }
@ -378,6 +378,7 @@ void FlowInitConfig(char quiet)
uint32_t i = 0; uint32_t i = 0;
for (i = 0; i < flow_config.hash_size; i++) { for (i = 0; i < flow_config.hash_size; i++) {
FBLOCK_INIT(&flow_hash[i]); FBLOCK_INIT(&flow_hash[i]);
SC_ATOMIC_INIT(flow_hash[i].next_ts);
} }
(void) SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket))); (void) SC_ATOMIC_ADD(flow_memuse, (flow_config.hash_size * sizeof(FlowBucket)));
@ -460,6 +461,7 @@ void FlowShutdown(void)
} }
FBLOCK_DESTROY(&flow_hash[u]); FBLOCK_DESTROY(&flow_hash[u]);
SC_ATOMIC_DESTROY(flow_hash[u].next_ts);
} }
SCFreeAligned(flow_hash); SCFreeAligned(flow_hash);
flow_hash = NULL; flow_hash = NULL;
@ -787,6 +789,18 @@ uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags)
return newflags; return newflags;
} }
void FlowUpdateState(Flow *f, enum FlowState s)
{
/* set the state */
SC_ATOMIC_SET(f->flow_state, s);
if (f->fb) {
/* and reset the flow buckup next_ts value so that the flow manager
* has to revisit this row */
SC_ATOMIC_SET(f->fb->next_ts, 0);
}
}
/************************************Unittests*******************************/ /************************************Unittests*******************************/
#ifdef UNITTESTS #ifdef UNITTESTS

@ -421,7 +421,7 @@ typedef struct Flow_
uint64_t tosrcbytecnt; uint64_t tosrcbytecnt;
} Flow; } Flow;
enum { enum FlowState {
FLOW_STATE_NEW = 0, FLOW_STATE_NEW = 0,
FLOW_STATE_ESTABLISHED, FLOW_STATE_ESTABLISHED,
FLOW_STATE_CLOSED, FLOW_STATE_CLOSED,
@ -465,6 +465,10 @@ int FlowGetPacketDirection(const Flow *, const Packet *);
void FlowCleanupAppLayer(Flow *); void FlowCleanupAppLayer(Flow *);
void FlowUpdateState(Flow *f, enum FlowState s);
/** ----- Inline functions ----- */
/** \brief Set the No Packet Inspection Flag without locking the flow. /** \brief Set the No Packet Inspection Flag without locking the flow.
* *
* \param f Flow to set the flag in * \param f Flow to set the flag in

@ -662,12 +662,12 @@ static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn,
case TCP_FIN_WAIT2: case TCP_FIN_WAIT2:
case TCP_CLOSING: case TCP_CLOSING:
case TCP_CLOSE_WAIT: case TCP_CLOSE_WAIT:
SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_ESTABLISHED); FlowUpdateState(p->flow, FLOW_STATE_ESTABLISHED);
break; break;
case TCP_LAST_ACK: case TCP_LAST_ACK:
case TCP_TIME_WAIT: case TCP_TIME_WAIT:
case TCP_CLOSED: case TCP_CLOSED:
SC_ATOMIC_SET(p->flow->flow_state, FLOW_STATE_CLOSED); FlowUpdateState(p->flow, FLOW_STATE_CLOSED);
break; break;
} }
} }

Loading…
Cancel
Save