/* Copyright (C) 2007-2010 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free * Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * version 2 along with this program; if not, write to the Free Software * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA * 02110-1301, USA. */ /** * \file * * \author Victor Julien * \author Anoop Saldanha * \author Eric Leblond * * Thread management functions. */ #include "suricata-common.h" #include "suricata.h" #include "stream.h" #include "runmodes.h" #include "threadvars.h" #include "tm-queues.h" #include "tm-queuehandlers.h" #include "tm-threads.h" #include "tmqh-packetpool.h" #include "threads.h" #include "util-debug.h" #include #include #include "util-privs.h" #include "util-cpu.h" #include "util-optimize.h" #include "util-profiling.h" #ifdef OS_FREEBSD #include #include #include #include #include #define cpu_set_t cpuset_t #elif OS_DARWIN #include #include #include #define cpu_set_t thread_affinity_policy_data_t #define CPU_SET(cpu_id, new_mask) ((*(new_mask)).affinity_tag = ((cpu_id) + 1)) #define CPU_ISSET(cpu_id, new_mask) (((*(new_mask)).affinity_tag == ((cpu_id) + 1))) #define CPU_ZERO(new_mask) ((*(new_mask)).affinity_tag = THREAD_AFFINITY_TAG_NULL) #endif /* OS_FREEBSD */ /* prototypes */ static int SetCPUAffinity(uint16_t cpu); /* root of the threadvars list */ ThreadVars *tv_root[TVT_MAX] = { NULL }; /* lock to protect tv_root */ SCMutex tv_root_lock = PTHREAD_MUTEX_INITIALIZER; /* Action On Failure(AOF). Determines how the engine should behave when a * thread encounters a failure. Defaults to restart the failed thread */ uint8_t tv_aof = THV_RESTART_THREAD; /** * \brief Check if a thread flag is set. * * \retval 1 flag is set. * \retval 0 flag is not set. */ int TmThreadsCheckFlag(ThreadVars *tv, uint8_t flag) { return (SC_ATOMIC_GET(tv->flags) & flag) ? 1 : 0; } /** * \brief Set a thread flag. */ void TmThreadsSetFlag(ThreadVars *tv, uint8_t flag) { SC_ATOMIC_OR(tv->flags, flag); } /** * \brief Unset a thread flag. */ void TmThreadsUnsetFlag(ThreadVars *tv, uint8_t flag) { SC_ATOMIC_AND(tv->flags, ~flag); } /* 1 slot functions */ void *TmThreadsSlot1NoIn(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; char run = 1; TmEcode r = TM_ECODE_OK; /* Set the thread name */ SCSetThreadName(tv->name); /* Drop the capabilities for this thread */ SCDropCaps(tv); if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); if (s->SlotThreadInit != NULL) { r = s->SlotThreadInit(tv, s->slot_initdata, &s->slot_data); if (r != TM_ECODE_OK) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); while (run) { TmThreadTestThreadUnPaused(tv); PACKET_PROFILING_TMM_START(p, s->tm_id); r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq); PACKET_PROFILING_TMM_END(p, s->tm_id); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); SCMutexLock(&s->slot_post_pq.mutex_q); TmqhReleasePacketsToPacketPool(&s->slot_post_pq); SCMutexUnlock(&s->slot_post_pq.mutex_q); if (p != NULL) TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; } /* handle pre queue */ while (s->slot_pre_pq.top != NULL) { Packet *extra_p = PacketDequeue(&s->slot_pre_pq); if (extra_p != NULL) tv->tmqh_out(tv, extra_p); } tv->tmqh_out(tv, p); if (p != NULL) tv->tmqh_out(tv, p); /* handle post queue */ if (s->slot_post_pq.top != NULL) { SCMutexLock(&s->slot_post_pq.mutex_q); while (s->slot_post_pq.top != NULL) { Packet *extra_p = PacketDequeue(&s->slot_post_pq); if (extra_p != NULL) tv->tmqh_out(tv, extra_p); } SCMutexUnlock(&s->slot_post_pq.mutex_q); } if (TmThreadsCheckFlag(tv, THV_KILL)) { SCPerfSyncCounters(tv, 0); run = 0; } } /* while (run) */ TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); } if (s->SlotThreadDeinit != NULL) { r = s->SlotThreadDeinit(tv, s->slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } void *TmThreadsSlot1NoOut(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; char run = 1; TmEcode r = TM_ECODE_OK; /* Set the thread name */ SCSetThreadName(tv->name); /* Drop the capabilities for this thread */ SCDropCaps(tv); if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); if (s->SlotThreadInit != NULL) { r = s->SlotThreadInit(tv, s->slot_initdata, &s->slot_data); if (r != TM_ECODE_OK) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); while (run) { TmThreadTestThreadUnPaused(tv); p = tv->tmqh_in(tv); PACKET_PROFILING_TMM_START(p, s->tm_id); r = s->SlotFunc(tv, p, s->slot_data, /* no outqh no pq */ NULL, /* no outqh no pq */ NULL); PACKET_PROFILING_TMM_END(p, s->tm_id); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; } if (TmThreadsCheckFlag(tv, THV_KILL)) { SCPerfSyncCounters(tv, 0); run = 0; } } /* while (run) */ TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); } if (s->SlotThreadDeinit != NULL) { r = s->SlotThreadDeinit(tv, s->slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } void *TmThreadsSlot1NoInOut(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; char run = 1; TmEcode r = TM_ECODE_OK; /* Set the thread name */ SCSetThreadName(tv->name); /* Drop the capabilities for this thread */ SCDropCaps(tv); if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); SCLogDebug("%s starting", tv->name); if (s->SlotThreadInit != NULL) { r = s->SlotThreadInit(tv, s->slot_initdata, &s->slot_data); if (r != TM_ECODE_OK) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); TmThreadsSetFlag(tv, THV_INIT_DONE); while (run) { TmThreadTestThreadUnPaused(tv); r = s->SlotFunc(tv, NULL, s->slot_data, /* no outqh, no pq */NULL, NULL); /* handle error */ if (r == TM_ECODE_FAILED) { TmThreadsSetFlag(tv, THV_FAILED); break; } if (TmThreadsCheckFlag(tv, THV_KILL)) { SCPerfSyncCounters(tv, 0); run = 0; } } /* while (run) */ TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); } if (s->SlotThreadDeinit != NULL) { r = s->SlotThreadDeinit(tv, s->slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } void *TmThreadsSlot1(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; char run = 1; TmEcode r = TM_ECODE_OK; /* Set the thread name */ SCSetThreadName(tv->name); /* Drop the capabilities for this thread */ SCDropCaps(tv); if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); SCLogDebug("%s starting", tv->name); if (s->SlotThreadInit != NULL) { r = s->SlotThreadInit(tv, s->slot_initdata, &s->slot_data); if (r != TM_ECODE_OK) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); SCMutexInit(&s->slot_pre_pq.mutex_q, NULL); memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); SCMutexInit(&s->slot_post_pq.mutex_q, NULL); TmThreadsSetFlag(tv, THV_INIT_DONE); while (run) { TmThreadTestThreadUnPaused(tv); /* input a packet */ p = tv->tmqh_in(tv); if (p != NULL) { PACKET_PROFILING_TMM_START(p, s->tm_id); r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq); PACKET_PROFILING_TMM_END(p, s->tm_id); /* handle error */ if (r == TM_ECODE_FAILED) { TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); SCMutexLock(&s->slot_post_pq.mutex_q); TmqhReleasePacketsToPacketPool(&s->slot_post_pq); SCMutexUnlock(&s->slot_post_pq.mutex_q); TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; } while (s->slot_pre_pq.top != NULL) { /* handle new packets from this func */ Packet *extra_p = PacketDequeue(&s->slot_pre_pq); if (extra_p != NULL) { tv->tmqh_out(tv, extra_p); } } /* output the packet */ tv->tmqh_out(tv, p); } if (s->slot_post_pq.top != NULL) { SCMutexLock(&s->slot_post_pq.mutex_q); while (s->slot_post_pq.top != NULL) { /* handle new packets from this func */ Packet *extra_p = PacketDequeue(&s->slot_post_pq); if (extra_p != NULL) { tv->tmqh_out(tv, extra_p); } } SCMutexUnlock(&s->slot_post_pq.mutex_q); } if (TmThreadsCheckFlag(tv, THV_KILL)) { SCPerfSyncCounters(tv, 0); run = 0; } } /* while (run) */ TmThreadWaitForFlag(tv, THV_DEINIT); if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); } if (s->SlotThreadDeinit != NULL) { r = s->SlotThreadDeinit(tv, s->slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } SCLogDebug("%s ending", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } /** * \brief Separate run function so we can call it recursively. * * \todo Deal with post_pq for slots beyond the first. */ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot) { TmEcode r; TmSlot *s; Packet *extra_p; for (s = slot; s != NULL; s = s->slot_next) { PACKET_PROFILING_TMM_START(p, s->tm_id); if (unlikely(s->id == 0)) { r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, &s->slot_post_pq); } else { r = s->SlotFunc(tv, p, s->slot_data, &s->slot_pre_pq, NULL); } PACKET_PROFILING_TMM_END(p, s->tm_id); /* handle error */ if (unlikely(r == TM_ECODE_FAILED)) { /* Encountered error. Return packets to packetpool and return */ TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); SCMutexLock(&s->slot_post_pq.mutex_q); TmqhReleasePacketsToPacketPool(&s->slot_post_pq); SCMutexUnlock(&s->slot_post_pq.mutex_q); TmThreadsSetFlag(tv, THV_FAILED); return TM_ECODE_FAILED; } /* handle new packets */ while (s->slot_pre_pq.top != NULL) { extra_p = PacketDequeue(&s->slot_pre_pq); if (unlikely(extra_p == NULL)) continue; /* see if we need to process the packet */ if (s->slot_next != NULL) { r = TmThreadsSlotVarRun(tv, extra_p, s->slot_next); if (unlikely(r == TM_ECODE_FAILED)) { TmqhReleasePacketsToPacketPool(&s->slot_pre_pq); SCMutexLock(&s->slot_post_pq.mutex_q); TmqhReleasePacketsToPacketPool(&s->slot_post_pq); SCMutexUnlock(&s->slot_post_pq.mutex_q); TmqhOutputPacketpool(tv, extra_p); TmThreadsSetFlag(tv, THV_FAILED); return TM_ECODE_FAILED; } } tv->tmqh_out(tv, extra_p); } } return TM_ECODE_OK; } /* pcap/nfq pkt read callback process_pkt pfring pkt read process_pkt slot: setup pkt_ack_loop(tv, slot_data) deinit process_pkt: while(s) run s; queue; */ void *TmThreadsSlotPktAcqLoop(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = tv->tm_slots; char run = 1; TmEcode r = TM_ECODE_OK; TmSlot *slot = NULL; /* Set the thread name */ SCSetThreadName(tv->name); /* Drop the capabilities for this thread */ SCDropCaps(tv); if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); /* check if we are setup properly */ if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { SCLogError(SC_ERR_FATAL, "TmSlot or ThreadVars badly setup: s=%p," " PktAcqLoop=%p, tmqh_in=%p," " tmqh_out=%p", s, s->PktAcqLoop, tv->tmqh_in, tv->tmqh_out); EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } for (slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadInit != NULL) { r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot->slot_data); if (r != TM_ECODE_OK) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&slot->slot_pre_pq, 0, sizeof(PacketQueue)); SCMutexInit(&slot->slot_pre_pq.mutex_q, NULL); memset(&slot->slot_post_pq, 0, sizeof(PacketQueue)); SCMutexInit(&slot->slot_post_pq.mutex_q, NULL); } TmThreadsSetFlag(tv, THV_INIT_DONE); while(run) { TmThreadTestThreadUnPaused(tv); r = s->PktAcqLoop(tv, s->slot_data, s); if (r == TM_ECODE_FAILED || TmThreadsCheckFlag(tv, THV_KILL)) { run = 0; } } SCPerfSyncCounters(tv, 0); TmThreadWaitForFlag(tv, THV_DEINIT); for (slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadExitPrintStats != NULL) { slot->SlotThreadExitPrintStats(tv, slot->slot_data); } if (slot->SlotThreadDeinit != NULL) { r = slot->SlotThreadDeinit(tv, slot->slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } } SCLogDebug("%s ending", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } /** * \todo Only the first "slot" currently makes the "post_pq" available * to the thread module. */ void *TmThreadsSlotVar(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = (TmSlot *)tv->tm_slots; Packet *p = NULL; char run = 1; TmEcode r = TM_ECODE_OK; /* Set the thread name */ SCSetThreadName(tv->name); /* Drop the capabilities for this thread */ SCDropCaps(tv); if (tv->thread_setup_flags != 0) TmThreadSetupOptions(tv); /* check if we are setup properly */ if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } for (; s != NULL; s = s->slot_next) { if (s->SlotThreadInit != NULL) { r = s->SlotThreadInit(tv, s->slot_initdata, &s->slot_data); if (r != TM_ECODE_OK) { EngineKill(); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } memset(&s->slot_pre_pq, 0, sizeof(PacketQueue)); SCMutexInit(&s->slot_pre_pq.mutex_q, NULL); memset(&s->slot_post_pq, 0, sizeof(PacketQueue)); SCMutexInit(&s->slot_post_pq.mutex_q, NULL); } TmThreadsSetFlag(tv, THV_INIT_DONE); s = (TmSlot *)tv->tm_slots; while (run) { TmThreadTestThreadUnPaused(tv); /* input a packet */ p = tv->tmqh_in(tv); if (p != NULL) { /* run the thread module(s) */ r = TmThreadsSlotVarRun(tv, p, s); if (r == TM_ECODE_FAILED) { TmqhOutputPacketpool(tv, p); TmThreadsSetFlag(tv, THV_FAILED); break; } /* output the packet */ tv->tmqh_out(tv, p); } /* if (p != NULL) */ /* now handle the post_pq packets */ TmSlot *slot; for (slot = s; slot != NULL; slot = slot->slot_next) { if (slot->slot_post_pq.top != NULL) { while (1) { SCMutexLock(&slot->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&slot->slot_post_pq); SCMutexUnlock(&slot->slot_post_pq.mutex_q); if (extra_p == NULL) break; if (slot->slot_next != NULL) { r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); if (r == TM_ECODE_FAILED) { SCMutexLock(&slot->slot_post_pq.mutex_q); TmqhReleasePacketsToPacketPool(&slot->slot_post_pq); SCMutexUnlock(&slot->slot_post_pq.mutex_q); TmqhOutputPacketpool(tv, extra_p); TmThreadsSetFlag(tv, THV_FAILED); break; } } /* output the packet */ tv->tmqh_out(tv, extra_p); } /* while */ } /* if */ } /* for */ if (TmThreadsCheckFlag(tv, THV_KILL)) { run = 0; } } /* while (run) */ SCPerfSyncCounters(tv, 0); TmThreadWaitForFlag(tv, THV_DEINIT); s = (TmSlot *)tv->tm_slots; for ( ; s != NULL; s = s->slot_next) { if (s->SlotThreadExitPrintStats != NULL) { s->SlotThreadExitPrintStats(tv, s->slot_data); } if (s->SlotThreadDeinit != NULL) { r = s->SlotThreadDeinit(tv, s->slot_data); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) -1); } } } SCLogDebug("%s ending", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); pthread_exit((void *) 0); } /** * \brief We set the slot functions. * * \param tv Pointer to the TV to set the slot function for. * \param name Name of the slot variant. * \param fn_p Pointer to a custom slot function. Used only if slot variant * "name" is "custom". * * \retval TmEcode TM_ECODE_OK on success; TM_ECODE_FAILED on failure. */ TmEcode TmThreadSetSlots(ThreadVars *tv, char *name, void *(*fn_p)(void *)) { if (name == NULL) { if (fn_p == NULL) { printf("Both slot name and function pointer can't be NULL inside " "TmThreadSetSlots\n"); goto error; } else { name = "custom"; } } if (strcmp(name, "1slot") == 0) { tv->tm_func = TmThreadsSlot1; } else if (strcmp(name, "1slot_noout") == 0) { tv->tm_func = TmThreadsSlot1NoOut; } else if (strcmp(name, "1slot_noin") == 0) { tv->tm_func = TmThreadsSlot1NoIn; } else if (strcmp(name, "1slot_noinout") == 0) { tv->tm_func = TmThreadsSlot1NoInOut; } else if (strcmp(name, "varslot") == 0) { tv->tm_func = TmThreadsSlotVar; } else if (strcmp(name, "pktacqloop") == 0) { tv->tm_func = TmThreadsSlotPktAcqLoop; } else if (strcmp(name, "custom") == 0) { if (fn_p == NULL) goto error; tv->tm_func = fn_p; } else { printf("Error: Slot \"%s\" not supported\n", name); goto error; } return TM_ECODE_OK; error: return TM_ECODE_FAILED; } ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot) { ThreadVars *tv; int i; SCMutexLock(&tv_root_lock); for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv) { TmSlot *slots = tv->tm_slots; while (slots != NULL) { if (slots == tm_slot) { SCMutexUnlock(&tv_root_lock); return tv; } slots = slots->slot_next; } tv = tv->next; } } SCMutexUnlock(&tv_root_lock); return NULL; } /** * \brief Appends a new entry to the slots. * * \param tv TV the slot is attached to. * \param tm TM to append. * \param data Data to be passed on to the slot init function. */ void TmSlotSetFuncAppend(ThreadVars *tv, TmModule *tm, void *data) { TmSlot *s = (TmSlot *)tv->tm_slots; TmSlot *slot = SCMalloc(sizeof(TmSlot)); if (slot == NULL) return; memset(slot, 0, sizeof(TmSlot)); slot->tv = tv; slot->SlotThreadInit = tm->ThreadInit; slot->slot_initdata = data; slot->SlotFunc = tm->Func; slot->PktAcqLoop = tm->PktAcqLoop; slot->SlotThreadExitPrintStats = tm->ThreadExitPrintStats; slot->SlotThreadDeinit = tm->ThreadDeinit; /* we don't have to check for the return value "-1". We wouldn't have * received a TM as arg, if it didn't exist */ slot->tm_id = TmModuleGetIDForTM(tm); tv->cap_flags |= tm->cap_flags; if (s == NULL) { tv->tm_slots = slot; slot->id = 0; } else { TmSlot *a = s, *b = NULL; /* get the last slot */ for ( ; a != NULL; a = a->slot_next) { b = a; } /* append the new slot */ if (b != NULL) { b->slot_next = slot; slot->id = b->id + 1; } } return; } /** * \brief Returns the slot holding a TM with the particular tm_id. * * \param tm_id TM id of the TM whose slot has to be returned. * * \retval slots Pointer to the slot. */ TmSlot *TmSlotGetSlotForTM(int tm_id) { ThreadVars *tv = NULL; TmSlot *slots; int i; SCMutexLock(&tv_root_lock); for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv) { slots = tv->tm_slots; while (slots != NULL) { if (slots->tm_id == tm_id) { SCMutexUnlock(&tv_root_lock); return slots; } slots = slots->slot_next; } tv = tv->next; } } SCMutexUnlock(&tv_root_lock); return NULL; } #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ static int SetCPUAffinitySet(cpu_set_t *cs) { #if defined OS_FREEBSD int r = cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, SCGetThreadIdLong(), sizeof(cpu_set_t),cs); #elif OS_DARWIN int r = thread_policy_set(mach_thread_self(), THREAD_AFFINITY_POLICY, (void*)cs, THREAD_AFFINITY_POLICY_COUNT); #else pid_t tid = syscall(SYS_gettid); int r = sched_setaffinity(tid, sizeof(cpu_set_t), cs); #endif /* OS_FREEBSD */ if (r != 0) { printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno)); return -1; } return 0; } #endif /** * \brief Set the thread affinity on the calling thread. * * \param cpuid Id of the core/cpu to setup the affinity. * * \retval 0 If all goes well; -1 if something is wrong. */ static int SetCPUAffinity(uint16_t cpuid) { #ifndef __CYGWIN__ #if !defined __OpenBSD__ int cpu = (int)cpuid; #endif #ifdef OS_WIN32 DWORD cs = 1 << cpu; #elif defined __OpenBSD__ return 0; #else cpu_set_t cs; CPU_ZERO(&cs); CPU_SET(cpu, &cs); #endif /* OS_WIN32 */ #ifdef OS_WIN32 int r = (0 == SetThreadAffinityMask(GetCurrentThread(), cs)); if (r != 0) { printf("Warning: sched_setaffinity failed (%" PRId32 "): %s\n", r, strerror(errno)); return -1; } SCLogDebug("CPU Affinity for thread %lu set to CPU %" PRId32, SCGetThreadIdLong(), cpu); return 0; #elif !defined __OpenBSD__ return SetCPUAffinitySet(&cs); #endif /* OS_WIN32 */ #endif } /** * \brief Set the thread options (thread priority). * * \param tv Pointer to the ThreadVars to setup the thread priority. * * \retval TM_ECODE_OK. */ TmEcode TmThreadSetThreadPriority(ThreadVars *tv, int prio) { tv->thread_setup_flags |= THREAD_SET_PRIORITY; tv->thread_priority = prio; return TM_ECODE_OK; } /** * \brief Adjusting nice value for threads. */ void TmThreadSetPrio(ThreadVars *tv) { SCEnter(); #ifndef __CYGWIN__ #ifdef OS_WIN32 if (0 == SetThreadPriority(GetCurrentThread(), tv->thread_priority)) { SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting priority for " "thread %s: %s", tv->name, strerror(errno)); } else { SCLogDebug("Priority set to %"PRId32" for thread %s", tv->thread_priority, tv->name); } #else int ret = nice(tv->thread_priority); if (ret == -1) { SCLogError(SC_ERR_THREAD_NICE_PRIO, "Error setting nice value " "for thread %s: %s", tv->name, strerror(errno)); } else { SCLogDebug("Nice value set to %"PRId32" for thread %s", tv->thread_priority, tv->name); } #endif /* OS_WIN32 */ #endif SCReturn; } /** * \brief Set the thread options (cpu affinity). * * \param tv pointer to the ThreadVars to setup the affinity. * \param cpu cpu on which affinity is set. * * \retval TM_ECODE_OK */ TmEcode TmThreadSetCPUAffinity(ThreadVars *tv, uint16_t cpu) { tv->thread_setup_flags |= THREAD_SET_AFFINITY; tv->cpu_affinity = cpu; return TM_ECODE_OK; } TmEcode TmThreadSetCPU(ThreadVars *tv, uint8_t type) { if (!threading_set_cpu_affinity) return TM_ECODE_OK; if (type > MAX_CPU_SET) { SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family"); return TM_ECODE_FAILED; } tv->thread_setup_flags |= THREAD_SET_AFFTYPE; tv->cpu_affinity = type; return TM_ECODE_OK; } int TmThreadGetNbThreads(uint8_t type) { if (type >= MAX_CPU_SET) { SCLogError(SC_ERR_INVALID_ARGUMENT, "invalid cpu type family"); return 0; } return thread_affinity[type].nb_threads; } /** * \brief Set the thread options (cpu affinitythread). * Priority should be already set by pthread_create. * * \param tv pointer to the ThreadVars of the calling thread. */ TmEcode TmThreadSetupOptions(ThreadVars *tv) { if (tv->thread_setup_flags & THREAD_SET_AFFINITY) { SCLogInfo("Setting affinity for \"%s\" Module to cpu/core " "%"PRIu16", thread id %lu", tv->name, tv->cpu_affinity, SCGetThreadIdLong()); SetCPUAffinity(tv->cpu_affinity); } #if !defined __CYGWIN__ && !defined OS_WIN32 && !defined __OpenBSD__ if (tv->thread_setup_flags & THREAD_SET_PRIORITY) TmThreadSetPrio(tv); if (tv->thread_setup_flags & THREAD_SET_AFFTYPE) { ThreadsAffinityType *taf = &thread_affinity[tv->cpu_affinity]; if (taf->mode_flag == EXCLUSIVE_AFFINITY) { int cpu = AffinityGetNextCPU(taf); SetCPUAffinity(cpu); /* If CPU is in a set overwrite the default thread prio */ if (CPU_ISSET(cpu, &taf->lowprio_cpu)) { tv->thread_priority = PRIO_LOW; } else if (CPU_ISSET(cpu, &taf->medprio_cpu)) { tv->thread_priority = PRIO_MEDIUM; } else if (CPU_ISSET(cpu, &taf->hiprio_cpu)) { tv->thread_priority = PRIO_HIGH; } else { tv->thread_priority = taf->prio; } SCLogInfo("Setting prio %d for \"%s\" Module to cpu/core " "%"PRIu16", thread id %lu", tv->thread_priority, tv->name, cpu, SCGetThreadIdLong()); } else { SetCPUAffinitySet(&taf->cpu_set); tv->thread_priority = taf->prio; } TmThreadSetPrio(tv); } #endif return TM_ECODE_OK; } /** * \brief Creates and returns the TV instance for a new thread. * * \param name Name of this TV instance * \param inq_name Incoming queue name * \param inqh_name Incoming queue handler name as set by TmqhSetup() * \param outq_name Outgoing queue name * \param outqh_name Outgoing queue handler as set by TmqhSetup() * \param slots String representation for the slot function to be used * \param fn_p Pointer to function when \"slots\" is of type \"custom\" * \param mucond Flag to indicate whether to initialize the condition * and the mutex variables for this newly created TV. * * \retval the newly created TV instance, or NULL on error */ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, char *outq_name, char *outqh_name, char *slots, void * (*fn_p)(void *), int mucond) { ThreadVars *tv = NULL; Tmq *tmq = NULL; Tmqh *tmqh = NULL; SCLogDebug("creating thread \"%s\"...", name); /* XXX create separate function for this: allocate a thread container */ tv = SCMalloc(sizeof(ThreadVars)); if (tv == NULL) goto error; memset(tv, 0, sizeof(ThreadVars)); SC_ATOMIC_INIT(tv->flags); SCMutexInit(&tv->sc_perf_pctx.m, NULL); tv->name = name; /* default state for every newly created thread */ TmThreadsSetFlag(tv, THV_PAUSE); TmThreadsSetFlag(tv, THV_USE); /* default aof for every newly created thread */ tv->aof = THV_RESTART_THREAD; /* set the incoming queue */ if (inq_name != NULL && strcmp(inq_name, "packetpool") != 0) { SCLogDebug("inq_name \"%s\"", inq_name); tmq = TmqGetQueueByName(inq_name); if (tmq == NULL) { tmq = TmqCreateQueue(inq_name); if (tmq == NULL) goto error; } SCLogDebug("tmq %p", tmq); tv->inq = tmq; tv->inq->reader_cnt++; SCLogDebug("tv->inq %p", tv->inq); } if (inqh_name != NULL) { SCLogDebug("inqh_name \"%s\"", inqh_name); tmqh = TmqhGetQueueHandlerByName(inqh_name); if (tmqh == NULL) goto error; tv->tmqh_in = tmqh->InHandler; tv->InShutdownHandler = tmqh->InShutdownHandler; SCLogDebug("tv->tmqh_in %p", tv->tmqh_in); } /* set the outgoing queue */ if (outqh_name != NULL) { SCLogDebug("outqh_name \"%s\"", outqh_name); tmqh = TmqhGetQueueHandlerByName(outqh_name); if (tmqh == NULL) goto error; tv->tmqh_out = tmqh->OutHandler; if (outq_name != NULL && strcmp(outq_name, "packetpool") != 0) { SCLogDebug("outq_name \"%s\"", outq_name); if (tmqh->OutHandlerCtxSetup != NULL) { tv->outctx = tmqh->OutHandlerCtxSetup(outq_name); tv->outq = NULL; } else { tmq = TmqGetQueueByName(outq_name); if (tmq == NULL) { tmq = TmqCreateQueue(outq_name); if (tmq == NULL) goto error; } SCLogDebug("tmq %p", tmq); tv->outq = tmq; tv->outctx = NULL; tv->outq->writer_cnt++; } } } if (TmThreadSetSlots(tv, slots, fn_p) != TM_ECODE_OK) { goto error; } if (mucond != 0) TmThreadInitMC(tv); return tv; error: printf("ERROR: failed to setup a thread.\n"); return NULL; } /** * \brief Creates and returns a TV instance for a Packet Processing Thread. * This function doesn't support custom slots, and hence shouldn't be * supplied \"custom\" as its slot type. All PPT threads are created * with a mucond(see TmThreadCreate declaration) of 0. Hence the tv * conditional variables are not used to kill the thread. * * \param name Name of this TV instance * \param inq_name Incoming queue name * \param inqh_name Incoming queue handler name as set by TmqhSetup() * \param outq_name Outgoing queue name * \param outqh_name Outgoing queue handler as set by TmqhSetup() * \param slots String representation for the slot function to be used * * \retval the newly created TV instance, or NULL on error */ ThreadVars *TmThreadCreatePacketHandler(char *name, char *inq_name, char *inqh_name, char *outq_name, char *outqh_name, char *slots) { ThreadVars *tv = NULL; tv = TmThreadCreate(name, inq_name, inqh_name, outq_name, outqh_name, slots, NULL, 0); if (tv != NULL) tv->type = TVT_PPT; return tv; } /** * \brief Creates and returns the TV instance for a Management thread(MGMT). * This function supports only custom slot functions and hence a * function pointer should be sent as an argument. * * \param name Name of this TV instance * \param fn_p Pointer to function when \"slots\" is of type \"custom\" * \param mucond Flag to indicate whether to initialize the condition * and the mutex variables for this newly created TV. * * \retval the newly created TV instance, or NULL on error */ ThreadVars *TmThreadCreateMgmtThread(char *name, void *(fn_p)(void *), int mucond) { ThreadVars *tv = NULL; tv = TmThreadCreate(name, NULL, NULL, NULL, NULL, "custom", fn_p, mucond); TmThreadSetCPU(tv, MANAGEMENT_CPU_SET); if (tv != NULL) tv->type = TVT_MGMT; return tv; } /** * \brief Appends this TV to tv_root based on its type * * \param type holds the type this TV belongs to. */ void TmThreadAppend(ThreadVars *tv, int type) { SCMutexLock(&tv_root_lock); if (tv_root[type] == NULL) { tv_root[type] = tv; tv->next = NULL; tv->prev = NULL; SCMutexUnlock(&tv_root_lock); return; } ThreadVars *t = tv_root[type]; while (t) { if (t->next == NULL) { t->next = tv; tv->prev = t; tv->next = NULL; break; } t = t->next; } SCMutexUnlock(&tv_root_lock); return; } /** * \brief Removes this TV from tv_root based on its type * * \param tv The tv instance to remove from the global tv list. * \param type Holds the type this TV belongs to. */ void TmThreadRemove(ThreadVars *tv, int type) { SCMutexLock(&tv_root_lock); if (tv_root[type] == NULL) { SCMutexUnlock(&tv_root_lock); return; } ThreadVars *t = tv_root[type]; while (t != tv) { t = t->next; } if (t != NULL) { if (t->prev != NULL) t->prev->next = t->next; if (t->next != NULL) t->next->prev = t->prev; if (t == tv_root[type]) tv_root[type] = t->next;; } SCMutexUnlock(&tv_root_lock); return; } /** * \brief Kill a thread. * * \param tv A ThreadVars instance corresponding to the thread that has to be * killed. */ void TmThreadKillThread(ThreadVars *tv) { int i = 0; if (tv == NULL) return; if (tv->inq != NULL) { /* we wait till we dry out all the inq packets, before we * kill this thread. Do note that you should have disabled * packet acquire by now using TmThreadDisableReceiveThreads()*/ if (!(strlen(tv->inq->name) == strlen("packetpool") && strcasecmp(tv->inq->name, "packetpool") == 0)) { PacketQueue *q = &trans_q[tv->inq->id]; while (q->len != 0) { usleep(1000); } } } /* set the thread flag informing the thread that it needs to be * terminated */ TmThreadsSetFlag(tv, THV_KILL); TmThreadsSetFlag(tv, THV_DEINIT); if (tv->inq != NULL) { /* signal the queue for the number of users */ if (tv->InShutdownHandler != NULL) { tv->InShutdownHandler(tv); } for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { if (tv->inq->q_type == 0) SCCondSignal(&trans_q[tv->inq->id].cond_q); else SCCondSignal(&data_queues[tv->inq->id].cond_q); } /* to be sure, signal more */ int cnt = 0; while (1) { if (TmThreadsCheckFlag(tv, THV_CLOSED)) { SCLogDebug("signalled the thread %" PRId32 " times", cnt); break; } cnt++; if (tv->InShutdownHandler != NULL) { tv->InShutdownHandler(tv); } for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { if (tv->inq->q_type == 0) SCCondSignal(&trans_q[tv->inq->id].cond_q); else SCCondSignal(&data_queues[tv->inq->id].cond_q); } usleep(100); } SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); } if (tv->cond != NULL ) { int cnt = 0; while (1) { if (TmThreadsCheckFlag(tv, THV_CLOSED)) { SCLogDebug("signalled the thread %" PRId32 " times", cnt); break; } cnt++; pthread_cond_broadcast(tv->cond); usleep(100); } } /* join it */ pthread_join(tv->t, NULL); SCLogDebug("thread %s stopped", tv->name); return; } /** * \brief Disable receive threads. */ void TmThreadDisableReceiveThreads(void) { ThreadVars *tv = NULL; SCMutexLock(&tv_root_lock); /* all receive threads are part of packet processing threads */ tv = tv_root[TVT_PPT]; /* we do have to keep in mind that TVs are arranged in the order * right from receive to log. The moment we fail to find a * receive TM amongst the slots in a tv, it indicates we are done * with all receive threads */ while (tv) { /* obtain the slots for this TV */ TmSlot *slots = tv->tm_slots; TmModule *tm = TmModuleGetById(slots->tm_id); if (!tm->flags & TM_FLAG_RECEIVE_TM) { tv = tv->next; continue; } /* we found our receive TV. Send it a KILL signal. This is all * we need to do to kill receive threads */ TmThreadsSetFlag(tv, THV_KILL); tv = tv->next; } SCMutexUnlock(&tv_root_lock); return; } TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name) { ThreadVars *tv = NULL; TmSlot *slots = NULL; SCMutexLock(&tv_root_lock); /* all receive threads are part of packet processing threads */ tv = tv_root[TVT_PPT]; while (tv) { slots = tv->tm_slots; while (slots != NULL) { TmModule *tm = TmModuleGetById(slots->tm_id); char *found = strstr(tm->name, tm_name); if (found != NULL) goto end; slots = slots->slot_next; } tv = tv->next; } end: SCMutexUnlock(&tv_root_lock); return slots; } void TmThreadKillThreads(void) { ThreadVars *tv = NULL; int i = 0; for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv) { TmThreadKillThread(tv); tv = tv->next; } } return; } /** * \brief Spawns a thread associated with the ThreadVars instance tv * * \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure */ TmEcode TmThreadSpawn(ThreadVars *tv) { pthread_attr_t attr; if (tv->tm_func == NULL) { printf("ERROR: no thread function set\n"); return TM_ECODE_FAILED; } /* Initialize and set thread detached attribute */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); int rc = pthread_create(&tv->t, &attr, tv->tm_func, (void *)tv); if (rc) { printf("ERROR; return code from pthread_create() is %" PRId32 "\n", rc); return TM_ECODE_FAILED; } TmThreadAppend(tv, tv->type); return TM_ECODE_OK; } /** * \brief Sets the thread flags for a thread instance(tv) * * \param tv Pointer to the thread instance for which the flag has to be set * \param flags Holds the thread state this thread instance has to be set to */ #if 0 void TmThreadSetFlags(ThreadVars *tv, uint8_t flags) { if (tv != NULL) tv->flags = flags; return; } #endif /** * \brief Sets the aof(Action on failure) for a thread instance(tv) * * \param tv Pointer to the thread instance for which the aof has to be set * \param aof Holds the aof this thread instance has to be set to */ void TmThreadSetAOF(ThreadVars *tv, uint8_t aof) { if (tv != NULL) tv->aof = aof; return; } /** * \brief Initializes the mutex and condition variables for this TV * * \param tv Pointer to a TV instance */ void TmThreadInitMC(ThreadVars *tv) { if ( (tv->m = SCMalloc(sizeof(SCMutex))) == NULL) { SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. " "Exiting..."); exit(EXIT_FAILURE); } if (SCMutexInit(tv->m, NULL) != 0) { printf("Error initializing the tv->m mutex\n"); exit(0); } if ( (tv->cond = SCMalloc(sizeof(SCCondT))) == NULL) { SCLogError(SC_ERR_FATAL, "Fatal error encountered in TmThreadInitMC. " "Exiting..."); exit(0); } if (SCCondInit(tv->cond, NULL) != 0) { SCLogError(SC_ERR_FATAL, "Error initializing the tv->cond condition " "variable"); exit(0); } return; } /** * \brief Tests if the thread represented in the arg has been unpaused or not. * * The function would return if the thread tv has been unpaused or if the * kill flag for the thread has been set. * * \param tv Pointer to the TV instance. */ void TmThreadTestThreadUnPaused(ThreadVars *tv) { while (TmThreadsCheckFlag(tv, THV_PAUSE)) { usleep(100); if (TmThreadsCheckFlag(tv, THV_KILL)) break; } return; } /** * \brief Waits till the specified flag(s) is(are) set. We don't bother if * the kill flag has been set or not on the thread. * * \param tv Pointer to the TV instance. */ void TmThreadWaitForFlag(ThreadVars *tv, uint8_t flags) { while (!TmThreadsCheckFlag(tv, flags)) { usleep(100); } return; } /** * \brief Unpauses a thread * * \param tv Pointer to a TV instance that has to be unpaused */ void TmThreadContinue(ThreadVars *tv) { TmThreadsUnsetFlag(tv, THV_PAUSE); return; } /** * \brief Unpauses all threads present in tv_root */ void TmThreadContinueThreads() { ThreadVars *tv = NULL; int i = 0; for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv != NULL) { TmThreadContinue(tv); tv = tv->next; } } return; } /** * \brief Pauses a thread * * \param tv Pointer to a TV instance that has to be paused */ void TmThreadPause(ThreadVars *tv) { TmThreadsSetFlag(tv, THV_PAUSE); return; } /** * \brief Pauses all threads present in tv_root */ void TmThreadPauseThreads() { ThreadVars *tv = NULL; int i = 0; for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv != NULL) { TmThreadPause(tv); tv = tv->next; } } return; } /** * \brief Restarts the thread sent as the argument * * \param tv Pointer to the thread instance(tv) to be restarted */ static void TmThreadRestartThread(ThreadVars *tv) { if (tv->restarted >= THV_MAX_RESTARTS) { SCLogError(SC_ERR_TM_THREADS_ERROR,"thread restarts exceeded " "threshold limit for thread \"%s\"", tv->name); exit(EXIT_FAILURE); } TmThreadsUnsetFlag(tv, THV_CLOSED); TmThreadsUnsetFlag(tv, THV_FAILED); if (TmThreadSpawn(tv) != TM_ECODE_OK) { SCLogError(SC_ERR_THREAD_SPAWN, "thread \"%s\" failed to spawn", tv->name); exit(EXIT_FAILURE); } tv->restarted++; SCLogInfo("thread \"%s\" restarted", tv->name); return; } /** * \brief Used to check the thread for certain conditions of failure. If the * thread has been specified to restart on failure, the thread is * restarted. If the thread has been specified to gracefully shutdown * the engine on failure, it does so. The global aof flag, tv_aof * overrides the thread aof flag, if it holds a THV_ENGINE_EXIT; */ void TmThreadCheckThreadState(void) { ThreadVars *tv = NULL; int i = 0; for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv) { if (TmThreadsCheckFlag(tv, THV_FAILED)) { TmThreadsSetFlag(tv, THV_DEINIT); pthread_join(tv->t, NULL); if (tv_aof & THV_ENGINE_EXIT || tv->aof & THV_ENGINE_EXIT) { EngineKill(); return; } else { /* if the engine kill-stop has been received by now, chuck * restarting and return to kill the engine */ if (suricata_ctl_flags & SURICATA_KILL || suricata_ctl_flags & SURICATA_STOP) { return; } TmThreadRestartThread(tv); } } tv = tv->next; } } return; } /** * \brief Used to check if all threads have finished their initialization. On * finding an un-initialized thread, it waits till that thread completes * its initialization, before proceeding to the next thread. * * \retval TM_ECODE_OK all initialized properly * \retval TM_ECODE_FAILED failure */ TmEcode TmThreadWaitOnThreadInit(void) { ThreadVars *tv = NULL; int i = 0; uint16_t mgt_num = 0; uint16_t ppt_num = 0; for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv != NULL) { char started = FALSE; while (started == FALSE) { if (TmThreadsCheckFlag(tv, THV_INIT_DONE)) { started = TRUE; } else { /* sleep a little to give the thread some * time to finish initialization */ usleep(100); } if (TmThreadsCheckFlag(tv, THV_FAILED)) { SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" failed to " "initialize.", tv->name); return TM_ECODE_FAILED; } if (TmThreadsCheckFlag(tv, THV_CLOSED)) { SCLogError(SC_ERR_THREAD_INIT, "thread \"%s\" closed on " "initialization.", tv->name); return TM_ECODE_FAILED; } } if (i == TVT_MGMT) mgt_num++; else if (i == TVT_PPT) ppt_num++; tv = tv->next; } } SCLogInfo("all %"PRIu16" packet processing threads, %"PRIu16" management " "threads initialized, engine started.", ppt_num, mgt_num); return TM_ECODE_OK; } /** * \brief Returns the TV for the calling thread. * * \retval tv Pointer to the ThreadVars instance for the calling thread; * NULL on no match */ ThreadVars *TmThreadsGetCallingThread(void) { pthread_t self = pthread_self(); ThreadVars *tv = NULL; int i = 0; SCMutexLock(&tv_root_lock); for (i = 0; i < TVT_MAX; i++) { tv = tv_root[i]; while (tv) { if (pthread_equal(self, tv->t)) { SCMutexUnlock(&tv_root_lock); return tv; } tv = tv->next; } } SCMutexUnlock(&tv_root_lock); return NULL; }