From 5f198e3a1de3f507d07247ecbc72c440e3a0c4f1 Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Thu, 19 Jul 2012 11:00:42 +0530 Subject: [PATCH] Suricata shutdown updates + minor cleanup --- src/flow-timeout.c | 31 +++++++++++++++++++++-- src/suricata.c | 56 ++++++++++++++++++++++++++++++----------- src/tm-threads.c | 63 +++------------------------------------------- src/tm-threads.h | 3 +-- 4 files changed, 74 insertions(+), 79 deletions(-) diff --git a/src/flow-timeout.c b/src/flow-timeout.c index 9b86b51f6c..020f5cc09a 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -70,6 +70,29 @@ static ThreadVars *stream_pseudo_pkt_detect_prev_TV = NULL; static TmSlot *stream_pseudo_pkt_decode_tm_slot = NULL; static ThreadVars *stream_pseudo_pkt_decode_TV = NULL; +/** + * \internal + * \brief Flush out if we have any unattended packets. + */ +static inline void FlowForceReassemblyFlushPendingPseudoPackets(void) +{ + /* we don't lock the queue, since flow manager is dead */ + if (stream_pseudo_pkt_decode_tm_slot->slot_post_pq.len == 0) + return; + + SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); + Packet *p = PacketDequeue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq); + SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); + if (TmThreadsSlotProcessPkt(stream_pseudo_pkt_decode_TV, + stream_pseudo_pkt_decode_tm_slot, + p) != TM_ECODE_OK) { + SCLogError(SC_ERR_TM_THREADS_ERROR, "Received error from FFR on " + "flushing packets through decode->.. TMs"); + } + + return; +} + /** * \internal * \brief Pseudo packet setup for flow forced reassembly. @@ -604,7 +627,11 @@ void FlowForceReassembly(void) { /* Do remember. We need to have packet acquire disabled by now */ - /** ----- Part 1 ----- **/ + /** ----- Part 1 ------*/ + /* Flush out unattended packets */ + FlowForceReassemblyFlushPendingPseudoPackets(); + + /** ----- Part 2 ----- **/ /* Check if all threads are idle. We need this so that we have all * packets freeds. As a consequence, no flows are in use */ @@ -632,7 +659,7 @@ void FlowForceReassembly(void) SCMutexUnlock(&tv_root_lock); - /** ----- Part 2 ----- **/ + /** ----- Part 3 ----- **/ /* Carry out flow reassembly for unattended flows */ FlowForceReassemblyForHash(); diff --git a/src/suricata.c b/src/suricata.c index 0ae3719452..e48d8ea792 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -1441,50 +1441,75 @@ int main(int argc, char **argv) } + /* nfq */ TmModuleReceiveNFQRegister(); TmModuleVerdictNFQRegister(); TmModuleDecodeNFQRegister(); + /* ipfw */ TmModuleReceiveIPFWRegister(); TmModuleVerdictIPFWRegister(); TmModuleDecodeIPFWRegister(); + /* pcap live */ TmModuleReceivePcapRegister(); TmModuleDecodePcapRegister(); + /* pcap file */ + TmModuleReceivePcapFileRegister(); + TmModuleDecodePcapFileRegister(); + /* af-packet */ TmModuleReceiveAFPRegister(); TmModuleDecodeAFPRegister(); + /* pfring */ TmModuleReceivePfringRegister(); TmModuleDecodePfringRegister(); - TmModuleReceivePcapFileRegister(); - TmModuleDecodePcapFileRegister(); + /* dag file */ + TmModuleReceiveErfFileRegister(); + TmModuleDecodeErfFileRegister(); + /* dag live */ + TmModuleReceiveErfDagRegister(); + TmModuleDecodeErfDagRegister(); + /* napatech */ + TmModuleNapatechFeedRegister(); + TmModuleNapatechDecodeRegister(); + + /* stream engine */ + TmModuleStreamTcpRegister(); + /* detection */ TmModuleDetectRegister(); - TmModuleAlertFastLogRegister(); - TmModuleAlertDebugLogRegister(); - TmModuleAlertPreludeRegister(); + /* respond-reject */ TmModuleRespondRejectRegister(); + + /* fast log */ + TmModuleAlertFastLogRegister(); TmModuleAlertFastLogIPv4Register(); TmModuleAlertFastLogIPv6Register(); + /* debug log */ + TmModuleAlertDebugLogRegister(); + /* prelue log */ + TmModuleAlertPreludeRegister(); + /* syslog log */ + TmModuleAlertSyslogRegister(); TmModuleAlertSyslogIPv4Register(); TmModuleAlertSyslogIPv6Register(); + /* unified2 log */ TmModuleUnified2AlertRegister(); - TmModuleAlertSyslogRegister(); + /* pcap info log */ TmModuleAlertPcapInfoRegister(); + /* drop log */ TmModuleLogDropLogRegister(); - TmModuleStreamTcpRegister(); + /* http log */ TmModuleLogHttpLogRegister(); TmModuleLogHttpLogIPv4Register(); TmModuleLogHttpLogIPv6Register(); + /* pcap log */ TmModulePcapLogRegister(); + /* file log */ TmModuleLogFileLogRegister(); TmModuleLogFilestoreRegister(); + /* cuda */ #ifdef __SC_CUDA_SUPPORT__ TmModuleCudaMpmB2gRegister(); TmModuleCudaPacketBatcherRegister(); #endif - TmModuleReceiveErfFileRegister(); - TmModuleDecodeErfFileRegister(); - TmModuleReceiveErfDagRegister(); - TmModuleDecodeErfDagRegister(); - TmModuleNapatechFeedRegister(); - TmModuleNapatechDecodeRegister(); TmModuleDebugList(); AppLayerHtpNeedFileInspection(); @@ -1903,7 +1928,7 @@ int main(int argc, char **argv) FlowKillFlowManagerThread(); /* Disable packet acquire thread first */ - TmThreadDisableReceiveThreads(); + TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM); FlowForceReassembly(); @@ -1916,7 +1941,8 @@ int main(int argc, char **argv) if (rule_reload == 1) { /* Disable detect threads first. This is required by live rule swap */ - TmThreadDisableUptoDetectThreads(); + TmThreadDisableThreadsWithTMS(TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM | + TM_FLAG_STREAM_TM | TM_FLAG_DETECT_TM); /* wait if live rule swap is in progress */ if (UtilSignalIsHandler(SIGUSR2, SignalHandlerSigusr2Idle)) { diff --git a/src/tm-threads.c b/src/tm-threads.c index afdb4446a1..f24abcaafc 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1494,65 +1494,9 @@ void TmThreadKillThread(ThreadVars *tv) } /** - * \brief Disable receive threads. + * \brief Disable all threads having the specified TMs. */ -void TmThreadDisableReceiveThreads(void) -{ - /* value in seconds */ -#define THREAD_KILL_MAX_WAIT_TIME 60 - /* value in microseconds */ -#define WAIT_TIME 100 - - double total_wait_time = 0; - - ThreadVars *tv = NULL; - - SCMutexLock(&tv_root_lock); - - /* all receive threads are part of packet processing threads */ - tv = tv_root[TVT_PPT]; - - /* we do have to keep in mind that TVs are arranged in the order - * right from receive to log. The moment we fail to find a - * receive TM amongst the slots in a tv, it indicates we are done - * with all receive threads */ - while (tv) { - /* obtain the slots for this TV */ - TmSlot *slots = tv->tm_slots; - TmModule *tm = TmModuleGetById(slots->tm_id); - - if (!(tm->flags & TM_FLAG_RECEIVE_TM)) { - tv = tv->next; - continue; - } - - /* we found our receive TV. Send it a KILL signal. This is all - * we need to do to kill receive threads */ - TmThreadsSetFlag(tv, THV_KILL); - - while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { - usleep(WAIT_TIME); - total_wait_time += WAIT_TIME / 1000000.0; - if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) { - SCLogError(SC_ERR_FATAL, "Engine unable to " - "disable receive thread - \"%s\". " - "Killing engine", tv->name); - exit(EXIT_FAILURE); - } - } - - tv = tv->next; - } - - SCMutexUnlock(&tv_root_lock); - - return; -} - -/** - * \brief Disable all threads <= detect. - */ -void TmThreadDisableUptoDetectThreads(void) +void TmThreadDisableThreadsWithTMS(uint8_t tm_flags) { /* value in seconds */ #define THREAD_KILL_MAX_WAIT_TIME 60 @@ -1579,8 +1523,7 @@ void TmThreadDisableUptoDetectThreads(void) while (slots != NULL) { TmModule *tm = TmModuleGetById(slots->tm_id); - if (tm->flags & (TM_FLAG_RECEIVE_TM | TM_FLAG_DECODE_TM | - TM_FLAG_STREAM_TM | TM_FLAG_DETECT_TM)) { + if (tm->flags & tm_flags) { disable = 1; break; } diff --git a/src/tm-threads.h b/src/tm-threads.h index 7f16384be8..daee37c6b9 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -112,8 +112,7 @@ void TmThreadWaitForFlag(ThreadVars *, uint8_t); TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot); ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *); -void TmThreadDisableReceiveThreads(void); -void TmThreadDisableUptoDetectThreads(void); +void TmThreadDisableThreadsWithTMS(uint8_t tm_flags); TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *); /**