initial thread code support

remotes/origin/master-1.0.x
Gurvinder Singh 16 years ago committed by Victor Julien
parent 47eb168713
commit 48c94bbf03

@ -390,7 +390,7 @@ void AppLayerDetectProtoThreadSpawn()
printf("ERROR: TmThreadsCreate failed\n"); printf("ERROR: TmThreadsCreate failed\n");
exit(1); exit(1);
} }
if (TmThreadSpawn(tv_applayerdetect) != 0) { if (TmThreadSpawn(tv_applayerdetect) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n"); printf("ERROR: TmThreadSpawn failed\n");
exit(1); exit(1);
} }

@ -103,7 +103,7 @@ void PerfSpawnThreads(void)
printf("ERROR: TmThreadsCreate failed\n"); printf("ERROR: TmThreadsCreate failed\n");
exit(1); exit(1);
} }
if (TmThreadSpawn(tv_wakeup) != 0) { if (TmThreadSpawn(tv_wakeup) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n"); printf("ERROR: TmThreadSpawn failed\n");
exit(1); exit(1);
} }
@ -114,7 +114,7 @@ void PerfSpawnThreads(void)
printf("ERROR: TmThreadsCreate failed\n"); printf("ERROR: TmThreadsCreate failed\n");
exit(1); exit(1);
} }
if (TmThreadSpawn(tv_mgmt) != 0) { if (TmThreadSpawn(tv_mgmt) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n"); printf("ERROR: TmThreadSpawn failed\n");
exit(1); exit(1);
} }

@ -533,7 +533,7 @@ int main(int argc, char **argv)
TmValidateQueueState(); TmValidateQueueState();
/* Wait till all the threads have been initialized */ /* Wait till all the threads have been initialized */
if (TmThreadWaitOnThreadInit() == -1) { if (TmThreadWaitOnThreadInit() == TM_ECODE_FAILED) {
printf("ERROR: Engine initialization failed, aborting...\n"); printf("ERROR: Engine initialization failed, aborting...\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }

@ -599,7 +599,7 @@ void FlowManagerThreadSpawn()
printf("ERROR: TmThreadsCreate failed\n"); printf("ERROR: TmThreadsCreate failed\n");
exit(1); exit(1);
} }
if (TmThreadSpawn(tv_flowmgr) != 0) { if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n"); printf("ERROR: TmThreadSpawn failed\n");
exit(1); exit(1);
} }

@ -45,6 +45,12 @@ enum {
TmModule tmm_modules[TMM_SIZE]; TmModule tmm_modules[TMM_SIZE];
/*Error codes for the thread modules*/
typedef enum {
TM_ECODE_OK = 0, /**< Thread module exits OK*/
TM_ECODE_FAILED, /**< Thread module exits due to failure*/
}TmEcode;
/** Global structure for Output Context */ /** Global structure for Output Context */
typedef struct LogFileCtx_ { typedef struct LogFileCtx_ {
FILE *fp; FILE *fp;
@ -61,7 +67,7 @@ LogFileCtx *LogFileNewCtx();
int LogFileFreeCtx(LogFileCtx *); int LogFileFreeCtx(LogFileCtx *);
TmModule *TmModuleGetByName(char *name); TmModule *TmModuleGetByName(char *name);
int TmModuleRegister(char *name, int (*module_func)(ThreadVars *, Packet *, void *)); TmEcode TmModuleRegister(char *name, int (*module_func)(ThreadVars *, Packet *, void *));
void TmModuleDebugList(void); void TmModuleDebugList(void);
void TmModuleRegisterTests(void); void TmModuleRegisterTests(void);

@ -343,7 +343,7 @@ void *TmThreadsSlot1(void *td) {
} }
/* separate run function so we can call it recursively */ /* separate run function so we can call it recursively */
static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) { static inline TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot) {
int r = 0; int r = 0;
TmSlot *s = NULL; TmSlot *s = NULL;
@ -355,7 +355,7 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot)
/* Encountered error. Return packets to packetpool and return */ /* Encountered error. Return packets to packetpool and return */
TmqhReleasePacketsToPacketPool(&s->slot_pq); TmqhReleasePacketsToPacketPool(&s->slot_pq);
TmThreadsSetFlag(tv, THV_FAILED); TmThreadsSetFlag(tv, THV_FAILED);
return 1; return TM_ECODE_FAILED;
} }
/* handle new packets */ /* handle new packets */
@ -366,19 +366,19 @@ static inline int TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot)
if (s->slot_next != NULL) { if (s->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next);
/* XXX handle error */ /* XXX handle error */
if (r == 1) { if (r == TM_ECODE_FAILED) {
//printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n"); //printf("TmThreadsSlotVarRun: recursive TmThreadsSlotVarRun returned 1\n");
TmqhReleasePacketsToPacketPool(&s->slot_pq); TmqhReleasePacketsToPacketPool(&s->slot_pq);
TmqhOutputPacketpool(tv, extra_p); TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED); TmThreadsSetFlag(tv, THV_FAILED);
return 1; return TM_ECODE_FAILED;
} }
} }
tv->tmqh_out(tv, extra_p); tv->tmqh_out(tv, extra_p);
} }
} }
return 0; return TM_ECODE_OK;
} }
void *TmThreadsSlotVar(void *td) { void *TmThreadsSlotVar(void *td) {
@ -421,7 +421,7 @@ void *TmThreadsSlotVar(void *td) {
} else { } else {
r = TmThreadsSlotVarRun(tv, p, s->s); r = TmThreadsSlotVarRun(tv, p, s->s);
/* XXX handle error */ /* XXX handle error */
if (r == 1) { if (r == TM_ECODE_FAILED) {
//printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n"); //printf("TmThreadsSlotVar: TmThreadsSlotVarRun returned 1, breaking out of the loop.\n");
TmqhOutputPacketpool(tv, p); TmqhOutputPacketpool(tv, p);
TmThreadsSetFlag(tv, THV_FAILED); TmThreadsSetFlag(tv, THV_FAILED);
@ -458,7 +458,7 @@ void *TmThreadsSlotVar(void *td) {
pthread_exit((void *) 0); pthread_exit((void *) 0);
} }
int TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) { TmEcode TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) {
uint16_t size = 0; uint16_t size = 0;
if (name == NULL) { if (name == NULL) {
@ -501,9 +501,9 @@ int TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) {
if (tv->tm_slots == NULL) goto error; if (tv->tm_slots == NULL) goto error;
memset(tv->tm_slots, 0, size); memset(tv->tm_slots, 0, size);
return 0; return TM_ECODE_OK;
error: error:
return -1; return TM_ECODE_FAILED;
} }
void Tm1SlotSetFunc(ThreadVars *tv, TmModule *tm, void *data) { void Tm1SlotSetFunc(ThreadVars *tv, TmModule *tm, void *data) {
@ -569,10 +569,10 @@ static int SetCPUAffinity(int cpu) {
return 0; return 0;
} }
int TmThreadSetCPUAffinity(ThreadVars *tv, int cpu) { TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, int cpu) {
tv->set_cpu_affinity = 1; tv->set_cpu_affinity = 1;
tv->cpu_affinity = cpu; tv->cpu_affinity = cpu;
return 0; return TM_ECODE_OK;
} }
/** /**
@ -661,7 +661,7 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
} }
} }
if (TmThreadSetSlots(tv, slots, fn_p) != 0) { if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) {
goto error; goto error;
} }
@ -841,15 +841,15 @@ void TmThreadKillThreads(void) {
/** /**
* \brief Spawns a thread associated with the ThreadVars instance tv * \brief Spawns a thread associated with the ThreadVars instance tv
* *
* \retval 0 on success and -1 on failure * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
*/ */
int TmThreadSpawn(ThreadVars *tv) TmEcode TmThreadSpawn(ThreadVars *tv)
{ {
pthread_attr_t attr; pthread_attr_t attr;
if (tv->tm_func == NULL) { if (tv->tm_func == NULL) {
printf("ERROR: no thread function set\n"); printf("ERROR: no thread function set\n");
return -1; return TM_ECODE_FAILED;
} }
/* Initialize and set thread detached attribute */ /* Initialize and set thread detached attribute */
@ -859,12 +859,12 @@ int TmThreadSpawn(ThreadVars *tv)
int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv);
if (rc) { if (rc) {
printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc); printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc);
return -1; return TM_ECODE_FAILED;
} }
TmThreadAppend(tv, tv->type); TmThreadAppend(tv, tv->type);
return 0; return TM_ECODE_OK;
} }
/** /**
@ -1021,7 +1021,7 @@ static void TmThreadRestartThread(ThreadVars *tv)
TmThreadsUnsetFlag(tv, THV_CLOSED); TmThreadsUnsetFlag(tv, THV_CLOSED);
TmThreadsUnsetFlag(tv, THV_FAILED); TmThreadsUnsetFlag(tv, THV_FAILED);
if (TmThreadSpawn(tv) != 0) { if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("Error: TmThreadSpawn failed\n"); printf("Error: TmThreadSpawn failed\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -1068,10 +1068,10 @@ void TmThreadCheckThreadState(void)
/** \brief Used to check if all threads have finished their initialization. On /** \brief Used to check if all threads have finished their initialization. On
* finding an un-initialized thread, it waits till that thread completes * finding an un-initialized thread, it waits till that thread completes
* its initialization, before proceeding to the next thread. * its initialization, before proceeding to the next thread.
* \retval 0 all initialized properly * \retval TM_ECODE_OK all initialized properly
* \retval -1 failure * \retval TM_ECODE_FAILED failure
*/ */
int TmThreadWaitOnThreadInit(void) TmEcode TmThreadWaitOnThreadInit(void)
{ {
ThreadVars *tv = NULL; ThreadVars *tv = NULL;
int i = 0; int i = 0;
@ -1091,7 +1091,7 @@ int TmThreadWaitOnThreadInit(void)
TmThreadsCheckFlag(tv, THV_FAILED)) TmThreadsCheckFlag(tv, THV_FAILED))
{ {
printf("Thread \"%s\" failed to initialize...\n", tv->name); printf("Thread \"%s\" failed to initialize...\n", tv->name);
return -1; return TM_ECODE_FAILED;
} }
} }
@ -1104,7 +1104,7 @@ int TmThreadWaitOnThreadInit(void)
SCLogInfo("all %"PRIu16" packet processing threads, %"PRIu16" management " SCLogInfo("all %"PRIu16" packet processing threads, %"PRIu16" management "
"threads initialized, engine started.", ppt_num, mgt_num); "threads initialized, engine started.", ppt_num, mgt_num);
return 0; return TM_ECODE_OK;
} }
/** /**

@ -26,7 +26,7 @@ ThreadVars *TmThreadCreatePacketHandler(char *, char *, char *, char *, char *,
ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *), int); ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *), int);
int TmThreadSpawn(ThreadVars *); TmEcode TmThreadSpawn(ThreadVars *);
void TmThreadSetFlags(ThreadVars *, uint8_t); void TmThreadSetFlags(ThreadVars *, uint8_t);
@ -36,7 +36,7 @@ void TmThreadKillThreads(void);
void TmThreadAppend(ThreadVars *, int); void TmThreadAppend(ThreadVars *, int);
int TmThreadSetCPUAffinity(ThreadVars *, int); TmEcode TmThreadSetCPUAffinity(ThreadVars *, int);
void TmThreadInitMC(ThreadVars *); void TmThreadInitMC(ThreadVars *);
@ -52,7 +52,7 @@ void TmThreadPauseThreads(void);
void TmThreadCheckThreadState(void); void TmThreadCheckThreadState(void);
int TmThreadWaitOnThreadInit(void); TmEcode TmThreadWaitOnThreadInit(void);
inline int TmThreadsCheckFlag(ThreadVars *, uint8_t); inline int TmThreadsCheckFlag(ThreadVars *, uint8_t);
inline void TmThreadsSetFlag(ThreadVars *, uint8_t); inline void TmThreadsSetFlag(ThreadVars *, uint8_t);

Loading…
Cancel
Save