diff --git a/src/app-layer.c b/src/app-layer.c index b0955769c9..db4fa3abed 100644 --- a/src/app-layer.c +++ b/src/app-layer.c @@ -461,7 +461,7 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, * If the protocol is yet unknown, the proto detection code is run first. * * \param dp_ctx Thread app layer detect context - * \param f unlocked flow + * \param f *locked* flow * \param p UDP packet * * \retval 0 ok @@ -473,8 +473,6 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow * int r = 0; - FLOWLOCK_WRLOCK(f); - uint8_t flags = 0; if (p->flowflags & FLOW_PKT_TOSERVER) { flags |= STREAM_TOSERVER; @@ -527,7 +525,6 @@ int AppLayerHandleUdp(ThreadVars *tv, AppLayerThreadCtx *tctx, Packet *p, Flow * } } - FLOWLOCK_UNLOCK(f); PACKET_PROFILING_APP_STORE(tctx, p); SCReturnInt(r); diff --git a/src/decode-icmpv4.c b/src/decode-icmpv4.c index 3e08daa197..1cfaf78078 100644 --- a/src/decode-icmpv4.c +++ b/src/decode-icmpv4.c @@ -194,7 +194,7 @@ int DecodeICMPV4(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, { /* ICMP ICMP_DEST_UNREACH influence TCP/UDP flows */ if (ICMPV4_DEST_UNREACH_IS_VALID(p)) { - FlowHandlePacket(tv, dtv, p); + FlowSetupPacket(p); } } } diff --git a/src/decode-icmpv6.c b/src/decode-icmpv6.c index 238a6cf6fa..26a48b7439 100644 --- a/src/decode-icmpv6.c +++ b/src/decode-icmpv6.c @@ -362,8 +362,7 @@ int DecodeICMPV6(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, SCLogDebug("Unknown Type, ICMPV6_UNKNOWN_TYPE"); #endif - /* Flow is an integral part of us */ - FlowHandlePacket(tv, dtv, p); + FlowSetupPacket(p); return TM_ECODE_OK; } diff --git a/src/decode-ipv4.c b/src/decode-ipv4.c index a41c35bfd7..c21d224600 100644 --- a/src/decode-ipv4.c +++ b/src/decode-ipv4.c @@ -1518,7 +1518,6 @@ int DecodeIPV4DefragTest03(void) 0x80, 0x00, 0xb1, 0xa3, 0x00, 0x00 }; - Flow *f = NULL; Packet *p = PacketGetFromAlloc(); if (unlikely(p == NULL)) return 0; @@ -1542,12 +1541,11 @@ int DecodeIPV4DefragTest03(void) result = 0; goto end; } - if (p->flow == NULL) { + if (!(p->flags & PKT_WANTS_FLOW)) { printf("packet flow shouldn't be NULL\n"); result = 0; goto end; } - f = p->flow; PACKET_RECYCLE(p); PacketCopyData(p, pkt1, sizeof(pkt1)); @@ -1585,11 +1583,11 @@ int DecodeIPV4DefragTest03(void) result = 0; goto end; } - if (tp->flow == NULL) { + if (!(tp->flags & PKT_WANTS_FLOW)) { result = 0; goto end; } - if (tp->flow != f) { + if (tp->flow_hash != p->flow_hash) { result = 0; goto end; } diff --git a/src/decode-sctp.c b/src/decode-sctp.c index 2d493c6690..f787933ebd 100644 --- a/src/decode-sctp.c +++ b/src/decode-sctp.c @@ -73,8 +73,7 @@ int DecodeSCTP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, u SCTP_GET_SRC_PORT(p), SCTP_GET_DST_PORT(p)); #endif - /* Flow is an integral part of us */ - FlowHandlePacket(tv, dtv, p); + FlowSetupPacket(p); return TM_ECODE_OK; } diff --git a/src/decode-tcp.c b/src/decode-tcp.c index b3c73b5d89..51dbfaaa26 100644 --- a/src/decode-tcp.c +++ b/src/decode-tcp.c @@ -214,8 +214,7 @@ int DecodeTCP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui TCP_HAS_MSS(p) ? "MSS " : ""); #endif - /* Flow is an integral part of us */ - FlowHandlePacket(tv, dtv, p); + FlowSetupPacket(p); return TM_ECODE_OK; } diff --git a/src/decode-udp.c b/src/decode-udp.c index 24f9207b8c..d45d57f254 100644 --- a/src/decode-udp.c +++ b/src/decode-udp.c @@ -85,17 +85,11 @@ int DecodeUDP(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p, uint8_t *pkt, ui if (unlikely(DecodeTeredo(tv, dtv, p, p->payload, p->payload_len, pq) == TM_ECODE_OK)) { /* Here we have a Teredo packet and don't need to handle app * layer */ - FlowHandlePacket(tv, dtv, p); + FlowSetupPacket(p); return TM_ECODE_OK; } - /* Flow is an integral part of us */ - FlowHandlePacket(tv, dtv, p); - - /* handle the app layer part of the UDP packet payload */ - if (unlikely(p->flow != NULL)) { - AppLayerHandleUdp(tv, dtv->app_tctx, p, p->flow); - } + FlowSetupPacket(p); return TM_ECODE_OK; } diff --git a/src/decode.h b/src/decode.h index 4c83d08300..a8641a23a0 100644 --- a/src/decode.h +++ b/src/decode.h @@ -399,6 +399,10 @@ typedef struct Packet_ struct Flow_ *flow; + /* raw hash value for looking up the flow, will need to modulated to the + * hash size still */ + uint32_t flow_hash; + struct timeval ts; union { @@ -1049,6 +1053,10 @@ int DecoderParseDataFromFile(char *filename, DecoderFunc Decoder); #define PKT_IS_INVALID (1<<20) #define PKT_PROFILE (1<<21) +/** indication by decoder that it feels the packet should be handled by + * flow engine: Packet::flow_hash will be set */ +#define PKT_WANTS_FLOW (1<<22) + /** \brief return 1 if the packet is a pseudo packet */ #define PKT_IS_PSEUDOPKT(p) ((p)->flags & PKT_PSEUDO_STREAM_END) diff --git a/src/flow-hash.c b/src/flow-hash.c index 20f1d36c61..c7418aa294 100644 --- a/src/flow-hash.c +++ b/src/flow-hash.c @@ -53,12 +53,6 @@ SC_ATOMIC_EXTERN(unsigned int, flow_prune_idx); SC_ATOMIC_EXTERN(unsigned int, flow_flags); static Flow *FlowGetUsedFlow(ThreadVars *tv, DecodeThreadVars *dtv); -static int handle_tcp_reuse = 1; - -void FlowDisableTcpReuseHandling(void) -{ - handle_tcp_reuse = 0; -} /** \brief compare two raw ipv6 addrs * @@ -297,6 +291,12 @@ static inline int FlowCompareICMPv4(Flow *f, const Packet *p) return 0; } +void FlowSetupPacket(Packet *p) +{ + p->flags |= PKT_WANTS_FLOW; + p->flow_hash = FlowGetHash(p); +} + int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, void *tcp_ssn); static inline int FlowCompare(Flow *f, const Packet *p) @@ -312,17 +312,6 @@ static inline int FlowCompare(Flow *f, const Packet *p) if (f->flags & FLOW_TCP_REUSED) return 0; - if (handle_tcp_reuse == 1) { - /* lets see if we need to consider the existing session reuse */ - if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) { - /* okay, we need to setup a new flow for this packet. - * Flag the flow that it's been replaced by a new one */ - f->flags |= FLOW_TCP_REUSED; - SCLogDebug("flow obsolete: TCP reuse will use a new flow " - "starting with packet %"PRIu64, p->pcap_cnt); - return 0; - } - } return 1; } else { return CMP_FLOW(f, p); @@ -418,120 +407,41 @@ static Flow *FlowGetNew(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p) return f; } -Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest) +static Flow *TcpReuseReplace(ThreadVars *tv, DecodeThreadVars *dtv, + FlowBucket *fb, Flow *old_f, + const uint32_t hash, const Packet *p) { - Flow *f = NULL; - - /* get the key to our bucket */ - uint32_t hash = FlowGetHash(p); - /* get our hash bucket and lock it */ - FlowBucket *fb = &flow_hash[hash % flow_config.hash_size]; - FBLOCK_LOCK(fb); + /* tag flow as reused so future lookups won't find it */ + old_f->flags |= FLOW_TCP_REUSED; + /* get some settings that we move over to the new flow */ + FlowThreadId thread_id = old_f->thread_id; + int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid); - SCLogDebug("fb %p fb->head %p", fb, fb->head); - - f = FlowGetNew(NULL, NULL, p); - if (f != NULL) { - /* flow is locked */ - if (fb->head == NULL) { - fb->head = f; - fb->tail = f; - } else { - f->hprev = fb->tail; - fb->tail->hnext = f; - fb->tail = f; - } - - /* got one, now lock, initialize and return */ - FlowInit(f, p); - f->flow_hash = hash; - f->fb = fb; - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - FlowReference(dest, f); + /* since fb lock is still held this flow won't be found until we are done */ + FLOWLOCK_UNLOCK(old_f); - } - FBLOCK_UNLOCK(fb); - return f; -} - -/** \brief Lookup flow based on packet - * - * Find the flow belonging to this packet. If not found, no new flow - * is set up. - * - * \param p packet to lookup the flow for - * - * \retval f flow or NULL if not found - */ -Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest) -{ - Flow *f = NULL; - - /* get the key to our bucket */ - uint32_t hash = FlowGetHash(p); - /* get our hash bucket and lock it */ - FlowBucket *fb = &flow_hash[hash % flow_config.hash_size]; - FBLOCK_LOCK(fb); - - SCLogDebug("fb %p fb->head %p", fb, fb->head); - - /* see if the bucket already has a flow */ - if (fb->head == NULL) { - FBLOCK_UNLOCK(fb); + /* Get a new flow. It will be either a locked flow or NULL */ + Flow *f = FlowGetNew(tv, dtv, p); + if (f == NULL) { return NULL; } - /* ok, we have a flow in the bucket. Let's find out if it is our flow */ - f = fb->head; - - /* see if this is the flow we are looking for */ - if (FlowCompare(f, p) == 0) { - while (f) { - f = f->hnext; - - if (f == NULL) { - FBLOCK_UNLOCK(fb); - return NULL; - } + /* flow is locked */ - if (FlowCompare(f, p) != 0) { - /* we found our flow, lets put it on top of the - * hash list -- this rewards active flows */ - if (f->hnext) { - f->hnext->hprev = f->hprev; - } - if (f->hprev) { - f->hprev->hnext = f->hnext; - } - if (f == fb->tail) { - fb->tail = f->hprev; - } + /* put at the start of the list */ + f->hnext = fb->head; + fb->head->hprev = f; + fb->head = f; - f->hnext = fb->head; - f->hprev = NULL; - fb->head->hprev = f; - fb->head = f; + /* initialize and return */ + FlowInit(f, p); + f->flow_hash = hash; + f->fb = fb; - /* found our flow, lock & return */ - FLOWLOCK_WRLOCK(f); - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - FlowReference(dest, f); - - FBLOCK_UNLOCK(fb); - return f; - } - } + f->thread_id = thread_id; + if (autofp_tmqh_flow_qid != -1) { + SC_ATOMIC_SET(f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid); } - - /* lock & return */ - FLOWLOCK_WRLOCK(f); - /* update the last seen timestamp of this flow */ - COPY_TIMESTAMP(&p->ts,&f->lastts); - FlowReference(dest, f); - - FBLOCK_UNLOCK(fb); return f; } @@ -556,10 +466,9 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p { Flow *f = NULL; - /* get the key to our bucket */ - uint32_t hash = FlowGetHash(p); /* get our hash bucket and lock it */ - FlowBucket *fb = &flow_hash[hash % flow_config.hash_size]; + const uint32_t hash = p->flow_hash; + FlowBucket *fb = &flow_hash[hash % flow_config.hash_size]; FBLOCK_LOCK(fb); SCLogDebug("fb %p fb->head %p", fb, fb->head); @@ -645,6 +554,14 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p /* found our flow, lock & return */ FLOWLOCK_WRLOCK(f); + if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) { + f = TcpReuseReplace(tv, dtv, fb, f, hash, p); + if (f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + } + /* update the last seen timestamp of this flow */ COPY_TIMESTAMP(&p->ts,&f->lastts); FlowReference(dest, f); @@ -657,6 +574,14 @@ Flow *FlowGetFlowFromHash(ThreadVars *tv, DecodeThreadVars *dtv, const Packet *p /* lock & return */ FLOWLOCK_WRLOCK(f); + if (unlikely(TcpSessionPacketSsnReuse(p, f, f->protoctx) == 1)) { + f = TcpReuseReplace(tv, dtv, fb, f, hash, p); + if (f == NULL) { + FBLOCK_UNLOCK(fb); + return NULL; + } + } + /* update the last seen timestamp of this flow */ COPY_TIMESTAMP(&p->ts,&f->lastts); FlowReference(dest, f); diff --git a/src/flow.c b/src/flow.c index 9c3973e654..9c6b1653d6 100644 --- a/src/flow.c +++ b/src/flow.c @@ -226,37 +226,6 @@ static inline int FlowUpdateSeenFlag(const Packet *p) return 1; } -/** - * - * Remove packet from flow. This assumes this happens *before* the packet - * is added to the stream engine and other higher state. - * - * \todo we can't restore the lastts - */ -void FlowHandlePacketUpdateRemove(Flow *f, Packet *p) -{ - if (p->flowflags & FLOW_PKT_TOSERVER) { - f->todstpktcnt--; - f->todstbytecnt -= GET_PKT_LEN(p); - p->flowflags &= ~(FLOW_PKT_TOSERVER|FLOW_PKT_TOSERVER_FIRST); - } else { - f->tosrcpktcnt--; - f->tosrcbytecnt -= GET_PKT_LEN(p); - p->flowflags &= ~(FLOW_PKT_TOCLIENT|FLOW_PKT_TOCLIENT_FIRST); - } - p->flowflags &= ~FLOW_PKT_ESTABLISHED; - - /*set the detection bypass flags*/ - if (f->flags & FLOW_NOPACKET_INSPECTION) { - SCLogDebug("unsetting FLOW_NOPACKET_INSPECTION flag on flow %p", f); - DecodeUnsetNoPacketInspectionFlag(p); - } - if (f->flags & FLOW_NOPAYLOAD_INSPECTION) { - SCLogDebug("unsetting FLOW_NOPAYLOAD_INSPECTION flag on flow %p", f); - DecodeUnsetNoPayloadInspectionFlag(p); - } -} - /** \brief Update Packet and Flow * * Updates packet and flow based on the new packet. @@ -330,10 +299,6 @@ void FlowHandlePacket(ThreadVars *tv, DecodeThreadVars *dtv, Packet *p) if (f == NULL) return; - FlowHandlePacketUpdate(f, p); - - FLOWLOCK_UNLOCK(f); - /* set the flow in the packet */ p->flags |= PKT_HAS_FLOW; return; diff --git a/src/flow.h b/src/flow.h index 2085f32242..f0fcc226ab 100644 --- a/src/flow.h +++ b/src/flow.h @@ -432,6 +432,11 @@ typedef struct FlowProto_ { void (*Freefunc)(void *); } FlowProto; +/** \brief prepare packet for a life with flow + * Set PKT_WANTS_FLOW flag to incidate workers should do a flow lookup + * and calc the hash value to be used in the lookup and autofp flow + * balancing. */ +void FlowSetupPacket(Packet *p); void FlowHandlePacket (ThreadVars *, DecodeThreadVars *, Packet *); void FlowInitConfig (char); void FlowPrintQueueInfo (void); @@ -577,11 +582,7 @@ AppProto FlowGetAppProtocol(const Flow *f); void *FlowGetAppState(const Flow *f); uint8_t FlowGetDisruptionFlags(const Flow *f, uint8_t flags); -void FlowHandlePacketUpdateRemove(Flow *f, Packet *p); void FlowHandlePacketUpdate(Flow *f, Packet *p); -Flow *FlowGetFlowFromHashByPacket(const Packet *p, Flow **dest); -Flow *FlowLookupFlowFromHash(const Packet *p, Flow **dest); - #endif /* __FLOW_H__ */ diff --git a/src/runmode-erf-file.c b/src/runmode-erf-file.c index f449a953a2..e2ffde4b4f 100644 --- a/src/runmode-erf-file.c +++ b/src/runmode-erf-file.c @@ -132,7 +132,6 @@ int RunModeErfFileAutoFp(void) int thread; RunModeInitialize(); - RunmodeSetFlowStreamAsync(); char *file = NULL; if (ConfGet("erf-file.file", &file) == 0) { diff --git a/src/runmode-pcap-file.c b/src/runmode-pcap-file.c index 29a65eeebf..cf99fac6fc 100644 --- a/src/runmode-pcap-file.c +++ b/src/runmode-pcap-file.c @@ -165,7 +165,6 @@ int RunModeFilePcapAutoFp(void) int thread; RunModeInitialize(); - RunmodeSetFlowStreamAsync(); char *file = NULL; if (ConfGet("pcap-file.file", &file) == 0) { diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 22cb446c8f..eec197f9fd 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -4863,145 +4863,14 @@ int TcpSessionPacketSsnReuse(const Packet *p, const Flow *f, const void *tcp_ssn return 0; } -/** \brief Handle TCP reuse of tuple - * - * Logic: - * 1. see if packet could trigger a new session - * 2. see if the flow/ssn is in a state where we want to support the reuse - * 3. disconnect packet from the old flow - * -> at this point new packets can still find the old flow - * -> as the flow's reference count != 0, it can't disappear - * 4. setup a new flow unconditionally - * 5. attach packet to new flow - * 6. tag old flow as FLOW_TCP_REUSED - * -> NEW packets won't find it - * -> existing packets in our queues may still reference it - * 7. dereference the old flow (reference cnt *may* now be 0, - * if no other packets reference it) - * - * The packets that still hold a reference to the old flow are updated - * by HandleFlowReuseApplyToPacket() - */ -static void TcpSessionReuseHandle(Packet *p) { - if (likely(TcpSessionPacketIsStreamStarter(p) == 0)) - return; - - int reuse = 0; - FLOWLOCK_RDLOCK(p->flow); - reuse = TcpSessionReuseDoneEnough(p, p->flow, p->flow->protoctx); - if (!reuse) { - SCLogDebug("steam starter packet %"PRIu64", but state not " - "ready to be reused", p->pcap_cnt); - FLOWLOCK_UNLOCK(p->flow); - return; - } - - SCLogDebug("steam starter packet %"PRIu64", and state " - "ready to be reused", p->pcap_cnt); - - /* ok, this packet needs a new flow */ - - /* first, get a reference to the old flow */ - Flow *old_f = NULL; - FlowReference(&old_f, p->flow); - - /* get some settings that we move over to the new flow */ - FlowThreadId thread_id = old_f->thread_id; - int16_t autofp_tmqh_flow_qid = SC_ATOMIC_GET(old_f->autofp_tmqh_flow_qid); - - /* disconnect the packet from the old flow */ - FlowHandlePacketUpdateRemove(p->flow, p); - FLOWLOCK_UNLOCK(p->flow); - FlowDeReference(&p->flow); // < can't disappear while usecnt >0 - - /* Can't tag flow as reused yet, would be a race condition: - * new packets will not get old flow because of FLOW_TCP_REUSED, - * so new flow may be created. This new flow could be handled in - * a different thread. */ - - /* Get a flow. It will be either a locked flow or NULL */ - Flow *new_f = FlowGetFlowFromHashByPacket(p, &p->flow); - if (new_f == NULL) { - FlowDeReference(&old_f); // < can't disappear while usecnt >0 - return; - } - - /* update flow and packet */ - FlowHandlePacketUpdate(new_f, p); - BUG_ON(new_f != p->flow); - - /* copy flow balancing settings */ - new_f->thread_id = thread_id; - SC_ATOMIC_SET(new_f->autofp_tmqh_flow_qid, autofp_tmqh_flow_qid); - - FLOWLOCK_UNLOCK(new_f); - - /* tag original flow that it's now unused */ - FLOWLOCK_WRLOCK(old_f); - SCLogDebug("old flow %p tagged with FLOW_TCP_REUSED by packet %"PRIu64"!", old_f, p->pcap_cnt); - old_f->flags |= FLOW_TCP_REUSED; - FLOWLOCK_UNLOCK(old_f); - FlowDeReference(&old_f); // < can't disappear while usecnt >0 - - SCLogDebug("new flow %p set up for packet %"PRIu64"!", p->flow, p->pcap_cnt); -} - -/** \brief Handle packets that reference the wrong flow because of TCP reuse - * - * In the case of TCP reuse we can have many packets that were assigned - * a flow by the capture/decode threads before the stream engine decided - * that a new flow was needed for these packets. - * When HandleFlowReuse creates a new flow, the packets already processed - * by the flow engine will still reference the old flow. - * - * This function detects this case and replaces the flow for those packets. - * It's a fairly expensive operation, but it should be rare as it's only - * done for packets that were already in the engine when the TCP reuse - * case was handled. New packets are assigned the correct flow by the - * flow engine. - */ -static void TcpSessionReuseHandleApplyToPacket(Packet *p) +void FlowUpdate(ThreadVars *tv, StreamTcpThread *stt, Packet *p) { - int need_flow_replace = 0; - - FLOWLOCK_WRLOCK(p->flow); - if (p->flow->flags & FLOW_TCP_REUSED) { - SCLogDebug("packet %"PRIu64" attached to outdated flow and ssn", p->pcap_cnt); - need_flow_replace = 1; - } - - if (likely(need_flow_replace == 0)) { - /* Work around a race condition: if HandleFlowReuse has inserted a new flow, - * it will not have seen both sides of the session yet. The packet we have here - * may be the first that got the flow directly from the hash right after the - * flow was added. In this case it won't have FLOW_PKT_ESTABLISHED flag set. */ - if ((p->flow->flags & FLOW_TO_DST_SEEN) && (p->flow->flags & FLOW_TO_SRC_SEEN)) { - p->flowflags |= FLOW_PKT_ESTABLISHED; - SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags |= FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt); - } else { - SCLogDebug("packet %"PRIu64" / flow %p: p->flowflags NOT FLOW_PKT_ESTABLISHED (%u/%u)", p->pcap_cnt, p->flow, p->flow->todstpktcnt, p->flow->tosrcpktcnt); - } - SCLogDebug("packet %"PRIu64" attached to regular flow %p and ssn", p->pcap_cnt, p->flow); - FLOWLOCK_UNLOCK(p->flow); - return; - } - - /* disconnect packet from old flow */ - FlowHandlePacketUpdateRemove(p->flow, p); - FLOWLOCK_UNLOCK(p->flow); - FlowDeReference(&p->flow); // < can't disappear while usecnt >0 + FlowHandlePacketUpdate(p->flow, p); - /* find the new flow that does belong to this packet */ - Flow *new_f = FlowLookupFlowFromHash(p, &p->flow); - if (new_f == NULL) { - // TODO reset packet flag wrt flow: direction, HAS_FLOW etc - p->flags &= ~PKT_HAS_FLOW; - return; + /* handle the app layer part of the UDP packet payload */ + if (p->proto == IPPROTO_UDP) { + AppLayerHandleUdp(tv, stt->ra_ctx->app_tctx, p, p->flow); } - FlowHandlePacketUpdate(new_f, p); - BUG_ON(new_f != p->flow); - FLOWLOCK_UNLOCK(new_f); - SCLogDebug("packet %"PRIu64" switched over to new flow %p!", p->pcap_cnt, p->flow); } TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) @@ -5009,49 +4878,59 @@ TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe StreamTcpThread *stt = (StreamTcpThread *)data; TmEcode ret = TM_ECODE_OK; - if (!(PKT_IS_TCP(p))) + SCLogDebug("p->pcap_cnt %"PRIu64, p->pcap_cnt); + + if (p->flow && p->flags & PKT_PSEUDO_STREAM_END) { + FLOWLOCK_WRLOCK(p->flow); + AppLayerProfilingReset(stt->ra_ctx->app_tctx); + (void)StreamTcpPacket(tv, p, stt, pq); + p->flags |= PKT_IGNORE_CHECKSUM; + stt->pkts++; + FLOWLOCK_UNLOCK(p->flow); return TM_ECODE_OK; + } + + if (!(p->flags & PKT_WANTS_FLOW)) { + return TM_ECODE_OK; + } + + FlowHandlePacket(tv, NULL, p); //TODO what to do about decoder thread vars + if (likely(p->flow != NULL)) { + FlowUpdate(tv, stt, p); + } + + if (!(PKT_IS_TCP(p))) { + goto unlock; + } if (p->flow == NULL) { StatsIncr(tv, stt->counter_tcp_no_flow); - return TM_ECODE_OK; + goto unlock; } + /* only TCP packets with a flow from here */ + if (stream_config.flags & STREAMTCP_INIT_FLAG_CHECKSUM_VALIDATION) { if (StreamTcpValidateChecksum(p) == 0) { StatsIncr(tv, stt->counter_tcp_invalid_checksum); - return TM_ECODE_OK; + goto unlock; } } else { p->flags |= PKT_IGNORE_CHECKSUM; } - - if (stt->runmode_flow_stream_async) { - /* "autofp" handling of TCP session/flow reuse */ - if (!(p->flags & PKT_PSEUDO_STREAM_END)) { - /* apply previous reuses to this packet */ - TcpSessionReuseHandleApplyToPacket(p); - if (p->flow == NULL) - return ret; - - if (!(p->flowflags & FLOW_PKT_TOSERVER_FIRST)) { - /* after that, check for 'new' reuse */ - TcpSessionReuseHandle(p); - if (p->flow == NULL) - return ret; - } - } - } AppLayerProfilingReset(stt->ra_ctx->app_tctx); - FLOWLOCK_WRLOCK(p->flow); ret = StreamTcpPacket(tv, p, stt, pq); - FLOWLOCK_UNLOCK(p->flow); //if (ret) // return TM_ECODE_FAILED; stt->pkts++; + + unlock: + if (p->flow) { + FLOWLOCK_UNLOCK(p->flow); + } return ret; } @@ -5114,11 +4993,6 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) if (stt->ssn_pool_id < 0 || ssn_pool == NULL) SCReturnInt(TM_ECODE_FAILED); - /* see if need to enable the TCP reuse handling in the stream engine */ - stt->runmode_flow_stream_async = RunmodeGetFlowStreamAsync(); - SCLogDebug("Flow and Stream engine run %s", - stt->runmode_flow_stream_async ? "asynchronous" : "synchronous"); - SCReturnInt(TM_ECODE_OK); } diff --git a/src/stream-tcp.h b/src/stream-tcp.h index 416cd5aa80..e23d9eab71 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -76,10 +76,6 @@ typedef struct TcpStreamCnf_ { typedef struct StreamTcpThread_ { int ssn_pool_id; - /** if set to true, we activate the TCP tuple reuse code in the - * stream engine. */ - int runmode_flow_stream_async; - uint64_t pkts; /** queue for pseudo packet(s) that were created in the stream diff --git a/src/tmqh-flow.c b/src/tmqh-flow.c index 5a6b40c9f5..881c5985ca 100644 --- a/src/tmqh-flow.c +++ b/src/tmqh-flow.c @@ -61,7 +61,9 @@ void TmqhFlowRegister(void) if (strcasecmp(scheduler, "round-robin") == 0) { tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowRoundRobin; } else if (strcasecmp(scheduler, "active-packets") == 0) { - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; + //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; + SCLogNotice("FIXME: using flow hash instead of active packets"); + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else if (strcasecmp(scheduler, "hash") == 0) { tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } else if (strcasecmp(scheduler, "ippair") == 0) { @@ -73,7 +75,8 @@ void TmqhFlowRegister(void) exit(EXIT_FAILURE); } } else { - tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; + //tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowActivePackets; + tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash; } return; @@ -330,7 +333,7 @@ void TmqhOutputFlowActivePackets(ThreadVars *tv, Packet *p) * \param tv thread vars. * \param p packet. */ -void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) +void TmqhOutputFlowHash2(ThreadVars *tv, Packet *p) { int16_t qid = 0; @@ -371,6 +374,31 @@ void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) return; } +void TmqhOutputFlowHash(ThreadVars *tv, Packet *p) +{ + int16_t qid = 0; + + TmqhFlowCtx *ctx = (TmqhFlowCtx *)tv->outctx; + + if (p->flags & PKT_WANTS_FLOW) { + uint32_t hash = p->flow_hash; + qid = hash % ctx->size; + } else { + qid = ctx->last++; + + if (ctx->last == ctx->size) + ctx->last = 0; + } + (void) SC_ATOMIC_ADD(ctx->queues[qid].total_packets, 1); + + PacketQueue *q = ctx->queues[qid].q; + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); + + return; +} /** * \brief select the queue to output based on IP address pair. * diff --git a/src/util-runmodes.c b/src/util-runmodes.c index 13724e98fe..8242c81724 100644 --- a/src/util-runmodes.c +++ b/src/util-runmodes.c @@ -51,21 +51,6 @@ #include "flow-hash.h" -/** set to true if flow engine and stream engine run in different - * threads. */ -static int runmode_flow_stream_async = 0; - -void RunmodeSetFlowStreamAsync(void) -{ - runmode_flow_stream_async = 1; - FlowDisableTcpReuseHandling(); -} - -int RunmodeGetFlowStreamAsync(void) -{ - return runmode_flow_stream_async; -} - /** \brief create a queue string for autofp to pass to * the flow queue handler. * @@ -121,8 +106,6 @@ int RunModeSetLiveCaptureAutoFp(ConfigIfaceParserFunc ConfigParser, if (thread_max < 1) thread_max = 1; - RunmodeSetFlowStreamAsync(); - queues = RunmodeAutoFpCreatePickupQueuesString(thread_max); if (queues == NULL) { SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed"); @@ -497,8 +480,6 @@ int RunModeSetIPSAutoFp(ConfigIPSParserFunc ConfigParser, if (thread_max < 1) thread_max = 1; - RunmodeSetFlowStreamAsync(); - queues = RunmodeAutoFpCreatePickupQueuesString(thread_max); if (queues == NULL) { SCLogError(SC_ERR_RUNMODE, "RunmodeAutoFpCreatePickupQueuesString failed"); diff --git a/src/util-unittest-helper.c b/src/util-unittest-helper.c index dd2c20dc9c..e58644a530 100644 --- a/src/util-unittest-helper.c +++ b/src/util-unittest-helper.c @@ -854,8 +854,10 @@ uint32_t UTHBuildPacketOfFlows(uint32_t start, uint32_t end, uint8_t dir) p->dst.addr_data32[0] = i; } FlowHandlePacket(NULL, NULL, p); - if (p->flow != NULL) + if (p->flow != NULL) { SC_ATOMIC_RESET(p->flow->use_cnt); + FLOWLOCK_UNLOCK(p->flow); + } /* Now the queues shoul be updated */ UTHFreePacket(p);