Add "workers" runmode.

Previous commits have considerabily empowered the "single" mode which
could contain multiple threads. This behaviour was not a target for
this runmode and the following patch remedies to the situation by
introducing the "workers" mode where each thread do all the tasks
from acquisition to logging. This runmode is currently implemented
for af-packet and pf-ring.
remotes/origin/master-1.1.x
Eric Leblond 14 years ago
parent 788fa1e5a1
commit beaa909eb8

@ -69,6 +69,10 @@ void RunModeIdsAFPRegister(void)
RunModeRegisterNewRunMode(RUNMODE_AFP_DEV, "single", RunModeRegisterNewRunMode(RUNMODE_AFP_DEV, "single",
"Single threaded af-packet mode", "Single threaded af-packet mode",
RunModeIdsAFPSingle); RunModeIdsAFPSingle);
RunModeRegisterNewRunMode(RUNMODE_AFP_DEV, "workers",
"Workers af-packet mode, each thread does all"
" tasks from acquisition to logging",
RunModeIdsAFPWorkers);
default_mode_autofp = "autofp"; default_mode_autofp = "autofp";
RunModeRegisterNewRunMode(RUNMODE_AFP_DEV, "autofp", RunModeRegisterNewRunMode(RUNMODE_AFP_DEV, "autofp",
"Multi socket AF_PACKET mode. Packets from " "Multi socket AF_PACKET mode. Packets from "
@ -325,3 +329,40 @@ int RunModeIdsAFPSingle(DetectEngineCtx *de_ctx)
#endif /* HAVE_AF_PACKET */ #endif /* HAVE_AF_PACKET */
SCReturnInt(0); SCReturnInt(0);
} }
/**
* \brief Workers version of the AF_PACKET processing.
*
* Start N threads with each thread doing all the work.
*
*/
int RunModeIdsAFPWorkers(DetectEngineCtx *de_ctx)
{
#ifdef HAVE_AF_PACKET
int ret;
char *live_dev = NULL;
#endif
SCEnter();
#ifdef HAVE_AF_PACKET
RunModeInitialize();
TimeModeSetLive();
ConfGet("af-packet.live-interface", &live_dev);
ret = RunModeSetLiveCaptureWorkers(de_ctx,
ParseAFPConfig,
AFPConfigGeThreadsCount,
"ReceiveAFP",
"DecodeAFP", "AFPacket",
live_dev);
if (ret != 0) {
printf("ERROR: Unable to start runmode\n");
exit(EXIT_FAILURE);
}
SCLogInfo("RunModeIdsAFPSingle initialised");
#endif /* HAVE_AF_PACKET */
SCReturnInt(0);
}

@ -26,6 +26,7 @@
int RunModeIdsAFPAuto(DetectEngineCtx *); int RunModeIdsAFPAuto(DetectEngineCtx *);
int RunModeIdsAFPSingle(DetectEngineCtx *); int RunModeIdsAFPSingle(DetectEngineCtx *);
int RunModeIdsAFPAutoFp(DetectEngineCtx *); int RunModeIdsAFPAutoFp(DetectEngineCtx *);
int RunModeIdsAFPWorkers(DetectEngineCtx *);
void RunModeIdsAFPRegister(void); void RunModeIdsAFPRegister(void);
const char *RunModeAFPGetDefaultMode(void); const char *RunModeAFPGetDefaultMode(void);

@ -70,8 +70,12 @@ void RunModeIdsPfringRegister(void)
"detect thread", "detect thread",
RunModeIdsPfringAutoFp); RunModeIdsPfringAutoFp);
RunModeRegisterNewRunMode(RUNMODE_PFRING, "single", RunModeRegisterNewRunMode(RUNMODE_PFRING, "single",
"Single threaded pfringt mode", "Single threaded pfring mode",
RunModeIdsPfringSingle); RunModeIdsPfringSingle);
RunModeRegisterNewRunMode(RUNMODE_PFRING, "workers",
"Workers pfring mode, each thread does all"
" tasks from acquisition to logging",
RunModeIdsPfringWorkers);
return; return;
} }
@ -454,3 +458,40 @@ int RunModeIdsPfringSingle(DetectEngineCtx *de_ctx)
return 0; return 0;
} }
int RunModeIdsPfringWorkers(DetectEngineCtx *de_ctx)
{
SCEnter();
/* We include only if pfring is enabled */
#ifdef HAVE_PFRING
int ret;
char *live_dev = NULL;
ConfigIfaceParserFunc tparser;
RunModeInitialize();
TimeModeSetLive();
ret = GetDevAndParser(&live_dev, &tparser);
if (ret != 0) {
printf("ERROR: Unabme to get parser and interface params\n");
exit(EXIT_FAILURE);
}
ret = RunModeSetLiveCaptureWorkers(de_ctx,
tparser,
PfringConfigGeThreadsCount,
"ReceivePfring",
"DecodePfring", "RxPFR",
live_dev);
if (ret != 0) {
printf("ERROR: Unable to start runmode\n");
exit(EXIT_FAILURE);
}
SCLogInfo("RunModeIdsPfringSingle initialised");
#endif /* HAVE_PFRING */
return 0;
}

@ -28,6 +28,7 @@
int RunModeIdsPfringAuto(DetectEngineCtx *); int RunModeIdsPfringAuto(DetectEngineCtx *);
int RunModeIdsPfringAutoFp(DetectEngineCtx *de_ctx); int RunModeIdsPfringAutoFp(DetectEngineCtx *de_ctx);
int RunModeIdsPfringSingle(DetectEngineCtx *de_ctx); int RunModeIdsPfringSingle(DetectEngineCtx *de_ctx);
int RunModeIdsPfringWorkers(DetectEngineCtx *de_ctx);
void RunModeIdsPfringRegister(void); void RunModeIdsPfringRegister(void);
const char *RunModeIdsPfringGetDefaultMode(void); const char *RunModeIdsPfringGetDefaultMode(void);

@ -567,41 +567,37 @@ int RunModeSetLiveCaptureAutoFp(DetectEngineCtx *de_ctx,
return 0; return 0;
} }
int RunModeSetLiveCaptureSingle(DetectEngineCtx *de_ctx, static int RunModeSetLiveCaptureWorkersForDevice(DetectEngineCtx *de_ctx,
ConfigIfaceParserFunc ConfigParser, ConfigIfaceParserFunc ConfigParser,
ConfigIfaceThreadsCountFunc mod_threads_count, ConfigIfaceThreadsCountFunc mod_threads_count,
char *recv_mod_name, char *recv_mod_name,
char *decode_mod_name, char *thread_name, char *decode_mod_name, char *thread_name,
const char *live_dev) const char *live_dev, void *aconf,
unsigned char single_mode)
{ {
int nlive = LiveGetDeviceCount();
void *aconf;
int threads_count;
int thread; int thread;
int threads_count;
if (nlive > 1) { if (single_mode) {
SCLogError(SC_ERR_RUNMODE, threads_count = 1;
"Can't use single runmode with multiple device");
exit(EXIT_FAILURE);
}
if (live_dev != NULL) {
aconf = ConfigParser(live_dev);
} else { } else {
char *live_dev_c = LiveGetDevice(0);
aconf = ConfigParser(live_dev_c);
}
threads_count = mod_threads_count(aconf); threads_count = mod_threads_count(aconf);
SCLogInfo("Going to use %" PRId32 " thread(s)", threads_count); SCLogInfo("Going to use %" PRId32 " thread(s)", threads_count);
}
/* create the threads */ /* create the threads */
for (thread = 0; thread < threads_count; thread++) { for (thread = 0; thread < threads_count; thread++) {
char tname[12]; char tname[20];
char *n_thread_name = NULL; char *n_thread_name = NULL;
ThreadVars *tv = NULL; ThreadVars *tv = NULL;
TmModule *tm_module = NULL; TmModule *tm_module = NULL;
snprintf(tname, sizeof(tname), "%s%"PRIu16, thread_name, thread+1); if (single_mode) {
snprintf(tname, sizeof(tname), "%s", thread_name);
} else {
snprintf(tname, sizeof(tname), "%s%s%"PRIu16,
thread_name, live_dev, thread+1);
}
n_thread_name = SCStrdup(tname); n_thread_name = SCStrdup(tname);
tv = TmThreadCreatePacketHandler(n_thread_name, tv = TmThreadCreatePacketHandler(n_thread_name,
"packetpool", "packetpool", "packetpool", "packetpool",
@ -650,3 +646,73 @@ int RunModeSetLiveCaptureSingle(DetectEngineCtx *de_ctx,
return 0; return 0;
} }
int RunModeSetLiveCaptureWorkers(DetectEngineCtx *de_ctx,
ConfigIfaceParserFunc ConfigParser,
ConfigIfaceThreadsCountFunc mod_threads_count,
char *recv_mod_name,
char *decode_mod_name, char *thread_name,
const char *live_dev)
{
int nlive = LiveGetDeviceCount();
void *aconf;
int ldev;
for (ldev = 0; ldev < nlive; ldev++) {
char *live_dev_c = NULL;
aconf = ConfigParser(live_dev_c);
if (live_dev != NULL) {
aconf = ConfigParser(live_dev);
live_dev_c = SCStrdup(live_dev);
} else {
live_dev_c = LiveGetDevice(ldev);
aconf = ConfigParser(live_dev_c);
}
RunModeSetLiveCaptureWorkersForDevice(de_ctx,
ConfigParser,
mod_threads_count,
recv_mod_name,
decode_mod_name,
thread_name,
live_dev_c,
aconf,
0);
}
return 0;
}
int RunModeSetLiveCaptureSingle(DetectEngineCtx *de_ctx,
ConfigIfaceParserFunc ConfigParser,
ConfigIfaceThreadsCountFunc mod_threads_count,
char *recv_mod_name,
char *decode_mod_name, char *thread_name,
const char *live_dev)
{
int nlive = LiveGetDeviceCount();
void *aconf;
if (nlive > 1) {
SCLogError(SC_ERR_RUNMODE,
"Can't use single runmode with multiple device");
exit(EXIT_FAILURE);
}
if (live_dev != NULL) {
aconf = ConfigParser(live_dev);
} else {
char *live_dev_c = LiveGetDevice(0);
aconf = ConfigParser(live_dev_c);
/* \todo Set threads number in config to 1 */
}
return RunModeSetLiveCaptureWorkersForDevice(de_ctx,
ConfigParser,
mod_threads_count,
recv_mod_name,
decode_mod_name,
thread_name,
live_dev,
aconf,
1);
}

@ -46,4 +46,12 @@ int RunModeSetLiveCaptureSingle(DetectEngineCtx *de_ctx,
char *decode_mod_name, char *thread_name, char *decode_mod_name, char *thread_name,
const char *live_dev); const char *live_dev);
int RunModeSetLiveCaptureWorkers(DetectEngineCtx *de_ctx,
ConfigIfaceParserFunc configparser,
ConfigIfaceThreadsCountFunc mod_threads_count,
char *recv_mod_name,
char *decode_mod_name, char *thread_name,
const char *live_dev);
#endif /* __UTIL_RUNMODES_H__ */ #endif /* __UTIL_RUNMODES_H__ */

Loading…
Cancel
Save