diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 5ab7b2b0eb..4f146ab83f 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -523,10 +523,10 @@ static inline void FlowForceReassemblyForHash(void) { Flow *f; TcpSession *ssn; - int client_ok; - int server_ok; - + int client_ok = 0; + int server_ok = 0; uint32_t idx = 0; + #if 0 /* We use this packet just for reassembly purpose */ Packet *reassemble_p = PacketGetFromAlloc(); @@ -676,7 +676,7 @@ static inline void FlowForceReassemblyForHash(void) void FlowForceReassembly(void) { /* Do remember. We need to have packet acquire disabled by now */ - +#if 0 /** ----- Part 1 ------*/ /* Flush out unattended packets */ FlowForceReassemblyFlushPendingPseudoPackets(); @@ -721,7 +721,7 @@ void FlowForceReassembly(void) } SCMutexUnlock(&tv_root_lock); - +#endif /** ----- Part 3 ----- **/ /* Carry out flow reassembly for unattended flows */ FlowForceReassemblyForHash(); diff --git a/src/runmode-unix-socket.c b/src/runmode-unix-socket.c index 83daaaf0c9..54c85fe739 100644 --- a/src/runmode-unix-socket.c +++ b/src/runmode-unix-socket.c @@ -290,21 +290,23 @@ TmEcode UnixSocketPcapFilesCheck(void *data) /* handle graceful shutdown of the flow engine, it's helper * threads and the packet threads */ FlowKillFlowManagerThread(); - TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM); + TmThreadDisableReceiveThreads(); FlowForceReassembly(); - TmThreadKillThreadsFamily(TVT_PPT); - TmThreadClearThreadsFamily(TVT_PPT); + TmThreadDisablePacketThreads(); FlowKillFlowRecyclerThread(); - /* kill remaining mgt threads */ + /* kill the stats threads */ TmThreadKillThreadsFamily(TVT_MGMT); TmThreadClearThreadsFamily(TVT_MGMT); - SCPerfReleaseResources(); - RunModeShutDown(); + /* kill packet threads -- already in 'disabled' state */ + TmThreadKillThreadsFamily(TVT_PPT); + TmThreadClearThreadsFamily(TVT_PPT); /* mgt and ppt threads killed, we can run non thread-safe * shutdown functions */ + SCPerfReleaseResources(); + RunModeShutDown(); FlowShutdown(); HostCleanup(); StreamTcpFreeConfig(STREAM_VERBOSE); diff --git a/src/suricata.c b/src/suricata.c index 31a2411fe7..25dab7e149 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -2392,20 +2392,19 @@ int main(int argc, char **argv) FlowKillFlowManagerThread(); } - /* Disable packet acquire thread first */ - TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM); + /* Disable packet acquisition first */ + TmThreadDisableReceiveThreads(); if (suri.run_mode != RUNMODE_UNIX_SOCKET) { FlowForceReassembly(); + /* kill receive threads when they have processed all + * flow timeout packets */ + TmThreadDisablePacketThreads(); } SCPrintElapsedTime(&suri); if (suri.rule_reload == 1) { - /* Disable detect threads first. This is required by live rule swap */ - TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM | - TM_FLAG_STREAM_TM | TM_FLAG_DETECT_TM); - /* wait if live rule swap is in progress */ if (UtilSignalIsHandler(SIGUSR2, SignalHandlerSigusr2Idle)) { SCLogInfo("Live rule swap in progress. Waiting for it to end " @@ -2430,6 +2429,7 @@ int main(int argc, char **argv) FlowKillFlowRecyclerThread(); } + /* kill remaining threads */ TmThreadKillThreads(); if (suri.run_mode != RUNMODE_UNIX_SOCKET) { diff --git a/src/threadvars.h b/src/threadvars.h index f8277b7758..64c215bcca 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -45,6 +45,8 @@ struct TmSlot_; #define THV_DEINIT (1 << 7) #define THV_RUNNING_DONE (1 << 8) /** thread has completed running and is entering * the de-init phase */ +#define THV_KILL_PKTACQ (1 << 9) /**< flag thread to stop packet acq */ +#define THV_FLOW_LOOP (1 << 10) /**< thread is in flow shutdown loop */ /** Thread flags set and read by threads, to control the threads, when they * encounter certain conditions like failure */ diff --git a/src/tm-threads.c b/src/tm-threads.c index 60b18c1daf..64964c308f 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -185,6 +185,56 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, return TM_ECODE_OK; } +/** \internal + * + * \brief Process flow timeout packets + * + * Process flow timeout pseudo packets. During shutdown this loop + * is run until the flow engine kills the thread and the queue is + * empty. + */ +static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) +{ + TmSlot *stream_slot = NULL, *slot = NULL; + int run = 1; + int r = TM_ECODE_OK; + + for (slot = s; slot != NULL; slot = slot->slot_next) { + if (slot->tm_id == TMM_STREAMTCP) { + stream_slot = slot; + break; + } + } + + if (tv->stream_pq == NULL || stream_slot == NULL) + return r; + + SCLogDebug("flow end loop starting"); + while(run) { + Packet *p; + if (tv->stream_pq->len != 0) { + SCMutexLock(&tv->stream_pq->mutex_q); + p = PacketDequeue(tv->stream_pq); + SCMutexUnlock(&tv->stream_pq->mutex_q); + BUG_ON(p == NULL); + + if ((r = TmThreadsSlotProcessPkt(tv, stream_slot, p) != TM_ECODE_OK)) { + if (r == TM_ECODE_FAILED) + run = 0; + } + } else { + usleep(1); + } + + if (tv->stream_pq->len == 0 && TmThreadsCheckFlag(tv, THV_KILL)) { + run = 0; + } + } + SCLogDebug("flow end loop complete"); + + return r; +} + /* pcap/nfq @@ -297,7 +347,7 @@ void *TmThreadsSlotPktAcqLoop(void *td) r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); - if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL) + if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) { run = 0; } @@ -307,6 +357,11 @@ void *TmThreadsSlotPktAcqLoop(void *td) } SCPerfSyncCounters(tv); + TmThreadsSetFlag(tv, THV_FLOW_LOOP); + + /* process all pseudo packets the flow timeout may throw at us */ + TmThreadTimeoutLoop(tv, s); + PacketPoolDestroy(); TmThreadsSetFlag(tv, THV_RUNNING_DONE); @@ -1442,9 +1497,13 @@ void TmThreadKillThread(ThreadVars *tv) } /** - * \brief Disable all threads having the specified TMs. + * \brief Disable all threads having the specified TMs. + * + * Breaks out of the packet acquisition loop, and bumps + * into the 'flow loop', where it will process packets + * from the flow engine's shutdown handling. */ -void TmThreadDisableThreadsWithTMS(uint8_t tm_flags) +void TmThreadDisableReceiveThreads(void) { /* value in seconds */ #define THREAD_KILL_MAX_WAIT_TIME 60 @@ -1471,7 +1530,7 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags) while (slots != NULL) { TmModule *tm = TmModuleGetById(slots->tm_id); - if (tm->flags & tm_flags) { + if (tm->flags & TM_FLAG_RECEIVE_TM) { disable = 1; break; } @@ -1494,9 +1553,8 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags) } } - /* we found our receive TV. Send it a KILL signal. This is all - * we need to do to kill receive threads */ - TmThreadsSetFlag(tv, THV_KILL); + /* we found a receive TV. Send it a KILL_PKTACQ signal. */ + TmThreadsSetFlag(tv, THV_KILL_PKTACQ); if (tv->inq != NULL) { int i; @@ -1509,7 +1567,8 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags) SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } - while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + /* wait for it to enter the 'flow loop' stage */ + while (!TmThreadsCheckFlag(tv, THV_FLOW_LOOP)) { usleep(WAIT_TIME); total_wait_time += WAIT_TIME / 1000000.0; if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { @@ -1529,6 +1588,77 @@ void TmThreadDisableThreadsWithTMS(uint8_t tm_flags) return; } +/** + * \brief Disable all threads having the specified TMs. + */ +void TmThreadDisablePacketThreads(void) +{ + /* value in seconds */ +#define THREAD_KILL_MAX_WAIT_TIME 60 + /* value in microseconds */ +#define WAIT_TIME 100 + + double total_wait_time = 0; + + ThreadVars *tv = NULL; + + SCMutexLock(&tv_root_lock); + + /* all receive threads are part of packet processing threads */ + tv = tv_root[TVT_PPT]; + + /* we do have to keep in mind that TVs are arranged in the order + * right from receive to log. The moment we fail to find a + * receive TM amongst the slots in a tv, it indicates we are done + * with all receive threads */ + while (tv) { + if (tv->inq != NULL) { + /* we wait till we dry out all the inq packets, before we + * kill this thread. Do note that you should have disabled + * packet acquire by now using TmThreadDisableReceiveThreads()*/ + if (!(strlen(tv->inq->name) == strlen("packetpool") && + strcasecmp(tv->inq->name, "packetpool") == 0)) { + PacketQueue *q = &trans_q[tv->inq->id]; + while (q->len != 0) { + usleep(1000); + } + } + } + + /* we found our receive TV. Send it a KILL signal. This is all + * we need to do to kill receive threads */ + TmThreadsSetFlag(tv, THV_KILL); + + if (tv->inq != NULL) { + int i; + for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { + if (tv->inq->q_type == 0) + SCCondSignal(&trans_q[tv->inq->id].cond_q); + else + SCCondSignal(&data_queues[tv->inq->id].cond_q); + } + SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); + } + + while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { + usleep(WAIT_TIME); + total_wait_time += WAIT_TIME / 1000000.0; + if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { + SCLogError(SC_ERR_FATAL, "Engine unable to " + "disable detect thread - \"%s\". " + "Killing engine", tv->name); + exit(EXIT_FAILURE); + } + } + + tv = tv->next; + } + + SCMutexUnlock(&tv_root_lock); + + return; +} + TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name) { ThreadVars *tv = NULL; diff --git a/src/tm-threads.h b/src/tm-threads.h index 4d864d6ebe..9ffc9efba8 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -130,7 +130,8 @@ void TmThreadWaitForFlag(ThreadVars *, uint16_t); TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot); ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *); -void TmThreadDisableThreadsWithTMS(uint8_t tm_flags); +void TmThreadDisablePacketThreads(void); +void TmThreadDisableReceiveThreads(void); TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *); /**