Adding a "flow" queue handler. This queue handler passes packets of the same flow to the same queue. Changed the default IDS mode to use this.

Some output cleanups, shutdown should be cleaner now.
remotes/origin/master-1.0.x
Victor Julien 17 years ago
parent e7206623bb
commit 3636ca9703

@ -85,6 +85,7 @@ tm-threads.c tm-threads.h \
tmqh-simple.c tmqh-simple.h \
tmqh-nfq.c tmqh-nfq.h \
tmqh-packetpool.c tmqh-packetpool.h \
tmqh-flow.c tmqh-flow.h \
alert-fastlog.c alert-fastlog.h \
alert-debuglog.c alert-debuglog.h \
log-httplog.c log-httplog.h \

@ -248,7 +248,9 @@ int AlertUnifiedLogThreadInit(ThreadVars *t, void *initdata, void **data)
int AlertUnifiedLogThreadDeinit(ThreadVars *t, void *data)
{
#ifdef DEBUG
printf("AlertUnifiedLogThreadDeinit started\n");
#endif
AlertUnifiedLogThread *aun = (AlertUnifiedLogThread *)data;
if (aun == NULL) {
@ -261,7 +263,9 @@ int AlertUnifiedLogThreadDeinit(ThreadVars *t, void *data)
/* clear memory */
memset(aun, 0, sizeof(AlertUnifiedLogThread));
free(aun);
#ifdef DEBUG
printf("AlertUnifiedLogThreadDeinit done\n");
#endif
return 0;
error:

@ -39,6 +39,8 @@
#include "tm-modules.h"
#include "tm-threads.h"
#include "tmqh-flow.h"
#include "alert-fastlog.h"
#include "alert-unified-log.h"
#include "alert-unified-alert.h"
@ -395,6 +397,233 @@ int RunModeIdsPcap(DetectEngineCtx *de_ctx, char *iface) {
return 0;
}
/** \brief Live pcap mode with 4 stream tracking and reassembly threads, testing the flow queuehandler */
int RunModeIdsPcap2(DetectEngineCtx *de_ctx, char *iface) {
TimeModeSetLive();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreate("ReceivePcap","packetpool","packetpool","pickup-queue","simple","1slot_noinout", NULL, 0);
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceivePcap");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceivePcap\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_receivepcap,tm_module,(void *)iface);
if (TmThreadSpawn(tv_receivepcap, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreate("Decode1","pickup-queue","simple","decode-queue1,decode-queue2,decode-queue3,decode-queue4","flow","1slot", NULL, 0);
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodePcap");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePcap failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_decode1,tm_module,NULL);
if (TmThreadSpawn(tv_decode1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 = TmThreadCreate("Stream1","decode-queue1","simple","stream-queue1","simple","1slot", NULL, 0);
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_stream1,tm_module,NULL);
if (TmThreadSpawn(tv_stream1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream2 = TmThreadCreate("Stream2","decode-queue2","simple","stream-queue1","simple","1slot", NULL, 0);
if (tv_stream2 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream2\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_stream2,tm_module,NULL);
if (TmThreadSpawn(tv_stream2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream3 = TmThreadCreate("Stream3","decode-queue3","simple","stream-queue2","simple","1slot", NULL, 0);
if (tv_stream3 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_stream3,tm_module,NULL);
if (TmThreadSpawn(tv_stream3, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream4 = TmThreadCreate("Stream4","decode-queue4","simple","stream-queue2","simple","1slot", NULL, 0);
if (tv_stream4 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_stream4,tm_module,NULL);
if (TmThreadSpawn(tv_stream4, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect1 = TmThreadCreate("Detect1","stream-queue1","simple","verdict-queue","simple","1slot", NULL, 0);
if (tv_detect1 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_detect1,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect1, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_detect2 = TmThreadCreate("Detect2","stream-queue2","simple","verdict-queue","simple","1slot", NULL, 0);
if (tv_detect2 == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_detect2,tm_module,(void *)de_ctx);
if (TmThreadSpawn(tv_detect2, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_rreject = TmThreadCreate("RespondReject","verdict-queue","simple","alert-queue1","simple","1slot", NULL, 0);
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("RespondReject");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for RespondReject failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_rreject,tm_module,NULL);
if (TmThreadSpawn(tv_rreject, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_alert = TmThreadCreate("AlertFastlog&Httplog","alert-queue1","simple","alert-queue2","simple","2slot", NULL, 0);
if (tv_alert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("AlertFastlog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertFastlog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_alert,tm_module,NULL);
tm_module = TmModuleGetByName("LogHttplog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_alert,tm_module,NULL);
if (TmThreadSpawn(tv_alert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_unified = TmThreadCreate("AlertUnifiedLog","alert-queue2","simple","alert-queue3","simple","2slot", NULL, 0);
if (tv_unified == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("AlertUnifiedLog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedLog failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc1(tv_unified,tm_module,NULL);
tm_module = TmModuleGetByName("AlertUnifiedAlert");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for AlertUnifiedAlert failed\n");
exit(EXIT_FAILURE);
}
Tm2SlotSetFunc2(tv_unified,tm_module,NULL);
if (TmThreadSpawn(tv_unified, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_debugalert = TmThreadCreate("AlertDebuglog","alert-queue3","simple","packetpool","packetpool","1slot", NULL, 0);
if (tv_debugalert == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("AlertDebuglog");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_debugalert,tm_module,NULL);
if (TmThreadSpawn(tv_debugalert, TVT_PPT, THV_USE | THV_PAUSE) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
return 0;
}
int RunModeIpsNFQ(DetectEngineCtx *de_ctx) {
TimeModeSetLive();
@ -983,6 +1212,7 @@ int main(int argc, char **argv)
DecodeGRERegisterTests();
AlpDetectRegisterTests();
ConfRegisterTests();
TmqhFlowRegisterTests();
uint32_t failed = UtRunTests();
UtCleanup();
if (failed) exit(EXIT_FAILURE);
@ -1024,7 +1254,8 @@ int main(int argc, char **argv)
gettimeofday(&start_time, NULL);
if (mode == MODE_PCAP_DEV) {
RunModeIdsPcap(de_ctx, pcap_dev);
RunModeIdsPcap2(de_ctx, pcap_dev);
//RunModeIdsPcap(de_ctx, pcap_dev);
}
else if (mode == MODE_PCAP_FILE) {
RunModeFilePcap(de_ctx, pcap_file);

@ -336,23 +336,23 @@ void FlowInitConfig (char quiet)
* \warning Not thread safe */
void FlowPrintQueueInfo (void)
{
printf("Flow Queue info:\n");
printf("SPARE %" PRIu32 "\n", flow_spare_q.len);
printf("* Flow Queue info:\n");
printf(" - SPARE %" PRIu32 " (", flow_spare_q.len);
#ifdef DBG_PERF
printf(" flow_spare_q.dbg_maxlen %" PRIu32 "\n", flow_spare_q.dbg_maxlen);
printf("flow_spare_q.dbg_maxlen %" PRIu32 ")\n", flow_spare_q.dbg_maxlen);
#endif
printf("NEW %" PRIu32 "\n", flow_new_q.len);
printf(" - NEW %" PRIu32 " (", flow_new_q.len);
#ifdef DBG_PERF
printf(" flow_new_q.dbg_maxlen %" PRIu32 "\n", flow_new_q.dbg_maxlen);
printf("flow_new_q.dbg_maxlen %" PRIu32 ")\n", flow_new_q.dbg_maxlen);
#endif
printf("ESTABLISHED %" PRIu32 "\n", flow_est_q.len);
printf(" - ESTABLISHED %" PRIu32 " (", flow_est_q.len);
#ifdef DBG_PERF
printf(" flow_est_q.dbg_maxlen %" PRIu32 "\n", flow_est_q.dbg_maxlen);
printf("flow_est_q.dbg_maxlen %" PRIu32 ")\n", flow_est_q.dbg_maxlen);
#endif
#ifdef FLOWBITS_STATS
printf("Flowbits added: %" PRIu32 ", removed: %" PRIu32 "\n", flowbits_added, flowbits_removed);
printf("Max memory usage: %" PRIu32 "\n", flowbits_memuse_max);
printf("* Flowbits added: %" PRIu32 ", removed: %" PRIu32 ", ", flowbits_added, flowbits_removed);
printf("max memory usage: %" PRIu32 "\n", flowbits_memuse_max);
#endif /* FLOWBITS_STATS */
}
@ -458,7 +458,7 @@ void *FlowManagerThread(void *td)
sleeping += 10;
}
printf("%s ended: %" PRIu32 " new flows, %" PRIu32 " established flows pruned\n", th_v->name, new_cnt, established_cnt);
printf("* %s ended: %" PRIu32 " new flows, %" PRIu32 " established flows were pruned\n", th_v->name, new_cnt, established_cnt);
pthread_exit((void *) 0);
}

@ -22,6 +22,7 @@ typedef struct ThreadVars_ {
/** queue's */
Tmq *inq;
Tmq *outq;
void *outctx;
/** queue handlers */
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);

@ -10,6 +10,7 @@
#include "tmqh-simple.h"
#include "tmqh-nfq.h"
#include "tmqh-packetpool.h"
#include "tmqh-flow.h"
void TmqhSetup (void) {
memset(&tmqh_table, 0, sizeof(tmqh_table));
@ -17,6 +18,7 @@ void TmqhSetup (void) {
TmqhSimpleRegister();
TmqhNfqRegister();
TmqhPacketpoolRegister();
TmqhFlowRegister();
}
Tmqh* TmqhGetQueueHandlerByName(char *name) {

@ -7,6 +7,7 @@ enum {
TMQH_SIMPLE,
TMQH_NFQ,
TMQH_PACKETPOOL,
TMQH_FLOW,
TMQH_SIZE,
};
@ -15,6 +16,9 @@ typedef struct Tmqh_ {
char *name;
Packet *(*InHandler)(ThreadVars *);
void (*OutHandler)(ThreadVars *, Packet *);
void *(*OutHandlerCtxSetup)(char *);
void (*OutHandlerCtxFree)(void *);
void (*RegisterTests)(void);
} Tmqh;
Tmqh tmqh_table[TMQH_SIZE];

@ -54,3 +54,8 @@ void TmqDebugList(void) {
}
}
void TmqResetQueues(void) {
memset(&tmqs, 0x00, sizeof(tmqs));
tmq_id = 0;
}

@ -11,6 +11,7 @@ Tmq* TmqCreateQueue(char *name);
Tmq* TmqGetQueueByName(char *name);
void TmqDebugList(void);
void TmqResetQueues(void);
#endif /* __TM_QUEUES_H__ */

@ -924,23 +924,28 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
}
/* set the outgoing queue */
if (outq_name != NULL) {
tmq = TmqGetQueueByName(outq_name);
if (tmq == NULL) {
tmq = TmqCreateQueue(outq_name);
if (tmq == NULL) goto error;
}
tv->outq = tmq;
tv->outq->usecnt++;
//printf("TmThreadCreate: tv->outq->id %" PRIu32 "\n", tv->outq->id);
}
if (outqh_name != NULL) {
tmqh = TmqhGetQueueHandlerByName(outqh_name);
if (tmqh == NULL) goto error;
tv->tmqh_out = tmqh->OutHandler;
//printf("TmThreadCreate: tv->tmqh_out %p\n", tv->tmqh_out);
if (outq_name != NULL) {
if (tmqh->OutHandlerCtxSetup != NULL) {
tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
} else {
tmq = TmqGetQueueByName(outq_name);
if (tmq == NULL) {
tmq = TmqCreateQueue(outq_name);
if (tmq == NULL) goto error;
}
tv->outq = tmq;
tv->outq->usecnt++;
}
//printf("TmThreadCreate: tv->outq->id %" PRIu32 "\n", tv->outq->id);
}
}
if (TmThreadSetSlots(tv, slots, fn_p) != 0) {
@ -998,7 +1003,9 @@ void TmThreadKillThreads(void) {
while (t) {
t->flags |= THV_KILL;
#ifdef DEBUG
printf("TmThreadKillThreads: told thread %s to stop\n", t->name);
#endif
/* XXX hack */
StreamMsgSignalQueueHack();
@ -1020,7 +1027,9 @@ void TmThreadKillThreads(void) {
int cnt = 0;
while (1) {
if (t->flags & THV_CLOSED) {
#ifdef DEBUG
printf("signalled the thread %" PRId32 " times\n", cnt);
#endif
break;
}
@ -1032,7 +1041,9 @@ void TmThreadKillThreads(void) {
usleep(100);
}
#ifdef DEBUG
printf("TmThreadKillThreads: signalled t->inq->id %" PRIu32 "\n", t->inq->id);
#endif
}
@ -1040,7 +1051,9 @@ void TmThreadKillThreads(void) {
int cnt = 0;
while (1) {
if (t->flags & THV_CLOSED) {
#ifdef DEBUG
printf("signalled the thread %" PRId32 " times\n", cnt);
#endif
break;
}
@ -1054,7 +1067,9 @@ void TmThreadKillThreads(void) {
/* join it */
pthread_join(t->t, NULL);
#ifdef DEBUG
printf("TmThreadKillThreads: thread %s stopped\n", t->name);
#endif
t = t->next;
}

Loading…
Cancel
Save