Improve the threading code to enable a single pcap file processing thread.

remotes/origin/master-1.0.x
Victor Julien 15 years ago
parent d79b85d251
commit c53dfea379

@ -590,7 +590,11 @@ int SigMatchSignatures(ThreadVars *th_v, PatternMatcherThread *pmt, Packet *p)
int Detect(ThreadVars *t, Packet *p, void *data, PacketQueue *pq) {
PatternMatcherThread *pmt = (PatternMatcherThread *)data;
return SigMatchSignatures(t,pmt,p);
int r = SigMatchSignatures(t,pmt,p);
if (r >= 0) {
return 0;
}
return 1;
}
int DetectThreadInit(ThreadVars *t, void *initdata, void **data) {

@ -572,7 +572,7 @@ int RunModeFilePcap(char *file) {
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot_noinout");
ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
@ -623,9 +623,9 @@ int RunModeFilePcap(char *file) {
exit(1);
}
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","packetpool","packetpool","1slot");
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","alert-queue1","simple","1slot");
//#endif
//ThreadVars *tv_detect1 = TmThreadCreate("Detect1","decode-queue1","simple","packetpool","packetpool","1slot");
//ThreadVars *tv_detect1 = TmThreadCreate("Detect1","decode-queue1","simple","alert-queue1","simple","1slot");
if (tv_detect1 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
@ -642,7 +642,7 @@ int RunModeFilePcap(char *file) {
exit(1);
}
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","packetpool","packetpool","1slot");
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue1","simple","alert-queue1","simple","1slot");
if (tv_detect2 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
@ -659,23 +659,6 @@ int RunModeFilePcap(char *file) {
exit(1);
}
ThreadVars *tv_rreject = TmThreadCreate("RespondReject","respond-queue","simple","alert-queue1","simple","1slot");
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
tm_module = TmModuleGetByName("RespondReject");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for RespondReject failed\n");
exit(1);
}
Tm1SlotSetFunc(tv_rreject,tm_module,NULL);
if (TmThreadSpawn(tv_rreject) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot");
if (tv_alert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
@ -744,6 +727,91 @@ int RunModeFilePcap(char *file) {
return 0;
}
/**
* \brief Single thread version of the Pcap file processing.
*/
int RunModeFilePcap2(char *file) {
printf("RunModeFilePcap2: file %s\n", file);
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv = TmThreadCreate("PcapFile","packetpool","packetpool","packetpool","packetpool","varslot");
if (tv == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceivePcap\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,file);
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,(void *)g_de_ctx);
tm_module = TmModuleGetByName("AlertFastlog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertFastlog failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("LogHttplog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("AlertUnifiedLog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("AlertUnifiedAlert");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("AlertDebuglog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(1);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
if (TmThreadSpawn(tv) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
return 0;
}
int main(int argc, char **argv)
{
int rc;
@ -854,6 +922,7 @@ int main(int argc, char **argv)
//RunModeIpsNFQ();
//RunModeIdsPcap(argv[1]);
RunModeFilePcap(argv[1]);
//RunModeFilePcap2(argv[1]);
ThreadVars tv_flowmgr;
memset(&tv_flowmgr, 0, sizeof(ThreadVars));
@ -884,7 +953,7 @@ int main(int argc, char **argv)
if (sigflags) {
printf("signal received\n");
if (sigflags & EIDPS_SIGINT || sigflags & EIDPS_STOP) {
if (sigflags & EIDPS_STOP) {
printf ("SIGINT or EngineStop received\n");
/* Stop the engine so it quits after processing the pcap file

@ -38,6 +38,8 @@ typedef struct PcapFileThreadVars_
u_int32_t errs;
ThreadVars *tv;
Packet *in_p;
} PcapFileThreadVars;
static PcapFileGlobalVars pcap_g = { NULL, NULL, };
@ -70,7 +72,7 @@ void TmModuleDecodePcapFileRegister (void) {
void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
//printf("PcapFileCallback: user %p, h %p, pkt %p\n", user, h, pkt);
PcapFileThreadVars *ptv = (PcapFileThreadVars *)user;
ThreadVars *tv = ptv->tv;
//ThreadVars *tv = ptv->tv;
mutex_lock(&mutex_pending);
if (pending > MAX_PENDING) {
@ -78,7 +80,7 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
}
mutex_unlock(&mutex_pending);
Packet *p = tv->tmqh_in(tv);
Packet *p = ptv->in_p;
p->ts.tv_sec = h->ts.tv_sec;
p->ts.tv_usec = h->ts.tv_usec;
@ -90,14 +92,14 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
p->pktlen = h->caplen;
memcpy(p->pkt, pkt, p->pktlen);
//printf("PcapFileCallback: p->pktlen: %u (pkt %02x, p->pkt %02x)\n", p->pktlen, *pkt, *p->pkt);
/* pass on... */
tv->tmqh_out(tv, p);
}
int ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
PcapFileThreadVars *ptv = (PcapFileThreadVars *)data;
ptv->in_p = p;
/// Right now we just support reading packets one at a time.
int r = pcap_dispatch(pcap_g.pcap_handle, 1, (pcap_handler)PcapFileCallback, (u_char *)ptv);
if (r <= 0) {
printf("ReceivePcap: code %d error %s\n", r, pcap_geterr(pcap_g.pcap_handle));

@ -120,10 +120,21 @@ void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
int ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
PcapThreadVars *ptv = (PcapThreadVars *)data;
//printf("ReceivePcap: tv %p\n", tv);
int r = pcap_dispatch(ptv->pcap_handle, 1, (pcap_handler)PcapCallback, (u_char *)ptv);
if (r <= 0) {
//printf("ReceivePcap: error %s\n", pcap_geterr(ptv->pcap_handle));
/// Just read one packet at a time for now.
int r = 0;
while (r == 0) {
//printf("ReceivePcap: call pcap_dispatch %u\n", tv->flags);
r = pcap_dispatch(ptv->pcap_handle, 1, (pcap_handler)PcapCallback, (u_char *)ptv);
if (r < 0) {
printf("ReceivePcap: error %s\n", pcap_geterr(ptv->pcap_handle));
break;
}
if (tv->flags != 0) {
printf("ReceivePcap: interrupted.\n");
return 0;
}
}
return 0;

@ -84,6 +84,9 @@ void *TmThreadsSlot1NoIn(void *td) {
}
/* XXX handle error */
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, p);
@ -131,6 +134,9 @@ void *TmThreadsSlot1NoOut(void *td) {
r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL);
/* XXX handle error */
if (r == 1) {
run = 0;
}
if (tv->flags & THV_KILL)
run = 0;
@ -240,6 +246,9 @@ void *TmThreadsSlot1(void *td) {
//printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r);
/* XXX handle error */
if (r == 1) {
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);
@ -313,8 +322,16 @@ void *TmThreadsSlot2(void *td) {
Packet *extra_p2 = PacketDequeue(&s->s2.slot_pq);
tv->tmqh_out(tv, extra_p2);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p);
}
if (r == 1) {
run = 0;
}
r = s->s2.SlotFunc(tv, p, s->s2.slot_data, &s->s2.slot_pq);
while (s->s2.slot_pq.len > 0) {
/* handle new packets from this func */
@ -324,6 +341,9 @@ void *TmThreadsSlot2(void *td) {
//printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r);
/* XXX handle error */
if (r == 1) {
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);
@ -422,10 +442,19 @@ void *TmThreadsSlot3(void *td) {
Packet *extra_p3 = PacketDequeue(&s->s3.slot_pq);
tv->tmqh_out(tv, extra_p3);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p2);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p);
}
if (r == 1) {
run = 0;
}
/* slot 2 */
r = s->s2.SlotFunc(tv, p, s->s2.slot_data, &s->s2.slot_pq);
@ -439,6 +468,9 @@ void *TmThreadsSlot3(void *td) {
Packet *extra_p2 = PacketDequeue(&s->s3.slot_pq);
tv->tmqh_out(tv, extra_p2);
}
if (r == 1) {
run = 0;
}
tv->tmqh_out(tv, extra_p);
}
@ -452,6 +484,9 @@ void *TmThreadsSlot3(void *td) {
//printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r);
/* XXX handle error */
if (r == 1) {
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);
@ -508,10 +543,15 @@ void *TmThreadsSlot3(void *td) {
static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) {
int r = 0;
TmSlot *s = NULL;
int retval = 0;
for (s = slot; s != NULL; s = s->slot_next) {
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pq);
/* XXX handle error */
if (r == 1) {
//printf("TmThreadsSlotVarRun: s->SlotFunc %p returned 1\n", s->SlotFunc);
retval = 1;
}
/* handle new packets */
while (s->slot_pq.len > 0) {
@ -519,14 +559,18 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot)
/* see if we need to process the packet */
if (s->slot_next != NULL) {
TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
/* XXX handle error */
if (r == 1) {
//printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n");
retval = 1;
}
}
tv->tmqh_out(tv, extra_p);
}
}
return 0;
return retval;
}
void *TmThreadsSlotVar(void *td) {
@ -556,12 +600,17 @@ void *TmThreadsSlotVar(void *td) {
while(run) {
/* input a packet */
p = tv->tmqh_in(tv);
//printf("TmThreadsSlotVar: %p\n", p);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = TmThreadsSlotVarRun(tv, p, s->s);
/* XXX handle error */
if (r == 1) {
//printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n");
run = 0;
}
/* output the packet */
tv->tmqh_out(tv, p);

Loading…
Cancel
Save