diff --git a/src/detect-engine.c b/src/detect-engine.c index db2c71bff3..305f939f9f 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -1677,11 +1677,12 @@ int DetectEngineMultiTenantEnabled(void) * * \param tenant_id the tenant id by which the config is known * \param filename full path of a yaml file + * \param loader_id id of loader thread or -1 * * \retval 0 ok * \retval -1 failed */ -int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename) +int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id) { DetectEngineCtx *de_ctx = NULL; char prefix[64]; @@ -1744,6 +1745,300 @@ error: return -1; } +/** + * \param ctx function specific data + * \param loader_id id of the loader that executed the task + */ +typedef int (*LoaderFunc)(void *ctx, int loader_id); + +typedef struct DetectLoaderTask_ { + LoaderFunc Func; + void *ctx; + TAILQ_ENTRY(DetectLoaderTask_) next; +} DetectLoaderTask; + +typedef struct DetectLoaderControl_ { + int id; + int result; /* 0 for ok, error otherwise */ + SCMutex m; + TAILQ_HEAD(, DetectLoaderTask_) task_list; +} DetectLoaderControl; + +#define NLOADERS 4 +static DetectLoaderControl loaders[NLOADERS]; +static int cur_loader = 0; +void TmThreadWakeupDetectLoaderThreads(void); + +/** \param loader -1 for auto select + * \retval loader_id or negative in case of error */ +int DetectLoaderQueueTask(int loader_id, LoaderFunc Func, void *func_ctx) +{ + if (loader_id == -1) { + loader_id = cur_loader; + cur_loader++; + if (cur_loader >= NLOADERS) + cur_loader = 0; + } + if (loader_id >= NLOADERS || loader_id < 0) { + return -ERANGE; + } + + DetectLoaderControl *loader = &loaders[loader_id]; + + DetectLoaderTask *t = SCCalloc(1, sizeof(*t)); + if (t == NULL) + return -ENOMEM; + + t->Func = Func; + t->ctx = func_ctx; + + SCMutexLock(&loader->m); + TAILQ_INSERT_TAIL(&loader->task_list, t, next); + SCMutexUnlock(&loader->m); + + TmThreadWakeupDetectLoaderThreads(); + + SCLogDebug("%d %p %p", loader_id, Func, func_ctx); + return loader_id; +} + +/** \brief wait for loader tasks to complete + * \retval result 0 for ok, -1 for errors */ +int DetectLoadersSync(void) +{ + SCLogDebug("waiting"); + int errors = 0; + int i; + for (i = 0; i < NLOADERS; i++) { + int done = 0; + DetectLoaderControl *loader = &loaders[i]; + while (!done) { + SCMutexLock(&loader->m); + if (TAILQ_EMPTY(&loader->task_list)) { + done = 1; + } + SCMutexUnlock(&loader->m); + } + SCMutexLock(&loader->m); + if (loader->result != 0) { + errors++; + loader->result = 0; + } + SCMutexUnlock(&loader->m); + + } + if (errors) { + SCLogError(SC_ERR_INITIALIZATION, "%d loaders reported errors", errors); + return -1; + } + SCLogDebug("done"); + return 0; +} + +void DetectLoaderInit(DetectLoaderControl *loader) +{ + memset(loader, 0x00, sizeof(*loader)); + SCMutexInit(&loader->m, NULL); + TAILQ_INIT(&loader->task_list); +} + +void DetectLoadersInit(void) +{ + int i; + for (i = 0; i < NLOADERS; i++) { + DetectLoaderInit(&loaders[i]); + } +} + +typedef struct TenantLoaderCtx_ { + uint32_t tenant_id; + const char *yaml; +} TenantLoaderCtx; + +static int DetectLoaderFuncLoadTenant(void *vctx, int loader_id) +{ + TenantLoaderCtx *ctx = (TenantLoaderCtx *)vctx; + +/* TODO we need to somehow store the loader id for when we free */ + if (DetectEngineMultiTenantLoadTenant(ctx->tenant_id, ctx->yaml, loader_id) != 0) { + return -1; + } + return 0; +} + +int DetectLoaderSetupLoadTenant(uint32_t tenant_id, const char *yaml) +{ + TenantLoaderCtx *t = SCCalloc(1, sizeof(*t)); + if (t == NULL) + return -ENOMEM; + + t->tenant_id = tenant_id; + t->yaml = yaml; + + return DetectLoaderQueueTask(-1, DetectLoaderFuncLoadTenant, t); +} + +/** + * \brief Unpauses all threads present in tv_root + */ +void TmThreadWakeupDetectLoaderThreads() +{ + ThreadVars *tv = NULL; + int i = 0; + + SCMutexLock(&tv_root_lock); + for (i = 0; i < TVT_MAX; i++) { + tv = tv_root[i]; + while (tv != NULL) { + if (strcmp(tv->name,"DetectLoader") == 0) { + BUG_ON(tv->ctrl_cond == NULL); + pthread_cond_broadcast(tv->ctrl_cond); + } + tv = tv->next; + } + } + SCMutexUnlock(&tv_root_lock); + + return; +} + +/** + * \brief Unpauses all threads present in tv_root + */ +void TmThreadContinueDetectLoaderThreads() +{ + ThreadVars *tv = NULL; + int i = 0; + + SCMutexLock(&tv_root_lock); + for (i = 0; i < TVT_MAX; i++) { + tv = tv_root[i]; + while (tv != NULL) { + if (strcmp(tv->name,"DetectLoader") == 0) + TmThreadContinue(tv); + + tv = tv->next; + } + } + SCMutexUnlock(&tv_root_lock); + + return; +} + + +SC_ATOMIC_DECLARE(int, detect_loader_cnt); + +typedef struct DetectLoaderThreadData_ { + uint32_t instance; +} DetectLoaderThreadData; + +static TmEcode DetectLoaderThreadInit(ThreadVars *t, void *initdata, void **data) +{ + DetectLoaderThreadData *ftd = SCCalloc(1, sizeof(DetectLoaderThreadData)); + if (ftd == NULL) + return TM_ECODE_FAILED; + + ftd->instance = SC_ATOMIC_ADD(detect_loader_cnt, 1) - 1; /* id's start at 0 */ + SCLogDebug("detect loader instance %u", ftd->instance); + + /* pass thread data back to caller */ + *data = ftd; + + return TM_ECODE_OK; +} + +static TmEcode DetectLoaderThreadDeinit(ThreadVars *t, void *data) +{ + SCFree(data); + return TM_ECODE_OK; +} + + +static TmEcode DetectLoader(ThreadVars *th_v, void *thread_data) +{ + /* block usr2. usr2 to be handled by the main thread only */ + UtilSignalBlock(SIGUSR2); + + DetectLoaderThreadData *ftd = (DetectLoaderThreadData *)thread_data; + BUG_ON(ftd == NULL); + + SCLogDebug("loader thread started"); + while (1) + { + if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { + TmThreadsSetFlag(th_v, THV_PAUSED); + TmThreadTestThreadUnPaused(th_v); + TmThreadsUnsetFlag(th_v, THV_PAUSED); + } + + /* see if we have tasks */ + + DetectLoaderControl *loader = &loaders[ftd->instance]; + SCMutexLock(&loader->m); + + DetectLoaderTask *task = NULL, *tmptask = NULL; + TAILQ_FOREACH_SAFE(task, &loader->task_list, next, tmptask) { + int r = task->Func(task->ctx, ftd->instance); + loader->result |= r; + TAILQ_REMOVE(&loader->task_list, task, next); + SCFree(task); + } + + SCMutexUnlock(&loader->m); + + if (TmThreadsCheckFlag(th_v, THV_KILL)) { + break; + } + + /* just wait until someone wakes us up */ + SCCtrlMutexLock(th_v->ctrl_mutex); + SCCtrlCondWait(th_v->ctrl_cond, th_v->ctrl_mutex); + SCCtrlMutexUnlock(th_v->ctrl_mutex); + + SCLogDebug("woke up..."); + } + return TM_ECODE_OK; +} + +/** \brief spawn the detect loader manager thread */ +void DetectLoaderThreadSpawn() +{ + uint32_t u; + for (u = 0; u < NLOADERS; u++) { + ThreadVars *tv_loader = NULL; + + char name[32] = ""; + snprintf(name, sizeof(name), "DetectLoader%02u", u+1); + + tv_loader = TmThreadCreateCmdThreadByName("DetectLoader", + "DetectLoader", 1); + BUG_ON(tv_loader == NULL); + + if (tv_loader == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(1); + } + if (TmThreadSpawn(tv_loader) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(1); + } + } + return; +} + +void TmModuleDetectLoaderRegister (void) +{ + tmm_modules[TMM_DETECTLOADER].name = "DetectLoader"; + tmm_modules[TMM_DETECTLOADER].ThreadInit = DetectLoaderThreadInit; + tmm_modules[TMM_DETECTLOADER].ThreadDeinit = DetectLoaderThreadDeinit; + tmm_modules[TMM_DETECTLOADER].Management = DetectLoader; + tmm_modules[TMM_DETECTLOADER].cap_flags = 0; + tmm_modules[TMM_DETECTLOADER].flags = TM_FLAG_MANAGEMENT_TM; + SCLogDebug("%s registered", tmm_modules[TMM_DETECTLOADER].name); + + SC_ATOMIC_INIT(detect_loader_cnt); +} + /** * \brief setup multi-detect / multi-tenancy * @@ -1761,6 +2056,11 @@ void DetectEngineMultiTenantSetup(void) int enabled = 0; (void)ConfGetBool("multi-detect.enabled", &enabled); if (enabled == 1) { + DetectLoadersInit(); + TmModuleDetectLoaderRegister(); + DetectLoaderThreadSpawn(); + TmThreadContinueDetectLoaderThreads(); + SCMutexLock(&master->lock); master->multi_tenant_enabled = 1; @@ -1858,7 +2158,7 @@ void DetectEngineMultiTenantSetup(void) } SCLogInfo("tenant id: %u, %s", tenant_id, yaml_node->val); - if (DetectEngineMultiTenantLoadTenant(tenant_id, yaml_node->val) != 0) { + if (DetectLoaderSetupLoadTenant(tenant_id, yaml_node->val) != 0) { /* error logged already */ goto bad_tenant; } @@ -1869,6 +2169,11 @@ void DetectEngineMultiTenantSetup(void) goto error; } } + + /* wait for our loaders to complete their tasks */ + if (DetectLoadersSync() != 0) + goto error; + if (DetectEngineMTApply() < 0) { SCLogError(SC_ERR_DETECT_PREPARE, "initializing the detection engine failed"); goto error; diff --git a/src/detect-engine.h b/src/detect-engine.h index ee2d89de0a..c9ee62e82e 100644 --- a/src/detect-engine.h +++ b/src/detect-engine.h @@ -86,7 +86,7 @@ int DetectEngineReloadIsStart(void); void DetectEngineReloadSetDone(void); int DetectEngineReloadIsDone(void); -int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename); +int DetectEngineMultiTenantLoadTenant(uint32_t tenant_id, const char *filename, int loader_id); int DetectEngineTentantRegisterVlanId(uint32_t tenant_id, uint16_t vlan_id); int DetectEngineTentantUnregisterVlanId(uint32_t tenant_id, uint16_t vlan_id); diff --git a/src/detect.h b/src/detect.h index 2ebfb43f96..590ce24c6b 100644 --- a/src/detect.h +++ b/src/detect.h @@ -719,6 +719,10 @@ typedef struct DetectEngineCtx_ { uint32_t ref_cnt; /** list in master: either active or freelist */ struct DetectEngineCtx_ *next; + + /** id of loader thread 'owning' this de_ctx */ + int loader_id; + } DetectEngineCtx; /* Engine groups profiles (low, medium, high, custom) */ diff --git a/src/runmode-unix-socket.c b/src/runmode-unix-socket.c index bc3e2955c3..1fdd49dfb2 100644 --- a/src/runmode-unix-socket.c +++ b/src/runmode-unix-socket.c @@ -639,7 +639,7 @@ TmEcode UnixSocketRegisterTenant(json_t *cmd, json_t* answer, void *data) SCLogDebug("add-tenant: %d %s", tenant_id, filename); /* 3 load into the system */ - if (DetectEngineMultiTenantLoadTenant(tenant_id, filename) != 0) { + if (DetectEngineMultiTenantLoadTenant(tenant_id, filename, -1) != 0) { json_object_set_new(answer, "message", json_string("adding tenant failed")); return TM_ECODE_FAILED; } diff --git a/src/tm-modules.c b/src/tm-modules.c index c6f8c9ac76..73e9f235fd 100644 --- a/src/tm-modules.c +++ b/src/tm-modules.c @@ -269,6 +269,7 @@ const char * TmModuleTmmIdToString(TmmId id) CASE_CODE (TMM_FLOWMANAGER); CASE_CODE (TMM_FLOWRECYCLER); CASE_CODE (TMM_UNIXMANAGER); + CASE_CODE (TMM_DETECTLOADER); CASE_CODE (TMM_LUALOG); CASE_CODE (TMM_LOGSTATSLOG); CASE_CODE (TMM_RECEIVENETMAP); diff --git a/src/tm-threads-common.h b/src/tm-threads-common.h index e59c2a8764..f6629eaa82 100644 --- a/src/tm-threads-common.h +++ b/src/tm-threads-common.h @@ -104,6 +104,7 @@ typedef enum { TMM_FLOWMANAGER, TMM_FLOWRECYCLER, + TMM_DETECTLOADER, TMM_UNIXMANAGER,