diff --git a/src/alert-debuglog.c b/src/alert-debuglog.c index 54437b6b56..4e243ef9bc 100644 --- a/src/alert-debuglog.c +++ b/src/alert-debuglog.c @@ -51,9 +51,9 @@ #define MODULE_NAME "AlertDebugLog" -TmEcode AlertDebugLog (ThreadVars *, Packet *, void *, PacketQueue *); -TmEcode AlertDebugLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *); -TmEcode AlertDebugLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode AlertDebugLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode AlertDebugLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode AlertDebugLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode AlertDebugLogThreadInit(ThreadVars *, void*, void **); TmEcode AlertDebugLogThreadDeinit(ThreadVars *, void *); void AlertDebugLogExitPrintStats(ThreadVars *, void *); @@ -88,7 +88,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size) (uint32_t) ts->tv_usec); } -TmEcode AlertDebugLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertDebugLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { AlertDebugLogThread *aft = (AlertDebugLogThread *)data; int i; @@ -169,7 +169,7 @@ TmEcode AlertDebugLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq return TM_ECODE_OK; } -TmEcode AlertDebugLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertDebugLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { AlertDebugLogThread *aft = (AlertDebugLogThread *)data; int i; @@ -199,12 +199,12 @@ TmEcode AlertDebugLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq return TM_ECODE_OK; } -TmEcode AlertDebugLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertDebugLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { if (PKT_IS_IPV4(p)) { - return AlertDebugLogIPv4(tv, p, data, pq); + return AlertDebugLogIPv4(tv, p, data, pq, postpq); } else if (PKT_IS_IPV6(p)) { - return AlertDebugLogIPv6(tv, p, data, pq); + return AlertDebugLogIPv6(tv, p, data, pq, postpq); } return TM_ECODE_OK; diff --git a/src/alert-fastlog.c b/src/alert-fastlog.c index 66427bcbfe..2f3adec06f 100644 --- a/src/alert-fastlog.c +++ b/src/alert-fastlog.c @@ -60,9 +60,9 @@ #define MODULE_NAME "AlertFastLog" -TmEcode AlertFastLog (ThreadVars *, Packet *, void *, PacketQueue *); -TmEcode AlertFastLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *); -TmEcode AlertFastLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode AlertFastLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode AlertFastLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode AlertFastLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode AlertFastLogThreadInit(ThreadVars *, void *, void **); TmEcode AlertFastLogThreadDeinit(ThreadVars *, void *); void AlertFastLogExitPrintStats(ThreadVars *, void *); @@ -117,7 +117,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size) (uint32_t) ts->tv_usec); } -TmEcode AlertFastLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertFastLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { AlertFastLogThread *aft = (AlertFastLogThread *)data; int i; @@ -160,7 +160,7 @@ TmEcode AlertFastLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) return TM_ECODE_OK; } -TmEcode AlertFastLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertFastLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { AlertFastLogThread *aft = (AlertFastLogThread *)data; int i; @@ -202,12 +202,12 @@ TmEcode AlertFastLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) return TM_ECODE_OK; } -TmEcode AlertFastLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertFastLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { if (PKT_IS_IPV4(p)) { - return AlertFastLogIPv4(tv, p, data, pq); + return AlertFastLogIPv4(tv, p, data, pq, postpq); } else if (PKT_IS_IPV6(p)) { - return AlertFastLogIPv6(tv, p, data, pq); + return AlertFastLogIPv6(tv, p, data, pq, postpq); } return TM_ECODE_OK; diff --git a/src/alert-prelude.c b/src/alert-prelude.c index 94dd9aaf18..0ce7fd129d 100644 --- a/src/alert-prelude.c +++ b/src/alert-prelude.c @@ -59,7 +59,7 @@ * */ -TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode AlertPreludeThreadInit(ThreadVars *, void *, void **); TmEcode AlertPreludeThreadDeinit(ThreadVars *, void *); int AlertPreludeOpenFileCtx(LogFileCtx *, char *); @@ -85,7 +85,7 @@ TmEcode AlertPreludeThreadInit(ThreadVars *t, void *initdata, void **data) return TM_ECODE_FAILED; } -TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { return TM_ECODE_OK; } @@ -119,7 +119,7 @@ static unsigned int mid_priority = 2; OutputCtx *AlertPreludeInitCtx(ConfNode *conf); -TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode AlertPreludeThreadInit(ThreadVars *, void *, void **); TmEcode AlertPreludeThreadDeinit(ThreadVars *, void *); int AlertPreludeOpenFileCtx(LogFileCtx *, char *); @@ -621,7 +621,7 @@ static int EventToReference(PacketAlert *pa, Packet *p, idmef_classification_t * * * \return TM_ECODE_OK if ok, else TM_ECODE_FAILED */ -TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { AlertPreludeThread *apn = (AlertPreludeThread *)data; uint8_t ethh_offset = 0; diff --git a/src/alert-unified-alert.c b/src/alert-unified-alert.c index 06573305b3..b4124a2f78 100644 --- a/src/alert-unified-alert.c +++ b/src/alert-unified-alert.c @@ -58,7 +58,7 @@ #define MODULE_NAME "AlertUnifiedAlert" -TmEcode AlertUnifiedAlert (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode AlertUnifiedAlert (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode AlertUnifiedAlertThreadInit(ThreadVars *, void *, void **); TmEcode AlertUnifiedAlertThreadDeinit(ThreadVars *, void *); int AlertUnifiedAlertOpenFileCtx(LogFileCtx *, const char *); @@ -179,7 +179,7 @@ int AlertUnifiedAlertRotateFile(ThreadVars *t, AlertUnifiedAlertThread *aun) { return 0; } -TmEcode AlertUnifiedAlert (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertUnifiedAlert (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { AlertUnifiedAlertThread *aun = (AlertUnifiedAlertThread *)data; AlertUnifiedAlertPacketHeader hdr; diff --git a/src/alert-unified-log.c b/src/alert-unified-log.c index a7f6473821..2718115e8e 100644 --- a/src/alert-unified-log.c +++ b/src/alert-unified-log.c @@ -60,7 +60,7 @@ #define MODULE_NAME "AlertUnifiedLog" -TmEcode AlertUnifiedLog (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode AlertUnifiedLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode AlertUnifiedLogThreadInit(ThreadVars *, void *, void **); TmEcode AlertUnifiedLogThreadDeinit(ThreadVars *, void *); int AlertUnifiedLogOpenFileCtx(LogFileCtx *, const char *); @@ -182,7 +182,7 @@ int AlertUnifiedLogRotateFile(ThreadVars *t, AlertUnifiedLogThread *aun) { return 0; } -TmEcode AlertUnifiedLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode AlertUnifiedLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *post_pq) { AlertUnifiedLogThread *aun = (AlertUnifiedLogThread *)data; AlertUnifiedLogPacketHeader hdr; diff --git a/src/alert-unified2-alert.c b/src/alert-unified2-alert.c index cbc7183a86..acaf514489 100644 --- a/src/alert-unified2-alert.c +++ b/src/alert-unified2-alert.c @@ -60,7 +60,7 @@ #define MIN_LIMIT 1 /*prototypes*/ -TmEcode Unified2Alert (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode Unified2Alert (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode Unified2AlertThreadInit(ThreadVars *, void *, void **); TmEcode Unified2AlertThreadDeinit(ThreadVars *, void *); int Unified2IPv4TypeAlert(ThreadVars *, Packet *, void *, PacketQueue *); @@ -203,7 +203,7 @@ int Unified2AlertRotateFile(ThreadVars *t, Unified2AlertThread *aun) { return 0; } -TmEcode Unified2Alert (ThreadVars *t, Packet *p, void *data, PacketQueue *pq) +TmEcode Unified2Alert (ThreadVars *t, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { int ret = 0; @@ -763,7 +763,7 @@ static int Unified2Test01 (void) { ret = Unified2AlertThreadInit(&tv, oc, &data); if(ret == TM_ECODE_FAILED) return 0; - ret = Unified2Alert(&tv, &p, data, &pq); + ret = Unified2Alert(&tv, &p, data, &pq, NULL); if(ret == TM_ECODE_FAILED) return 0; ret = Unified2AlertThreadDeinit(&tv, data); @@ -832,7 +832,7 @@ static int Unified2Test02 (void) { ret = Unified2AlertThreadInit(&tv, oc, &data); if(ret == -1) return 0; - ret = Unified2Alert(&tv, &p, data, &pq); + ret = Unified2Alert(&tv, &p, data, &pq, NULL); if(ret == TM_ECODE_FAILED) return 0; ret = Unified2AlertThreadDeinit(&tv, data); @@ -907,7 +907,7 @@ static int Unified2Test03 (void) { ret = Unified2AlertThreadInit(&tv, oc, &data); if(ret == -1) return 0; - ret = Unified2Alert(&tv, &p, data, &pq); + ret = Unified2Alert(&tv, &p, data, &pq, NULL); if(ret == TM_ECODE_FAILED) return 0; ret = Unified2AlertThreadDeinit(&tv, data); @@ -976,7 +976,7 @@ static int Unified2Test04 (void) { ret = Unified2AlertThreadInit(&tv, oc, &data); if(ret == -1) return 0; - ret = Unified2Alert(&tv, &p, data, &pq); + ret = Unified2Alert(&tv, &p, data, &pq, NULL); if(ret == TM_ECODE_FAILED) return 0; ret = Unified2AlertThreadDeinit(&tv, data); @@ -1045,7 +1045,7 @@ static int Unified2Test05 (void) { ret = Unified2AlertThreadInit(&tv, oc, &data); if(ret == -1) return 0; - ret = Unified2Alert(&tv, &p, data, &pq); + ret = Unified2Alert(&tv, &p, data, &pq, NULL); if(ret == TM_ECODE_FAILED) return 0; ret = Unified2AlertThreadDeinit(&tv, data); diff --git a/src/detect.c b/src/detect.c index 293b7492b3..ad6c2d6b3d 100644 --- a/src/detect.c +++ b/src/detect.c @@ -150,7 +150,7 @@ void DbgPrintSigs(DetectEngineCtx *, SigGroupHead *); void DbgPrintSigs2(DetectEngineCtx *, SigGroupHead *); /* tm module api functions */ -TmEcode Detect(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode Detect(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode DetectThreadInit(ThreadVars *, void *, void **); TmEcode DetectThreadDeinit(ThreadVars *, void *); @@ -871,7 +871,7 @@ end: * \retval TM_ECODE_FAILED error * \retval TM_ECODE_OK ok */ -TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { +TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { /* No need to perform any detection on this packet, if the the given flag is set.*/ if (p->flags & PKT_NOPACKET_INSPECTION) @@ -6974,7 +6974,7 @@ int SigTest40NoPacketInspection01(void) { //DetectEngineIPOnlyThreadInit(de_ctx,&det_ctx->io_ctx); det_ctx->de_ctx = de_ctx; - Detect(&th_v, &p, det_ctx, &pq); + Detect(&th_v, &p, det_ctx, &pq, NULL); if (PacketAlertCheck(&p, 2)) result = 0; else @@ -8622,20 +8622,20 @@ static int SigTestDetectAlertCounter(void) p.payload = (uint8_t *)"boo"; p.payload_len = strlen((char *)p.payload); p.proto = IPPROTO_TCP; - Detect(&tv, &p, det_ctx, NULL); + Detect(&tv, &p, det_ctx, NULL, NULL); result = (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 1); - Detect(&tv, &p, det_ctx, NULL); + Detect(&tv, &p, det_ctx, NULL, NULL); result &= (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 2); p.payload = (uint8_t *)"roo"; p.payload_len = strlen((char *)p.payload); - Detect(&tv, &p, det_ctx, NULL); + Detect(&tv, &p, det_ctx, NULL, NULL); result &= (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 2); p.payload = (uint8_t *)"laboosa"; p.payload_len = strlen((char *)p.payload); - Detect(&tv, &p, det_ctx, NULL); + Detect(&tv, &p, det_ctx, NULL, NULL); result &= (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 3); end: diff --git a/src/log-httplog.c b/src/log-httplog.c index 451f598664..84c76f97bb 100644 --- a/src/log-httplog.c +++ b/src/log-httplog.c @@ -50,9 +50,9 @@ #define MODULE_NAME "LogHttpLog" -TmEcode LogHttpLog (ThreadVars *, Packet *, void *, PacketQueue *); -TmEcode LogHttpLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *); -TmEcode LogHttpLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode LogHttpLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode LogHttpLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode LogHttpLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode LogHttpLogThreadInit(ThreadVars *, void *, void **); TmEcode LogHttpLogThreadDeinit(ThreadVars *, void *); void LogHttpLogExitPrintStats(ThreadVars *, void *); @@ -107,7 +107,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size) (uint32_t) ts->tv_usec); } -TmEcode LogHttpLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode LogHttpLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); LogHttpLogThread *aft = (LogHttpLogThread *)data; @@ -224,7 +224,7 @@ end: SCReturnInt(TM_ECODE_OK); } -TmEcode LogHttpLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode LogHttpLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); LogHttpLogThread *aft = (LogHttpLogThread *)data; @@ -341,7 +341,7 @@ end: SCReturnInt(TM_ECODE_OK); } -TmEcode LogHttpLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode LogHttpLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); @@ -355,9 +355,9 @@ TmEcode LogHttpLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) } if (PKT_IS_IPV4(p)) { - SCReturnInt(LogHttpLogIPv4(tv, p, data, pq)); + SCReturnInt(LogHttpLogIPv4(tv, p, data, pq, postpq)); } else if (PKT_IS_IPV6(p)) { - SCReturnInt(LogHttpLogIPv6(tv, p, data, pq)); + SCReturnInt(LogHttpLogIPv6(tv, p, data, pq, postpq)); } SCReturnInt(TM_ECODE_OK); diff --git a/src/respond-reject.c b/src/respond-reject.c index 7936b8dae0..052abd3d2c 100644 --- a/src/respond-reject.c +++ b/src/respond-reject.c @@ -56,7 +56,7 @@ void TmModuleRespondRejectRegister (void) { tmm_modules[TMM_RESPONDREJECT].cap_flags = 0; /* libnet is not compat with caps */ } -TmEcode RespondRejectFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { +TmEcode RespondRejectFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { int ret = 0; /* ACTION_REJECT defaults to rejecting the SRC */ diff --git a/src/respond-reject.h b/src/respond-reject.h index 962e4d29c3..bf2eafd627 100644 --- a/src/respond-reject.h +++ b/src/respond-reject.h @@ -28,6 +28,6 @@ #define REJECT_DIR_DST 1 void TmModuleRespondRejectRegister (void); -TmEcode RespondRejectFunc(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode RespondRejectFunc(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); #endif /* __RESPOND_REJECT_H__ */ diff --git a/src/runmodes.c b/src/runmodes.c index 34c0dc550f..5a9a923d9c 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -2191,6 +2191,106 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { return 0; } +int RunModeFilePcapAuto2(DetectEngineCtx *de_ctx, char *file) { + SCEnter(); + char tname[12]; + uint16_t cpu = 0; + + /* Available cpus */ + uint16_t ncpus = UtilCpuGetNumProcessorsOnline(); + + SCLogDebug("file %s", file); + TimeModeSetOffline(); + + /* create the threads */ + ThreadVars *tv = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","varslot"); + if (tv == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + TmModule *tm_module = TmModuleGetByName("ReceivePcapFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed for ReceivePcap\n"); + exit(EXIT_FAILURE); + } + TmVarSlotSetFuncAppend(tv,tm_module,file); + + tm_module = TmModuleGetByName("DecodePcapFile"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodePcap failed\n"); + exit(EXIT_FAILURE); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + tm_module = TmModuleGetByName("StreamTcp"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + exit(EXIT_FAILURE); + } + TmVarSlotSetFuncAppend(tv,tm_module,NULL); + + TmThreadSetCPUAffinity(tv, 0); + if (ncpus > 1) + TmThreadSetThreadPriority(tv, PRIO_MEDIUM); + + if (TmThreadSpawn(tv) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + + for (cpu = 0; cpu < ncpus; cpu++) { + snprintf(tname, sizeof(tname),"Detect%"PRIu16, cpu+1); + if (tname == NULL) + break; + + char *thread_name = SCStrdup(tname); + SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); + + ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"pickup-queue","simple","alert-queue1","simple","1slot"); + if (tv_detect_ncpu == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + tm_module = TmModuleGetByName("Detect"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName Detect failed\n"); + exit(EXIT_FAILURE); + } + Tm1SlotSetFunc(tv_detect_ncpu,tm_module,(void *)de_ctx); + + TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu); + /* If we have more than one core/cpu, the first Detect thread + * (at cpu 0) will have less priority (higher 'nice' value) + * In this case we will set the thread priority to +10 (default is 0) + */ + if (cpu == 0 && ncpus > 1) { + TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW); + } else if (ncpus > 1) { + TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM); + } + + if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + } + + ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs", + "alert-queue1", "simple", "packetpool", "packetpool", "varslot"); + SetupOutputs(tv_outputs); + + TmThreadSetCPUAffinity(tv_outputs, 0); + if (ncpus > 1) + TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM); + + if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + + return 0; +} + /** * \brief RunModeIpsIPFWAuto set up the following thread packet handlers: * - Receive thread (from IPFW) diff --git a/src/runmodes.h b/src/runmodes.h index 8ec7a673c6..16ebab800d 100644 --- a/src/runmodes.h +++ b/src/runmodes.h @@ -36,6 +36,7 @@ int RunModeIpsNFQAuto(DetectEngineCtx *, char *); int RunModeFilePcap(DetectEngineCtx *, char *); int RunModeFilePcap2(DetectEngineCtx *, char *); int RunModeFilePcapAuto(DetectEngineCtx *, char *); +int RunModeFilePcapAuto2(DetectEngineCtx *, char *); int RunModeIdsPfring(DetectEngineCtx *, char *); int RunModeIdsPfring2(DetectEngineCtx *, char *); diff --git a/src/source-erf-file.c b/src/source-erf-file.c index e48d22960f..a9e2e1c295 100644 --- a/src/source-erf-file.c +++ b/src/source-erf-file.c @@ -59,13 +59,13 @@ typedef struct ErfFileThreadVars_ { uint64_t bytes; } ErfFileThreadVars; -TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **); void ReceiveErfFileThreadExitStats(ThreadVars *, void *); TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *); TmEcode DecodeErfFileThreadInit(ThreadVars *, void *, void **); -TmEcode DecodeErfFile(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode DecodeErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); /** * \brief Register the ERF file receiver (reader) module. @@ -105,7 +105,7 @@ TmModuleDecodeErfFileRegister(void) * decoding. */ TmEcode -ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); @@ -218,7 +218,7 @@ DecodeErfFileThreadInit(ThreadVars *tv, void *initdata, void **data) * off to the ethernet decoder. */ TmEcode -DecodeErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +DecodeErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); DecodeThreadVars *dtv = (DecodeThreadVars *)data; diff --git a/src/source-nfq.c b/src/source-nfq.c index 40e68d1e4b..b702a2e0ad 100644 --- a/src/source-nfq.c +++ b/src/source-nfq.c @@ -105,16 +105,16 @@ static uint16_t receive_queue_num = 0; static uint16_t verdict_queue_num = 0; static SCMutex nfq_init_lock; -TmEcode ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode ReceiveNFQThreadInit(ThreadVars *, void *, void **); void ReceiveNFQThreadExitStats(ThreadVars *, void *); -TmEcode VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode VerdictNFQThreadInit(ThreadVars *, void *, void **); void VerdictNFQThreadExitStats(ThreadVars *, void *); TmEcode VerdictNFQThreadDeinit(ThreadVars *, void *); -TmEcode DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode DecodeNFQThreadInit(ThreadVars *, void *, void **); void TmModuleReceiveNFQRegister (void) { @@ -497,7 +497,7 @@ process_rv: /** * \brief NFQ receive module main entry function: receive a packet from NFQ */ -TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { +TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { NFQThreadVars *ntv = (NFQThreadVars *)data; @@ -572,7 +572,7 @@ void NFQSetVerdict(NFQThreadVars *t, Packet *p) { /** * \brief NFQ verdict module packet entry function */ -TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { +TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { NFQThreadVars *ntv = (NFQThreadVars *)data; @@ -609,7 +609,7 @@ TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { /** * \brief Decode a packet coming from NFQ */ -TmEcode DecodeNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode DecodeNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { IPV4Hdr *ip4h = (IPV4Hdr *)p->pkt; diff --git a/src/source-pcap-file.c b/src/source-pcap-file.c index c12d04c081..11ebece35b 100644 --- a/src/source-pcap-file.c +++ b/src/source-pcap-file.c @@ -45,6 +45,7 @@ #include "util-privs.h" extern int max_pending_packets; +static int pcap_max_read_packets = 0; typedef struct PcapFileGlobalVars_ { pcap_t *pcap_handle; @@ -54,6 +55,9 @@ typedef struct PcapFileGlobalVars_ { uint64_t cnt; /** packet counter */ } PcapFileGlobalVars; +/** max packets < 65536 */ +#define PCAP_FILE_MAX_PKTS 256 + typedef struct PcapFileThreadVars_ { /* counters */ @@ -64,16 +68,21 @@ typedef struct PcapFileThreadVars_ ThreadVars *tv; Packet *in_p; + + Packet *array[PCAP_FILE_MAX_PKTS]; + uint16_t array_idx; + + uint8_t done; } PcapFileThreadVars; static PcapFileGlobalVars pcap_g; -TmEcode ReceivePcapFile(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode ReceivePcapFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode ReceivePcapFileThreadInit(ThreadVars *, void *, void **); void ReceivePcapFileThreadExitStats(ThreadVars *, void *); TmEcode ReceivePcapFileThreadDeinit(ThreadVars *, void *); -TmEcode DecodePcapFile(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode DecodePcapFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode DecodePcapFileThreadInit(ThreadVars *, void *, void **); void TmModuleReceivePcapFileRegister (void) { @@ -103,7 +112,16 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { PcapFileThreadVars *ptv = (PcapFileThreadVars *)user; - Packet *p = ptv->in_p; + Packet *p = NULL; + if (ptv->array_idx == 0) { + p = ptv->in_p; + } else { + p = PacketGetFromQueueOrAlloc(); + } + + if (p == NULL) { + return; + } p->ts.tv_sec = h->ts.tv_sec; p->ts.tv_usec = h->ts.tv_usec; @@ -118,21 +136,46 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { memcpy(p->pkt, pkt, p->pktlen); //printf("PcapFileCallback: p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)\n", p->pktlen, *pkt, *p->pkt); + /* store the packet in our array */ + ptv->array[ptv->array_idx] = p; + ptv->array_idx++; + SCReturn; } /** * \brief Main PCAP file reading function */ -TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { +TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); PcapFileThreadVars *ptv = (PcapFileThreadVars *)data; + + if (ptv->done == 1) { + SCReturnInt(TM_ECODE_FAILED); + } + + ptv->array_idx = 0; ptv->in_p = p; /* Right now we just support reading packets one at a time. */ - int r = pcap_dispatch(pcap_g.pcap_handle, 1, + int r = pcap_dispatch(pcap_g.pcap_handle, pcap_max_read_packets, (pcap_handler)PcapFileCallback, (u_char *)ptv); + + uint16_t cnt = 0; + for (cnt = 0; cnt < ptv->array_idx; cnt++) { + Packet *pp = ptv->array[cnt]; + + pcap_g.cnt++; + + if (cnt > 0) { + pp->pcap_cnt = pcap_g.cnt; + PacketEnqueue(postpq, pp); + } else { + p->pcap_cnt = pcap_g.cnt; + } + } + if (r < 0) { SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s", r, pcap_geterr(pcap_g.pcap_handle)); @@ -143,10 +186,9 @@ TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) SCLogInfo("pcap file end of file reached (pcap err code %" PRId32 ")", r); EngineStop(); - SCReturnInt(TM_ECODE_FAILED); + ptv->done = 1; + SCReturnInt(TM_ECODE_OK); } - pcap_g.cnt++; - p->pcap_cnt = pcap_g.cnt; SCReturnInt(TM_ECODE_OK); } @@ -159,6 +201,11 @@ TmEcode ReceivePcapFileThreadInit(ThreadVars *tv, void *initdata, void **data) { SCReturnInt(TM_ECODE_FAILED); } + /* use max_pending_packets as pcap read size unless it's bigger than + * our size limit */ + pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ? + PCAP_FILE_MAX_PKTS : max_pending_packets; + SCLogInfo("reading pcap file %s", (char *)initdata); PcapFileThreadVars *ptv = SCMalloc(sizeof(PcapFileThreadVars)); @@ -234,7 +281,7 @@ TmEcode ReceivePcapFileThreadDeinit(ThreadVars *tv, void *data) { SCReturnInt(TM_ECODE_OK); } -TmEcode DecodePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode DecodePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); DecodeThreadVars *dtv = (DecodeThreadVars *)data; diff --git a/src/source-pcap.c b/src/source-pcap.c index c23da71c28..42ec468adf 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -68,13 +68,13 @@ typedef struct PcapThreadVars_ ThreadVars *tv; } PcapThreadVars; -TmEcode ReceivePcap(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode ReceivePcap(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode ReceivePcapThreadInit(ThreadVars *, void *, void **); void ReceivePcapThreadExitStats(ThreadVars *, void *); TmEcode ReceivePcapThreadDeinit(ThreadVars *, void *); TmEcode DecodePcapThreadInit(ThreadVars *, void *, void **); -TmEcode DecodePcap(ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode DecodePcap(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); /** * \brief Registration Function for RecievePcap. @@ -146,7 +146,7 @@ void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) { * \param pq pointer to the PacketQueue (not used here but part of the api) * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success */ -TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) { +TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); PcapThreadVars *ptv = (PcapThreadVars *)data; @@ -376,7 +376,7 @@ TmEcode ReceivePcapThreadDeinit(ThreadVars *tv, void *data) { * \param data pointer that gets cast into PcapThreadVars for ptv * \param pq pointer to the current PacketQueue */ -TmEcode DecodePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode DecodePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { SCEnter(); DecodeThreadVars *dtv = (DecodeThreadVars *)data; diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 392e4e859b..e0e1d390df 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -65,7 +65,7 @@ typedef struct StreamTcpThread_ { TcpReassemblyThreadCtx *ra_ctx; /**< tcp reassembly thread data */ } StreamTcpThread; -TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *); +TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **); TmEcode StreamTcpThreadDeinit(ThreadVars *, void *); void StreamTcpExitPrintStats(ThreadVars *, void *); @@ -2512,7 +2512,7 @@ static int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt) SCReturnInt(0); } -TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) +TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) { StreamTcpThread *stt = (StreamTcpThread *)data; TmEcode ret = TM_ECODE_OK; diff --git a/src/suricata.c b/src/suricata.c index d08ef91adc..cf08f9024b 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -922,6 +922,7 @@ int main(int argc, char **argv) //RunModeFilePcap(de_ctx, pcap_file); //RunModeFilePcap2(de_ctx, pcap_file); RunModeFilePcapAuto(de_ctx, pcap_file); + //RunModeFilePcapAuto2(de_ctx, pcap_file); } else if (run_mode == MODE_PFRING) { //RunModeIdsPfring3(de_ctx, pfring_dev); diff --git a/src/tm-modules.h b/src/tm-modules.h index 2cacbb7f4b..aecad59c9d 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -41,7 +41,7 @@ typedef struct TmModule_ { TmEcode (*ThreadDeinit)(ThreadVars *, void *); /** the packet processing function */ - TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *); + TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); void (*RegisterTests)(void); diff --git a/src/tm-threads.c b/src/tm-threads.c index d23279ad5f..67378c797f 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -70,7 +70,7 @@ uint8_t tv_aof = THV_RESTART_THREAD; typedef struct TmSlot_ { /* function pointers */ - TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *); + TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode (*SlotThreadInit)(ThreadVars *, void *, void **); void (*SlotThreadExitPrintStats)(ThreadVars *, void *); @@ -79,10 +79,20 @@ typedef struct TmSlot_ { /* data storage */ void *slot_initdata; void *slot_data; - PacketQueue slot_pq; + + /**< queue filled by the SlotFunc with packets that will + * be processed futher _before_ the current packet. */ + PacketQueue slot_pre_pq; + + /**< queue filled by the SlotFunc with packets that will + * be processed futher _after_ the current packet. */ + PacketQueue slot_post_pq; /* linked list, only used by TmVarSlot */ struct TmSlot_ *slot_next; + + int id; /**< slot id, only used my TmVarSlot to know what the first + * slot is. */ } TmSlot; /* 1 function slot */ @@ -165,29 +175,38 @@ void *TmThreadsSlot1NoIn(void *td) { pthread_exit((void *) -1); } } - memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { TmThreadTestThreadUnPaused(tv); - r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pq); + r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pre_pq, &s->s.slot_post_pq); /* handle error */ if (r == TM_ECODE_FAILED) { - TmqhReleasePacketsToPacketPool(&s->s.slot_pq); + TmqhReleasePacketsToPacketPool(&s->s.slot_pre_pq); + TmqhReleasePacketsToPacketPool(&s->s.slot_post_pq); TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; } - while (s->s.slot_pq.len > 0) { - Packet *extra = PacketDequeue(&s->s.slot_pq); + /* handle pre queue */ + while (s->s.slot_pre_pq.len > 0) { + Packet *extra = PacketDequeue(&s->s.slot_pre_pq); tv->tmqh_out(tv, extra); } tv->tmqh_out(tv, p); + /* handle post queue */ + while (s->s.slot_post_pq.len > 0) { + Packet *extra = PacketDequeue(&s->s.slot_post_pq); + tv->tmqh_out(tv, extra); + } + if (TmThreadsCheckFlag(tv, THV_KILL)) { SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); run = 0; @@ -235,7 +254,8 @@ void *TmThreadsSlot1NoOut(void *td) { pthread_exit((void *) -1); } } - memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); @@ -244,7 +264,7 @@ void *TmThreadsSlot1NoOut(void *td) { p = tv->tmqh_in(tv); - r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL); + r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL, /* no outqh no pq */NULL); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhOutputPacketpool(tv, p); @@ -300,14 +320,15 @@ void *TmThreadsSlot1NoInOut(void *td) { pthread_exit((void *) -1); } } - memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { TmThreadTestThreadUnPaused(tv); - r = s->s.SlotFunc(tv, NULL, s->s.slot_data, /* no outqh, no pq */NULL); + r = s->s.SlotFunc(tv, NULL, s->s.slot_data, /* no outqh, no pq */NULL, NULL); //printf("%s: TmThreadsSlot1NoInNoOut: r %" PRId32 "\n", tv->name, r); /* handle error */ @@ -367,7 +388,8 @@ void *TmThreadsSlot1(void *td) { pthread_exit((void *) -1); } } - memset(&s->s.slot_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue)); + memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { @@ -379,25 +401,30 @@ void *TmThreadsSlot1(void *td) { if (p == NULL) { //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); } else { - r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pq); + r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pre_pq, &s->s.slot_post_pq); /* handle error */ if (r == TM_ECODE_FAILED) { - TmqhReleasePacketsToPacketPool(&s->s.slot_pq); + TmqhReleasePacketsToPacketPool(&s->s.slot_pre_pq); + TmqhReleasePacketsToPacketPool(&s->s.slot_post_pq); TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; } - while (s->s.slot_pq.len > 0) { + while (s->s.slot_pre_pq.len > 0) { /* handle new packets from this func */ - Packet *extra_p = PacketDequeue(&s->s.slot_pq); + Packet *extra_p = PacketDequeue(&s->s.slot_pre_pq); tv->tmqh_out(tv, extra_p); } - //printf("%s: TmThreadsSlot1: p %p, r %" PRId32 "\n", tv->name, p, r); - /* output the packet */ tv->tmqh_out(tv, p); + + while (s->s.slot_post_pq.len > 0) { + /* handle new packets from this func */ + Packet *extra_p = PacketDequeue(&s->s.slot_post_pq); + tv->tmqh_out(tv, extra_p); + } } if (TmThreadsCheckFlag(tv, THV_KILL)) { @@ -424,24 +451,32 @@ void *TmThreadsSlot1(void *td) { pthread_exit((void *) 0); } -/* separate run function so we can call it recursively */ +/** \brief separate run function so we can call it recursively + * + * \todo deal with post_pq for slots beyond the first + */ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) { TmEcode r = TM_ECODE_OK; TmSlot *s = NULL; for (s = slot; s != NULL; s = s->slot_next) { - r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pq); + if (s->id == 0) { + r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq); + } else { + r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, NULL); + } /* handle error */ if (r == TM_ECODE_FAILED) { /* Encountered error. Return packets to packetpool and return */ - TmqhReleasePacketsToPacketPool(&s->slot_pq); + TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); + TmqhReleasePacketsToPacketPool(&s->slot_post_pq); TmThreadsSetFlag(tv, THV_FAILED); return TM_ECODE_FAILED; } /* handle new packets */ - while (s->slot_pq.len > 0) { - Packet *extra_p = PacketDequeue(&s->slot_pq); + while (s->slot_pre_pq.len > 0) { + Packet *extra_p = PacketDequeue(&s->slot_pre_pq); /* see if we need to process the packet */ if (s->slot_next != NULL) { @@ -449,7 +484,8 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl /* XXX handle error */ if (r == TM_ECODE_FAILED) { //printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n"); - TmqhReleasePacketsToPacketPool(&s->slot_pq); + TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); + TmqhReleasePacketsToPacketPool(&s->slot_post_pq); TmqhOutputPacketpool(tv, extra_p); TmThreadsSetFlag(tv, THV_FAILED); return TM_ECODE_FAILED; @@ -457,11 +493,17 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl } tv->tmqh_out(tv, extra_p); } + + /** \todo post pq */ } return TM_ECODE_OK; } +/** + * \todo only the first "slot" currently makes the "post_pq" available + * to the thread module. + */ void *TmThreadsSlotVar(void *td) { ThreadVars *tv = (ThreadVars *)td; TmVarSlot *s = (TmVarSlot *)tv->tm_slots; @@ -479,8 +521,6 @@ void *TmThreadsSlotVar(void *td) { if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); - //printf("TmThreadsSlot1: %s starting\n", tv->name); - for (slot = s->s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadInit != NULL) { r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot->slot_data); @@ -491,7 +531,8 @@ void *TmThreadsSlotVar(void *td) { pthread_exit((void *) -1); } } - memset(&slot->slot_pq, 0, sizeof(PacketQueue)); + memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue)); + memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); } TmThreadsSetFlag(tv, THV_INIT_DONE); @@ -501,15 +542,13 @@ void *TmThreadsSlotVar(void *td) { /* input a packet */ p = tv->tmqh_in(tv); - //printf("TmThreadsSlotVar: %p\n", p); if (p == NULL) { - //printf("%s: TmThreadsSlot1: p == NULL\n", tv->name); + //printf("%s: TmThreadsSlotVar: p == NULL\n", tv->name); } else { + r = TmThreadsSlotVarRun(tv, p, s->s); - /* XXX handle error */ if (r == TM_ECODE_FAILED) { - //printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n"); TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; @@ -517,14 +556,30 @@ void *TmThreadsSlotVar(void *td) { /* output the packet */ tv->tmqh_out(tv, p); + + /* now handle the post_pq packets */ + while (s->s->slot_post_pq.len > 0) { + Packet *extra_p = PacketDequeue(&s->s->slot_post_pq); + + if (s->s->slot_next != NULL) { + r = TmThreadsSlotVarRun(tv, extra_p, s->s->slot_next); + if (r == TM_ECODE_FAILED) { + TmqhOutputPacketpool(tv, extra_p); + TmThreadsSetFlag(tv, THV_FAILED); + break; + } + } + + /* output the packet */ + tv->tmqh_out(tv, extra_p); + } } if (TmThreadsCheckFlag(tv, THV_KILL)) { - //printf("%s: TmThreadsSlot1: KILL is set\n", tv->name); - SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); run = 0; } } + SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0); for (slot = s->s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadExitPrintStats != NULL) { @@ -626,6 +681,7 @@ void TmVarSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) { if (s->s == NULL) { s->s = slot; + slot->id = 0; } else { TmSlot *a = s->s, *b = NULL; @@ -636,6 +692,7 @@ void TmVarSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) { /* append the new slot */ if (b != NULL) { b->slot_next = slot; + slot->id = b->id + 1; } } }