diff --git a/src/alert-fastlog.c b/src/alert-fastlog.c index 6b098baaa3..c610c9b333 100644 --- a/src/alert-fastlog.c +++ b/src/alert-fastlog.c @@ -33,9 +33,9 @@ #include "util-unittest.h" -int AlertFastlog (ThreadVars *, Packet *, void *); -int AlertFastlogIPv4(ThreadVars *, Packet *, void *); -int AlertFastlogIPv6(ThreadVars *, Packet *, void *); +int AlertFastlog (ThreadVars *, Packet *, void *, PacketQueue *); +int AlertFastlogIPv4(ThreadVars *, Packet *, void *, PacketQueue *); +int AlertFastlogIPv6(ThreadVars *, Packet *, void *, PacketQueue *); int AlertFastlogThreadInit(ThreadVars *, void **); int AlertFastlogThreadDeinit(ThreadVars *, void *); @@ -78,7 +78,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size) (u_int32_t) ts->tv_usec); } -int AlertFastlogIPv4(ThreadVars *tv, Packet *p, void *data) +int AlertFastlogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { AlertFastlogThread *aft = (AlertFastlogThread *)data; int i; @@ -103,7 +103,7 @@ int AlertFastlogIPv4(ThreadVars *tv, Packet *p, void *data) return 0; } -int AlertFastlogIPv6(ThreadVars *tv, Packet *p, void *data) +int AlertFastlogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { AlertFastlogThread *aft = (AlertFastlogThread *)data; int i; @@ -129,12 +129,12 @@ int AlertFastlogIPv6(ThreadVars *tv, Packet *p, void *data) return 0; } -int AlertFastlog (ThreadVars *tv, Packet *p, void *data) +int AlertFastlog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { if (PKT_IS_IPV4(p)) { - return AlertFastlogIPv4(tv, p, data); + return AlertFastlogIPv4(tv, p, data, pq); } else if (PKT_IS_IPV6(p)) { - return AlertFastlogIPv6(tv, p, data); + return AlertFastlogIPv6(tv, p, data, pq); } return 0; diff --git a/src/alert-unified-alert.c b/src/alert-unified-alert.c index 8d15460fbc..ee90c51d19 100644 --- a/src/alert-unified-alert.c +++ b/src/alert-unified-alert.c @@ -33,7 +33,7 @@ #include "util-unittest.h" -int AlertUnifiedAlert (ThreadVars *, Packet *, void *); +int AlertUnifiedAlert (ThreadVars *, Packet *, void *, PacketQueue *); int AlertUnifiedAlertThreadInit(ThreadVars *, void **); int AlertUnifiedAlertThreadDeinit(ThreadVars *, void *); @@ -145,7 +145,7 @@ int AlertUnifiedAlertRotateFile(ThreadVars *t, AlertUnifiedAlertThread *aun) { return 0; } -int AlertUnifiedAlert (ThreadVars *tv, Packet *p, void *data) +int AlertUnifiedAlert (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { AlertUnifiedAlertThread *aun = (AlertUnifiedAlertThread *)data; AlertUnifiedAlertPacketHeader hdr; diff --git a/src/alert-unified-log.c b/src/alert-unified-log.c index 8d812e10ab..bd908193cc 100644 --- a/src/alert-unified-log.c +++ b/src/alert-unified-log.c @@ -33,7 +33,7 @@ #include "util-unittest.h" -int AlertUnifiedLog (ThreadVars *, Packet *, void *); +int AlertUnifiedLog (ThreadVars *, Packet *, void *, PacketQueue *); int AlertUnifiedLogThreadInit(ThreadVars *, void **); int AlertUnifiedLogThreadDeinit(ThreadVars *, void *); @@ -147,7 +147,7 @@ int AlertUnifiedLogRotateFile(ThreadVars *t, AlertUnifiedLogThread *aun) { return 0; } -int AlertUnifiedLog (ThreadVars *tv, Packet *p, void *data) +int AlertUnifiedLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { AlertUnifiedLogThread *aun = (AlertUnifiedLogThread *)data; AlertUnifiedLogPacketHeader hdr; diff --git a/src/decode-ipv4.c b/src/decode-ipv4.c index d5e7442808..52f513c9a0 100644 --- a/src/decode-ipv4.c +++ b/src/decode-ipv4.c @@ -49,7 +49,7 @@ static int DecodeIPV4Packet(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t l return 0; } -void DecodeIPV4(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len) +void DecodeIPV4(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len, PacketQueue *pq) { int ret; @@ -96,24 +96,23 @@ void DecodeIPV4(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len) break; case IPPROTO_IPV6: { -// #if 0 - printf("DecodeIPV4: next layer is IPV6\n"); - printf("DecodeIPV4: we are p %p\n", p); - - /* spawn off tunnel packet */ - Packet *tp = TunnelPktSetup(t, p, pkt + IPV4_GET_HLEN(p), len - IPV4_GET_HLEN(p), IPV4_GET_IPPROTO(p)); - printf("DecodeIPV4: tunnel is tp %p\n", tp); - - /* send that to the Tunnel decoder */ - DecodeTunnel(t, tp, tp->pkt, tp->pktlen); - printf("DecodeIPV4: DecodeTunnel done, outputing\n"); - t->tmqh_out(t,tp); - - /* the current packet is now a tunnel packet */ - SET_TUNNEL_PKT(p); - printf("DecodeIPV4: packet is now a tunnel (root) packet: %p\n", p); + if (pq != NULL) { + //printf("DecodeIPV4: next layer is IPV6\n"); + //printf("DecodeIPV4: we are p %p\n", p); + + /* spawn off tunnel packet */ + Packet *tp = TunnelPktSetup(t, p, pkt + IPV4_GET_HLEN(p), len - IPV4_GET_HLEN(p), IPV4_GET_IPPROTO(p)); + //printf("DecodeIPV4: tunnel is tp %p\n", tp); + + /* send that to the Tunnel decoder */ + DecodeTunnel(t, tp, tp->pkt, tp->pktlen, pq); + /* add the tp to the packet queue. */ + PacketEnqueue(pq,tp); + + /* the current packet is now a tunnel packet */ + SET_TUNNEL_PKT(p); + } break; -// #endif } } diff --git a/src/decode-ipv6.c b/src/decode-ipv6.c index f33a4e44db..2ea1fb5486 100644 --- a/src/decode-ipv6.c +++ b/src/decode-ipv6.c @@ -364,7 +364,17 @@ void DecodeIPV6(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len) if (ret < 0) return; - /* now process the L4 Layer */ +#ifdef DEBUG + /* debug print */ + char s[46], d[46]; + inet_ntop(AF_INET6, (const void *)GET_IPV6_SRC_ADDR(p), s, sizeof(s)); + inet_ntop(AF_INET6, (const void *)GET_IPV6_DST_ADDR(p), d, sizeof(d)); + printf("IPV6 %s->%s - CLASS: %u FLOW: %u NH: %u PLEN: %u HLIM: %u\n", s,d, + IPV6_GET_CLASS(p), IPV6_GET_FLOW(p), IPV6_GET_NH(p), IPV6_GET_PLEN(p), + IPV6_GET_HLIM(p)); +#endif /* DEBUG */ + + /* now process the Ext headers and/or the L4 Layer */ switch(IPV6_GET_NH(p)) { case IPPROTO_TCP: return(DecodeTCP(t, p, pkt + IPV6_HEADER_LEN, len - IPV6_HEADER_LEN)); @@ -387,14 +397,6 @@ void DecodeIPV6(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len) } #ifdef DEBUG - /* debug print */ - char s[46], d[46]; - inet_ntop(AF_INET6, (const void *)GET_IPV6_SRC_ADDR(p), s, sizeof(s)); - inet_ntop(AF_INET6, (const void *)GET_IPV6_DST_ADDR(p), d, sizeof(d)); - printf("IPV6 %s->%s - CLASS: %u FLOW: %u NH: %u PLEN: %u HLIM: %u\n", s,d, - IPV6_GET_CLASS(p), IPV6_GET_FLOW(p), IPV6_GET_NH(p), IPV6_GET_PLEN(p), - IPV6_GET_HLIM(p)); - if (IPV6_EXTHDR_ISSET_FH(p)) { printf("IPV6 FRAG - HDRLEN: %u NH: %u OFFSET: %u ID: %u\n", IPV6_EXTHDR_GET_FH_HDRLEN(p), IPV6_EXTHDR_GET_FH_NH(p), diff --git a/src/decode.c b/src/decode.c index 76275bc994..36cc8ed284 100644 --- a/src/decode.c +++ b/src/decode.c @@ -4,14 +4,14 @@ #include "decode.h" -void DecodeTunnel(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len) +void DecodeTunnel(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len, PacketQueue *pq) { switch (p->tunnel_proto) { case IPPROTO_IP: - return DecodeIPV4(t, p, pkt, len); + return DecodeIPV4(t, p, pkt, len, pq); break; case IPPROTO_IPV6: - printf("DecodeTunnel: IPv6 packet\n"); + //printf("DecodeTunnel: IPv6 packet\n"); return DecodeIPV6(t, p, pkt, len); break; default: diff --git a/src/decode.h b/src/decode.h index a813c9ddbf..c756888fbe 100644 --- a/src/decode.h +++ b/src/decode.h @@ -262,6 +262,16 @@ typedef struct _Packet } Packet; +typedef struct _PacketQueue { + Packet *top; + Packet *bot; + u_int16_t len; + pthread_mutex_t mutex_q; + pthread_cond_t cond_q; +#ifdef DBG_PERF + u_int16_t dbg_maxlen; +#endif /* DBG_PERF */ +} PacketQueue; /* clear key vars so we don't need to call the expensive * memset or bzero */ @@ -332,8 +342,8 @@ typedef struct _Packet /* decoder functions */ -void DecodeTunnel(ThreadVars *, Packet *, u_int8_t *, u_int16_t); -void DecodeIPV4(ThreadVars *, Packet *, u_int8_t *, u_int16_t); +void DecodeTunnel(ThreadVars *, Packet *, u_int8_t *, u_int16_t, PacketQueue *); +void DecodeIPV4(ThreadVars *, Packet *, u_int8_t *, u_int16_t, PacketQueue *); void DecodeIPV6(ThreadVars *, Packet *, u_int8_t *, u_int16_t); void DecodeICMPV4(ThreadVars *, Packet *, u_int8_t *, u_int16_t); void DecodeICMPV6(ThreadVars *, Packet *, u_int8_t *, u_int16_t); diff --git a/src/detect.c b/src/detect.c index 2c20989db8..ce265b037f 100644 --- a/src/detect.c +++ b/src/detect.c @@ -53,7 +53,7 @@ int SignatureTupleCmp(SignatureTuple *a, SignatureTuple *b); int SignatureTupleCmpRaw(DetectAddressGroup *src, DetectAddressGroup *dst, DetectPort *sp, DetectPort *dp, u_int8_t proto, SignatureTuple *b); /* tm module api functions */ -int Detect(ThreadVars *, Packet *, void *); +int Detect(ThreadVars *, Packet *, void *, PacketQueue *); int DetectThreadInit(ThreadVars *, void **); int DetectThreadDeinit(ThreadVars *, void *); @@ -476,7 +476,7 @@ int SigMatchSignatures(ThreadVars *th_v, PatternMatcherThread *pmt, Packet *p) } /* tm module api functions */ -int Detect(ThreadVars *t, Packet *p, void *data) { +int Detect(ThreadVars *t, Packet *p, void *data, PacketQueue *pq) { PatternMatcherThread *pmt = (PatternMatcherThread *)data; return SigMatchSignatures(t,pmt,p); diff --git a/src/log-httplog.c b/src/log-httplog.c index 464d7db050..593e4fc052 100644 --- a/src/log-httplog.c +++ b/src/log-httplog.c @@ -28,9 +28,9 @@ #include "util-unittest.h" -int LogHttplog (ThreadVars *, Packet *, void *); -int LogHttplogIPv4(ThreadVars *, Packet *, void *); -int LogHttplogIPv6(ThreadVars *, Packet *, void *); +int LogHttplog (ThreadVars *, Packet *, void *, PacketQueue *); +int LogHttplogIPv4(ThreadVars *, Packet *, void *, PacketQueue *); +int LogHttplogIPv6(ThreadVars *, Packet *, void *, PacketQueue *); int LogHttplogThreadInit(ThreadVars *, void **); int LogHttplogThreadDeinit(ThreadVars *, void *); @@ -73,7 +73,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size) (u_int32_t) ts->tv_usec); } -int LogHttplogIPv4(ThreadVars *tv, Packet *p, void *data) +int LogHttplogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { LogHttplogThread *aft = (LogHttplogThread *)data; int i; @@ -120,7 +120,7 @@ int LogHttplogIPv4(ThreadVars *tv, Packet *p, void *data) return 0; } -int LogHttplogIPv6(ThreadVars *tv, Packet *p, void *data) +int LogHttplogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { LogHttplogThread *aft = (LogHttplogThread *)data; int i; @@ -167,15 +167,15 @@ int LogHttplogIPv6(ThreadVars *tv, Packet *p, void *data) return 0; } -int LogHttplog (ThreadVars *tv, Packet *p, void *data) +int LogHttplog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { if (!(PKT_IS_TCP(p))) return 0; if (PKT_IS_IPV4(p)) { - return LogHttplogIPv4(tv, p, data); + return LogHttplogIPv4(tv, p, data, pq); } else if (PKT_IS_IPV6(p)) { - return LogHttplogIPv6(tv, p, data); + return LogHttplogIPv6(tv, p, data, pq); } return 0; diff --git a/src/packet-queue.h b/src/packet-queue.h index b8af710a8c..4d3775cbbd 100644 --- a/src/packet-queue.h +++ b/src/packet-queue.h @@ -5,7 +5,7 @@ #include #include "decode.h" - +#if 0 typedef struct _PacketQueue { Packet *top; Packet *bot; @@ -16,7 +16,7 @@ typedef struct _PacketQueue { u_int16_t dbg_maxlen; #endif /* DBG_PERF */ } PacketQueue; - +#endif void PacketEnqueue (PacketQueue *, Packet *); Packet *PacketDequeue (PacketQueue *); diff --git a/src/respond-reject.c b/src/respond-reject.c index 8a906cc295..54ff190cc1 100644 --- a/src/respond-reject.c +++ b/src/respond-reject.c @@ -41,7 +41,7 @@ void TmModuleRespondRejectRegister (void) { tmm_modules[TMM_RESPONDREJECT].RegisterTests = NULL; } -int RespondRejectFunc(ThreadVars *tv, Packet *p, void *data) { +int RespondRejectFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { /* ACTION_REJECT defaults to rejecting the SRC */ if (p->action != ACTION_REJECT && p->action != ACTION_REJECT_DST && diff --git a/src/respond-reject.h b/src/respond-reject.h index d9be166a8f..ed0e505463 100644 --- a/src/respond-reject.h +++ b/src/respond-reject.h @@ -7,6 +7,6 @@ #define REJECT_DIR_DST 1 void TmModuleRespondRejectRegister (void); -int RespondRejectFunc(ThreadVars *, Packet *, void *); +int RespondRejectFunc(ThreadVars *, Packet *, void *, PacketQueue *); #endif /* __RESPOND_REJECT_H__ */ diff --git a/src/source-nfq.c b/src/source-nfq.c index d4879ad63f..06f3cb1f8b 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -27,12 +27,12 @@ static NFQThreadVars nfq_t[NFQ_MAX_QUEUE]; static u_int16_t receive_queue_num = 0; static u_int16_t verdict_queue_num = 0; -int ReceiveNFQ(ThreadVars *, Packet *, void *); +int ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *); int ReceiveNFQThreadInit(ThreadVars *, void **); -int VerdictNFQ(ThreadVars *, Packet *, void *); +int VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *); int VerdictNFQThreadInit(ThreadVars *, void **); int VerdictNFQThreadDeinit(ThreadVars *, void *); -int DecodeNFQ(ThreadVars *, Packet *, void *); +int DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *); void TmModuleReceiveNFQRegister (void) { /* XXX create a general NFQ setup function */ @@ -292,7 +292,7 @@ void NFQRecvPkt(NFQThreadVars *t) { } } -int ReceiveNFQ(ThreadVars *tv, Packet *p, void *data) { +int ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { NFQThreadVars *ntv = (NFQThreadVars *)data; /* XXX can we move this to initialization? */ @@ -330,28 +330,28 @@ void NFQSetVerdict(NFQThreadVars *t, Packet *p) { printf("NFQSetVerdict: nfq_set_verdict of %p failed %d\n", p, ret); } -int VerdictNFQ(ThreadVars *tv, Packet *p, void *data) { +int VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { NFQThreadVars *ntv = (NFQThreadVars *)data; /* if this is a tunnel packet we check if we are ready to verdict * already. */ if (IS_TUNNEL_PKT(p)) { char verdict = 1; - printf("VerdictNFQ: tunnel pkt: %p %s\n", p, p->root ? "upper layer" : "root"); + //printf("VerdictNFQ: tunnel pkt: %p %s\n", p, p->root ? "upper layer" : "root"); pthread_mutex_t *m = p->root ? &p->root->mutex_rtv_cnt : &p->mutex_rtv_cnt; mutex_lock(m); /* if there are more tunnel packets than ready to verdict packets, * we won't verdict this one */ if (TUNNEL_PKT_TPR(p) > TUNNEL_PKT_RTV(p)) { - printf("VerdictNFQ: not ready to verdict yet: TUNNEL_PKT_TPR(p) > TUNNEL_PKT_RTV(p) = %d > %d\n", TUNNEL_PKT_TPR(p), TUNNEL_PKT_RTV(p)); + //printf("VerdictNFQ: not ready to verdict yet: TUNNEL_PKT_TPR(p) > TUNNEL_PKT_RTV(p) = %d > %d\n", TUNNEL_PKT_TPR(p), TUNNEL_PKT_RTV(p)); verdict = 0; } mutex_unlock(m); /* don't verdict if we are not ready */ if (verdict == 1) { - printf("VerdictNFQ: setting verdict\n"); + //printf("VerdictNFQ: setting verdict\n"); NFQSetVerdict(ntv, p->root ? p->root : p); } else { TUNNEL_INCR_PKT_RTV(p); @@ -368,7 +368,7 @@ int VerdictNFQ(ThreadVars *tv, Packet *p, void *data) { * * */ -int DecodeNFQ(ThreadVars *t, Packet *p, void *data) +int DecodeNFQ(ThreadVars *t, Packet *p, void *data, PacketQueue *pq) { IPV4Hdr *ip4h = (IPV4Hdr *)p->pkt; IPV6Hdr *ip6h = (IPV6Hdr *)p->pkt; @@ -378,7 +378,7 @@ int DecodeNFQ(ThreadVars *t, Packet *p, void *data) #endif if (IPV4_GET_RAW_VER(ip4h) == 4) - DecodeIPV4(t, p, p->pkt, p->pktlen); + DecodeIPV4(t, p, p->pkt, p->pktlen, pq); else if(IPV6_GET_RAW_VER(ip6h) == 6) DecodeIPV6(t, p, p->pkt, p->pktlen); diff --git a/src/tm-modules.h b/src/tm-modules.h index 1de785f4be..d5dacc068b 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -4,7 +4,7 @@ typedef struct _TmModule { char *name; int (*Init)(ThreadVars *, void **); - int (*Func)(ThreadVars *, Packet *, void *); + int (*Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Deinit)(ThreadVars *, void *); void (*RegisterTests)(void); } TmModule; diff --git a/src/tm-threads.c b/src/tm-threads.c index f4323389a3..afe2c431a0 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -12,40 +12,46 @@ static ThreadVars *tv_root; /* 1 function slot */ typedef struct _Tm1Slot { int (*Slot1Init)(ThreadVars *, void **); - int (*Slot1Func)(ThreadVars *, Packet *, void *); + int (*Slot1Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Slot1Deinit)(ThreadVars *, void *); void *slot1_data; + PacketQueue slot1_pq; } Tm1Slot; /* 2 function slot */ typedef struct _Tm2Slot { int (*Slot1Init)(ThreadVars *, void **); - int (*Slot1Func)(ThreadVars *, Packet *, void *); + int (*Slot1Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Slot1Deinit)(ThreadVars *, void *); void *slot1_data; + PacketQueue slot1_pq; int (*Slot2Init)(ThreadVars *, void **); - int (*Slot2Func)(ThreadVars *, Packet *, void *); + int (*Slot2Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Slot2Deinit)(ThreadVars *, void *); void *slot2_data; + PacketQueue slot2_pq; } Tm2Slot; /* 3 function slot */ typedef struct _Tm3Slot { int (*Slot1Init)(ThreadVars *, void **); - int (*Slot1Func)(ThreadVars *, Packet *, void *); + int (*Slot1Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Slot1Deinit)(ThreadVars *, void *); void *slot1_data; + PacketQueue slot1_pq; int (*Slot2Init)(ThreadVars *, void **); - int (*Slot2Func)(ThreadVars *, Packet *, void *); + int (*Slot2Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Slot2Deinit)(ThreadVars *, void *); void *slot2_data; + PacketQueue slot2_pq; int (*Slot3Init)(ThreadVars *, void **); - int (*Slot3Func)(ThreadVars *, Packet *, void *); + int (*Slot3Func)(ThreadVars *, Packet *, void *, PacketQueue *); int (*Slot3Deinit)(ThreadVars *, void *); void *slot3_data; + PacketQueue slot3_pq; } Tm3Slot; @@ -62,9 +68,15 @@ void *TmThreadsSlot1NoIn(void *td) { pthread_exit((void *) -1); } } + memset(&s1->slot1_pq, 0, sizeof(PacketQueue)); while(run) { - r = s1->Slot1Func(tv, p, s1->slot1_data); + r = s1->Slot1Func(tv, p, s1->slot1_data, &s1->slot1_pq); + while (s1->slot1_pq.len > 0) { + Packet *extra = PacketDequeue(&s1->slot1_pq); + tv->tmqh_out(tv, extra); + } + /* XXX handle error */ tv->tmqh_out(tv, p); @@ -96,11 +108,12 @@ void *TmThreadsSlot1NoOut(void *td) { pthread_exit((void *) -1); } } + memset(&s1->slot1_pq, 0, sizeof(PacketQueue)); while(run) { p = tv->tmqh_in(tv); - r = s1->Slot1Func(tv, p, s1->slot1_data); + r = s1->Slot1Func(tv, p, s1->slot1_data, /* no outqh no pq */NULL); /* XXX handle error */ if (tv->flags & THV_KILL) @@ -131,9 +144,10 @@ void *TmThreadsSlot1NoInOut(void *td) { pthread_exit((void *) -1); } } + memset(&s1->slot1_pq, 0, sizeof(PacketQueue)); while(run) { - r = s1->Slot1Func(tv, NULL, s1->slot1_data); + r = s1->Slot1Func(tv, NULL, s1->slot1_data, /* no outqh, no pq */NULL); //printf("%s: TmThreadsSlot1NoInNoOut: r %d\n", tv->name, r); /* XXX handle error */ @@ -169,13 +183,19 @@ void *TmThreadsSlot1(void *td) { pthread_exit((void *) -1); } } + memset(&s1->slot1_pq, 0, sizeof(PacketQueue)); while(run) { p = tv->tmqh_in(tv); if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { - r = s1->Slot1Func(tv, p, s1->slot1_data); + r = s1->Slot1Func(tv, p, s1->slot1_data, &s1->slot1_pq); + while (s1->slot1_pq.len > 0) { + Packet *extra = PacketDequeue(&s1->slot1_pq); + tv->tmqh_out(tv, extra); + } + //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ @@ -226,8 +246,8 @@ void *TmThreadsSlot2(void *td) { if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { - r = s2->Slot1Func(tv, p, s2->slot1_data); - r = s2->Slot2Func(tv, p, s2->slot2_data); + r = s2->Slot1Func(tv, p, s2->slot1_data, &s2->slot1_pq); + r = s2->Slot2Func(tv, p, s2->slot2_data, &s2->slot2_pq); //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ @@ -273,16 +293,16 @@ void *TmThreadsSlot3(void *td) { } } if (s3->Slot2Init != NULL) { - r = s3->Slot2Init(tv, &s3->slot2_data); - if (r != 0) { - pthread_exit((void *) -1); - } + r = s3->Slot2Init(tv, &s3->slot2_data); + if (r != 0) { + pthread_exit((void *) -1); + } } if (s3->Slot3Init != NULL) { - r = s3->Slot3Init(tv, &s3->slot3_data); - if (r != 0) { - pthread_exit((void *) -1); - } + r = s3->Slot3Init(tv, &s3->slot3_data); + if (r != 0) { + pthread_exit((void *) -1); + } } while(run) { @@ -290,9 +310,9 @@ void *TmThreadsSlot3(void *td) { if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { - r = s3->Slot1Func(tv, p, s3->slot1_data); - r = s3->Slot2Func(tv, p, s3->slot2_data); - r = s3->Slot3Func(tv, p, s3->slot3_data); + r = s3->Slot1Func(tv, p, s3->slot1_data, &s3->slot1_pq); + r = s3->Slot2Func(tv, p, s3->slot2_data, &s3->slot2_pq); + r = s3->Slot3Func(tv, p, s3->slot3_data, &s3->slot3_pq); //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index c112ca73e9..7b3879b8bc 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -35,22 +35,22 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) char proot = 0; if (IS_TUNNEL_PKT(p)) { - printf("TmqhOutputPacketpool: tunnel packet: %p %s\n", p,p->root ? "upper layer":"root"); + //printf("TmqhOutputPacketpool: tunnel packet: %p %s\n", p,p->root ? "upper layer":"root"); /* get a lock */ pthread_mutex_t *m = p->root ? &p->root->mutex_rtv_cnt : &p->mutex_rtv_cnt; mutex_lock(m); if (IS_TUNNEL_ROOT_PKT(p)) { - printf("TmqhOutputPacketpool: IS_TUNNEL_ROOT_PKT\n"); + //printf("TmqhOutputPacketpool: IS_TUNNEL_ROOT_PKT\n"); if (TUNNEL_PKT_TPR(p) == 0) { - printf("TmqhOutputPacketpool: TUNNEL_PKT_TPR(p) == 0\n"); + //printf("TmqhOutputPacketpool: TUNNEL_PKT_TPR(p) == 0\n"); /* if this packet is the root and there are no * more tunnel packets, enqueue it */ /* fall through */ } else { - printf("TmqhOutputPacketpool: TUNNEL_PKT_TPR(p) > 0\n"); + //printf("TmqhOutputPacketpool: TUNNEL_PKT_TPR(p) > 0\n"); /* if this is the root and there are more tunnel * packets, don't add this. It's still referenced * by the tunnel packets, and we will enqueue it @@ -60,27 +60,27 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) return; } } else { - printf("TmqhOutputPacketpool: NOT IS_TUNNEL_ROOT_PKT\n"); + //printf("TmqhOutputPacketpool: NOT IS_TUNNEL_ROOT_PKT\n"); if (p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1) { - printf("TmqhOutputPacketpool: p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1\n"); + //printf("TmqhOutputPacketpool: p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1\n"); /* the root is ready and we are the last tunnel packet, * lets enqueue them both. */ TUNNEL_DECR_PKT_TPR_NOLOCK(p); /* handle the root */ - printf("TmqhOutputPacketpool: calling PacketEnqueue for root pkt\n"); + //printf("TmqhOutputPacketpool: calling PacketEnqueue for root pkt\n"); proot = 1; /* fall through */ } else { - printf("TmqhOutputPacketpool: NOT p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1 (%u)\n", TUNNEL_PKT_TPR(p)); + //printf("TmqhOutputPacketpool: NOT p->root->tunnel_verdicted == 1 && TUNNEL_PKT_TPR(p) == 1 (%u)\n", TUNNEL_PKT_TPR(p)); TUNNEL_DECR_PKT_TPR_NOLOCK(p); /* fall through */ } } mutex_unlock(m); - printf("TmqhOutputPacketpool: tunnel stuff done, move on\n"); + //printf("TmqhOutputPacketpool: tunnel stuff done, move on\n"); } mutex_lock(&q->mutex_q); @@ -89,7 +89,11 @@ void TmqhOutputPacketpool(ThreadVars *t, Packet *p) mutex_unlock(&q->mutex_q); mutex_lock(&mutex_pending); - if (pending) pending--; + if (pending) { + pending--; + } else { + printf("TmqhOutputPacketpool: warning, trying to subtract from 0 pending counter.\n"); + } if (pending <= MAX_PENDING) pthread_cond_signal(&cond_pending); mutex_unlock(&mutex_pending); diff --git a/src/vips.c b/src/vips.c index e3c03c1100..83fdc17246 100644 --- a/src/vips.c +++ b/src/vips.c @@ -90,13 +90,21 @@ Packet *SetupPkt (void) Packet *TunnelPktSetup(ThreadVars *t, Packet *parent, u_int8_t *pkt, u_int16_t len, u_int8_t proto) { - printf("TunnelPktSetup: pkt %p, len %u, proto %u\n", pkt, len, proto); + //printf("TunnelPktSetup: pkt %p, len %u, proto %u\n", pkt, len, proto); /* get us a packet */ mutex_lock(&packet_q.mutex_q); Packet *p = PacketDequeue(&packet_q); mutex_unlock(&packet_q.mutex_q); + mutex_lock(&mutex_pending); + pending++; +#ifdef DBG_PERF + if (pending > dbg_maxpending) + dbg_maxpending = pending; +#endif /* DBG_PERF */ + mutex_unlock(&mutex_pending); + CLEAR_PACKET(p); /* set the root ptr to the lowest layer */