detect: update detect engine management

Update detect engine management to make it easier to reload the detect
engine.

Core of the new approach is a 'master' ctx, that keeps a list of one or
more detect engines. The detect engines will not be passed to any thread
directly, but instead will only be accessed through the detect engine
thread contexts. As we can replace those atomically, replacing a detect
engine becomes easier.

Each thread keeps a reference to its detect context. When a detect engine
is replaced or removed, it's added to a free list. Once its reference
count reaches 0, it is freed.
pull/1389/head
Victor Julien 10 years ago
parent 664100c074
commit d66fa1add1

@ -101,6 +101,8 @@ static TmEcode DetectEngineThreadCtxInitForLiveRuleSwap(ThreadVars *, void *, vo
static uint8_t DetectEngineCtxLoadConf(DetectEngineCtx *); static uint8_t DetectEngineCtxLoadConf(DetectEngineCtx *);
static DetectEngineMasterCtx g_master_de_ctx = { SCMUTEX_INITIALIZER, NULL, NULL, };
/* 2 - for each direction */ /* 2 - for each direction */
DetectEngineAppInspectionEngine *app_inspection_engine[FLOW_PROTO_DEFAULT][ALPROTO_MAX][2]; DetectEngineAppInspectionEngine *app_inspection_engine[FLOW_PROTO_DEFAULT][ALPROTO_MAX][2];
@ -443,6 +445,208 @@ void DetectEngineRegisterAppInspectionEngine(uint8_t ipproto,
return; return;
} }
static int DetectEngineReloadThreads(DetectEngineCtx *new_de_ctx)
{
SCEnter();
int i = 0;
int no_of_detect_tvs = 0;
ThreadVars *tv = NULL;
SCLogNotice("rule reload starting");
/* count detect threads in use */
SCMutexLock(&tv_root_lock);
tv = tv_root[TVT_PPT];
while (tv) {
/* obtain the slots for this TV */
TmSlot *slots = tv->tm_slots;
while (slots != NULL) {
TmModule *tm = TmModuleGetById(slots->tm_id);
if (suricata_ctl_flags != 0) {
SCLogInfo("rule reload interupted by engine shutdown");
SCMutexUnlock(&tv_root_lock);
return -1;
}
if (!(tm->flags & TM_FLAG_DETECT_TM)) {
slots = slots->slot_next;
continue;
}
no_of_detect_tvs++;
break;
}
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
if (no_of_detect_tvs == 0) {
return -1;
}
/* prepare swap structures */
DetectEngineThreadCtx *old_det_ctx[no_of_detect_tvs];
DetectEngineThreadCtx *new_det_ctx[no_of_detect_tvs];
ThreadVars *detect_tvs[no_of_detect_tvs];
memset(old_det_ctx, 0x00, (no_of_detect_tvs * sizeof(DetectEngineThreadCtx *)));
memset(new_det_ctx, 0x00, (no_of_detect_tvs * sizeof(DetectEngineThreadCtx *)));
memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
/* start the process of swapping detect threads ctxs */
/* get reference to tv's and setup new_det_ctx array */
SCMutexLock(&tv_root_lock);
tv = tv_root[TVT_PPT];
while (tv) {
/* obtain the slots for this TV */
TmSlot *slots = tv->tm_slots;
while (slots != NULL) {
TmModule *tm = TmModuleGetById(slots->tm_id);
if (suricata_ctl_flags != 0) {
SCMutexUnlock(&tv_root_lock);
goto error;
}
if (!(tm->flags & TM_FLAG_DETECT_TM)) {
slots = slots->slot_next;
continue;
}
old_det_ctx[i] = SC_ATOMIC_GET(slots->slot_data);
detect_tvs[i] = tv;
TmEcode r = DetectEngineThreadCtxInitForLiveRuleSwap(tv, (void *)new_de_ctx,
(void **)&new_det_ctx[i]);
i++;
if (r == TM_ECODE_FAILED) {
SCLogError(SC_ERR_LIVE_RULE_SWAP, "Detect engine thread init "
"failure in live rule swap. Let's get out of here");
SCMutexUnlock(&tv_root_lock);
goto error;
}
SCLogDebug("live rule swap created new det_ctx - %p and de_ctx "
"- %p\n", new_det_ctx, new_de_ctx);
break;
}
tv = tv->next;
}
BUG_ON(i != no_of_detect_tvs);
/* atomicly replace the det_ctx data */
i = 0;
tv = tv_root[TVT_PPT];
while (tv) {
/* find the correct slot */
TmSlot *slots = tv->tm_slots;
while (slots != NULL) {
if (suricata_ctl_flags != 0) {
return -1;
}
TmModule *tm = TmModuleGetById(slots->tm_id);
if (!(tm->flags & TM_FLAG_DETECT_TM)) {
slots = slots->slot_next;
continue;
}
SCLogDebug("swapping new det_ctx - %p with older one - %p",
new_det_ctx[i], SC_ATOMIC_GET(slots->slot_data));
(void)SC_ATOMIC_SET(slots->slot_data, new_det_ctx[i++]);
break;
}
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
/* threads now all have new data, however they may not have started using
* it and may still use the old data */
SCLogInfo("Live rule swap has swapped %d old det_ctx's with new ones, "
"along with the new de_ctx", no_of_detect_tvs);
/* inject a fake packet if the detect thread isn't using the new ctx yet,
* this speeds up the process */
for (i = 0; i < no_of_detect_tvs; i++) {
int break_out = 0;
int pseudo_pkt_inserted = 0;
usleep(1000);
while (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (suricata_ctl_flags != 0) {
break_out = 1;
break;
}
if (pseudo_pkt_inserted == 0) {
pseudo_pkt_inserted = 1;
if (detect_tvs[i]->inq != NULL) {
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END;
PacketQueue *q = &trans_q[detect_tvs[i]->inq->id];
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
}
}
usleep(1000);
}
if (break_out)
break;
SCLogDebug("new_det_ctx - %p used by detect engine", new_det_ctx[i]);
}
/* this is to make sure that if someone initiated shutdown during a live
* rule swap, the live rule swap won't clean up the old det_ctx and
* de_ctx, till all detect threads have stopped working and sitting
* silently after setting RUNNING_DONE flag and while waiting for
* THV_DEINIT flag */
if (i != no_of_detect_tvs) { // not all threads we swapped
ThreadVars *tv = tv_root[TVT_PPT];
while (tv) {
/* obtain the slots for this TV */
TmSlot *slots = tv->tm_slots;
while (slots != NULL) {
TmModule *tm = TmModuleGetById(slots->tm_id);
if (!(tm->flags & TM_FLAG_DETECT_TM)) {
slots = slots->slot_next;
continue;
}
while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
usleep(100);
}
slots = slots->slot_next;
}
tv = tv->next;
}
}
/* free all the ctxs */
for (i = 0; i < no_of_detect_tvs; i++) {
SCLogDebug("Freeing old_det_ctx - %p used by detect",
old_det_ctx[i]);
DetectEngineThreadCtxDeinit(NULL, old_det_ctx[i]);
}
SRepReloadComplete();
SCLogNotice("rule reload complete");
return 0;
error:
for (i = 0; i < no_of_detect_tvs; i++) {
if (new_det_ctx[i] != NULL)
DetectEngineThreadCtxDeinit(NULL, new_det_ctx[i]);
}
return -1;
}
static void *DetectEngineLiveRuleSwap(void *arg) static void *DetectEngineLiveRuleSwap(void *arg)
{ {
SCEnter(); SCEnter();
@ -762,36 +966,6 @@ void DetectEngineSpawnLiveRuleSwapMgmtThread(void)
SCReturn; SCReturn;
} }
DetectEngineCtx *DetectEngineGetGlobalDeCtx(void)
{
DetectEngineCtx *de_ctx = NULL;
SCMutexLock(&tv_root_lock);
ThreadVars *tv = tv_root[TVT_PPT];
while (tv) {
/* obtain the slots for this TV */
TmSlot *slots = tv->tm_slots;
while (slots != NULL) {
TmModule *tm = TmModuleGetById(slots->tm_id);
if (tm->flags & TM_FLAG_DETECT_TM) {
DetectEngineThreadCtx *det_ctx = SC_ATOMIC_GET(slots->slot_data);
de_ctx = det_ctx->de_ctx;
SCMutexUnlock(&tv_root_lock);
return de_ctx;
}
slots = slots->slot_next;
}
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
return NULL;
}
DetectEngineCtx *DetectEngineCtxInit(void) DetectEngineCtx *DetectEngineCtxInit(void)
{ {
DetectEngineCtx *de_ctx; DetectEngineCtx *de_ctx;
@ -1363,10 +1537,6 @@ static TmEcode ThreadCtxDoInit (DetectEngineCtx *de_ctx, DetectEngineThreadCtx *
*/ */
TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data) TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data)
{ {
DetectEngineCtx *de_ctx = (DetectEngineCtx *)initdata;
if (de_ctx == NULL)
return TM_ECODE_FAILED;
/* first register the counter. In delayed detect mode we exit right after if the /* first register the counter. In delayed detect mode we exit right after if the
* rules haven't been loaded yet. */ * rules haven't been loaded yet. */
uint16_t counter_alerts = SCPerfTVRegisterCounter("detect.alert", tv, uint16_t counter_alerts = SCPerfTVRegisterCounter("detect.alert", tv,
@ -1381,21 +1551,22 @@ TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data)
uint16_t counter_match_list = SCPerfTVRegisterAvgCounter("detect.match_list", tv, uint16_t counter_match_list = SCPerfTVRegisterAvgCounter("detect.match_list", tv,
SC_PERF_TYPE_UINT64, "NULL"); SC_PERF_TYPE_UINT64, "NULL");
#endif #endif
if (de_ctx->delayed_detect == 1 && de_ctx->delayed_detect_initialized == 0) {
*data = NULL;
return TM_ECODE_OK;
}
DetectEngineThreadCtx *det_ctx = SCMalloc(sizeof(DetectEngineThreadCtx)); DetectEngineThreadCtx *det_ctx = SCMalloc(sizeof(DetectEngineThreadCtx));
if (unlikely(det_ctx == NULL)) if (unlikely(det_ctx == NULL))
return TM_ECODE_FAILED; return TM_ECODE_FAILED;
memset(det_ctx, 0, sizeof(DetectEngineThreadCtx)); memset(det_ctx, 0, sizeof(DetectEngineThreadCtx));
det_ctx->tv = tv; det_ctx->tv = tv;
det_ctx->de_ctx = de_ctx; det_ctx->de_ctx = DetectEngineGetCurrent();
if (det_ctx->de_ctx == NULL) {
DetectEngineThreadCtxDeinit(tv, det_ctx);
return TM_ECODE_FAILED;
}
if (ThreadCtxDoInit(de_ctx, det_ctx) != TM_ECODE_OK) if (ThreadCtxDoInit(det_ctx->de_ctx, det_ctx) != TM_ECODE_OK) {
DetectEngineThreadCtxDeinit(tv, det_ctx);
return TM_ECODE_FAILED; return TM_ECODE_FAILED;
}
/** alert counter setup */ /** alert counter setup */
det_ctx->counter_alerts = counter_alerts; det_ctx->counter_alerts = counter_alerts;
@ -1423,19 +1594,18 @@ static TmEcode DetectEngineThreadCtxInitForLiveRuleSwap(ThreadVars *tv, void *in
{ {
*data = NULL; *data = NULL;
DetectEngineCtx *de_ctx = (DetectEngineCtx *)initdata;
if (de_ctx == NULL)
return TM_ECODE_FAILED;
DetectEngineThreadCtx *det_ctx = SCMalloc(sizeof(DetectEngineThreadCtx)); DetectEngineThreadCtx *det_ctx = SCMalloc(sizeof(DetectEngineThreadCtx));
if (unlikely(det_ctx == NULL)) if (unlikely(det_ctx == NULL))
return TM_ECODE_FAILED; return TM_ECODE_FAILED;
memset(det_ctx, 0, sizeof(DetectEngineThreadCtx)); memset(det_ctx, 0, sizeof(DetectEngineThreadCtx));
det_ctx->tv = tv; det_ctx->tv = tv;
det_ctx->de_ctx = de_ctx; det_ctx->de_ctx = DetectEngineGetCurrent();
if (det_ctx->de_ctx == NULL) {
return TM_ECODE_FAILED;
}
if (ThreadCtxDoInit(de_ctx, det_ctx) != TM_ECODE_OK) if (ThreadCtxDoInit(det_ctx->de_ctx, det_ctx) != TM_ECODE_OK)
return TM_ECODE_FAILED; return TM_ECODE_FAILED;
/** alert counter setup */ /** alert counter setup */
@ -1535,6 +1705,7 @@ TmEcode DetectEngineThreadCtxDeinit(ThreadVars *tv, void *data)
} }
DetectEngineThreadCtxDeinitKeywords(det_ctx->de_ctx, det_ctx); DetectEngineThreadCtxDeinitKeywords(det_ctx->de_ctx, det_ctx);
DetectEngineDeReference(&det_ctx->de_ctx);
SCFree(det_ctx); SCFree(det_ctx);
return TM_ECODE_OK; return TM_ECODE_OK;
@ -1611,6 +1782,187 @@ void *DetectThreadCtxGetKeywordThreadCtx(DetectEngineThreadCtx *det_ctx, int id)
return det_ctx->keyword_ctxs_array[id]; return det_ctx->keyword_ctxs_array[id];
} }
DetectEngineCtx *DetectEngineGetCurrent(void)
{
DetectEngineMasterCtx *master = &g_master_de_ctx;
SCMutexLock(&master->lock);
if (master->list == NULL) {
SCMutexUnlock(&master->lock);
return NULL;
}
master->list->ref_cnt++;
SCLogDebug("master->list %p ref_cnt %u", master->list, master->list->ref_cnt);
SCMutexUnlock(&master->lock);
return master->list;
}
void DetectEngineDeReference(DetectEngineCtx **de_ctx)
{
BUG_ON((*de_ctx)->ref_cnt == 0);
(*de_ctx)->ref_cnt--;
*de_ctx = NULL;
}
static int DetectEngineAddToList(DetectEngineCtx *instance)
{
DetectEngineMasterCtx *master = &g_master_de_ctx;
if (instance == NULL)
return -1;
if (master->list == NULL) {
master->list = instance;
} else {
instance->next = master->list;
master->list = instance;
}
return 0;
}
int DetectEngineAddToMaster(DetectEngineCtx *de_ctx)
{
int r;
if (de_ctx == NULL)
return -1;
SCLogDebug("adding de_ctx %p to master", de_ctx);
DetectEngineMasterCtx *master = &g_master_de_ctx;
SCMutexLock(&master->lock);
r = DetectEngineAddToList(de_ctx);
SCMutexUnlock(&master->lock);
return r;
}
int DetectEngineMoveToFreeList(DetectEngineCtx *de_ctx)
{
DetectEngineMasterCtx *master = &g_master_de_ctx;
SCMutexLock(&master->lock);
DetectEngineCtx *instance = master->list;
if (instance == NULL) {
SCMutexUnlock(&master->lock);
return -1;
}
/* remove from active list */
if (instance == de_ctx) {
master->list = instance->next;
} else {
DetectEngineCtx *prev = instance;
instance = instance->next; /* already checked first element */
while (instance) {
DetectEngineCtx *next = instance->next;
if (instance == de_ctx) {
prev->next = instance->next;
break;
}
prev = instance;
instance = next;
}
if (instance == NULL) {
SCMutexUnlock(&master->lock);
return -1;
}
}
/* instance is now detached from list */
instance->next = NULL;
/* add to free list */
if (master->free_list == NULL) {
master->free_list = instance;
} else {
instance->next = master->free_list;
master->free_list = instance;
}
SCLogDebug("detect engine %p moved to free list (%u refs)", de_ctx, de_ctx->ref_cnt);
SCMutexUnlock(&master->lock);
return 0;
}
void DetectEnginePruneFreeList(void)
{
DetectEngineMasterCtx *master = &g_master_de_ctx;
SCMutexLock(&master->lock);
DetectEngineCtx *prev = NULL;
DetectEngineCtx *instance = master->free_list;
while (instance) {
DetectEngineCtx *next = instance->next;
SCLogDebug("detect engine %p has %u ref(s)", instance, instance->ref_cnt);
if (instance->ref_cnt == 0) {
if (prev == NULL) {
master->free_list = next;
} else {
prev->next = next;
}
SCLogDebug("freeing detect engine %p", instance);
DetectEngineCtxFree(instance);
instance = NULL;
}
prev = instance;
instance = next;
}
SCMutexUnlock(&master->lock);
}
int DetectEngineReload(void)
{
DetectEngineCtx *new_de_ctx = NULL;
DetectEngineCtx *old_de_ctx = NULL;
/* get a reference to the current de_ctx */
old_de_ctx = DetectEngineGetCurrent();
if (old_de_ctx == NULL)
return -1;
SCLogDebug("get ref to old_de_ctx %p", old_de_ctx);
/* get new detection engine */
new_de_ctx = DetectEngineCtxInit();
if (new_de_ctx == NULL) {
DetectEngineDeReference(&old_de_ctx);
return -1;
}
if (SigLoadSignatures(new_de_ctx, NULL, 0) != 0) {
DetectEngineCtxFree(new_de_ctx);
DetectEngineDeReference(&old_de_ctx);
return -1;
}
SCThresholdConfInitContext(new_de_ctx, NULL);
SCLogDebug("set up new_de_ctx %p", new_de_ctx);
/* add to master */
DetectEngineAddToMaster(new_de_ctx);
/* move to old free list */
DetectEngineMoveToFreeList(old_de_ctx);
DetectEngineDeReference(&old_de_ctx);
SCLogDebug("going to reload the threads to use new_de_ctx %p", new_de_ctx);
/* update the threads */
DetectEngineReloadThreads(new_de_ctx);
SCLogDebug("threads now run new_de_ctx %p", new_de_ctx);
/* walk free list, freeing the old_de_ctx */
DetectEnginePruneFreeList();
SCLogDebug("old_de_ctx should have been freed");
return 0;
}
const char *DetectSigmatchListEnumToString(enum DetectSigmatchListEnum type) const char *DetectSigmatchListEnumToString(enum DetectSigmatchListEnum type)
{ {
switch (type) { switch (type) {

@ -57,7 +57,6 @@ extern DetectEngineAppInspectionEngine *app_inspection_engine[FLOW_PROTO_DEFAULT
void DetectEngineRegisterAppInspectionEngines(void); void DetectEngineRegisterAppInspectionEngines(void);
void DetectEngineSpawnLiveRuleSwapMgmtThread(void); void DetectEngineSpawnLiveRuleSwapMgmtThread(void);
DetectEngineCtx *DetectEngineCtxInit(void); DetectEngineCtx *DetectEngineCtxInit(void);
DetectEngineCtx *DetectEngineGetGlobalDeCtx(void);
void DetectEngineCtxFree(DetectEngineCtx *); void DetectEngineCtxFree(DetectEngineCtx *);
TmEcode DetectEngineThreadCtxInit(ThreadVars *, void *, void **); TmEcode DetectEngineThreadCtxInit(ThreadVars *, void *, void **);
@ -69,6 +68,13 @@ void DetectEngineResetMaxSigId(DetectEngineCtx *);
void DetectEngineRegisterTests(void); void DetectEngineRegisterTests(void);
const char *DetectSigmatchListEnumToString(enum DetectSigmatchListEnum type); const char *DetectSigmatchListEnumToString(enum DetectSigmatchListEnum type);
int DetectEngineAddToMaster(DetectEngineCtx *de_ctx);
DetectEngineCtx *DetectEngineGetCurrent(void);
void DetectEnginePruneFreeList(void);
int DetectEngineMoveToFreeList(DetectEngineCtx *de_ctx);
void DetectEngineDeReference(DetectEngineCtx **de_ctx);
int DetectEngineReload(void);
/** /**
* \brief Registers an app inspection engine. * \brief Registers an app inspection engine.
* *

@ -736,6 +736,11 @@ typedef struct DetectEngineCtx_ {
struct SCProfileKeywordDetectCtx_ *profile_keyword_ctx; struct SCProfileKeywordDetectCtx_ *profile_keyword_ctx;
struct SCProfileKeywordDetectCtx_ *profile_keyword_ctx_per_list[DETECT_SM_LIST_MAX]; struct SCProfileKeywordDetectCtx_ *profile_keyword_ctx_per_list[DETECT_SM_LIST_MAX];
#endif #endif
/** how many de_ctx' are referencing this */
uint32_t ref_cnt;
/** list in master: either active or freelist */
struct DetectEngineCtx_ *next;
} DetectEngineCtx; } DetectEngineCtx;
/* Engine groups profiles (low, medium, high, custom) */ /* Engine groups profiles (low, medium, high, custom) */
@ -1041,6 +1046,19 @@ typedef struct SigGroupHead_ {
* deal with both cases */ * deal with both cases */
#define SIGMATCH_OPTIONAL_OPT (1 << 5) #define SIGMATCH_OPTIONAL_OPT (1 << 5)
typedef struct DetectEngineMasterCtx_ {
SCMutex lock;
/** list of active detection engines. This list is used to generate the
* threads det_ctx's */
DetectEngineCtx *list;
/** free list, containing detection engines that will be removed but may
* still be referenced by det_ctx's. Freed as soon as all references are
* gone. */
DetectEngineCtx *free_list;
} DetectEngineMasterCtx;
/** Remember to add the options in SignatureIsIPOnly() at detect.c otherwise it wont be part of a signature group */ /** Remember to add the options in SignatureIsIPOnly() at detect.c otherwise it wont be part of a signature group */
enum { enum {

@ -2307,6 +2307,7 @@ int main(int argc, char **argv)
"context failed."); "context failed.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
#ifdef __SC_CUDA_SUPPORT__ #ifdef __SC_CUDA_SUPPORT__
if (PatternMatchDefaultMatcher() == MPM_AC_CUDA) if (PatternMatchDefaultMatcher() == MPM_AC_CUDA)
CudaVarsSetDeCtx(de_ctx); CudaVarsSetDeCtx(de_ctx);
@ -2332,6 +2333,7 @@ int main(int argc, char **argv)
if (suri.run_mode == RUNMODE_ENGINE_ANALYSIS) { if (suri.run_mode == RUNMODE_ENGINE_ANALYSIS) {
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
DetectEngineAddToMaster(de_ctx);
} }
} }
@ -2482,11 +2484,6 @@ int main(int argc, char **argv)
} }
} }
DetectEngineCtx *global_de_ctx = DetectEngineGetGlobalDeCtx();
if (suri.run_mode != RUNMODE_UNIX_SOCKET && de_ctx != NULL) {
BUG_ON(global_de_ctx == NULL);
}
/* before TmThreadKillThreads, as otherwise that kills it /* before TmThreadKillThreads, as otherwise that kills it
* but more slowly */ * but more slowly */
if (suri.run_mode != RUNMODE_UNIX_SOCKET) { if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
@ -2517,9 +2514,14 @@ int main(int argc, char **argv)
AppLayerHtpPrintStats(); AppLayerHtpPrintStats();
if (global_de_ctx) { /** TODO this can do into it's own func */
DetectEngineCtxFree(global_de_ctx); de_ctx = DetectEngineGetCurrent();
if (de_ctx) {
DetectEngineMoveToFreeList(de_ctx);
DetectEngineDeReference(&de_ctx);
} }
DetectEnginePruneFreeList();
AppLayerDeSetup(); AppLayerDeSetup();
TagDestroyCtx(); TagDestroyCtx();

Loading…
Cancel
Save