flow: move flow handling into worker threads

Instead of handling the packet update during flow lookup, handle
it in the stream/detect threads. This lowers the load of the
capture thread(s) in autofp mode.

The decoders now set a flag in the packet if the packet needs a
flow lookup. Then the workers will take care of this. The decoders
also already calculate the raw flow hash value. This is so that
this value can be used in flow balancing in autofp.

Because the flow lookup/creation is now done in the worker threads,
the flow balancing can no longer use the flow. It's not yet
available. Autofp load balancing uses raw hash values instead.

In the same line, move UDP AppLayer out of the DecodeUDP module,
and also into the stream/detect threads.

Handle TCP session reuse inside the flow engine itself. If a looked up
flow matches the packet, but is a TCP stream starter, check if the
ssn needs to be reused. If that is the case handle it within the
lookup function. Simplies the locking and removes potential race
conditions.
pull/2089/head
Victor Julien 10 years ago
parent ae7aae81dc
commit 2f0e0f17db

@ -461,7 +461,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
* If the protocol is yet unknown, the proto detection code is run first.
*
* \param dp_ctx Thread app layer detect context
* \param f unlocked flow
* \param f *locked* flow
* \param p UDP packet
*
* \retval 0 ok
@ -473,8 +473,6 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
int r = 0;
FLOWLOCK_WRLOCK(f);
uint8_t flags = 0;
if (p->flowflags & FLOW_PKT_TOSERVER) {
flags |= STREAM_TOSERVER;
@ -527,7 +525,6 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow *
}
}
FLOWLOCK_UNLOCK(f);
PACKET_PROFILING_APP_STORE(tctx, p);
SCReturnInt(r);

@ -194,7 +194,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt,
{
/* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */
if (ICMPV4_DEST_UNREACH_IS_VALID(p)) {
FlowHandlePacket(tv, dtv, p);
FlowSetupPacket(p);
}
}
}

@ -362,8 +362,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p,
SCLogDebug("Unknown Type, ICMPV6_UNKNOWN_TYPE");
#endif
/* Flow is an integral part of us */
FlowHandlePacket(tv, dtv, p);
FlowSetupPacket(p);
return TM_ECODE_OK;
}

@ -1518,7 +1518,6 @@ int DecodeIPV4DefragTest03(void)
0x80, 0x00, 0xb1, 0xa3, 0x00, 0x00
};
Flow *f = NULL;
Packet *p = PacketGetFromAlloc();
if (unlikely(p == NULL))
return 0;
@ -1542,12 +1541,11 @@ int DecodeIPV4DefragTest03(void)
result = 0;
goto end;
}
if (p->flow == NULL) {
if (!(p->flags & PKT_WANTS_FLOW)) {
printf("packet flow shouldn't be NULL\n");
result = 0;
goto end;
}
f = p->flow;
PACKET_RECYCLE(p);
PacketCopyData(p, pkt1, sizeof(pkt1));
@ -1585,11 +1583,11 @@ int DecodeIPV4DefragTest03(void)
result = 0;
goto end;
}
if (tp->flow == NULL) {
if (!(tp->flags & PKT_WANTS_FLOW)) {
result = 0;
goto end;
}
if (tp->flow != f) {
if (tp->flow_hash != p->flow_hash) {
result = 0;
goto end;
}

@ -73,8 +73,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u
SCTP_GET_SRC_PORT(p), SCTP_GET_DST_PORT(p));
#endif
/* Flow is an integral part of us */
FlowHandlePacket(tv, dtv, p);
FlowSetupPacket(p);
return TM_ECODE_OK;
}

@ -214,8 +214,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
TCP_HAS_MSS(p) ? "MSS " : "");
#endif
/* Flow is an integral part of us */
FlowHandlePacket(tv, dtv, p);
FlowSetupPacket(p);
return TM_ECODE_OK;
}

@ -85,17 +85,11 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui
if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) {
/* Here we have a Teredo packet and don't need to handle app
* layer */
FlowHandlePacket(tv, dtv, p);
FlowSetupPacket(p);
return TM_ECODE_OK;
}
/* Flow is an integral part of us */
FlowHandlePacket(tv, dtv, p);
/* handle the app layer part of the UDP packet payload */
if (unlikely(p->flow != NULL)) {
AppLayerHandleUdp(tv, dtv->app_tctx, p, p->flow);
}
FlowSetupPacket(p);
return TM_ECODE_OK;
}

@ -399,6 +399,10 @@ typedef struct Packet_
struct Flow_ *flow;
/* raw hash value for looking up the flow, will need to modulated to the
* hash size still */
uint32_t flow_hash;
struct timeval ts;
union {
@ -1049,6 +1053,10 @@ int DecoderParseDataFromFile(char *filename, DecoderFunc Decoder);
#define PKT_IS_INVALID (1<<20)
#define PKT_PROFILE (1<<21)
/** indication by decoder that it feels the packet should be handled by
* flow engine: Packet::flow_hash will be set */
#define PKT_WANTS_FLOW (1<<22)
/** \brief return 1 if the packet is a pseudo packet */
#define PKT_IS_PSEUDOPKT(p) ((p)->flags & PKT_PSEUDO_STREAM_END)

@ -53,12 +53,6 @@ SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx);
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv);
static int handle_tcp_reuse = 1;
void FlowDisableTcpReuseHandling(void)
{
handle_tcp_reuse = 0;
}
/** \brief compare two raw ipv6 addrs
*
@ -297,6 +291,12 @@ static inline int FlowCompareICMPv4(Flow *f, const Packet *p)
return 0;
}
void FlowSetupPacket(Packet *p)
{
p->flags |= PKT_WANTS_FLOW;
p->flow_hash = FlowGetHash(p);
}
int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn);
static inline int FlowCompare(Flow *f, const Packet *p)
@ -312,17 +312,6 @@ static inline int FlowCompare(Flow *f, const Packet *p)
if (f->flags & FLOW_TCP_REUSED)
return 0;
if (handle_tcp_reuse == 1) {
/* lets see if we need to consider the existing session reuse */
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
/* okay, we need to setup a new flow for this packet.
* Flag the flow that it's been replaced by a new one */
f->flags |= FLOW_TCP_REUSED;
SCLogDebug("flow obsolete: TCP reuse will use a new flow "
"starting with packet %"PRIu64, p->pcap_cnt);
return 0;
}
}
return 1;
} else {
return CMP_FLOW(f, p);
@ -418,120 +407,41 @@ static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p)
return f;
}
Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest)
static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv,
FlowBucket *fb, Flow *old_f,
const uint32_t hash, const Packet *p)
{
Flow *f = NULL;
/* get the key to our bucket */
uint32_t hash = FlowGetHash(p);
/* get our hash bucket and lock it */
FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
FBLOCK_LOCK(fb);
SCLogDebug("fb %p fb->head %p", fb, fb->head);
f = FlowGetNew(NULL, NULL, p);
if (f != NULL) {
/* flow is locked */
if (fb->head == NULL) {
fb->head = f;
fb->tail = f;
} else {
f->hprev = fb->tail;
fb->tail->hnext = f;
fb->tail = f;
}
/* got one, now lock, initialize and return */
FlowInit(f, p);
f->flow_hash = hash;
f->fb = fb;
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);
}
FBLOCK_UNLOCK(fb);
return f;
}
/** \brief Lookup flow based on packet
*
* Find the flow belonging to this packet. If not found, no new flow
* is set up.
*
* \param p packet to lookup the flow for
*
* \retval f flow or NULL if not found
*/
Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest)
{
Flow *f = NULL;
/* get the key to our bucket */
uint32_t hash = FlowGetHash(p);
/* get our hash bucket and lock it */
FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
FBLOCK_LOCK(fb);
SCLogDebug("fb %p fb->head %p", fb, fb->head);
/* see if the bucket already has a flow */
if (fb->head == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
/* tag flow as reused so future lookups won't find it */
old_f->flags |= FLOW_TCP_REUSED;
/* get some settings that we move over to the new flow */
FlowThreadId thread_id = old_f->thread_id;
int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
/* ok, we have a flow in the bucket. Let's find out if it is our flow */
f = fb->head;
/* see if this is the flow we are looking for */
if (FlowCompare(f, p) == 0) {
while (f) {
f = f->hnext;
/* since fb lock is still held this flow won't be found until we are done */
FLOWLOCK_UNLOCK(old_f);
/* Get a new flow. It will be either a locked flow or NULL */
Flow *f = FlowGetNew(tv, dtv, p);
if (f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
if (FlowCompare(f, p) != 0) {
/* we found our flow, lets put it on top of the
* hash list -- this rewards active flows */
if (f->hnext) {
f->hnext->hprev = f->hprev;
}
if (f->hprev) {
f->hprev->hnext = f->hnext;
}
if (f == fb->tail) {
fb->tail = f->hprev;
}
/* flow is locked */
/* put at the start of the list */
f->hnext = fb->head;
f->hprev = NULL;
fb->head->hprev = f;
fb->head = f;
/* found our flow, lock & return */
FLOWLOCK_WRLOCK(f);
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);
/* initialize and return */
FlowInit(f, p);
f->flow_hash = hash;
f->fb = fb;
FBLOCK_UNLOCK(fb);
return f;
f->thread_id = thread_id;
if (autofp_tmqh_flow_qid != -1) {
SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
}
}
}
/* lock & return */
FLOWLOCK_WRLOCK(f);
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);
FBLOCK_UNLOCK(fb);
return f;
}
@ -556,9 +466,8 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
{
Flow *f = NULL;
/* get the key to our bucket */
uint32_t hash = FlowGetHash(p);
/* get our hash bucket and lock it */
const uint32_t hash = p->flow_hash;
FlowBucket *fb = &flow_hash[hash % flow_config.hash_size];
FBLOCK_LOCK(fb);
@ -645,6 +554,14 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
/* found our flow, lock & return */
FLOWLOCK_WRLOCK(f);
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
f = TcpReuseReplace(tv, dtv, fb, f, hash, p);
if (f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
}
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);
@ -657,6 +574,14 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p
/* lock & return */
FLOWLOCK_WRLOCK(f);
if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) {
f = TcpReuseReplace(tv, dtv, fb, f, hash, p);
if (f == NULL) {
FBLOCK_UNLOCK(fb);
return NULL;
}
}
/* update the last seen timestamp of this flow */
COPY_TIMESTAMP(&p->ts,&f->lastts);
FlowReference(dest, f);

@ -226,37 +226,6 @@ static inline int FlowUpdateSeenFlag(const Packet *p)
return 1;
}
/**
*
* Remove packet from flow. This assumes this happens *before* the packet
* is added to the stream engine and other higher state.
*
* \todo we can't restore the lastts
*/
void FlowHandlePacketUpdateRemove(Flow *f, Packet *p)
{
if (p->flowflags & FLOW_PKT_TOSERVER) {
f->todstpktcnt--;
f->todstbytecnt -= GET_PKT_LEN(p);
p->flowflags &= ~(FLOW_PKT_TOSERVER|FLOW_PKT_TOSERVER_FIRST);
} else {
f->tosrcpktcnt--;
f->tosrcbytecnt -= GET_PKT_LEN(p);
p->flowflags &= ~(FLOW_PKT_TOCLIENT|FLOW_PKT_TOCLIENT_FIRST);
}
p->flowflags &= ~FLOW_PKT_ESTABLISHED;
/*set the detection bypass flags*/
if (f->flags & FLOW_NOPACKET_INSPECTION) {
SCLogDebug("unsetting FLOW_NOPACKET_INSPECTION flag on flow %p", f);
DecodeUnsetNoPacketInspectionFlag(p);
}
if (f->flags & FLOW_NOPAYLOAD_INSPECTION) {
SCLogDebug("unsetting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f);
DecodeUnsetNoPayloadInspectionFlag(p);
}
}
/** \brief Update Packet and Flow
*
* Updates packet and flow based on the new packet.
@ -330,10 +299,6 @@ void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p)
if (f == NULL)
return;
FlowHandlePacketUpdate(f, p);
FLOWLOCK_UNLOCK(f);
/* set the flow in the packet */
p->flags |= PKT_HAS_FLOW;
return;

@ -432,6 +432,11 @@ typedef struct FlowProto_ {
void (*Freefunc)(void *);
} FlowProto;
/** \brief prepare packet for a life with flow
* Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup
* and calc the hash value to be used in the lookup and autofp flow
* balancing. */
void FlowSetupPacket(Packet *p);
void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *);
void FlowInitConfig (char);
void FlowPrintQueueInfo (void);
@ -577,11 +582,7 @@ AppProto FlowGetAppProtocol(const Flow *f);
void *FlowGetAppState(const Flow *f);
uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags);
void FlowHandlePacketUpdateRemove(Flow *f, Packet *p);
void FlowHandlePacketUpdate(Flow *f, Packet *p);
Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest);
Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest);
#endif /* __FLOW_H__ */

@ -132,7 +132,6 @@ int RunModeErfFileAutoFp(void)
int thread;
RunModeInitialize();
RunmodeSetFlowStreamAsync();
char *file = NULL;
if (ConfGet("erf-file.file", &file) == 0) {

@ -165,7 +165,6 @@ int RunModeFilePcapAutoFp(void)
int thread;
RunModeInitialize();
RunmodeSetFlowStreamAsync();
char *file = NULL;
if (ConfGet("pcap-file.file", &file) == 0) {

@ -4863,195 +4863,74 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn
return 0;
}
/** \brief Handle TCP reuse of tuple
*
* Logic:
* 1. see if packet could trigger a new session
* 2. see if the flow/ssn is in a state where we want to support the reuse
* 3. disconnect packet from the old flow
* -> at this point new packets can still find the old flow
* -> as the flow's reference count != 0, it can't disappear
* 4. setup a new flow unconditionally
* 5. attach packet to new flow
* 6. tag old flow as FLOW_TCP_REUSED
* -> NEW packets won't find it
* -> existing packets in our queues may still reference it
* 7. dereference the old flow (reference cnt *may* now be 0,
* if no other packets reference it)
*
* The packets that still hold a reference to the old flow are updated
* by HandleFlowReuseApplyToPacket()
*/
static void TcpSessionReuseHandle(Packet *p) {
if (likely(TcpSessionPacketIsStreamStarter(p) == 0))
return;
int reuse = 0;
FLOWLOCK_RDLOCK(p->flow);
reuse = TcpSessionReuseDoneEnough(p, p->flow, p->flow->protoctx);
if (!reuse) {
SCLogDebug("steam starter packet %"PRIu64", but state not "
"ready to be reused", p->pcap_cnt);
FLOWLOCK_UNLOCK(p->flow);
return;
}
SCLogDebug("steam starter packet %"PRIu64", and state "
"ready to be reused", p->pcap_cnt);
/* ok, this packet needs a new flow */
/* first, get a reference to the old flow */
Flow *old_f = NULL;
FlowReference(&old_f, p->flow);
/* get some settings that we move over to the new flow */
FlowThreadId thread_id = old_f->thread_id;
int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid);
/* disconnect the packet from the old flow */
FlowHandlePacketUpdateRemove(p->flow, p);
FLOWLOCK_UNLOCK(p->flow);
FlowDeReference(&p->flow); // < can't disappear while usecnt >0
/* Can't tag flow as reused yet, would be a race condition:
* new packets will not get old flow because of FLOW_TCP_REUSED,
* so new flow may be created. This new flow could be handled in
* a different thread. */
void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p)
{
FlowHandlePacketUpdate(p->flow, p);
/* Get a flow. It will be either a locked flow or NULL */
Flow *new_f = FlowGetFlowFromHashByPacket(p, &p->flow);
if (new_f == NULL) {
FlowDeReference(&old_f); // < can't disappear while usecnt >0
return;
/* handle the app layer part of the UDP packet payload */
if (p->proto == IPPROTO_UDP) {
AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow);
}
/* update flow and packet */
FlowHandlePacketUpdate(new_f, p);
BUG_ON(new_f != p->flow);
/* copy flow balancing settings */
new_f->thread_id = thread_id;
SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid);
FLOWLOCK_UNLOCK(new_f);
/* tag original flow that it's now unused */
FLOWLOCK_WRLOCK(old_f);
SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt);
old_f->flags |= FLOW_TCP_REUSED;
FLOWLOCK_UNLOCK(old_f);
FlowDeReference(&old_f); // < can't disappear while usecnt >0
SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt);
}
/** \brief Handle packets that reference the wrong flow because of TCP reuse
*
* In the case of TCP reuse we can have many packets that were assigned
* a flow by the capture/decode threads before the stream engine decided
* that a new flow was needed for these packets.
* When HandleFlowReuse creates a new flow, the packets already processed
* by the flow engine will still reference the old flow.
*
* This function detects this case and replaces the flow for those packets.
* It's a fairly expensive operation, but it should be rare as it's only
* done for packets that were already in the engine when the TCP reuse
* case was handled. New packets are assigned the correct flow by the
* flow engine.
*/
static void TcpSessionReuseHandleApplyToPacket(Packet *p)
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
int need_flow_replace = 0;
StreamTcpThread *stt = (StreamTcpThread *)data;
TmEcode ret = TM_ECODE_OK;
SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt);
if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) {
FLOWLOCK_WRLOCK(p->flow);
if (p->flow->flags & FLOW_TCP_REUSED) {
SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt);
need_flow_replace = 1;
}
if (likely(need_flow_replace == 0)) {
/* Work around a race condition: if HandleFlowReuse has inserted a new flow,
* it will not have seen both sides of the session yet. The packet we have here
* may be the first that got the flow directly from the hash right after the
* flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */
if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) {
p->flowflags |= FLOW_PKT_ESTABLISHED;
SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
} else {
SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt);
}
SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow);
AppLayerProfilingReset(stt->ra_ctx->app_tctx);
(void)StreamTcpPacket(tv, p, stt, pq);
p->flags |= PKT_IGNORE_CHECKSUM;
stt->pkts++;
FLOWLOCK_UNLOCK(p->flow);
return;
return TM_ECODE_OK;
}
/* disconnect packet from old flow */
FlowHandlePacketUpdateRemove(p->flow, p);
FLOWLOCK_UNLOCK(p->flow);
FlowDeReference(&p->flow); // < can't disappear while usecnt >0
/* find the new flow that does belong to this packet */
Flow *new_f = FlowLookupFlowFromHash(p, &p->flow);
if (new_f == NULL) {
// TODO reset packet flag wrt flow: direction, HAS_FLOW etc
p->flags &= ~PKT_HAS_FLOW;
return;
if (!(p->flags & PKT_WANTS_FLOW)) {
return TM_ECODE_OK;
}
FlowHandlePacketUpdate(new_f, p);
BUG_ON(new_f != p->flow);
FLOWLOCK_UNLOCK(new_f);
SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow);
}
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
StreamTcpThread *stt = (StreamTcpThread *)data;
TmEcode ret = TM_ECODE_OK;
FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars
if (likely(p->flow != NULL)) {
FlowUpdate(tv, stt, p);
}
if (!(PKT_IS_TCP(p)))
return TM_ECODE_OK;
if (!(PKT_IS_TCP(p))) {
goto unlock;
}
if (p->flow == NULL) {
StatsIncr(tv, stt->counter_tcp_no_flow);
return TM_ECODE_OK;
goto unlock;
}
/* only TCP packets with a flow from here */
if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) {
if (StreamTcpValidateChecksum(p) == 0) {
StatsIncr(tv, stt->counter_tcp_invalid_checksum);
return TM_ECODE_OK;
goto unlock;
}
} else {
p->flags |= PKT_IGNORE_CHECKSUM;
}
if (stt->runmode_flow_stream_async) {
/* "autofp" handling of TCP session/flow reuse */
if (!(p->flags & PKT_PSEUDO_STREAM_END)) {
/* apply previous reuses to this packet */
TcpSessionReuseHandleApplyToPacket(p);
if (p->flow == NULL)
return ret;
if (!(p->flowflags & FLOW_PKT_TOSERVER_FIRST)) {
/* after that, check for 'new' reuse */
TcpSessionReuseHandle(p);
if (p->flow == NULL)
return ret;
}
}
}
AppLayerProfilingReset(stt->ra_ctx->app_tctx);
FLOWLOCK_WRLOCK(p->flow);
ret = StreamTcpPacket(tv, p, stt, pq);
FLOWLOCK_UNLOCK(p->flow);
//if (ret)
// return TM_ECODE_FAILED;
stt->pkts++;
unlock:
if (p->flow) {
FLOWLOCK_UNLOCK(p->flow);
}
return ret;
}
@ -5114,11 +4993,6 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
if (stt->ssn_pool_id < 0 || ssn_pool == NULL)
SCReturnInt(TM_ECODE_FAILED);
/* see if need to enable the TCP reuse handling in the stream engine */
stt->runmode_flow_stream_async = RunmodeGetFlowStreamAsync();
SCLogDebug("Flow and Stream engine run %s",
stt->runmode_flow_stream_async ? "asynchronous" : "synchronous");
SCReturnInt(TM_ECODE_OK);
}

@ -76,10 +76,6 @@ typedef struct TcpStreamCnf_ {
typedef struct StreamTcpThread_ {
int ssn_pool_id;
/** if set to true, we activate the TCP tuple reuse code in the
* stream engine. */
int runmode_flow_stream_async;
uint64_t pkts;
/** queue for pseudo packet(s) that were created in the stream

@ -61,7 +61,9 @@ void TmqhFlowRegister(void)
if (strcasecmp(scheduler, "round-robin") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin;
} else if (strcasecmp(scheduler, "active-packets") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
//tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
SCLogNotice("FIXME: using flow hash instead of active packets");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "hash") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "ippair") == 0) {
@ -73,7 +75,8 @@ void TmqhFlowRegister(void)
exit(EXIT_FAILURE);
}
} else {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
//tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets;
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
}
return;
@ -330,7 +333,7 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p)
* \param tv thread vars.
* \param p packet.
*/
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
@ -371,6 +374,31 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
return;
}
void TmqhOutputFlowHash(ThreadVars *tv, Packet *p)
{
int16_t qid = 0;
TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx;
if (p->flags & PKT_WANTS_FLOW) {
uint32_t hash = p->flow_hash;
qid = hash % ctx->size;
} else {
qid = ctx->last++;
if (ctx->last == ctx->size)
ctx->last = 0;
}
(void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1);
PacketQueue *q = ctx->queues[qid].q;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
return;
}
/**
* \brief select the queue to output based on IP address pair.
*

@ -51,21 +51,6 @@
#include "flow-hash.h"
/** set to true if flow engine and stream engine run in different
* threads. */
static int runmode_flow_stream_async = 0;
void RunmodeSetFlowStreamAsync(void)
{
runmode_flow_stream_async = 1;
FlowDisableTcpReuseHandling();
}
int RunmodeGetFlowStreamAsync(void)
{
return runmode_flow_stream_async;
}
/** \brief create a queue string for autofp to pass to
* the flow queue handler.
*
@ -121,8 +106,6 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser,
if (thread_max < 1)
thread_max = 1;
RunmodeSetFlowStreamAsync();
queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
if (queues == NULL) {
SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed");
@ -497,8 +480,6 @@ int RunModeSetIPSAutoFp(ConfigIPSParserFunc ConfigParser,
if (thread_max < 1)
thread_max = 1;
RunmodeSetFlowStreamAsync();
queues = RunmodeAutoFpCreatePickupQueuesString(thread_max);
if (queues == NULL) {
SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed");

@ -854,8 +854,10 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir)
p->dst.addr_data32[0] = i;
}
FlowHandlePacket(NULL, NULL, p);
if (p->flow != NULL)
if (p->flow != NULL) {
SC_ATOMIC_RESET(p->flow->use_cnt);
FLOWLOCK_UNLOCK(p->flow);
}
/* Now the queues shoul be updated */
UTHFreePacket(p);

Loading…
Cancel
Save