detect: create loader threads

To speed up startup with many tenants, tenant loading will be parallelized.
As no tempary threads should be used for these memory allocation heavy
tasks, this patch adds new type of 'command' thread that can be used to
load and reload tenants.

This patch hardcodes the number of loaders to 4. Future work will make it
dynamic.

The loader thread essentially sleeps constantly. When a tasks is sent to
it, it will wake up and execute it.
pull/1608/head
Victor Julien 10 years ago
parent 82aa419431
commit eb09118d64

@ -1677,11 +1677,12 @@ int DetectEngineMultiTenantEnabled(void)
*
* \param tenant_id the tenant id by which the config is known
* \param filename full path of a yaml file
* \param loader_id id of loader thread or -1
*
* \retval 0 ok
* \retval -1 failed
*/
int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename)
int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id)
{
DetectEngineCtx *de_ctx = NULL;
char prefix[64];
@ -1744,6 +1745,300 @@ error:
return -1;
}
/**
* \param ctx function specific data
* \param loader_id id of the loader that executed the task
*/
typedef int (*LoaderFunc)(void *ctx, int loader_id);
typedef struct DetectLoaderTask_ {
LoaderFunc Func;
void *ctx;
TAILQ_ENTRY(DetectLoaderTask_) next;
} DetectLoaderTask;
typedef struct DetectLoaderControl_ {
int id;
int result; /* 0 for ok, error otherwise */
SCMutex m;
TAILQ_HEAD(, DetectLoaderTask_) task_list;
} DetectLoaderControl;
#define NLOADERS 4
static DetectLoaderControl loaders[NLOADERS];
static int cur_loader = 0;
void TmThreadWakeupDetectLoaderThreads(void);
/** \param loader -1 for auto select
* \retval loader_id or negative in case of error */
int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx)
{
if (loader_id == -1) {
loader_id = cur_loader;
cur_loader++;
if (cur_loader >= NLOADERS)
cur_loader = 0;
}
if (loader_id >= NLOADERS || loader_id < 0) {
return -ERANGE;
}
DetectLoaderControl *loader = &loaders[loader_id];
DetectLoaderTask *t = SCCalloc(1, sizeof(*t));
if (t == NULL)
return -ENOMEM;
t->Func = Func;
t->ctx = func_ctx;
SCMutexLock(&loader->m);
TAILQ_INSERT_TAIL(&loader->task_list, t, next);
SCMutexUnlock(&loader->m);
TmThreadWakeupDetectLoaderThreads();
SCLogDebug("%d %p %p", loader_id, Func, func_ctx);
return loader_id;
}
/** \brief wait for loader tasks to complete
* \retval result 0 for ok, -1 for errors */
int DetectLoadersSync(void)
{
SCLogDebug("waiting");
int errors = 0;
int i;
for (i = 0; i < NLOADERS; i++) {
int done = 0;
DetectLoaderControl *loader = &loaders[i];
while (!done) {
SCMutexLock(&loader->m);
if (TAILQ_EMPTY(&loader->task_list)) {
done = 1;
}
SCMutexUnlock(&loader->m);
}
SCMutexLock(&loader->m);
if (loader->result != 0) {
errors++;
loader->result = 0;
}
SCMutexUnlock(&loader->m);
}
if (errors) {
SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors);
return -1;
}
SCLogDebug("done");
return 0;
}
void DetectLoaderInit(DetectLoaderControl *loader)
{
memset(loader, 0x00, sizeof(*loader));
SCMutexInit(&loader->m, NULL);
TAILQ_INIT(&loader->task_list);
}
void DetectLoadersInit(void)
{
int i;
for (i = 0; i < NLOADERS; i++) {
DetectLoaderInit(&loaders[i]);
}
}
typedef struct TenantLoaderCtx_ {
uint32_t tenant_id;
const char *yaml;
} TenantLoaderCtx;
static int DetectLoaderFuncLoadTenant(void *vctx, int loader_id)
{
TenantLoaderCtx *ctx = (TenantLoaderCtx *)vctx;
/* TODO we need to somehow store the loader id for when we free */
if (DetectEngineMultiTenantLoadTenant(ctx->tenant_id, ctx->yaml, loader_id) != 0) {
return -1;
}
return 0;
}
int DetectLoaderSetupLoadTenant(uint32_t tenant_id, const char *yaml)
{
TenantLoaderCtx *t = SCCalloc(1, sizeof(*t));
if (t == NULL)
return -ENOMEM;
t->tenant_id = tenant_id;
t->yaml = yaml;
return DetectLoaderQueueTask(-1, DetectLoaderFuncLoadTenant, t);
}
/**
* \brief Unpauses all threads present in tv_root
*/
void TmThreadWakeupDetectLoaderThreads()
{
ThreadVars *tv = NULL;
int i = 0;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
if (strcmp(tv->name,"DetectLoader") == 0) {
BUG_ON(tv->ctrl_cond == NULL);
pthread_cond_broadcast(tv->ctrl_cond);
}
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return;
}
/**
* \brief Unpauses all threads present in tv_root
*/
void TmThreadContinueDetectLoaderThreads()
{
ThreadVars *tv = NULL;
int i = 0;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv != NULL) {
if (strcmp(tv->name,"DetectLoader") == 0)
TmThreadContinue(tv);
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return;
}
SC_ATOMIC_DECLARE(int, detect_loader_cnt);
typedef struct DetectLoaderThreadData_ {
uint32_t instance;
} DetectLoaderThreadData;
static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data)
{
DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData));
if (ftd == NULL)
return TM_ECODE_FAILED;
ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */
SCLogDebug("detect loader instance %u", ftd->instance);
/* pass thread data back to caller */
*data = ftd;
return TM_ECODE_OK;
}
static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data)
{
SCFree(data);
return TM_ECODE_OK;
}
static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data;
BUG_ON(ftd == NULL);
SCLogDebug("loader thread started");
while (1)
{
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
/* see if we have tasks */
DetectLoaderControl *loader = &loaders[ftd->instance];
SCMutexLock(&loader->m);
DetectLoaderTask *task = NULL, *tmptask = NULL;
TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) {
int r = task->Func(task->ctx, ftd->instance);
loader->result |= r;
TAILQ_REMOVE(&loader->task_list, task, next);
SCFree(task);
}
SCMutexUnlock(&loader->m);
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
break;
}
/* just wait until someone wakes us up */
SCCtrlMutexLock(th_v->ctrl_mutex);
SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex);
SCCtrlMutexUnlock(th_v->ctrl_mutex);
SCLogDebug("woke up...");
}
return TM_ECODE_OK;
}
/** \brief spawn the detect loader manager thread */
void DetectLoaderThreadSpawn()
{
uint32_t u;
for (u = 0; u < NLOADERS; u++) {
ThreadVars *tv_loader = NULL;
char name[32] = "";
snprintf(name, sizeof(name), "DetectLoader%02u", u+1);
tv_loader = TmThreadCreateCmdThreadByName("DetectLoader",
"DetectLoader", 1);
BUG_ON(tv_loader == NULL);
if (tv_loader == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(1);
}
if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
}
return;
}
void TmModuleDetectLoaderRegister (void)
{
tmm_modules[TMM_DETECTLOADER].name = "DetectLoader";
tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit;
tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit;
tmm_modules[TMM_DETECTLOADER].Management = DetectLoader;
tmm_modules[TMM_DETECTLOADER].cap_flags = 0;
tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM;
SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name);
SC_ATOMIC_INIT(detect_loader_cnt);
}
/**
* \brief setup multi-detect / multi-tenancy
*
@ -1761,6 +2056,11 @@ void DetectEngineMultiTenantSetup(void)
int enabled = 0;
(void)ConfGetBool("multi-detect.enabled", &enabled);
if (enabled == 1) {
DetectLoadersInit();
TmModuleDetectLoaderRegister();
DetectLoaderThreadSpawn();
TmThreadContinueDetectLoaderThreads();
SCMutexLock(&master->lock);
master->multi_tenant_enabled = 1;
@ -1858,7 +2158,7 @@ void DetectEngineMultiTenantSetup(void)
}
SCLogInfo("tenant id: %u, %s", tenant_id, yaml_node->val);
if (DetectEngineMultiTenantLoadTenant(tenant_id, yaml_node->val) != 0) {
if (DetectLoaderSetupLoadTenant(tenant_id, yaml_node->val) != 0) {
/* error logged already */
goto bad_tenant;
}
@ -1869,6 +2169,11 @@ void DetectEngineMultiTenantSetup(void)
goto error;
}
}
/* wait for our loaders to complete their tasks */
if (DetectLoadersSync() != 0)
goto error;
if (DetectEngineMTApply() < 0) {
SCLogError(SC_ERR_DETECT_PREPARE, "initializing the detection engine failed");
goto error;

@ -86,7 +86,7 @@ int DetectEngineReloadIsStart(void);
void DetectEngineReloadSetDone(void);
int DetectEngineReloadIsDone(void);
int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename);
int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id);
int DetectEngineTentantRegisterVlanId(uint32_t tenant_id, uint16_t vlan_id);
int DetectEngineTentantUnregisterVlanId(uint32_t tenant_id, uint16_t vlan_id);

@ -719,6 +719,10 @@ typedef struct DetectEngineCtx_ {
uint32_t ref_cnt;
/** list in master: either active or freelist */
struct DetectEngineCtx_ *next;
/** id of loader thread 'owning' this de_ctx */
int loader_id;
} DetectEngineCtx;
/* Engine groups profiles (low, medium, high, custom) */

@ -639,7 +639,7 @@ TmEcode UnixSocketRegisterTenant(json_t *cmd, json_t* answer, void *data)
SCLogDebug("add-tenant: %d %s", tenant_id, filename);
/* 3 load into the system */
if (DetectEngineMultiTenantLoadTenant(tenant_id, filename) != 0) {
if (DetectEngineMultiTenantLoadTenant(tenant_id, filename, -1) != 0) {
json_object_set_new(answer, "message", json_string("adding tenant failed"));
return TM_ECODE_FAILED;
}

@ -269,6 +269,7 @@ const char * TmModuleTmmIdToString(TmmId id)
CASE_CODE (TMM_FLOWMANAGER);
CASE_CODE (TMM_FLOWRECYCLER);
CASE_CODE (TMM_UNIXMANAGER);
CASE_CODE (TMM_DETECTLOADER);
CASE_CODE (TMM_LUALOG);
CASE_CODE (TMM_LOGSTATSLOG);
CASE_CODE (TMM_RECEIVENETMAP);

@ -104,6 +104,7 @@ typedef enum {
TMM_FLOWMANAGER,
TMM_FLOWRECYCLER,
TMM_DETECTLOADER,
TMM_UNIXMANAGER,

Loading…
Cancel
Save