Delay Detect threads initialization

This patch modifies the init of Detect threads. They are now started
with a dummy function and their initialisation is done after the
signatures are loaded. Just after this, the dummy function is switched
to normal one.

In IPS mode, this permit to route packets without waiting for the
signature to start and should fix #488.

Offline mode such as pcap file don't use this mode to be sure to
analyse all packets in the file.

The patch introduces a "delayed-detect" configuration variable
under detect-engine. It can be used to activate the feature
(set to "yes" to have signature loaded after capture is started).
pull/39/head
Eric Leblond 13 years ago
parent eaea832a4e
commit 7e09cdc265

@ -338,6 +338,7 @@ void *SCCudaPBTmThreadsSlot1(void *td)
TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
TmThreadTestThreadUnPaused(tv);
/* input a packet */
@ -354,9 +355,9 @@ void *SCCudaPBTmThreadsSlot1(void *td)
* the Batcher TM(which is waiting on a cond from the previous
* feeder TM). Please handle the NULL packet case in the
* function that you now call */
r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL);
r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL);
} else {
r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL);
r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), NULL, NULL);
/* handle error */
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, p);

@ -671,6 +671,9 @@ typedef struct DetectEngineCtx_ {
/** Store rule file and line so that parsers can use them in errors. */
char *rule_file;
int rule_line;
/** Is detect engine using a delayed init */
int delayed_detect;
} DetectEngineCtx;
/* Engine groups profiles (low, medium, high, custom) */

@ -569,7 +569,8 @@ static inline void FlowForceReassemblyForHash(void)
} else {
TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
while (s != NULL) {
s->SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
&s->slot_post_pq);
s = s->slot_next;
}
@ -598,7 +599,8 @@ static inline void FlowForceReassemblyForHash(void)
} else {
TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
while (s != NULL) {
s->SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
SlotFunc(NULL, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
&s->slot_post_pq);
s = s->slot_next;
}

@ -686,6 +686,7 @@ int main(int argc, char **argv)
#endif /* OS_WIN32 */
int build_info = 0;
int rule_reload = 0;
int delayed_detect = 0;
char *log_dir;
#ifdef OS_WIN32
@ -1734,18 +1735,42 @@ int main(int argc, char **argv)
if (MagicInit() != 0)
exit(EXIT_FAILURE);
if (SigLoadSignatures(de_ctx, sig_file, sig_file_exclusive) < 0) {
if (sig_file == NULL) {
SCLogError(SC_ERR_OPENING_FILE, "Signature file has not been provided");
} else {
SCLogError(SC_ERR_NO_RULES_LOADED, "Loading signatures failed.");
/* In offline mode delayed init of detect is a bad idea */
if ((run_mode == RUNMODE_PCAP_FILE) ||
(run_mode == RUNMODE_ERF_FILE) ||
engine_analysis) {
delayed_detect = 0;
} else {
ConfNode *denode = NULL;
ConfNode *decnf = ConfGetNode("detect-engine");
if (decnf != NULL) {
TAILQ_FOREACH(denode, &decnf->head, next) {
if (strcmp(denode->val, "delayed-detect") == 0) {
(void)ConfGetChildValueBool(denode, "delayed-detect", &delayed_detect);
}
}
}
if (de_ctx->failure_fatal)
exit(EXIT_FAILURE);
}
de_ctx->delayed_detect = delayed_detect;
if (engine_analysis) {
exit(EXIT_SUCCESS);
SCLogInfo("Delayed detect %s", delayed_detect ? "enabled" : "disabled");
if (delayed_detect) {
SCLogInfo("Packets will start being processed before signatures are active.");
}
if (!delayed_detect) {
if (SigLoadSignatures(de_ctx, sig_file, sig_file_exclusive) < 0) {
if (sig_file == NULL) {
SCLogError(SC_ERR_OPENING_FILE, "Signature file has not been provided");
} else {
SCLogError(SC_ERR_NO_RULES_LOADED, "Loading signatures failed.");
}
if (de_ctx->failure_fatal)
exit(EXIT_FAILURE);
}
if (engine_analysis) {
exit(EXIT_SUCCESS);
}
}
/* registering singal handlers we use. We register usr2 here, so that one
@ -1855,6 +1880,21 @@ int main(int argc, char **argv)
/* Un-pause all the paused threads */
TmThreadContinueThreads();
if (delayed_detect) {
if (SigLoadSignatures(de_ctx, sig_file, sig_file_exclusive) < 0) {
if (sig_file == NULL) {
SCLogError(SC_ERR_OPENING_FILE, "Signature file has not been provided");
} else {
SCLogError(SC_ERR_NO_RULES_LOADED, "Loading signatures failed.");
}
if (de_ctx->failure_fatal)
exit(EXIT_FAILURE);
}
TmThreadActivateDummySlot();
SCLogInfo("Signature(s) loaded, Detect thread(s) activated.");
}
#ifdef DBG_MEM_ALLOC
SCLogInfo("Memory used at startup: %"PRIdMAX, (intmax_t)global_mem);
#ifdef DBG_MEM_ALLOC_SKIP_STARTUP

@ -1,4 +1,4 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
/* Copyright (C) 2007-2012 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
@ -20,7 +20,7 @@
*
* \author Victor Julien <victor@inliniac.net>
* \author Anoop Saldanha <anoopsaldanha@gmail.com>
* \author Eric Leblond <eleblond@edenwall.com>
* \author Eric Leblond <eric@regit.org>
*
* Thread management functions.
*/
@ -43,6 +43,7 @@
#include "util-optimize.h"
#include "util-profiling.h"
#include "util-signal.h"
#include "queue.h"
#ifdef PROFILE_LOCKING
__thread uint64_t mutex_lock_contention;
@ -119,6 +120,16 @@ void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag)
SC_ATOMIC_AND(tv->flags, ~flag);
}
/**
* \brief Function to use as dummy stack function
*
* \retval TM_ECODE_OK
*/
TmEcode TmDummyFunc(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{
return TM_ECODE_OK;
}
/* 1 slot functions */
void *TmThreadsSlot1NoIn(void *td)
{
@ -159,8 +170,9 @@ void *TmThreadsSlot1NoIn(void *td)
while (run) {
TmThreadTestThreadUnPaused(tv);
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
r = s->SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
/* handle error */
if (r == TM_ECODE_FAILED) {
@ -257,11 +269,12 @@ void *TmThreadsSlot1NoOut(void *td)
while (run) {
TmThreadTestThreadUnPaused(tv);
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
p = tv->tmqh_in(tv);
PACKET_PROFILING_TMM_START(p, s->tm_id);
r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), /* no outqh no pq */ NULL,
r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), /* no outqh no pq */ NULL,
/* no outqh no pq */ NULL);
PACKET_PROFILING_TMM_END(p, s->tm_id);
@ -337,9 +350,10 @@ void *TmThreadsSlot1NoInOut(void *td)
TmThreadsSetFlag(tv, THV_INIT_DONE);
while (run) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
TmThreadTestThreadUnPaused(tv);
r = s->SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), /* no outqh, no pq */NULL, NULL);
r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), /* no outqh, no pq */NULL, NULL);
/* handle error */
if (r == TM_ECODE_FAILED) {
@ -420,8 +434,9 @@ void *TmThreadsSlot1(void *td)
p = tv->tmqh_in(tv);
if (p != NULL) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
PACKET_PROFILING_TMM_START(p, s->tm_id);
r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq,
&s->slot_post_pq);
PACKET_PROFILING_TMM_END(p, s->tm_id);
@ -500,12 +515,13 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p,
Packet *extra_p;
for (s = slot; s != NULL; s = s->slot_next) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
PACKET_PROFILING_TMM_START(p, s->tm_id);
if (unlikely(s->id == 0)) {
r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
} else {
r = s->SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL);
r = SlotFunc(tv, p, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, NULL);
}
PACKET_PROFILING_TMM_END(p, s->tm_id);
@ -885,20 +901,23 @@ ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
* \param tv TV the slot is attached to.
* \param tm TM to append.
* \param data Data to be passed on to the slot init function.
*
* \retval The allocated TmSlot or NULL if there is an error
*/
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
static inline TmSlot * _TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
{
TmSlot *s = (TmSlot *)tv->tm_slots;
TmSlot *slot = SCMalloc(sizeof(TmSlot));
if (slot == NULL)
return;
return NULL;
memset(slot, 0, sizeof(TmSlot));
SC_ATOMIC_INIT(slot->slot_data);
slot->tv = tv;
slot->SlotThreadInit = tm->ThreadInit;
slot->slot_initdata = data;
slot->SlotFunc = tm->Func;
SC_ATOMIC_INIT(slot->SlotFunc);
SC_ATOMIC_SET(slot->SlotFunc, tm->Func);
slot->PktAcqLoop = tm->PktAcqLoop;
slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
slot->SlotThreadDeinit = tm->ThreadDeinit;
@ -925,9 +944,106 @@ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
}
}
return slot;
}
/**
* \brief Appends a new entry to the slots.
*
* \param tv TV the slot is attached to.
* \param tm TM to append.
* \param data Data to be passed on to the slot init function.
*/
void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data)
{
_TmSlotSetFuncAppend(tv, tm, data);
}
typedef struct TmDummySlot_ {
TmSlot *slot;
TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *,
PacketQueue *);
TmEcode (*SlotThreadInit)(ThreadVars *, void *, void **);
TAILQ_ENTRY(TmDummySlot_) next;
} TmDummySlot;
static TAILQ_HEAD(, TmDummySlot_) dummy_slots =
TAILQ_HEAD_INITIALIZER(dummy_slots);
/**
* \brief Appends a new entry to the slots with a delayed option.
*
* \param tv TV the slot is attached to.
* \param tm TM to append.
* \param data Data to be passed on to the slot init function.
* \param delayed Delayed start of slot if equal to 1
*/
void TmSlotSetFuncAppendDelayed(ThreadVars *tv, TmModule *tm, void *data,
int delayed)
{
TmSlot *slot = _TmSlotSetFuncAppend(tv, tm, data);
TmDummySlot *dslot = NULL;
if ((slot == NULL) || (delayed == 0)) {
return;
}
dslot = SCMalloc(sizeof(TmDummySlot));
if (dslot == NULL) {
return;
}
memset(dslot, 0, sizeof(*dslot));
dslot->SlotFunc = SC_ATOMIC_GET(slot->SlotFunc);
SC_ATOMIC_SET(slot->SlotFunc, TmDummyFunc);
dslot->SlotThreadInit = slot->SlotThreadInit;
slot->SlotThreadInit = NULL;
dslot->slot = slot;
TAILQ_INSERT_TAIL(&dummy_slots, dslot, next);
return;
}
/**
* \brief Activate slots that have been started in delayed mode
*/
void TmThreadActivateDummySlot()
{
TmDummySlot *dslot;
TmSlot *s;
TmEcode r = TM_ECODE_OK;
TAILQ_FOREACH(dslot, &dummy_slots, next) {
void *slot_data = NULL;
s = dslot->slot;
if (dslot->SlotThreadInit != NULL) {
s->SlotThreadInit = dslot->SlotThreadInit;
r = s->SlotThreadInit(s->tv, s->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
EngineKill();
TmThreadsSetFlag(s->tv, THV_CLOSED | THV_RUNNING_DONE);
}
SC_ATOMIC_SET(s->slot_data, slot_data);
}
SC_ATOMIC_CAS(&s->SlotFunc, TmDummyFunc, dslot->SlotFunc);
}
}
/**
* \brief Deactivate slots that have been started in delayed mode.
*/
void TmThreadDeActivateDummySlot()
{
TmDummySlot *dslot;
TAILQ_FOREACH(dslot, &dummy_slots, next) {
SC_ATOMIC_CAS(&dslot->slot->SlotFunc, dslot->SlotFunc, TmDummyFunc);
dslot->slot->SlotThreadInit = NULL;
}
}
/**
* \brief Returns the slot holding a TM with the particular tm_id.
*

@ -29,13 +29,15 @@
#include "tm-threads-common.h"
#include "tm-modules.h"
typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *,
PacketQueue *);
typedef struct TmSlot_ {
/* the TV holding this slot */
ThreadVars *tv;
/* function pointers */
TmEcode (*SlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *,
PacketQueue *);
SC_ATOMIC_DECLARE(TmSlotFunc, SlotFunc);
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
@ -72,6 +74,7 @@ extern ThreadVars *tv_root[TVT_MAX];
extern SCMutex tv_root_lock;
void TmSlotSetFuncAppend(ThreadVars *, TmModule *, void *);
void TmSlotSetFuncAppendDelayed(ThreadVars *, TmModule *, void *, int delayed);
TmSlot *TmSlotGetSlotForTM(int);
ThreadVars *TmThreadCreate(char *, char *, char *, char *, char *, char *,
@ -104,6 +107,9 @@ void TmThreadCheckThreadState(void);
TmEcode TmThreadWaitOnThreadInit(void);
ThreadVars *TmThreadsGetCallingThread(void);
void TmThreadActivateDummySlot(void);
void TmThreadDeActivateDummySlot(void);
int TmThreadsCheckFlag(ThreadVars *, uint8_t);
void TmThreadsSetFlag(ThreadVars *, uint8_t);
void TmThreadsUnsetFlag(ThreadVars *, uint8_t);

@ -2267,7 +2267,8 @@ void *CudaMpmB2gThreadsSlot1(void *td)
* If the MPM is configured to use multiple CUstreams, buffer (1) and buffer (2) are
* processed in parallel using multiple streams; In this case
* data_queues[tctx->tmq_streamq->id] will contain the results of packet buffer (2). */
r = s->SlotFunc(tv,
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
r = SlotFunc(tv,
(Packet *)data,
(void *)tctx,
(PacketQueue *)&data_queues[tv->inq->id],

@ -330,7 +330,8 @@ int RunModeSetLiveCaptureAuto(DetectEngineCtx *de_ctx,
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module,
(void *)de_ctx, de_ctx->delayed_detect);
TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
@ -574,7 +575,8 @@ int RunModeSetLiveCaptureAutoFp(DetectEngineCtx *de_ctx,
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module,
(void *)de_ctx, de_ctx->delayed_detect);
TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
@ -674,7 +676,8 @@ static int RunModeSetLiveCaptureWorkersForDevice(DetectEngineCtx *de_ctx,
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx);
TmSlotSetFuncAppendDelayed(tv, tm_module,
(void *)de_ctx, de_ctx->delayed_detect);
tm_module = TmModuleGetByName("RespondReject");
if (tm_module == NULL) {
@ -879,7 +882,8 @@ int RunModeSetIPSAuto(DetectEngineCtx *de_ctx,
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module,
(void *)de_ctx, de_ctx->delayed_detect);
TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
@ -1064,7 +1068,8 @@ int RunModeSetIPSAutoFp(DetectEngineCtx *de_ctx,
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
TmSlotSetFuncAppendDelayed(tv_detect_ncpu, tm_module,
(void *)de_ctx, de_ctx->delayed_detect);
TmThreadSetCPU(tv_detect_ncpu, DETECT_CPU_SET);
@ -1181,7 +1186,8 @@ int RunModeSetIPSWorker(DetectEngineCtx *de_ctx,
SCLogError(SC_ERR_RUNMODE, "TmModuleGetByName Detect failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, (void *)de_ctx);
TmSlotSetFuncAppendDelayed(tv, tm_module,
(void *)de_ctx, de_ctx->delayed_detect);
tm_module = TmModuleGetByName(verdict_mod_name);
if (tm_module == NULL) {

@ -294,6 +294,9 @@ detect-engine:
# When rule-reload is enabled, sending a USR2 signal to the Suricata process
# will trigger a live rule reload. Experimental feature, use with care.
#- rule-reload: true
# If set to yes, the loading of signatures will be made after the capture
# is started. This will limit the downtime in IPS mode.
#- delayed-detect: yes
# Suricata is multi-threaded. Here the threading can be influenced.
threading:

Loading…
Cancel
Save