diff --git a/src/alert-unified2-alert.c b/src/alert-unified2-alert.c index 7a0371caba..6ff984fb56 100644 --- a/src/alert-unified2-alert.c +++ b/src/alert-unified2-alert.c @@ -2053,6 +2053,7 @@ error: */ void Unified2RegisterTests(void) { + PacketPoolInit(); #ifdef UNITTESTS UtRegisterTest("Unified2Test01 -- Ipv4 test", Unified2Test01, 1); UtRegisterTest("Unified2Test02 -- Ipv6 test", Unified2Test02, 1); diff --git a/src/source-erf-file.c b/src/source-erf-file.c index 24a1e47191..81305b8a0a 100644 --- a/src/source-erf-file.c +++ b/src/source-erf-file.c @@ -111,7 +111,6 @@ TmModuleDecodeErfFileRegister(void) TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot) { Packet *p = NULL; - uint16_t packet_q_len = 0; ErfFileThreadVars *etv = (ErfFileThreadVars *)data; etv->slot = ((TmSlot *)slot)->slot_next; diff --git a/src/source-mpipe.c b/src/source-mpipe.c index 285068162b..382ed63f99 100644 --- a/src/source-mpipe.c +++ b/src/source-mpipe.c @@ -330,7 +330,6 @@ TmEcode ReceiveMpipeLoop(ThreadVars *tv, void *data, void *slot) TmSlot *s = (TmSlot *)slot; ptv->slot = s->slot_next; Packet *p = NULL; - int cpu = tmc_cpus_get_my_cpu(); int rank = tv->rank; int max_queued = 0; char *ctype; @@ -918,7 +917,7 @@ TmEcode ReceiveMpipeThreadInit(ThreadVars *tv, void *initdata, void **data) SCReturnInt(TM_ECODE_FAILED); } } - gxio_mpipe_init(context, instance); + result = gxio_mpipe_init(context, instance); VERIFY(result, "gxio_mpipe_init()"); /* open ingress interfaces */ for (int i = 0; i < nlive; i++) { diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index 3653888dee..9acd490dd2 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -48,12 +48,68 @@ #include "util-profiling.h" #include "util-device.h" -/* TODO: Handle case without __thread */ +#ifdef TLS __thread PktPool thread_pkt_pool; +static inline PktPool *GetThreadPacketPool(void) +{ + return &thread_pkt_pool; +} + +static inline PktPool *ThreadPacketPoolCreate(void) +{ + /* Nothing to do since __thread allocates the memory. */ + return GetThreadPacketPool(); +} + + +#else +/* __thread not supported. */ +static pthread_key_t pkt_pool_thread_key; + +static inline PktPool *GetThreadPacketPool(void) +{ + return (PktPool*)pthread_getspecific(pkt_pool_thread_key); +} + +static inline PktPool *ThreadPacketPoolCreate(void) +{ + /* Check that the pool is not already created */ + PktPool *pool = GetThreadPacketPool(); + if (pool) + return pool; + + /* Create a new pool for this thread. */ + pool = (PktPool*)SCMallocAligned(sizeof(PktPool), CLS); + if (pool == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, "malloc failed"); + exit(EXIT_FAILURE); + } + pthread_setspecific(pkt_pool_thread_key, pool); + + return pool; +} + +static void PktPoolThreadDestroy(void * buf) +{ + free(buf); +} + +#endif + /* Number of freed packet to save for one pool before freeing them. */ #define MAX_PENDING_RETURN_PACKETS 32 +void TmqhPacketpoolInit(void) +{ +#ifndef TLS + /* Create the pthread Key that is used to look up thread specific + * data buffer. Needs to be created only once. + */ + pthread_key_create(&pkt_pool_thread_key, PktPoolThreadDestroy); +#endif +} + /** * \brief TmqhPacketpoolRegister * \initonly @@ -62,12 +118,14 @@ void TmqhPacketpoolRegister (void) { tmqh_table[TMQH_PACKETPOOL].name = "packetpool"; tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool; tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool; + + TmqhPacketpoolInit(); } -static int PacketPoolIsEmpty(void) +static int PacketPoolIsEmpty(PktPool *pool) { /* Check local stack first. */ - if (thread_pkt_pool.head || thread_pkt_pool.return_stack.head) + if (pool->head || pool->return_stack.head) return 0; return 1; @@ -75,7 +133,9 @@ static int PacketPoolIsEmpty(void) void PacketPoolWait(void) { - while(PacketPoolIsEmpty()) + PktPool *my_pool = GetThreadPacketPool(); + + while(PacketPoolIsEmpty(my_pool)) ; } @@ -88,7 +148,7 @@ static void PacketPoolStorePacket(Packet *p) /* Clear the PKT_ALLOC flag, since that indicates to push back * onto the ring buffer. */ p->flags &= ~PKT_ALLOC; - p->pool = &thread_pkt_pool;; + p->pool = GetThreadPacketPool(); p->ReleasePacket = PacketPoolReturnPacket; PacketPoolReturnPacket(p); } @@ -102,7 +162,7 @@ static void PacketPoolStorePacket(Packet *p) */ Packet *PacketPoolGetPacket(void) { - PktPool *pool = &thread_pkt_pool; + PktPool *pool = GetThreadPacketPool(); if (pool->head) { /* Stack is not empty. */ @@ -125,7 +185,7 @@ Packet *PacketPoolGetPacket(void) */ if (pool->head) { /* Stack is not empty. */ - Packet *p = pool->return_stack.head; + Packet *p = pool->head; pool->head = p->next; p->pool = pool; return p; @@ -141,6 +201,8 @@ Packet *PacketPoolGetPacket(void) */ void PacketPoolReturnPacket(Packet *p) { + PktPool *my_pool = GetThreadPacketPool(); + PktPool *pool = p->pool; if (pool == NULL) { free(p); @@ -149,31 +211,31 @@ void PacketPoolReturnPacket(Packet *p) PACKET_RECYCLE(p); - if (pool == &thread_pkt_pool) { + if (pool == my_pool) { /* Push back onto this thread's own stack, so no locking. */ - p->next = thread_pkt_pool.head; - thread_pkt_pool.head = p; + p->next = my_pool->head; + my_pool->head = p; } else { - PktPool *pending_pool = thread_pkt_pool.pending_pool; + PktPool *pending_pool = my_pool->pending_pool; if (pending_pool == NULL) { /* No pending packet, so store the current packet. */ - thread_pkt_pool.pending_pool = pool; - thread_pkt_pool.pending_head = p; - thread_pkt_pool.pending_tail = p; - thread_pkt_pool.pending_count = 1; + my_pool->pending_pool = pool; + my_pool->pending_head = p; + my_pool->pending_tail = p; + my_pool->pending_count = 1; } else if (pending_pool == pool) { /* Another packet for the pending pool list. */ - p->next = thread_pkt_pool.pending_head; - thread_pkt_pool.pending_head->next = p; - thread_pkt_pool.pending_count++; - if (thread_pkt_pool.pending_count > MAX_PENDING_RETURN_PACKETS) { + p->next = my_pool->pending_head; + my_pool->pending_head->next = p; + my_pool->pending_count++; + if (my_pool->pending_count > MAX_PENDING_RETURN_PACKETS) { /* Return the entire list of pending packets. */ SCMutexLock(&pool->return_stack.mutex); - thread_pkt_pool.pending_tail->next = pool->return_stack.head; - pool->return_stack.head = thread_pkt_pool.pending_head; + my_pool->pending_tail->next = pool->return_stack.head; + pool->return_stack.head = my_pool->pending_head; SCMutexUnlock(&pool->return_stack.mutex); /* Clear the list of pending packets to return. */ - thread_pkt_pool.pending_pool = NULL; + my_pool->pending_pool = NULL; } } else { /* Push onto return stack for this pool */ @@ -189,7 +251,9 @@ void PacketPoolInit(void) { extern intmax_t max_pending_packets; - SCMutexInit(&thread_pkt_pool.return_stack.mutex, NULL); + PktPool *my_pool = ThreadPacketPoolCreate(); + + SCMutexInit(&my_pool->return_stack.mutex, NULL); /* pre allocate packets */ SCLogDebug("preallocating packets... packet size %" PRIuMAX "", diff --git a/src/tmqh-packetpool.h b/src/tmqh-packetpool.h index ddb7060e3c..729883cd31 100644 --- a/src/tmqh-packetpool.h +++ b/src/tmqh-packetpool.h @@ -30,7 +30,7 @@ /* Return stack, onto which other threads free packets. */ typedef struct PktPoolLockedStack_{ /* linked list of free packets. */ - SCSpinlock mutex; + SCMutex mutex; Packet *head; } __attribute__((aligned(CLS))) PktPoolLockedStack;