detect loader: move to own file

pull/1608/head
Victor Julien 11 years ago
parent cfeaf42cab
commit 979bd35277

@ -112,6 +112,7 @@ detect-engine-hscd.c detect-engine-hscd.h \
detect-engine-hsmd.c detect-engine-hsmd.h \
detect-engine-hua.c detect-engine-hua.h \
detect-engine-iponly.c detect-engine-iponly.h \
detect-engine-loader.c detect-engine-loader.h \
detect-engine-mpm.c detect-engine-mpm.h \
detect-engine-payload.c detect-engine-payload.h \
detect-engine-port.c detect-engine-port.h \

@ -0,0 +1,300 @@
/* Copyright (C) 2015 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*/
#include "suricata-common.h"
#include "suricata.h"
#include "conf.h"
#include "debug.h"
#include "detect.h"
#include "threads.h"
#include "threadvars.h"
#include "tm-threads.h"
#include "queue.h"
#include "util-signal.h"
#include "detect-engine-loader.h"
#define NLOADERS 4
static DetectLoaderControl *loaders = NULL;
static int cur_loader = 0;
void TmThreadWakeupDetectLoaderThreads(void);
static int num_loaders = NLOADERS;
/** \param loader -1 for auto select
* \retval loader_id or negative in case of error */
int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx)
{
if (loader_id == -1) {
loader_id = cur_loader;
cur_loader++;
if (cur_loader >= num_loaders)
cur_loader = 0;
}
if (loader_id >= num_loaders || loader_id < 0) {
return -ERANGE;
}
DetectLoaderControl *loader = &loaders[loader_id];
DetectLoaderTask *t = SCCalloc(1, sizeof(*t));
if (t == NULL)
return -ENOMEM;
t->Func = Func;
t->ctx = func_ctx;
SCMutexLock(&loader->m);
TAILQ_INSERT_TAIL(&loader->task_list, t, next);
SCMutexUnlock(&loader->m);
TmThreadWakeupDetectLoaderThreads();
SCLogDebug("%d %p %p", loader_id, Func, func_ctx);
return loader_id;
}
/** \brief wait for loader tasks to complete
* \retval result 0 for ok, -1 for errors */
int DetectLoadersSync(void)
{
SCLogDebug("waiting");
int errors = 0;
int i;
for (i = 0; i < num_loaders; i++) {
int done = 0;
DetectLoaderControl *loader = &loaders[i];
while (!done) {
SCMutexLock(&loader->m);
if (TAILQ_EMPTY(&loader->task_list)) {
done = 1;
}
SCMutexUnlock(&loader->m);
}
SCMutexLock(&loader->m);
if (loader->result != 0) {
errors++;
loader->result = 0;
}
SCMutexUnlock(&loader->m);
}
if (errors) {
SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors);
return -1;
}
SCLogDebug("done");
return 0;
}
static void DetectLoaderInit(DetectLoaderControl *loader)
{
memset(loader, 0x00, sizeof(*loader));
SCMutexInit(&loader->m, NULL);
TAILQ_INIT(&loader->task_list);
}
void DetectLoadersInit(void)
{
intmax_t setting = NLOADERS;
(void)ConfGetInt("multi-detect.loaders", &setting);
if (setting < 1 || setting > 1024) {
SCLogError(SC_ERR_INVALID_ARGUMENTS,
"invalid multi-detect.loaders setting %"PRIdMAX, setting);
exit(EXIT_FAILURE);
}
num_loaders = (int32_t)setting;
SCLogInfo("using %d detect loader threads", num_loaders);
BUG_ON(loaders != NULL);
loaders = SCCalloc(num_loaders, sizeof(DetectLoaderControl));
BUG_ON(loaders == NULL);
int i;
for (i = 0; i < num_loaders; i++) {
DetectLoaderInit(&loaders[i]);
}
}
/**
* \brief Unpauses all threads present in tv_root
*/
void TmThreadWakeupDetectLoaderThreads()
{
ThreadVars *tv = NULL;
int i = 0;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
if (strcmp(tv->name,"DetectLoader") == 0) {
BUG_ON(tv->ctrl_cond == NULL);
pthread_cond_broadcast(tv->ctrl_cond);
}
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return;
}
/**
* \brief Unpauses all threads present in tv_root
*/
void TmThreadContinueDetectLoaderThreads()
{
ThreadVars *tv = NULL;
int i = 0;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
if (strcmp(tv->name,"DetectLoader") == 0)
TmThreadContinue(tv);
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return;
}
SC_ATOMIC_DECLARE(int, detect_loader_cnt);
typedef struct DetectLoaderThreadData_ {
uint32_t instance;
} DetectLoaderThreadData;
static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data)
{
DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */
SCLogDebug("detect loader instance %u", ftd->instance);
/* pass thread data back to caller */
*data = ftd;
return TM_ECODE_OK;
}
static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data)
{
SCFree(data);
return TM_ECODE_OK;
}
static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data;
BUG_ON(ftd == NULL);
SCLogDebug("loader thread started");
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
/* see if we have tasks */
DetectLoaderControl *loader = &loaders[ftd->instance];
SCMutexLock(&loader->m);
DetectLoaderTask *task = NULL, *tmptask = NULL;
TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) {
int r = task->Func(task->ctx, ftd->instance);
loader->result |= r;
TAILQ_REMOVE(&loader->task_list, task, next);
SCFree(task);
}
SCMutexUnlock(&loader->m);
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
break;
}
/* just wait until someone wakes us up */
SCCtrlMutexLock(th_v->ctrl_mutex);
SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex);
SCCtrlMutexUnlock(th_v->ctrl_mutex);
SCLogDebug("woke up...");
}
return TM_ECODE_OK;
}
/** \brief spawn the detect loader manager thread */
void DetectLoaderThreadSpawn()
{
int i;
for (i = 0; i < num_loaders; i++) {
ThreadVars *tv_loader = NULL;
char name[32] = "";
snprintf(name, sizeof(name), "DetectLoader%02d", i+1);
tv_loader = TmThreadCreateCmdThreadByName("DetectLoader",
"DetectLoader", 1);
BUG_ON(tv_loader == NULL);
if (tv_loader == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return;
}
void TmModuleDetectLoaderRegister (void)
{
tmm_modules[TMM_DETECTLOADER].name = "DetectLoader";
tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit;
tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit;
tmm_modules[TMM_DETECTLOADER].Management = DetectLoader;
tmm_modules[TMM_DETECTLOADER].cap_flags = 0;
tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name);
SC_ATOMIC_INIT(detect_loader_cnt);
}

@ -0,0 +1,57 @@
/* Copyright (C) 2015 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* Detect loader API, for using multiple 'loader' threads
* that can load multiple detection engines in parallel.
*/
#ifndef __DETECT_ENGINE_LOADER_H__
#define __DETECT_ENGINE_LOADER_H__
/**
* \param ctx function specific data
* \param loader_id id of the loader that executed the task
*/
typedef int (*LoaderFunc)(void *ctx, int loader_id);
typedef struct DetectLoaderTask_ {
LoaderFunc Func;
void *ctx;
TAILQ_ENTRY(DetectLoaderTask_) next;
} DetectLoaderTask;
typedef struct DetectLoaderControl_ {
int id;
int result; /* 0 for ok, error otherwise */
SCMutex m;
TAILQ_HEAD(, DetectLoaderTask_) task_list;
} DetectLoaderControl;
int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx);
int DetectLoadersSync(void);
void DetectLoadersInit(void);
void TmThreadContinueDetectLoaderThreads();
void DetectLoaderThreadSpawn();
void TmModuleDetectLoaderRegister (void);
#endif /* __DETECT_ENGINE_LOADER_H__ */

@ -71,6 +71,8 @@
#include "detect-uricontent.h"
#include "detect-engine-threshold.h"
#include "detect-engine-loader.h"
#include "util-classification-config.h"
#include "util-reference-config.h"
#include "util-threshold-config.h"
@ -1795,128 +1797,6 @@ error:
}
/**
* \param ctx function specific data
* \param loader_id id of the loader that executed the task
*/
typedef int (*LoaderFunc)(void *ctx, int loader_id);
typedef struct DetectLoaderTask_ {
LoaderFunc Func;
void *ctx;
TAILQ_ENTRY(DetectLoaderTask_) next;
} DetectLoaderTask;
typedef struct DetectLoaderControl_ {
int id;
int result; /* 0 for ok, error otherwise */
SCMutex m;
TAILQ_HEAD(, DetectLoaderTask_) task_list;
} DetectLoaderControl;
#define NLOADERS 4
static DetectLoaderControl *loaders = NULL;
static int cur_loader = 0;
void TmThreadWakeupDetectLoaderThreads(void);
static int num_loaders = NLOADERS;
/** \param loader -1 for auto select
* \retval loader_id or negative in case of error */
int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx)
{
if (loader_id == -1) {
loader_id = cur_loader;
cur_loader++;
if (cur_loader >= num_loaders)
cur_loader = 0;
}
if (loader_id >= num_loaders || loader_id < 0) {
return -ERANGE;
}
DetectLoaderControl *loader = &loaders[loader_id];
DetectLoaderTask *t = SCCalloc(1, sizeof(*t));
if (t == NULL)
return -ENOMEM;
t->Func = Func;
t->ctx = func_ctx;
SCMutexLock(&loader->m);
TAILQ_INSERT_TAIL(&loader->task_list, t, next);
SCMutexUnlock(&loader->m);
TmThreadWakeupDetectLoaderThreads();
SCLogDebug("%d %p %p", loader_id, Func, func_ctx);
return loader_id;
}
/** \brief wait for loader tasks to complete
* \retval result 0 for ok, -1 for errors */
int DetectLoadersSync(void)
{
SCLogDebug("waiting");
int errors = 0;
int i;
for (i = 0; i < num_loaders; i++) {
int done = 0;
DetectLoaderControl *loader = &loaders[i];
while (!done) {
SCMutexLock(&loader->m);
if (TAILQ_EMPTY(&loader->task_list)) {
done = 1;
}
SCMutexUnlock(&loader->m);
}
SCMutexLock(&loader->m);
if (loader->result != 0) {
errors++;
loader->result = 0;
}
SCMutexUnlock(&loader->m);
}
if (errors) {
SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors);
return -1;
}
SCLogDebug("done");
return 0;
}
void DetectLoaderInit(DetectLoaderControl *loader)
{
memset(loader, 0x00, sizeof(*loader));
SCMutexInit(&loader->m, NULL);
TAILQ_INIT(&loader->task_list);
}
void DetectLoadersInit(void)
{
intmax_t setting = NLOADERS;
(void)ConfGetInt("multi-detect.loaders", &setting);
if (setting < 1 || setting > 1024) {
SCLogError(SC_ERR_INVALID_ARGUMENTS,
"invalid multi-detect.loaders setting %"PRIdMAX, setting);
exit(EXIT_FAILURE);
}
num_loaders = (int32_t)setting;
SCLogInfo("using %d detect loader threads", num_loaders);
BUG_ON(loaders != NULL);
loaders = SCCalloc(num_loaders, sizeof(DetectLoaderControl));
BUG_ON(loaders == NULL);
int i;
for (i = 0; i < num_loaders; i++) {
DetectLoaderInit(&loaders[i]);
}
}
typedef struct TenantLoaderCtx_ {
uint32_t tenant_id;
int reload_cnt; /**< used by reload */
@ -2007,168 +1887,6 @@ int DetectEngineReloadTenantBlocking(uint32_t tenant_id, const char *yaml, int r
return 0;
}
/**
* \brief Unpauses all threads present in tv_root
*/
void TmThreadWakeupDetectLoaderThreads()
{
ThreadVars *tv = NULL;
int i = 0;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
if (strcmp(tv->name,"DetectLoader") == 0) {
BUG_ON(tv->ctrl_cond == NULL);
pthread_cond_broadcast(tv->ctrl_cond);
}
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return;
}
/**
* \brief Unpauses all threads present in tv_root
*/
void TmThreadContinueDetectLoaderThreads()
{
ThreadVars *tv = NULL;
int i = 0;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
if (strcmp(tv->name,"DetectLoader") == 0)
TmThreadContinue(tv);
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return;
}
SC_ATOMIC_DECLARE(int, detect_loader_cnt);
typedef struct DetectLoaderThreadData_ {
uint32_t instance;
} DetectLoaderThreadData;
static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data)
{
DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */
SCLogDebug("detect loader instance %u", ftd->instance);
/* pass thread data back to caller */
*data = ftd;
return TM_ECODE_OK;
}
static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data)
{
SCFree(data);
return TM_ECODE_OK;
}
static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data;
BUG_ON(ftd == NULL);
SCLogDebug("loader thread started");
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
/* see if we have tasks */
DetectLoaderControl *loader = &loaders[ftd->instance];
SCMutexLock(&loader->m);
DetectLoaderTask *task = NULL, *tmptask = NULL;
TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) {
int r = task->Func(task->ctx, ftd->instance);
loader->result |= r;
TAILQ_REMOVE(&loader->task_list, task, next);
SCFree(task);
}
SCMutexUnlock(&loader->m);
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
break;
}
/* just wait until someone wakes us up */
SCCtrlMutexLock(th_v->ctrl_mutex);
SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex);
SCCtrlMutexUnlock(th_v->ctrl_mutex);
SCLogDebug("woke up...");
}
return TM_ECODE_OK;
}
/** \brief spawn the detect loader manager thread */
void DetectLoaderThreadSpawn()
{
int i;
for (i = 0; i < num_loaders; i++) {
ThreadVars *tv_loader = NULL;
char name[32] = "";
snprintf(name, sizeof(name), "DetectLoader%02d", i+1);
tv_loader = TmThreadCreateCmdThreadByName("DetectLoader",
"DetectLoader", 1);
BUG_ON(tv_loader == NULL);
if (tv_loader == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return;
}
void TmModuleDetectLoaderRegister (void)
{
tmm_modules[TMM_DETECTLOADER].name = "DetectLoader";
tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit;
tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit;
tmm_modules[TMM_DETECTLOADER].Management = DetectLoader;
tmm_modules[TMM_DETECTLOADER].cap_flags = 0;
tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name);
SC_ATOMIC_INIT(detect_loader_cnt);
}
/**
* \brief setup multi-detect / multi-tenancy
*

Loading…
Cancel
Save