threads: add management API

Currently management threads do their own thread setup and handling. This
patch introduces a new way of handling management threads.

Functionality that needs to run as a management thread can now register
itself as a regular 'thread module' (TmModule), where the 'Management'
callback is registered.
pull/1058/head
Victor Julien 11 years ago
parent f1185d051c
commit 46cee88ef8

@ -33,6 +33,7 @@
#define TM_FLAG_STREAM_TM 0x04
#define TM_FLAG_DETECT_TM 0x08
#define TM_FLAG_LOGAPI_TM 0x10 /**< TM is run by Log API */
#define TM_FLAG_MANAGEMENT_TM 0x20
typedef struct TmModule_ {
char *name;
@ -47,6 +48,8 @@ typedef struct TmModule_ {
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
TmEcode (*Management)(ThreadVars *, void *);
/** global Init/DeInit */
TmEcode (*Init)(void);
TmEcode (*DeInit)(void);

@ -895,6 +895,82 @@ void *TmThreadsSlotVar(void *td)
return NULL;
}
static void *TmThreadsManagement(void *td)
{
/* block usr2. usr2 to be handled by the main thread only */
UtilSignalBlock(SIGUSR2);
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = (TmSlot *)tv->tm_slots;
TmEcode r = TM_ECODE_OK;
BUG_ON(s == NULL);
/* Set the thread name */
if (SCSetThreadName(tv->name) < 0) {
SCLogWarning(SC_ERR_THREAD_INIT, "Unable to set thread name");
}
if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
/* Drop the capabilities for this thread */
SCDropCaps(tv);
SCLogDebug("%s starting", tv->name);
if (s->SlotThreadInit != NULL) {
void *slot_data = NULL;
r = s->SlotThreadInit(tv, s->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
EngineKill();
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit((void *) -1);
return NULL;
}
(void)SC_ATOMIC_SET(s->slot_data, slot_data);
}
memset(&s->slot_pre_pq, 0, sizeof(PacketQueue));
memset(&s->slot_post_pq, 0, sizeof(PacketQueue));
tv->sc_perf_pca = SCPerfGetAllCountersArray(&tv->sc_perf_pctx);
SCPerfAddToClubbedTMTable((tv->thread_group_name != NULL) ?
tv->thread_group_name : tv->name, &tv->sc_perf_pctx);
TmThreadsSetFlag(tv, THV_INIT_DONE);
r = s->Management(tv, SC_ATOMIC_GET(s->slot_data));
/* handle error */
if (r == TM_ECODE_FAILED) {
TmThreadsSetFlag(tv, THV_FAILED);
}
if (TmThreadsCheckFlag(tv, THV_KILL)) {
SCPerfSyncCounters(tv);
}
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv, THV_DEINIT);
if (s->SlotThreadExitPrintStats != NULL) {
s->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(s->slot_data));
}
if (s->SlotThreadDeinit != NULL) {
r = s->SlotThreadDeinit(tv, SC_ATOMIC_GET(s->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) -1);
return NULL;
}
}
TmThreadsSetFlag(tv, THV_CLOSED);
pthread_exit((void *) 0);
return NULL;
}
/**
* \brief We set the slot functions.
*
@ -929,6 +1005,8 @@ TmEcode TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *))
tv->tm_func = TmThreadsSlotVar;
} else if (strcmp(name, "pktacqloop") == 0) {
tv->tm_func = TmThreadsSlotPktAcqLoop;
} else if (strcmp(name, "management") == 0) {
tv->tm_func = TmThreadsManagement;
} else if (strcmp(name, "custom") == 0) {
if (fn_p == NULL)
goto error;
@ -994,6 +1072,7 @@ static inline TmSlot * _TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *
SC_ATOMIC_INIT(slot->SlotFunc);
(void)SC_ATOMIC_SET(slot->SlotFunc, tm->Func);
slot->PktAcqLoop = tm->PktAcqLoop;
slot->Management = tm->Management;
slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats;
slot->SlotThreadDeinit = tm->ThreadDeinit;
/* we don't have to check for the return value "-1". We wouldn't have
@ -1538,6 +1617,38 @@ ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *),
return tv;
}
/**
* \brief Creates and returns the TV instance for a Management thread(MGMT).
* This function supports only custom slot functions and hence a
* function pointer should be sent as an argument.
*
* \param name Name of this TV instance
* \param module Name of TmModule with MANAGEMENT flag set.
* \param mucond Flag to indicate whether to initialize the condition
* and the mutex variables for this newly created TV.
*
* \retval the newly created TV instance, or NULL on error
*/
ThreadVars *TmThreadCreateMgmtThreadByName(char *name, char *module,
int mucond)
{
ThreadVars *tv = NULL;
tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "management", NULL, mucond);
if (tv != NULL) {
tv->type = TVT_MGMT;
TmThreadSetCPU(tv, MANAGEMENT_CPU_SET);
TmModule *m = TmModuleGetByName(module);
if (m) {
TmSlotSetFuncAppend(tv, m, NULL);
}
}
return tv;
}
/**
* \brief Creates and returns the TV instance for a CMD thread.
* This function supports only custom slot functions and hence a

@ -70,6 +70,10 @@ typedef struct TmSlot_ {
/* linked list, only used when you have multiple slots(used by TmVarSlot) */
struct TmSlot_ *slot_next;
/* just called once, so not perf critical */
TmEcode (*Management)(ThreadVars *, void *);
} TmSlot;
extern ThreadVars *tv_root[TVT_MAX];
@ -85,6 +89,8 @@ ThreadVars *TmThreadCreate(char *, char *, char *, char *, char *, char *,
ThreadVars *TmThreadCreatePacketHandler(char *, char *, char *, char *, char *,
char *);
ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *), int);
ThreadVars *TmThreadCreateMgmtThreadByName(char *name, char *module,
int mucond);
ThreadVars *TmThreadCreateCmdThread(char *name, void *(fn_p)(void *), int);
TmEcode TmThreadSpawn(ThreadVars *);
void TmThreadSetFlags(ThreadVars *, uint8_t);

Loading…
Cancel
Save