Make pcap file mode read multiple packets per 'read'. Update threading model to deal with this.

remotes/origin/master-1.0.x
Victor Julien 15 years ago
parent 6f502f0da5
commit 4e7df60b2f

@ -51,9 +51,9 @@
#define MODULE_NAME "AlertDebugLog"
TmEcode AlertDebugLog (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertDebugLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertDebugLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertDebugLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertDebugLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertDebugLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertDebugLogThreadInit(ThreadVars *, void*, void **);
TmEcode AlertDebugLogThreadDeinit(ThreadVars *, void *);
void AlertDebugLogExitPrintStats(ThreadVars *, void *);
@ -88,7 +88,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size)
(uint32_t) ts->tv_usec);
}
TmEcode AlertDebugLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertDebugLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
AlertDebugLogThread *aft = (AlertDebugLogThread *)data;
int i;
@ -169,7 +169,7 @@ TmEcode AlertDebugLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq
return TM_ECODE_OK;
}
TmEcode AlertDebugLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertDebugLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
AlertDebugLogThread *aft = (AlertDebugLogThread *)data;
int i;
@ -199,12 +199,12 @@ TmEcode AlertDebugLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq
return TM_ECODE_OK;
}
TmEcode AlertDebugLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertDebugLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
if (PKT_IS_IPV4(p)) {
return AlertDebugLogIPv4(tv, p, data, pq);
return AlertDebugLogIPv4(tv, p, data, pq, postpq);
} else if (PKT_IS_IPV6(p)) {
return AlertDebugLogIPv6(tv, p, data, pq);
return AlertDebugLogIPv6(tv, p, data, pq, postpq);
}
return TM_ECODE_OK;

@ -60,9 +60,9 @@
#define MODULE_NAME "AlertFastLog"
TmEcode AlertFastLog (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertFastLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertFastLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertFastLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertFastLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertFastLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertFastLogThreadInit(ThreadVars *, void *, void **);
TmEcode AlertFastLogThreadDeinit(ThreadVars *, void *);
void AlertFastLogExitPrintStats(ThreadVars *, void *);
@ -117,7 +117,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size)
(uint32_t) ts->tv_usec);
}
TmEcode AlertFastLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertFastLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
AlertFastLogThread *aft = (AlertFastLogThread *)data;
int i;
@ -160,7 +160,7 @@ TmEcode AlertFastLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
return TM_ECODE_OK;
}
TmEcode AlertFastLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertFastLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
AlertFastLogThread *aft = (AlertFastLogThread *)data;
int i;
@ -202,12 +202,12 @@ TmEcode AlertFastLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
return TM_ECODE_OK;
}
TmEcode AlertFastLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertFastLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
if (PKT_IS_IPV4(p)) {
return AlertFastLogIPv4(tv, p, data, pq);
return AlertFastLogIPv4(tv, p, data, pq, postpq);
} else if (PKT_IS_IPV6(p)) {
return AlertFastLogIPv6(tv, p, data, pq);
return AlertFastLogIPv6(tv, p, data, pq, postpq);
}
return TM_ECODE_OK;

@ -59,7 +59,7 @@
*
*/
TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertPreludeThreadInit(ThreadVars *, void *, void **);
TmEcode AlertPreludeThreadDeinit(ThreadVars *, void *);
int AlertPreludeOpenFileCtx(LogFileCtx *, char *);
@ -85,7 +85,7 @@ TmEcode AlertPreludeThreadInit(ThreadVars *t, void *initdata, void **data)
return TM_ECODE_FAILED;
}
TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
return TM_ECODE_OK;
}
@ -119,7 +119,7 @@ static unsigned int mid_priority = 2;
OutputCtx *AlertPreludeInitCtx(ConfNode *conf);
TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertPrelude (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertPreludeThreadInit(ThreadVars *, void *, void **);
TmEcode AlertPreludeThreadDeinit(ThreadVars *, void *);
int AlertPreludeOpenFileCtx(LogFileCtx *, char *);
@ -621,7 +621,7 @@ static int EventToReference(PacketAlert *pa, Packet *p, idmef_classification_t *
*
* \return TM_ECODE_OK if ok, else TM_ECODE_FAILED
*/
TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertPrelude (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
AlertPreludeThread *apn = (AlertPreludeThread *)data;
uint8_t ethh_offset = 0;

@ -58,7 +58,7 @@
#define MODULE_NAME "AlertUnifiedAlert"
TmEcode AlertUnifiedAlert (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertUnifiedAlert (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertUnifiedAlertThreadInit(ThreadVars *, void *, void **);
TmEcode AlertUnifiedAlertThreadDeinit(ThreadVars *, void *);
int AlertUnifiedAlertOpenFileCtx(LogFileCtx *, const char *);
@ -179,7 +179,7 @@ int AlertUnifiedAlertRotateFile(ThreadVars *t, AlertUnifiedAlertThread *aun) {
return 0;
}
TmEcode AlertUnifiedAlert (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertUnifiedAlert (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
AlertUnifiedAlertThread *aun = (AlertUnifiedAlertThread *)data;
AlertUnifiedAlertPacketHeader hdr;

@ -60,7 +60,7 @@
#define MODULE_NAME "AlertUnifiedLog"
TmEcode AlertUnifiedLog (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode AlertUnifiedLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode AlertUnifiedLogThreadInit(ThreadVars *, void *, void **);
TmEcode AlertUnifiedLogThreadDeinit(ThreadVars *, void *);
int AlertUnifiedLogOpenFileCtx(LogFileCtx *, const char *);
@ -182,7 +182,7 @@ int AlertUnifiedLogRotateFile(ThreadVars *t, AlertUnifiedLogThread *aun) {
return 0;
}
TmEcode AlertUnifiedLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode AlertUnifiedLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *post_pq)
{
AlertUnifiedLogThread *aun = (AlertUnifiedLogThread *)data;
AlertUnifiedLogPacketHeader hdr;

@ -60,7 +60,7 @@
#define MIN_LIMIT 1
/*prototypes*/
TmEcode Unified2Alert (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode Unified2Alert (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode Unified2AlertThreadInit(ThreadVars *, void *, void **);
TmEcode Unified2AlertThreadDeinit(ThreadVars *, void *);
int Unified2IPv4TypeAlert(ThreadVars *, Packet *, void *, PacketQueue *);
@ -203,7 +203,7 @@ int Unified2AlertRotateFile(ThreadVars *t, Unified2AlertThread *aun) {
return 0;
}
TmEcode Unified2Alert (ThreadVars *t, Packet *p, void *data, PacketQueue *pq)
TmEcode Unified2Alert (ThreadVars *t, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
int ret = 0;
@ -763,7 +763,7 @@ static int Unified2Test01 (void) {
ret = Unified2AlertThreadInit(&tv, oc, &data);
if(ret == TM_ECODE_FAILED)
return 0;
ret = Unified2Alert(&tv, &p, data, &pq);
ret = Unified2Alert(&tv, &p, data, &pq, NULL);
if(ret == TM_ECODE_FAILED)
return 0;
ret = Unified2AlertThreadDeinit(&tv, data);
@ -832,7 +832,7 @@ static int Unified2Test02 (void) {
ret = Unified2AlertThreadInit(&tv, oc, &data);
if(ret == -1)
return 0;
ret = Unified2Alert(&tv, &p, data, &pq);
ret = Unified2Alert(&tv, &p, data, &pq, NULL);
if(ret == TM_ECODE_FAILED)
return 0;
ret = Unified2AlertThreadDeinit(&tv, data);
@ -907,7 +907,7 @@ static int Unified2Test03 (void) {
ret = Unified2AlertThreadInit(&tv, oc, &data);
if(ret == -1)
return 0;
ret = Unified2Alert(&tv, &p, data, &pq);
ret = Unified2Alert(&tv, &p, data, &pq, NULL);
if(ret == TM_ECODE_FAILED)
return 0;
ret = Unified2AlertThreadDeinit(&tv, data);
@ -976,7 +976,7 @@ static int Unified2Test04 (void) {
ret = Unified2AlertThreadInit(&tv, oc, &data);
if(ret == -1)
return 0;
ret = Unified2Alert(&tv, &p, data, &pq);
ret = Unified2Alert(&tv, &p, data, &pq, NULL);
if(ret == TM_ECODE_FAILED)
return 0;
ret = Unified2AlertThreadDeinit(&tv, data);
@ -1045,7 +1045,7 @@ static int Unified2Test05 (void) {
ret = Unified2AlertThreadInit(&tv, oc, &data);
if(ret == -1)
return 0;
ret = Unified2Alert(&tv, &p, data, &pq);
ret = Unified2Alert(&tv, &p, data, &pq, NULL);
if(ret == TM_ECODE_FAILED)
return 0;
ret = Unified2AlertThreadDeinit(&tv, data);

@ -150,7 +150,7 @@ void DbgPrintSigs(DetectEngineCtx *, SigGroupHead *);
void DbgPrintSigs2(DetectEngineCtx *, SigGroupHead *);
/* tm module api functions */
TmEcode Detect(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode Detect(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode DetectThreadInit(ThreadVars *, void *, void **);
TmEcode DetectThreadDeinit(ThreadVars *, void *);
@ -871,7 +871,7 @@ end:
* \retval TM_ECODE_FAILED error
* \retval TM_ECODE_OK ok
*/
TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
TmEcode Detect(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
/* No need to perform any detection on this packet, if the the given flag is set.*/
if (p->flags & PKT_NOPACKET_INSPECTION)
@ -6974,7 +6974,7 @@ int SigTest40NoPacketInspection01(void) {
//DetectEngineIPOnlyThreadInit(de_ctx,&det_ctx->io_ctx);
det_ctx->de_ctx = de_ctx;
Detect(&th_v, &p, det_ctx, &pq);
Detect(&th_v, &p, det_ctx, &pq, NULL);
if (PacketAlertCheck(&p, 2))
result = 0;
else
@ -8622,20 +8622,20 @@ static int SigTestDetectAlertCounter(void)
p.payload = (uint8_t *)"boo";
p.payload_len = strlen((char *)p.payload);
p.proto = IPPROTO_TCP;
Detect(&tv, &p, det_ctx, NULL);
Detect(&tv, &p, det_ctx, NULL, NULL);
result = (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 1);
Detect(&tv, &p, det_ctx, NULL);
Detect(&tv, &p, det_ctx, NULL, NULL);
result &= (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 2);
p.payload = (uint8_t *)"roo";
p.payload_len = strlen((char *)p.payload);
Detect(&tv, &p, det_ctx, NULL);
Detect(&tv, &p, det_ctx, NULL, NULL);
result &= (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 2);
p.payload = (uint8_t *)"laboosa";
p.payload_len = strlen((char *)p.payload);
Detect(&tv, &p, det_ctx, NULL);
Detect(&tv, &p, det_ctx, NULL, NULL);
result &= (SCPerfGetLocalCounterValue(det_ctx->counter_alerts, tv.sc_perf_pca) == 3);
end:

@ -50,9 +50,9 @@
#define MODULE_NAME "LogHttpLog"
TmEcode LogHttpLog (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode LogHttpLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode LogHttpLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode LogHttpLog (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode LogHttpLogIPv4(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode LogHttpLogIPv6(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode LogHttpLogThreadInit(ThreadVars *, void *, void **);
TmEcode LogHttpLogThreadDeinit(ThreadVars *, void *);
void LogHttpLogExitPrintStats(ThreadVars *, void *);
@ -107,7 +107,7 @@ static void CreateTimeString (const struct timeval *ts, char *str, size_t size)
(uint32_t) ts->tv_usec);
}
TmEcode LogHttpLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode LogHttpLogIPv4(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
LogHttpLogThread *aft = (LogHttpLogThread *)data;
@ -224,7 +224,7 @@ end:
SCReturnInt(TM_ECODE_OK);
}
TmEcode LogHttpLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode LogHttpLogIPv6(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
LogHttpLogThread *aft = (LogHttpLogThread *)data;
@ -341,7 +341,7 @@ end:
SCReturnInt(TM_ECODE_OK);
}
TmEcode LogHttpLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode LogHttpLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
@ -355,9 +355,9 @@ TmEcode LogHttpLog (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
}
if (PKT_IS_IPV4(p)) {
SCReturnInt(LogHttpLogIPv4(tv, p, data, pq));
SCReturnInt(LogHttpLogIPv4(tv, p, data, pq, postpq));
} else if (PKT_IS_IPV6(p)) {
SCReturnInt(LogHttpLogIPv6(tv, p, data, pq));
SCReturnInt(LogHttpLogIPv6(tv, p, data, pq, postpq));
}
SCReturnInt(TM_ECODE_OK);

@ -56,7 +56,7 @@ void TmModuleRespondRejectRegister (void) {
tmm_modules[TMM_RESPONDREJECT].cap_flags = 0; /* libnet is not compat with caps */
}
TmEcode RespondRejectFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
TmEcode RespondRejectFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
int ret = 0;
/* ACTION_REJECT defaults to rejecting the SRC */

@ -28,6 +28,6 @@
#define REJECT_DIR_DST 1
void TmModuleRespondRejectRegister (void);
TmEcode RespondRejectFunc(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode RespondRejectFunc(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
#endif /* __RESPOND_REJECT_H__ */

@ -2191,6 +2191,106 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
return 0;
}
int RunModeFilePcapAuto2(DetectEngineCtx *de_ctx, char *file) {
SCEnter();
char tname[12];
uint16_t cpu = 0;
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
SCLogDebug("file %s", file);
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","varslot");
if (tv == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceivePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceivePcap\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv,tm_module,file);
tm_module = TmModuleGetByName("DecodePcapFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv,tm_module,NULL);
TmThreadSetCPUAffinity(tv, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv, PRIO_MEDIUM);
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
for (cpu = 0; cpu < ncpus; cpu++) {
snprintf(tname, sizeof(tname),"Detect%"PRIu16, cpu+1);
if (tname == NULL)
break;
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"pickup-queue","simple","alert-queue1","simple","1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_detect_ncpu,tm_module,(void *)de_ctx);
TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
/* If we have more than one core/cpu, the first Detect thread
* (at cpu 0) will have less priority (higher 'nice' value)
* In this case we will set the thread priority to +10 (default is 0)
*/
if (cpu == 0 && ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
} else if (ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
}
ThreadVars *tv_outputs = TmThreadCreatePacketHandler("Outputs",
"alert-queue1", "simple", "packetpool", "packetpool", "varslot");
SetupOutputs(tv_outputs);
TmThreadSetCPUAffinity(tv_outputs, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
return 0;
}
/**
* \brief RunModeIpsIPFWAuto set up the following thread packet handlers:
* - Receive thread (from IPFW)

@ -36,6 +36,7 @@ int RunModeIpsNFQAuto(DetectEngineCtx *, char *);
int RunModeFilePcap(DetectEngineCtx *, char *);
int RunModeFilePcap2(DetectEngineCtx *, char *);
int RunModeFilePcapAuto(DetectEngineCtx *, char *);
int RunModeFilePcapAuto2(DetectEngineCtx *, char *);
int RunModeIdsPfring(DetectEngineCtx *, char *);
int RunModeIdsPfring2(DetectEngineCtx *, char *);

@ -59,13 +59,13 @@ typedef struct ErfFileThreadVars_ {
uint64_t bytes;
} ErfFileThreadVars;
TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **);
void ReceiveErfFileThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *);
TmEcode DecodeErfFileThreadInit(ThreadVars *, void *, void **);
TmEcode DecodeErfFile(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode DecodeErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
/**
* \brief Register the ERF file receiver (reader) module.
@ -105,7 +105,7 @@ TmModuleDecodeErfFileRegister(void)
* decoding.
*/
TmEcode
ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
@ -218,7 +218,7 @@ DecodeErfFileThreadInit(ThreadVars *tv, void *initdata, void **data)
* off to the ethernet decoder.
*/
TmEcode
DecodeErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
DecodeErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
DecodeThreadVars *dtv = (DecodeThreadVars *)data;

@ -105,16 +105,16 @@ static uint16_t receive_queue_num = 0;
static uint16_t verdict_queue_num = 0;
static SCMutex nfq_init_lock;
TmEcode ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode ReceiveNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceiveNFQThreadInit(ThreadVars *, void *, void **);
void ReceiveNFQThreadExitStats(ThreadVars *, void *);
TmEcode VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode VerdictNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode VerdictNFQThreadInit(ThreadVars *, void *, void **);
void VerdictNFQThreadExitStats(ThreadVars *, void *);
TmEcode VerdictNFQThreadDeinit(ThreadVars *, void *);
TmEcode DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode DecodeNFQ(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode DecodeNFQThreadInit(ThreadVars *, void *, void **);
void TmModuleReceiveNFQRegister (void) {
@ -497,7 +497,7 @@ process_rv:
/**
* \brief NFQ receive module main entry function: receive a packet from NFQ
*/
TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
TmEcode ReceiveNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
@ -572,7 +572,7 @@ void NFQSetVerdict(NFQThreadVars *t, Packet *p) {
/**
* \brief NFQ verdict module packet entry function
*/
TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
NFQThreadVars *ntv = (NFQThreadVars *)data;
@ -609,7 +609,7 @@ TmEcode VerdictNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
/**
* \brief Decode a packet coming from NFQ
*/
TmEcode DecodeNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode DecodeNFQ(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
IPV4Hdr *ip4h = (IPV4Hdr *)p->pkt;

@ -45,6 +45,7 @@
#include "util-privs.h"
extern int max_pending_packets;
static int pcap_max_read_packets = 0;
typedef struct PcapFileGlobalVars_ {
pcap_t *pcap_handle;
@ -54,6 +55,9 @@ typedef struct PcapFileGlobalVars_ {
uint64_t cnt; /** packet counter */
} PcapFileGlobalVars;
/** max packets < 65536 */
#define PCAP_FILE_MAX_PKTS 256
typedef struct PcapFileThreadVars_
{
/* counters */
@ -64,16 +68,21 @@ typedef struct PcapFileThreadVars_
ThreadVars *tv;
Packet *in_p;
Packet *array[PCAP_FILE_MAX_PKTS];
uint16_t array_idx;
uint8_t done;
} PcapFileThreadVars;
static PcapFileGlobalVars pcap_g;
TmEcode ReceivePcapFile(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode ReceivePcapFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceivePcapFileThreadInit(ThreadVars *, void *, void **);
void ReceivePcapFileThreadExitStats(ThreadVars *, void *);
TmEcode ReceivePcapFileThreadDeinit(ThreadVars *, void *);
TmEcode DecodePcapFile(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode DecodePcapFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode DecodePcapFileThreadInit(ThreadVars *, void *, void **);
void TmModuleReceivePcapFileRegister (void) {
@ -103,7 +112,16 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
PcapFileThreadVars *ptv = (PcapFileThreadVars *)user;
Packet *p = ptv->in_p;
Packet *p = NULL;
if (ptv->array_idx == 0) {
p = ptv->in_p;
} else {
p = PacketGetFromQueueOrAlloc();
}
if (p == NULL) {
return;
}
p->ts.tv_sec = h->ts.tv_sec;
p->ts.tv_usec = h->ts.tv_usec;
@ -118,21 +136,46 @@ void PcapFileCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
memcpy(p->pkt, pkt, p->pktlen);
//printf("PcapFileCallback: p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)\n", p->pktlen, *pkt, *p->pkt);
/* store the packet in our array */
ptv->array[ptv->array_idx] = p;
ptv->array_idx++;
SCReturn;
}
/**
* \brief Main PCAP file reading function
*/
TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
SCEnter();
PcapFileThreadVars *ptv = (PcapFileThreadVars *)data;
if (ptv->done == 1) {
SCReturnInt(TM_ECODE_FAILED);
}
ptv->array_idx = 0;
ptv->in_p = p;
/* Right now we just support reading packets one at a time. */
int r = pcap_dispatch(pcap_g.pcap_handle, 1,
int r = pcap_dispatch(pcap_g.pcap_handle, pcap_max_read_packets,
(pcap_handler)PcapFileCallback, (u_char *)ptv);
uint16_t cnt = 0;
for (cnt = 0; cnt < ptv->array_idx; cnt++) {
Packet *pp = ptv->array[cnt];
pcap_g.cnt++;
if (cnt > 0) {
pp->pcap_cnt = pcap_g.cnt;
PacketEnqueue(postpq, pp);
} else {
p->pcap_cnt = pcap_g.cnt;
}
}
if (r < 0) {
SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
r, pcap_geterr(pcap_g.pcap_handle));
@ -143,10 +186,9 @@ TmEcode ReceivePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
SCLogInfo("pcap file end of file reached (pcap err code %" PRId32 ")", r);
EngineStop();
SCReturnInt(TM_ECODE_FAILED);
ptv->done = 1;
SCReturnInt(TM_ECODE_OK);
}
pcap_g.cnt++;
p->pcap_cnt = pcap_g.cnt;
SCReturnInt(TM_ECODE_OK);
}
@ -159,6 +201,11 @@ TmEcode ReceivePcapFileThreadInit(ThreadVars *tv, void *initdata, void **data) {
SCReturnInt(TM_ECODE_FAILED);
}
/* use max_pending_packets as pcap read size unless it's bigger than
* our size limit */
pcap_max_read_packets = (PCAP_FILE_MAX_PKTS < max_pending_packets) ?
PCAP_FILE_MAX_PKTS : max_pending_packets;
SCLogInfo("reading pcap file %s", (char *)initdata);
PcapFileThreadVars *ptv = SCMalloc(sizeof(PcapFileThreadVars));
@ -234,7 +281,7 @@ TmEcode ReceivePcapFileThreadDeinit(ThreadVars *tv, void *data) {
SCReturnInt(TM_ECODE_OK);
}
TmEcode DecodePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode DecodePcapFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
DecodeThreadVars *dtv = (DecodeThreadVars *)data;

@ -68,13 +68,13 @@ typedef struct PcapThreadVars_
ThreadVars *tv;
} PcapThreadVars;
TmEcode ReceivePcap(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode ReceivePcap(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceivePcapThreadInit(ThreadVars *, void *, void **);
void ReceivePcapThreadExitStats(ThreadVars *, void *);
TmEcode ReceivePcapThreadDeinit(ThreadVars *, void *);
TmEcode DecodePcapThreadInit(ThreadVars *, void *, void **);
TmEcode DecodePcap(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode DecodePcap(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
/**
* \brief Registration Function for RecievePcap.
@ -146,7 +146,7 @@ void PcapCallback(char *user, struct pcap_pkthdr *h, u_char *pkt) {
* \param pq pointer to the PacketQueue (not used here but part of the api)
* \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success
*/
TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq) {
TmEcode ReceivePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
SCEnter();
PcapThreadVars *ptv = (PcapThreadVars *)data;
@ -376,7 +376,7 @@ TmEcode ReceivePcapThreadDeinit(ThreadVars *tv, void *data) {
* \param data pointer that gets cast into PcapThreadVars for ptv
* \param pq pointer to the current PacketQueue
*/
TmEcode DecodePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode DecodePcap(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
SCEnter();
DecodeThreadVars *dtv = (DecodeThreadVars *)data;

@ -65,7 +65,7 @@ typedef struct StreamTcpThread_ {
TcpReassemblyThreadCtx *ra_ctx; /**< tcp reassembly thread data */
} StreamTcpThread;
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **);
TmEcode StreamTcpThreadDeinit(ThreadVars *, void *);
void StreamTcpExitPrintStats(ThreadVars *, void *);
@ -2512,7 +2512,7 @@ static int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt)
SCReturnInt(0);
}
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq)
TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
StreamTcpThread *stt = (StreamTcpThread *)data;
TmEcode ret = TM_ECODE_OK;

@ -922,6 +922,7 @@ int main(int argc, char **argv)
//RunModeFilePcap(de_ctx, pcap_file);
//RunModeFilePcap2(de_ctx, pcap_file);
RunModeFilePcapAuto(de_ctx, pcap_file);
//RunModeFilePcapAuto2(de_ctx, pcap_file);
}
else if (run_mode == MODE_PFRING) {
//RunModeIdsPfring3(de_ctx, pfring_dev);

@ -41,7 +41,7 @@ typedef struct TmModule_ {
TmEcode (*ThreadDeinit)(ThreadVars *, void *);
/** the packet processing function */
TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
void (*RegisterTests)(void);

@ -70,7 +70,7 @@ uint8_t tv_aof = THV_RESTART_THREAD;
typedef struct TmSlot_ {
/* function pointers */
TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *);
TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode (*SlotThreadInit)(ThreadVars *, void *, void **);
void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
@ -79,10 +79,20 @@ typedef struct TmSlot_ {
/* data storage */
void *slot_initdata;
void *slot_data;
PacketQueue slot_pq;
/**< queue filled by the SlotFunc with packets that will
* be processed futher _before_ the current packet. */
PacketQueue slot_pre_pq;
/**< queue filled by the SlotFunc with packets that will
* be processed futher _after_ the current packet. */
PacketQueue slot_post_pq;
/* linked list, only used by TmVarSlot */
struct TmSlot_ *slot_next;
int id; /**< slot id, only used my TmVarSlot to know what the first
* slot is. */
} TmSlot;
/* 1 function slot */
@ -165,29 +175,38 @@ void *TmThreadsSlot1NoIn(void *td) {
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue));
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pq);
r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pre_pq, &s->s.slot_post_pq);
/* handle error */
if (r == TM_ECODE_FAILED) {
TmqhReleasePacketsToPacketPool(&s->s.slot_pq);
TmqhReleasePacketsToPacketPool(&s->s.slot_pre_pq);
TmqhReleasePacketsToPacketPool(&s->s.slot_post_pq);
TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
while (s->s.slot_pq.len > 0) {
Packet *extra = PacketDequeue(&s->s.slot_pq);
/* handle pre queue */
while (s->s.slot_pre_pq.len > 0) {
Packet *extra = PacketDequeue(&s->s.slot_pre_pq);
tv->tmqh_out(tv, extra);
}
tv->tmqh_out(tv, p);
/* handle post queue */
while (s->s.slot_post_pq.len > 0) {
Packet *extra = PacketDequeue(&s->s.slot_post_pq);
tv->tmqh_out(tv, extra);
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
run = 0;
@ -235,7 +254,8 @@ void *TmThreadsSlot1NoOut(void *td) {
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue));
TmThreadsSetFlag(tv, THV_INIT_DONE);
@ -244,7 +264,7 @@ void *TmThreadsSlot1NoOut(void *td) {
p = tv->tmqh_in(tv);
r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL);
r = s->s.SlotFunc(tv, p, s->s.slot_data, /* no outqh no pq */NULL, /* no outqh no pq */NULL);
/* handle error */
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, p);
@ -300,14 +320,15 @@ void *TmThreadsSlot1NoInOut(void *td) {
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue));
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmThreadTestThreadUnPaused(tv);
r = s->s.SlotFunc(tv, NULL, s->s.slot_data, /* no outqh, no pq */NULL);
r = s->s.SlotFunc(tv, NULL, s->s.slot_data, /* no outqh, no pq */NULL, NULL);
//printf("%s: TmThreadsSlot1NoInNoOut: r %" PRId32 "\n", tv->name, r);
/* handle error */
@ -367,7 +388,8 @@ void *TmThreadsSlot1(void *td) {
pthread_exit((void *) -1);
}
}
memset(&s->s.slot_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_pre_pq, 0, sizeof(PacketQueue));
memset(&s->s.slot_post_pq, 0, sizeof(PacketQueue));
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
@ -379,25 +401,30 @@ void *TmThreadsSlot1(void *td) {
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pq);
r = s->s.SlotFunc(tv, p, s->s.slot_data, &s->s.slot_pre_pq, &s->s.slot_post_pq);
/* handle error */
if (r == TM_ECODE_FAILED) {
TmqhReleasePacketsToPacketPool(&s->s.slot_pq);
TmqhReleasePacketsToPacketPool(&s->s.slot_pre_pq);
TmqhReleasePacketsToPacketPool(&s->s.slot_post_pq);
TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
while (s->s.slot_pq.len > 0) {
while (s->s.slot_pre_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s.slot_pq);
Packet *extra_p = PacketDequeue(&s->s.slot_pre_pq);
tv->tmqh_out(tv, extra_p);
}
//printf("%s: TmThreadsSlot1: p %p, r %" PRId32 "\n", tv->name, p, r);
/* output the packet */
tv->tmqh_out(tv, p);
while (s->s.slot_post_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s->s.slot_post_pq);
tv->tmqh_out(tv, extra_p);
}
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
@ -424,24 +451,32 @@ void *TmThreadsSlot1(void *td) {
pthread_exit((void *) 0);
}
/* separate run function so we can call it recursively */
/** \brief separate run function so we can call it recursively
*
* \todo deal with post_pq for slots beyond the first
*/
static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) {
TmEcode r = TM_ECODE_OK;
TmSlot *s = NULL;
for (s = slot; s != NULL; s = s->slot_next) {
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pq);
if (s->id == 0) {
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq);
} else {
r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, NULL);
}
/* handle error */
if (r == TM_ECODE_FAILED) {
/* Encountered error. Return packets to packetpool and return */
TmqhReleasePacketsToPacketPool(&s->slot_pq);
TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
TmThreadsSetFlag(tv, THV_FAILED);
return TM_ECODE_FAILED;
}
/* handle new packets */
while (s->slot_pq.len > 0) {
Packet *extra_p = PacketDequeue(&s->slot_pq);
while (s->slot_pre_pq.len > 0) {
Packet *extra_p = PacketDequeue(&s->slot_pre_pq);
/* see if we need to process the packet */
if (s->slot_next != NULL) {
@ -449,7 +484,8 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl
/* XXX handle error */
if (r == TM_ECODE_FAILED) {
//printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n");
TmqhReleasePacketsToPacketPool(&s->slot_pq);
TmqhReleasePacketsToPacketPool(&s->slot_pre_pq);
TmqhReleasePacketsToPacketPool(&s->slot_post_pq);
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
return TM_ECODE_FAILED;
@ -457,11 +493,17 @@ static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *sl
}
tv->tmqh_out(tv, extra_p);
}
/** \todo post pq */
}
return TM_ECODE_OK;
}
/**
* \todo only the first "slot" currently makes the "post_pq" available
* to the thread module.
*/
void *TmThreadsSlotVar(void *td) {
ThreadVars *tv = (ThreadVars *)td;
TmVarSlot *s = (TmVarSlot *)tv->tm_slots;
@ -479,8 +521,6 @@ void *TmThreadsSlotVar(void *td) {
if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
//printf("TmThreadsSlot1: %s starting\n", tv->name);
for (slot = s->s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot->slot_data);
@ -491,7 +531,8 @@ void *TmThreadsSlotVar(void *td) {
pthread_exit((void *) -1);
}
}
memset(&slot->slot_pq, 0, sizeof(PacketQueue));
memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue));
memset(&slot->slot_post_pq, 0, sizeof(PacketQueue));
}
TmThreadsSetFlag(tv, THV_INIT_DONE);
@ -501,15 +542,13 @@ void *TmThreadsSlotVar(void *td) {
/* input a packet */
p = tv->tmqh_in(tv);
//printf("TmThreadsSlotVar: %p\n", p);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
//printf("%s: TmThreadsSlotVar: p == NULL\n", tv->name);
} else {
r = TmThreadsSlotVarRun(tv, p, s->s);
/* XXX handle error */
if (r == TM_ECODE_FAILED) {
//printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n");
TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
@ -517,14 +556,30 @@ void *TmThreadsSlotVar(void *td) {
/* output the packet */
tv->tmqh_out(tv, p);
/* now handle the post_pq packets */
while (s->s->slot_post_pq.len > 0) {
Packet *extra_p = PacketDequeue(&s->s->slot_post_pq);
if (s->s->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, s->s->slot_next);
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
}
/* output the packet */
tv->tmqh_out(tv, extra_p);
}
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
//printf("%s: TmThreadsSlot1: KILL is set\n", tv->name);
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
run = 0;
}
}
SCPerfUpdateCounterArray(tv->sc_perf_pca, &tv->sc_perf_pctx, 0);
for (slot = s->s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
@ -626,6 +681,7 @@ void TmVarSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) {
if (s->s == NULL) {
s->s = slot;
slot->id = 0;
} else {
TmSlot *a = s->s, *b = NULL;
@ -636,6 +692,7 @@ void TmVarSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) {
/* append the new slot */
if (b != NULL) {
b->slot_next = slot;
slot->id = b->id + 1;
}
}
}

Loading…
Cancel
Save