updated to fix unix shutdown sequence

Should fix crashes occuring from unix mode shutdown/cleanup phase.
pull/263/merge
Anoop Saldanha 12 years ago committed by Victor Julien
parent d771e08156
commit 34a9c047fc

@ -698,7 +698,11 @@ void *AppLayerDetectProtoThread(void *td)
/* main loop */ /* main loop */
while(run) { while(run) {
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
/* grab a msg, can return NULL on signals */ /* grab a msg, can return NULL on signals */
StreamMsg *smsg = StreamMsgGetFromQueue(stream_q); StreamMsg *smsg = StreamMsgGetFromQueue(stream_q);

@ -465,7 +465,11 @@ static void *SCPerfMgmtThread(void *arg)
TmThreadsSetFlag(tv_local, THV_INIT_DONE); TmThreadsSetFlag(tv_local, THV_INIT_DONE);
while (run) { while (run) {
TmThreadTestThreadUnPaused(tv_local); if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
TmThreadTestThreadUnPaused(tv_local);
TmThreadsUnsetFlag(tv_local, THV_PAUSED);
}
cond_time.tv_sec = time(NULL) + sc_counter_tts; cond_time.tv_sec = time(NULL) + sc_counter_tts;
cond_time.tv_nsec = 0; cond_time.tv_nsec = 0;
@ -529,7 +533,11 @@ static void *SCPerfWakeupThread(void *arg)
TmThreadsSetFlag(tv_local, THV_INIT_DONE); TmThreadsSetFlag(tv_local, THV_INIT_DONE);
while (run) { while (run) {
TmThreadTestThreadUnPaused(tv_local); if (TmThreadsCheckFlag(tv_local, THV_PAUSE)) {
TmThreadsSetFlag(tv_local, THV_PAUSED);
TmThreadTestThreadUnPaused(tv_local);
TmThreadsUnsetFlag(tv_local, THV_PAUSED);
}
cond_time.tv_sec = time(NULL) + SC_PERF_WUT_TTS; cond_time.tv_sec = time(NULL) + SC_PERF_WUT_TTS;
cond_time.tv_nsec = 0; cond_time.tv_nsec = 0;

@ -445,7 +445,11 @@ void *FlowManagerThread(void *td)
TmThreadsSetFlag(th_v, THV_INIT_DONE); TmThreadsSetFlag(th_v, THV_INIT_DONE);
while (1) while (1)
{ {
TmThreadTestThreadUnPaused(th_v); if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
TmThreadsSetFlag(th_v, THV_PAUSED);
TmThreadTestThreadUnPaused(th_v);
TmThreadsUnsetFlag(th_v, THV_PAUSED);
}
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
emerg = TRUE; emerg = TRUE;

@ -681,6 +681,19 @@ void FlowForceReassembly(void)
while (q->len != 0) { while (q->len != 0) {
usleep(100); usleep(100);
} }
TmThreadsSetFlag(tv, THV_PAUSE);
if (tv->inq->q_type == 0)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
else
SCCondSignal(&data_queues[tv->inq->id].cond_q);
while (!TmThreadsCheckFlag(tv, THV_PAUSED)) {
if (tv->inq->q_type == 0)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
else
SCCondSignal(&data_queues[tv->inq->id].cond_q);
usleep(100);
}
TmThreadsUnsetFlag(tv, THV_PAUSE);
} }
} }
tv = tv->next; tv = tv->next;

@ -270,6 +270,7 @@ TmEcode UnixSocketPcapFilesCheck(void *data)
this->running = 0; this->running = 0;
TmThreadKillThreadsFamily(TVT_MGMT); TmThreadKillThreadsFamily(TVT_MGMT);
TmThreadClearThreadsFamily(TVT_MGMT); TmThreadClearThreadsFamily(TVT_MGMT);
TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
FlowForceReassembly(); FlowForceReassembly();
TmThreadKillThreadsFamily(TVT_PPT); TmThreadKillThreadsFamily(TVT_PPT);
TmThreadClearThreadsFamily(TVT_PPT); TmThreadClearThreadsFamily(TVT_PPT);

@ -33,17 +33,18 @@
struct TmSlot_; struct TmSlot_;
/** Thread flags set and read by threads to control the threads */ /** Thread flags set and read by threads to control the threads */
#define THV_USE 0x01 /** thread is in use */ #define THV_USE 1 /** thread is in use */
#define THV_INIT_DONE 0x02 /** thread initialization done */ #define THV_INIT_DONE (1 << 1) /** thread initialization done */
#define THV_PAUSE 0x04 /** thread has been paused */ #define THV_PAUSE (1 << 2) /** signal thread to pause itself */
#define THV_KILL 0x08 /** thread has been asked to cleanup and exit */ #define THV_PAUSED (1 << 3) /** the thread is paused atm */
#define THV_FAILED 0x10 /** thread has encountered an error and failed */ #define THV_KILL (1 << 4) /** thread has been asked to cleanup and exit */
#define THV_CLOSED 0x20 /** thread done, should be joinable */ #define THV_FAILED (1 << 5) /** thread has encountered an error and failed */
#define THV_CLOSED (1 << 6) /** thread done, should be joinable */
/* used to indicate the thread is going through de-init. Introduced as more /* used to indicate the thread is going through de-init. Introduced as more
* of a hack for solving stream-timeout-shutdown. Is set by the main thread. */ * of a hack for solving stream-timeout-shutdown. Is set by the main thread. */
#define THV_DEINIT 0x40 #define THV_DEINIT (1 << 7)
#define THV_RUNNING_DONE 0x80 /** thread has completed running and is entering #define THV_RUNNING_DONE (1 << 8) /** thread has completed running and is entering
* the de-init phase */ * the de-init phase */
/** Thread flags set and read by threads, to control the threads, when they /** Thread flags set and read by threads, to control the threads, when they
* encounter certain conditions like failure */ * encounter certain conditions like failure */
@ -59,7 +60,7 @@ typedef struct ThreadVars_ {
char *name; char *name;
char *thread_group_name; char *thread_group_name;
SC_ATOMIC_DECLARE(unsigned char, flags); SC_ATOMIC_DECLARE(unsigned short, flags);
/** aof(action on failure) determines what should be done with the thread /** aof(action on failure) determines what should be done with the thread
when it encounters certain conditions like failures */ when it encounters certain conditions like failures */

@ -89,7 +89,7 @@ uint8_t tv_aof = THV_RESTART_THREAD;
* \retval 1 flag is set. * \retval 1 flag is set.
* \retval 0 flag is not set. * \retval 0 flag is not set.
*/ */
int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) int TmThreadsCheckFlag(ThreadVars *tv, uint16_t flag)
{ {
return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0; return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0;
} }
@ -97,7 +97,7 @@ int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag)
/** /**
* \brief Set a thread flag. * \brief Set a thread flag.
*/ */
void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) void TmThreadsSetFlag(ThreadVars *tv, uint16_t flag)
{ {
SC_ATOMIC_OR(tv->flags, flag); SC_ATOMIC_OR(tv->flags, flag);
} }
@ -105,7 +105,7 @@ void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag)
/** /**
* \brief Unset a thread flag. * \brief Unset a thread flag.
*/ */
void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) void TmThreadsUnsetFlag(ThreadVars *tv, uint16_t flag)
{ {
SC_ATOMIC_AND(tv->flags, ~flag); SC_ATOMIC_AND(tv->flags, ~flag);
} }
@ -160,7 +160,11 @@ void *TmThreadsSlot1NoIn(void *td)
TmThreadsSetFlag(tv, THV_INIT_DONE); TmThreadsSetFlag(tv, THV_INIT_DONE);
while (run) { while (run) {
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq); r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), &s->slot_pre_pq, &s->slot_post_pq);
@ -262,7 +266,11 @@ void *TmThreadsSlot1NoOut(void *td)
TmThreadsSetFlag(tv, THV_INIT_DONE); TmThreadsSetFlag(tv, THV_INIT_DONE);
while (run) { while (run) {
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
p = tv->tmqh_in(tv); p = tv->tmqh_in(tv);
@ -348,7 +356,11 @@ void *TmThreadsSlot1NoInOut(void *td)
while (run) { while (run) {
TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc); TmSlotFunc SlotFunc = SC_ATOMIC_GET(s->SlotFunc);
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), /* no outqh, no pq */NULL, NULL); r = SlotFunc(tv, NULL, SC_ATOMIC_GET(s->slot_data), /* no outqh, no pq */NULL, NULL);
@ -428,7 +440,11 @@ void *TmThreadsSlot1(void *td)
TmThreadsSetFlag(tv, THV_INIT_DONE); TmThreadsSetFlag(tv, THV_INIT_DONE);
while (run) { while (run) {
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
/* input a packet */ /* input a packet */
p = tv->tmqh_in(tv); p = tv->tmqh_in(tv);
@ -657,7 +673,11 @@ void *TmThreadsSlotPktAcqLoop(void *td) {
TmThreadsSetFlag(tv, THV_INIT_DONE); TmThreadsSetFlag(tv, THV_INIT_DONE);
while(run) { while(run) {
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
@ -755,7 +775,11 @@ void *TmThreadsSlotVar(void *td)
s = (TmSlot *)tv->tm_slots; s = (TmSlot *)tv->tm_slots;
while (run) { while (run) {
TmThreadTestThreadUnPaused(tv); if (TmThreadsCheckFlag(tv, THV_PAUSE)) {
TmThreadsSetFlag(tv, THV_PAUSED);
TmThreadTestThreadUnPaused(tv);
TmThreadsUnsetFlag(tv, THV_PAUSED);
}
/* input a packet */ /* input a packet */
p = tv->tmqh_in(tv); p = tv->tmqh_in(tv);
@ -1954,7 +1978,7 @@ void TmThreadTestThreadUnPaused(ThreadVars *tv)
* *
* \param tv Pointer to the TV instance. * \param tv Pointer to the TV instance.
*/ */
void TmThreadWaitForFlag(ThreadVars *tv, uint8_t flags) void TmThreadWaitForFlag(ThreadVars *tv, uint16_t flags)
{ {
while (!TmThreadsCheckFlag(tv, flags)) { while (!TmThreadsCheckFlag(tv, flags)) {
usleep(100); usleep(100);

@ -113,10 +113,10 @@ ThreadVars *TmThreadsGetCallingThread(void);
void TmThreadActivateDummySlot(void); void TmThreadActivateDummySlot(void);
void TmThreadDeActivateDummySlot(void); void TmThreadDeActivateDummySlot(void);
int TmThreadsCheckFlag(ThreadVars *, uint8_t); int TmThreadsCheckFlag(ThreadVars *, uint16_t);
void TmThreadsSetFlag(ThreadVars *, uint8_t); void TmThreadsSetFlag(ThreadVars *, uint16_t);
void TmThreadsUnsetFlag(ThreadVars *, uint8_t); void TmThreadsUnsetFlag(ThreadVars *, uint16_t);
void TmThreadWaitForFlag(ThreadVars *, uint8_t); void TmThreadWaitForFlag(ThreadVars *, uint16_t);
TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot); TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);

Loading…
Cancel
Save