Update the ERF file runmodes to support autofp and single.

remotes/origin/master
Jason Ish 14 years ago committed by Victor Julien
parent 1f801d316c
commit 90548837e3

@ -44,96 +44,100 @@ const char *RunModeErfFileGetDefaultMode(void)
void RunModeErfFileRegister(void) void RunModeErfFileRegister(void)
{ {
default_mode = "auto"; default_mode = "autofp";
RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "auto",
"Multi threaded Erf File mode", RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "single",
RunModeErfFileAuto); "Single threaded ERF file mode",
RunModeErfFileSingle);
RunModeRegisterNewRunMode(RUNMODE_ERF_FILE, "autofp",
"Multi threaded ERF file mode. Packets from "
"each flow are assigned to a single detect thread",
RunModeErfFileAutoFp);
return; return;
} }
int RunModeErfFileAuto(DetectEngineCtx *de_ctx) int RunModeErfFileSingle(DetectEngineCtx *de_ctx)
{ {
SCEnter(); int ret;
char tname[12]; char *file;
uint16_t cpu = 0;
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
RunModeInitialize(); SCEnter();
char *file = NULL;
if (ConfGet("erf-file.file", &file) == 0) { if (ConfGet("erf-file.file", &file) == 0) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving erf-file.file " SCLogError(SC_ERR_RUNMODE, "Failed to get erf-file.file from config.");
"from Conf");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
SCLogDebug("file %s", file);
RunModeInitialize();
TimeModeSetOffline(); TimeModeSetOffline();
/* create the threads */ /* Basically the same setup as PCAP files. */
ThreadVars *tv_receiveerf =
TmThreadCreatePacketHandler("ReceiveErfFile", ThreadVars *tv = TmThreadCreatePacketHandler("ErfFile",
"packetpool", "packetpool",
"packetpool", "packetpool", "packetpool", "packetpool",
"pickup-queue", "simple", "pktacqloop");
"1slot"); if (tv == NULL) {
if (tv_receiveerf == NULL) {
printf("ERROR: TmThreadsCreate failed\n"); printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
TmModule *tm_module = TmModuleGetByName("ReceiveErfFile"); TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
if (tm_module == NULL) { if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n"); printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
TmSlotSetFuncAppend(tv_receiveerf, tm_module, file); TmSlotSetFuncAppend(tv, tm_module, file);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_receiveerf, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode & Stream",
"pickup-queue", "simple",
"stream-queue1", "simple",
"varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodeErfFile"); tm_module = TmModuleGetByName("DecodeErfFile");
if (tm_module == NULL) { if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodeErfFile failed\n"); printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); TmSlotSetFuncAppend(tv, tm_module, NULL);
tm_module = TmModuleGetByName("StreamTcp"); tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) { if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n"); printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL); TmSlotSetFuncAppend(tv, tm_module, NULL);
if (threading_set_cpu_affinity) { tm_module = TmModuleGetByName("Detect");
TmThreadSetCPUAffinity(tv_decode1, 0); if (tm_module == NULL) {
if (ncpus > 1) printf("ERROR: TmModuleGetByName Detect failed\n");
TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM); exit(EXIT_FAILURE);
} }
TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx);
SetupOutputs(tv);
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n"); printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
SCLogInfo("RunModeErfFileSingle initialised");
SCReturnInt(0);
}
int RunModeErfFileAutoFp(DetectEngineCtx *de_ctx)
{
SCEnter();
char tname[12];
char qname[12];
uint16_t cpu = 0;
char queues[2048] = "";
RunModeInitialize();
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
/* start with cpu 1 so that if we're creating an odd number of detect /* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */ * threads we're not creating the most on CPU0. */
if (ncpus > 0) if (ncpus > 0)
@ -147,21 +151,84 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
thread_max = 1; thread_max = 1;
int thread; int thread;
for (thread = 0; thread < thread_max; thread++) {
if (strlen(queues) > 0)
strlcat(queues, ",", sizeof(queues));
snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
strlcat(queues, qname, sizeof(queues));
}
SCLogDebug("queues %s", queues);
char *file = NULL;
if (ConfGet("erf-file.file", &file) == 0) {
SCLogError(SC_ERR_RUNMODE,
"Failed retrieving erf-file.file from config");
exit(EXIT_FAILURE);
}
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv =
TmThreadCreatePacketHandler("ReceiveErfFile",
"packetpool", "packetpool",
queues, "flow",
"pktacqloop");
if (tv == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceiveErfFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceiveErfFile\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, file);
tm_module = TmModuleGetByName("DecodeErfFile");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodeErfFile failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
for (thread = 0; thread < thread_max; thread++) { for (thread = 0; thread < thread_max; thread++) {
snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1); snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
snprintf(qname, sizeof(qname), "pickup%"PRIu16, thread+1);
SCLogDebug("tname %s, qname %s", tname, qname);
char *thread_name = SCStrdup(tname); char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu = ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(thread_name, TmThreadCreatePacketHandler(thread_name,
"stream-queue1", "simple", qname, "flow",
"alert-queue1", "simple", "packetpool", "packetpool",
"1slot"); "varslot");
if (tv_detect_ncpu == NULL) { if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n"); printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
tm_module = TmModuleGetByName("Detect"); tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) { if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n"); printf("ERROR: TmModuleGetByName Detect failed\n");
@ -189,6 +256,9 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
} }
tv_detect_ncpu->thread_group_name = thread_group_name; tv_detect_ncpu->thread_group_name = thread_group_name;
/* add outputs as well */
SetupOutputs(tv_detect_ncpu);
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) { if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n"); printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -200,28 +270,7 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx)
cpu++; cpu++;
} }
ThreadVars *tv_outputs = SCLogInfo("RunModeErfFileAutoFp initialised");
TmThreadCreatePacketHandler("Outputs",
"alert-queue1", "simple",
"packetpool", "packetpool",
"varslot");
if (tv_outputs == NULL) {
printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
exit(EXIT_FAILURE);
}
SetupOutputs(tv_outputs);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_outputs, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
return 0; SCReturnInt(0);
} }

@ -23,7 +23,8 @@
#ifndef __RUNMODE_ERF_FILE_H__ #ifndef __RUNMODE_ERF_FILE_H__
#define __RUNMODE_ERF_FILE_H__ #define __RUNMODE_ERF_FILE_H__
int RunModeErfFileAuto(DetectEngineCtx *); int RunModeErfFileSingle(DetectEngineCtx *);
int RunModeErfFileAutoFp(DetectEngineCtx *);
void RunModeErfFileRegister(void); void RunModeErfFileRegister(void);
const char *RunModeErfFileGetDefaultMode(void); const char *RunModeErfFileGetDefaultMode(void);

@ -52,14 +52,17 @@ typedef struct DagRecord_ {
} __attribute__((packed)) DagRecord; } __attribute__((packed)) DagRecord;
typedef struct ErfFileThreadVars_ { typedef struct ErfFileThreadVars_ {
FILE *erf;
ThreadVars *tv; ThreadVars *tv;
TmSlot *slot;
FILE *erf;
uint32_t pkts; uint32_t pkts;
uint64_t bytes; uint64_t bytes;
} ErfFileThreadVars; } ErfFileThreadVars;
TmEcode ReceiveErfFile(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); static inline TmEcode ReadErfRecord(ThreadVars *, Packet *, void *);
TmEcode ReceiveErfFileLoop(ThreadVars *, void *, void *);
TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **); TmEcode ReceiveErfFileThreadInit(ThreadVars *, void *, void **);
void ReceiveErfFileThreadExitStats(ThreadVars *, void *); void ReceiveErfFileThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *); TmEcode ReceiveErfFileThreadDeinit(ThreadVars *, void *);
@ -75,7 +78,8 @@ TmModuleReceiveErfFileRegister(void)
{ {
tmm_modules[TMM_RECEIVEERFFILE].name = "ReceiveErfFile"; tmm_modules[TMM_RECEIVEERFFILE].name = "ReceiveErfFile";
tmm_modules[TMM_RECEIVEERFFILE].ThreadInit = ReceiveErfFileThreadInit; tmm_modules[TMM_RECEIVEERFFILE].ThreadInit = ReceiveErfFileThreadInit;
tmm_modules[TMM_RECEIVEERFFILE].Func = ReceiveErfFile; tmm_modules[TMM_RECEIVEERFFILE].Func = NULL;
tmm_modules[TMM_RECEIVEERFFILE].PktAcqLoop = ReceiveErfFileLoop;
tmm_modules[TMM_RECEIVEERFFILE].ThreadExitPrintStats = tmm_modules[TMM_RECEIVEERFFILE].ThreadExitPrintStats =
ReceiveErfFileThreadExitStats; ReceiveErfFileThreadExitStats;
tmm_modules[TMM_RECEIVEERFFILE].ThreadDeinit = NULL; tmm_modules[TMM_RECEIVEERFFILE].ThreadDeinit = NULL;
@ -100,13 +104,51 @@ TmModuleDecodeErfFileRegister(void)
} }
/** /**
* \brief Thread entry function for ERF reading. * \brief ERF file reading loop.
*
* Reads a new ERF record from the file and sets up the Packet for
* decoding.
*/ */
TmEcode TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) {
ErfFileThreadVars *etv = (ErfFileThreadVars *)data;
etv->slot = ((TmSlot *)slot)->slot_next;
Packet *p;
uint16_t packet_q_len = 0;
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL) {
SCReturnInt(TM_ECODE_OK);
}
/* Make sure we have at least one packet in the packet pool,
* to prevent us from alloc'ing packets at line rate. */
do {
packet_q_len = PacketPoolSize();
if (unlikely(packet_q_len == 0)) {
PacketPoolWait();
}
} while (packet_q_len == 0);
p = PacketGetFromQueueOrAlloc();
if (unlikely(p == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate a packet.");
EngineStop();
SCReturnInt(TM_ECODE_FAILED);
}
if (ReadErfRecord(tv, p, data) != TM_ECODE_OK) {
TmqhOutputPacketpool(etv->tv, p);
EngineStop();
SCReturnInt(TM_ECODE_FAILED);
}
if (TmThreadsSlotProcessPkt(etv->tv, etv->slot, p) != TM_ECODE_OK) {
EngineStop();
SCReturnInt(TM_ECODE_FAILED);
}
}
}
static inline TmEcode ReadErfRecord(ThreadVars *tv, Packet *p, void *data)
{ {
SCEnter(); SCEnter();
@ -115,16 +157,24 @@ ReceiveErfFile(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQue
int r = fread(&dr, sizeof(DagRecord), 1, etv->erf); int r = fread(&dr, sizeof(DagRecord), 1, etv->erf);
if (r < 1) { if (r < 1) {
SCLogInfo("End of ERF file reached or an error occurred."); if (feof(etv->erf)) {
EngineStop(); SCLogInfo("End of ERF file reached");
}
else {
SCLogInfo("Error reading ERF record");
}
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
int rlen = ntohs(dr.rlen); int rlen = ntohs(dr.rlen);
int wlen = ntohs(dr.wlen); int wlen = ntohs(dr.wlen);
r = fread(GET_PKT_DATA(p), rlen - sizeof(DagRecord), 1, etv->erf); r = fread(GET_PKT_DATA(p), rlen - sizeof(DagRecord), 1, etv->erf);
if (r < 1) { if (r < 1) {
SCLogInfo("End of ERF file reached or an error occurred."); if (feof(etv->erf)) {
EngineStop(); SCLogInfo("End of ERF file reached");
}
else {
SCLogInfo("Error reading ERF record");
}
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }

Loading…
Cancel
Save