flow-time: use live threads at shutdown

Update pktacq loop to process flow timeouts in a running engine.

Add a new step to the shutdown phase of packet acquisition loop
threads (pktacqloop).

The shutdown code lets the pktacqloop break out of it's packet
acquisition loop. The thread then enters a flow timeout loop, where
it processes packets from it's tv->stream_pq queue until it's
empty _and_ the KILL flag is set.

Make sure receive threads are done before moving on to flow hash
cleanup (recycle all). Without this the flow recycler could start
it's unconditional hash clean up while detect threads are still
running on the flows.

Update unix socket to match live modes.
pull/1268/head
Victor Julien 11 years ago
parent c6ec92d9b1
commit 8e86f387a6

@ -523,10 +523,10 @@ static inline void FlowForceReassemblyForHash(void)
{
Flow *f;
TcpSession *ssn;
int client_ok;
int server_ok;
int client_ok = 0;
int server_ok = 0;
uint32_t idx = 0;
#if 0
/* We use this packet just for reassembly purpose */
Packet *reassemble_p = PacketGetFromAlloc();
@ -676,7 +676,7 @@ static inline void FlowForceReassemblyForHash(void)
void FlowForceReassembly(void)
{
/* Do remember. We need to have packet acquire disabled by now */
#if 0
/** ----- Part 1 ------*/
/* Flush out unattended packets */
FlowForceReassemblyFlushPendingPseudoPackets();
@ -721,7 +721,7 @@ void FlowForceReassembly(void)
}
SCMutexUnlock(&tv_root_lock);
#endif
/** ----- Part 3 ----- **/
/* Carry out flow reassembly for unattended flows */
FlowForceReassemblyForHash();

@ -290,21 +290,23 @@ TmEcode UnixSocketPcapFilesCheck(void *data)
/* handle graceful shutdown of the flow engine, it's helper
* threads and the packet threads */
FlowKillFlowManagerThread();
TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
TmThreadDisableReceiveThreads();
FlowForceReassembly();
TmThreadKillThreadsFamily(TVT_PPT);
TmThreadClearThreadsFamily(TVT_PPT);
TmThreadDisablePacketThreads();
FlowKillFlowRecyclerThread();
/* kill remaining mgt threads */
/* kill the stats threads */
TmThreadKillThreadsFamily(TVT_MGMT);
TmThreadClearThreadsFamily(TVT_MGMT);
SCPerfReleaseResources();
RunModeShutDown();
/* kill packet threads -- already in 'disabled' state */
TmThreadKillThreadsFamily(TVT_PPT);
TmThreadClearThreadsFamily(TVT_PPT);
/* mgt and ppt threads killed, we can run non thread-safe
* shutdown functions */
SCPerfReleaseResources();
RunModeShutDown();
FlowShutdown();
HostCleanup();
StreamTcpFreeConfig(STREAM_VERBOSE);

@ -2392,20 +2392,19 @@ int main(int argc, char **argv)
FlowKillFlowManagerThread();
}
/* Disable packet acquire thread first */
TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM);
/* Disable packet acquisition first */
TmThreadDisableReceiveThreads();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
FlowForceReassembly();
/* kill receive threads when they have processed all
* flow timeout packets */
TmThreadDisablePacketThreads();
}
SCPrintElapsedTime(&suri);
if (suri.rule_reload == 1) {
/* Disable detect threads first. This is required by live rule swap */
TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM |
TM_FLAG_STREAM_TM | TM_FLAG_DETECT_TM);
/* wait if live rule swap is in progress */
if (UtilSignalIsHandler(SIGUSR2, SignalHandlerSigusr2Idle)) {
SCLogInfo("Live rule swap in progress. Waiting for it to end "
@ -2430,6 +2429,7 @@ int main(int argc, char **argv)
FlowKillFlowRecyclerThread();
}
/* kill remaining threads */
TmThreadKillThreads();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {

@ -45,6 +45,8 @@ struct TmSlot_;
#define THV_DEINIT (1 << 7)
#define THV_RUNNING_DONE (1 << 8) /** thread has completed running and is entering
* the de-init phase */
#define THV_KILL_PKTACQ (1 << 9) /**< flag thread to stop packet acq */
#define THV_FLOW_LOOP (1 << 10) /**< thread is in flow shutdown loop */
/** Thread flags set and read by threads, to control the threads, when they
* encounter certain conditions like failure */

@ -185,6 +185,56 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p,
return TM_ECODE_OK;
}
/** \internal
*
* \brief Process flow timeout packets
*
* Process flow timeout pseudo packets. During shutdown this loop
* is run until the flow engine kills the thread and the queue is
* empty.
*/
static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
{
TmSlot *stream_slot = NULL, *slot = NULL;
int run = 1;
int r = TM_ECODE_OK;
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->tm_id == TMM_STREAMTCP) {
stream_slot = slot;
break;
}
}
if (tv->stream_pq == NULL || stream_slot == NULL)
return r;
SCLogDebug("flow end loop starting");
while(run) {
Packet *p;
if (tv->stream_pq->len != 0) {
SCMutexLock(&tv->stream_pq->mutex_q);
p = PacketDequeue(tv->stream_pq);
SCMutexUnlock(&tv->stream_pq->mutex_q);
BUG_ON(p == NULL);
if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) {
if (r == TM_ECODE_FAILED)
run = 0;
}
} else {
usleep(1);
}
if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0;
}
}
SCLogDebug("flow end loop complete");
return r;
}
/*
pcap/nfq
@ -297,7 +347,7 @@ void *TmThreadsSlotPktAcqLoop(void *td)
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL)
if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL_PKTACQ)
|| suricata_ctl_flags) {
run = 0;
}
@ -307,6 +357,11 @@ void *TmThreadsSlotPktAcqLoop(void *td)
}
SCPerfSyncCounters(tv);
TmThreadsSetFlag(tv, THV_FLOW_LOOP);
/* process all pseudo packets the flow timeout may throw at us */
TmThreadTimeoutLoop(tv, s);
PacketPoolDestroy();
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
@ -1442,9 +1497,13 @@ void TmThreadKillThread(ThreadVars *tv)
}
/**
* \brief Disable all threads having the specified TMs.
* \brief Disable all threads having the specified TMs.
*
* Breaks out of the packet acquisition loop, and bumps
* into the 'flow loop', where it will process packets
* from the flow engine's shutdown handling.
*/
void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
void TmThreadDisableReceiveThreads(void)
{
/* value in seconds */
#define THREAD_KILL_MAX_WAIT_TIME 60
@ -1471,7 +1530,7 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
while (slots != NULL) {
TmModule *tm = TmModuleGetById(slots->tm_id);
if (tm->flags & tm_flags) {
if (tm->flags & TM_FLAG_RECEIVE_TM) {
disable = 1;
break;
}
@ -1494,9 +1553,8 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
}
}
/* we found our receive TV. Send it a KILL signal. This is all
* we need to do to kill receive threads */
TmThreadsSetFlag(tv, THV_KILL);
/* we found a receive TV. Send it a KILL_PKTACQ signal. */
TmThreadsSetFlag(tv, THV_KILL_PKTACQ);
if (tv->inq != NULL) {
int i;
@ -1509,7 +1567,8 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
/* wait for it to enter the 'flow loop' stage */
while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) {
usleep(WAIT_TIME);
total_wait_time += WAIT_TIME / 1000000.0;
if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
@ -1529,6 +1588,77 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags)
return;
}
/**
* \brief Disable all threads having the specified TMs.
*/
void TmThreadDisablePacketThreads(void)
{
/* value in seconds */
#define THREAD_KILL_MAX_WAIT_TIME 60
/* value in microseconds */
#define WAIT_TIME 100
double total_wait_time = 0;
ThreadVars *tv = NULL;
SCMutexLock(&tv_root_lock);
/* all receive threads are part of packet processing threads */
tv = tv_root[TVT_PPT];
/* we do have to keep in mind that TVs are arranged in the order
* right from receive to log. The moment we fail to find a
* receive TM amongst the slots in a tv, it indicates we are done
* with all receive threads */
while (tv) {
if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
while (q->len != 0) {
usleep(1000);
}
}
}
/* we found our receive TV. Send it a KILL signal. This is all
* we need to do to kill receive threads */
TmThreadsSetFlag(tv, THV_KILL);
if (tv->inq != NULL) {
int i;
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
if (tv->inq->q_type == 0)
SCCondSignal(&trans_q[tv->inq->id].cond_q);
else
SCCondSignal(&data_queues[tv->inq->id].cond_q);
}
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
}
while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
usleep(WAIT_TIME);
total_wait_time += WAIT_TIME / 1000000.0;
if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
SCLogError(SC_ERR_FATAL, "Engine unable to "
"disable detect thread - \"%s\". "
"Killing engine", tv->name);
exit(EXIT_FAILURE);
}
}
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
return;
}
TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name)
{
ThreadVars *tv = NULL;

@ -130,7 +130,8 @@ void TmThreadWaitForFlag(ThreadVars *, uint16_t);
TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *);
void TmThreadDisableThreadsWithTMS(uint8_t tm_flags);
void TmThreadDisablePacketThreads(void);
void TmThreadDisableReceiveThreads(void);
TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *);
/**

Loading…
Cancel
Save