From c53dfea379575821afb6264171e9d4c789900c5b Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Thu, 30 Jul 2009 11:49:26 +0200 Subject: [PATCH] Improve the threading code to enable a single pcap file processing thread. --- src/detect.c | 6 ++- src/eidps.c | 113 +++++++++++++++++++++++++++++++++-------- src/source-pcap-file.c | 12 +++-- src/source-pcap.c | 19 +++++-- src/tm-threads.c | 53 ++++++++++++++++++- 5 files changed, 169 insertions(+), 34 deletions(-) diff --git a/src/detect.c b/src/detect.c index 694e76a8b4..d121933b6b 100644 --- a/src/detect.c +++ b/src/detect.c @@ -590,7 +590,11 @@ int SigMatchSignatures(ThreadVars *th_v, PatternMatcherThread *pmt, Packet *p) int Detect(ThreadVars *t, Packet *p, void *data, PacketQueue *pq) { PatternMatcherThread *pmt = (PatternMatcherThread *)data; - return SigMatchSignatures(t,pmt,p); + int r = SigMatchSignatures(t,pmt,p); + if (r >= 0) { + return 0; + } + return 1; } int DetectThreadInit(ThreadVars *t, void *initdata, void **data) { diff --git a/src/eidps.c b/src/eidps.c index 22c2b76fce..ca3ae12bd7 100644 --- a/src/eidps.c +++ b/src/eidps.c @@ -572,7 +572,7 @@ int RunModeFilePcap(char *file) { TimeModeSetOffline(); /* create the threads */ - ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot_noinout"); + ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot"); if (tv_receivepcap == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(1); @@ -623,9 +623,9 @@ int RunModeFilePcap(char *file) { exit(1); } - ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","packetpool","packetpool","1slot"); + ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","alert-queue1","simple","1slot"); //#endif - //ThreadVars *tv_detect1 = TmThreadCreate("Detect1","decode-queue1","simple","packetpool","packetpool","1slot"); + //ThreadVars *tv_detect1 = TmThreadCreate("Detect1","decode-queue1","simple","alert-queue1","simple","1slot"); if (tv_detect1 == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(1); @@ -642,7 +642,7 @@ int RunModeFilePcap(char *file) { exit(1); } - ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","packetpool","packetpool","1slot"); + ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","alert-queue1","simple","1slot"); if (tv_detect2 == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(1); @@ -659,23 +659,6 @@ int RunModeFilePcap(char *file) { exit(1); } - ThreadVars *tv_rreject = TmThreadCreate("RespondReject","respond-queue","simple","alert-queue1","simple","1slot"); - if (tv_rreject == NULL) { - printf("ERROR: TmThreadsCreate failed\n"); - exit(1); - } - tm_module = TmModuleGetByName("RespondReject"); - if (tm_module == NULL) { - printf("ERROR: TmModuleGetByName for RespondReject failed\n"); - exit(1); - } - Tm1SlotSetFunc(tv_rreject,tm_module,NULL); - - if (TmThreadSpawn(tv_rreject) != 0) { - printf("ERROR: TmThreadSpawn failed\n"); - exit(1); - } - ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot"); if (tv_alert == NULL) { printf("ERROR: TmThreadsCreate failed\n"); @@ -744,6 +727,91 @@ int RunModeFilePcap(char *file) { return 0; } +/** + * \brief Single thread version of the Pcap file processing. + */ +int RunModeFilePcap2(char *file) { + printf("RunModeFilePcap2: file %s\n", file); + TimeModeSetOffline(); + + /* create the threads */ + ThreadVars *tv = TmThreadCreate("PcapFile","packetpool","packetpool","packetpool","packetpool","varslot"); + if (tv == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + + TmModule *tm_module = TmModuleGetByName("ReceivePcapFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,file); + + tm_module = TmModuleGetByName("DecodePcapFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodePcap failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("StreamTcp"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("Detect"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName Detect failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,(void *)g_de_ctx); + + tm_module = TmModuleGetByName("AlertFastlog"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName for AlertFastlog failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("LogHttplog"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("AlertUnifiedLog"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("AlertUnifiedAlert"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("AlertDebuglog"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed\n"); + exit(1); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + if (TmThreadSpawn(tv) != 0) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + + return 0; +} + int main(int argc, char **argv) { int rc; @@ -854,6 +922,7 @@ int main(int argc, char **argv) //RunModeIpsNFQ(); //RunModeIdsPcap(argv[1]); RunModeFilePcap(argv[1]); + //RunModeFilePcap2(argv[1]); ThreadVars tv_flowmgr; memset(&tv_flowmgr, 0, sizeof(ThreadVars)); @@ -884,7 +953,7 @@ int main(int argc, char **argv) if (sigflags) { printf("signal received\n"); - if (sigflags & EIDPS_SIGINT || sigflags & EIDPS_STOP) { + if (sigflags & EIDPS_STOP) { printf ("SIGINT or EngineStop received\n"); /* Stop the engine so it quits after processing the pcap file diff --git a/src/source-pcap-file.c b/src/source-pcap-file.c index c3f203cea1..f12648003b 100644 --- a/src/source-pcap-file.c +++ b/src/source-pcap-file.c @@ -38,6 +38,8 @@ typedef struct PcapFileThreadVars_ u_int32_t errs; ThreadVars *tv; + + Packet *in_p; } PcapFileThreadVars; static PcapFileGlobalVars pcap_g = { NULL, NULL, }; @@ -70,7 +72,7 @@ void TmModuleDecodePcapFileRegister (void) { void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { //printf("PcapFileCallback: user %p, h %p, pkt %p\n", user, h, pkt); PcapFileThreadVars *ptv = (PcapFileThreadVars *)user; - ThreadVars *tv = ptv->tv; + //ThreadVars *tv = ptv->tv; mutex_lock(&mutex_pending); if (pending > MAX_PENDING) { @@ -78,7 +80,7 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { } mutex_unlock(&mutex_pending); - Packet *p = tv->tmqh_in(tv); + Packet *p = ptv->in_p; p->ts.tv_sec = h->ts.tv_sec; p->ts.tv_usec = h->ts.tv_usec; @@ -90,14 +92,14 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { p->pktlen = h->caplen; memcpy(p->pkt, pkt, p->pktlen); //printf("PcapFileCallback: p->pktlen: %u (pkt %02x, p->pkt %02x)\n", p->pktlen, *pkt, *p->pkt); - - /* pass on... */ - tv->tmqh_out(tv, p); } int ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { PcapFileThreadVars *ptv = (PcapFileThreadVars *)data; + ptv->in_p = p; + + /// Right now we just support reading packets one at a time. int r = pcap_dispatch(pcap_g.pcap_handle, 1, (pcap_handler)PcapFileCallback, (u_char *)ptv); if (r <= 0) { printf("ReceivePcap: code %d error %s\n", r, pcap_geterr(pcap_g.pcap_handle)); diff --git a/src/source-pcap.c b/src/source-pcap.c index 83edd1d420..d1d93f9dd9 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -120,10 +120,21 @@ void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { int ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { PcapThreadVars *ptv = (PcapThreadVars *)data; - //printf("ReceivePcap: tv %p\n", tv); - int r = pcap_dispatch(ptv->pcap_handle, 1, (pcap_handler)PcapCallback, (u_char *)ptv); - if (r <= 0) { - //printf("ReceivePcap: error %s\n", pcap_geterr(ptv->pcap_handle)); + /// Just read one packet at a time for now. + int r = 0; + while (r == 0) { + //printf("ReceivePcap: call pcap_dispatch %u\n", tv->flags); + + r = pcap_dispatch(ptv->pcap_handle, 1, (pcap_handler)PcapCallback, (u_char *)ptv); + if (r < 0) { + printf("ReceivePcap: error %s\n", pcap_geterr(ptv->pcap_handle)); + break; + } + + if (tv->flags != 0) { + printf("ReceivePcap: interrupted.\n"); + return 0; + } } return 0; diff --git a/src/tm-threads.c b/src/tm-threads.c index 52ffa249fd..c45a316a2c 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -84,6 +84,9 @@ void *TmThreadsSlot1NoIn(void *td) { } /* XXX handle error */ + if (r == 1) { + run = 0; + } tv->tmqh_out(tv, p); @@ -131,6 +134,9 @@ void *TmThreadsSlot1NoOut(void *td) { r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL); /* XXX handle error */ + if (r == 1) { + run = 0; + } if (tv->flags & THV_KILL) run = 0; @@ -240,6 +246,9 @@ void *TmThreadsSlot1(void *td) { //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ + if (r == 1) { + run = 0; + } /* output the packet */ tv->tmqh_out(tv, p); @@ -313,8 +322,16 @@ void *TmThreadsSlot2(void *td) { Packet *extra_p2 = PacketDequeue(&s->s2.slot_pq); tv->tmqh_out(tv, extra_p2); } + if (r == 1) { + run = 0; + } + tv->tmqh_out(tv, extra_p); } + if (r == 1) { + run = 0; + } + r = s->s2.SlotFunc(tv, p, s->s2.slot_data, &s->s2.slot_pq); while (s->s2.slot_pq.len > 0) { /* handle new packets from this func */ @@ -324,6 +341,9 @@ void *TmThreadsSlot2(void *td) { //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ + if (r == 1) { + run = 0; + } /* output the packet */ tv->tmqh_out(tv, p); @@ -422,10 +442,19 @@ void *TmThreadsSlot3(void *td) { Packet *extra_p3 = PacketDequeue(&s->s3.slot_pq); tv->tmqh_out(tv, extra_p3); } + if (r == 1) { + run = 0; + } tv->tmqh_out(tv, extra_p2); } + if (r == 1) { + run = 0; + } tv->tmqh_out(tv, extra_p); } + if (r == 1) { + run = 0; + } /* slot 2 */ r = s->s2.SlotFunc(tv, p, s->s2.slot_data, &s->s2.slot_pq); @@ -439,6 +468,9 @@ void *TmThreadsSlot3(void *td) { Packet *extra_p2 = PacketDequeue(&s->s3.slot_pq); tv->tmqh_out(tv, extra_p2); } + if (r == 1) { + run = 0; + } tv->tmqh_out(tv, extra_p); } @@ -452,6 +484,9 @@ void *TmThreadsSlot3(void *td) { //printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r); /* XXX handle error */ + if (r == 1) { + run = 0; + } /* output the packet */ tv->tmqh_out(tv, p); @@ -508,10 +543,15 @@ void *TmThreadsSlot3(void *td) { static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) { int r = 0; TmSlot *s = NULL; + int retval = 0; for (s = slot; s != NULL; s = s->slot_next) { r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pq); /* XXX handle error */ + if (r == 1) { + //printf("TmThreadsSlotVarRun: s->SlotFunc %p returned 1\n", s->SlotFunc); + retval = 1; + } /* handle new packets */ while (s->slot_pq.len > 0) { @@ -519,14 +559,18 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) /* see if we need to process the packet */ if (s->slot_next != NULL) { - TmThreadsSlotVarRun(tv, extra_p, s->slot_next); + r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); /* XXX handle error */ + if (r == 1) { + //printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n"); + retval = 1; + } } tv->tmqh_out(tv, extra_p); } } - return 0; + return retval; } void *TmThreadsSlotVar(void *td) { @@ -556,12 +600,17 @@ void *TmThreadsSlotVar(void *td) { while(run) { /* input a packet */ p = tv->tmqh_in(tv); + //printf("TmThreadsSlotVar: %p\n", p); if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { r = TmThreadsSlotVarRun(tv, p, s->s); /* XXX handle error */ + if (r == 1) { + //printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n"); + run = 0; + } /* output the packet */ tv->tmqh_out(tv, p);