diff --git a/src/flow-manager.c b/src/flow-manager.c index 2d86b81c6a..411dcbc88c 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -285,6 +285,10 @@ static uint32_t FlowManagerHashRowTimeout(Flow *f, struct timeval *ts, continue; } + /* before grabbing the flow lock, make sure we have at least + * 3 packets in the pool */ + PacketPoolWaitForN(3); + FLOWLOCK_WRLOCK(f); Flow *next_flow = f->hprev; @@ -377,6 +381,10 @@ static uint32_t FlowTimeoutHash(struct timeval *ts, uint32_t try_cnt, for (idx = hash_min; idx < hash_max; idx++) { FlowBucket *fb = &flow_hash[idx]; + /* before grabbing the row lock, make sure we have at least + * 9 packets in the pool */ + PacketPoolWaitForN(9); + if (FBLOCK_TRYLOCK(fb) != 0) continue; @@ -1220,7 +1228,10 @@ static int FlowMgrTest04 (void) static int FlowMgrTest05 (void) { int result = 0; + extern intmax_t max_pending_packets; + max_pending_packets = 128; + PacketPoolInit(); FlowInitConfig(FLOW_QUIET); FlowConfig backup; memcpy(&backup, &flow_config, sizeof(FlowConfig)); @@ -1258,7 +1269,7 @@ static int FlowMgrTest05 (void) memcpy(&flow_config, &backup, sizeof(FlowConfig)); FlowShutdown(); - + PacketPoolDestroy(); return result; } #endif /* UNITTESTS */ diff --git a/src/flow-timeout.c b/src/flow-timeout.c index f2eb089928..b18007939e 100644 --- a/src/flow-timeout.c +++ b/src/flow-timeout.c @@ -515,6 +515,7 @@ static inline void FlowForceReassemblyForHash(void) for (idx = 0; idx < flow_config.hash_size; idx++) { FlowBucket *fb = &flow_hash[idx]; + PacketPoolWaitForN(9); FBLOCK_LOCK(fb); /* get the topmost flow from the QUEUE */ @@ -522,6 +523,8 @@ static inline void FlowForceReassemblyForHash(void) /* we need to loop through all the flows in the queue */ while (f != NULL) { + PacketPoolWaitForN(3); + FLOWLOCK_WRLOCK(f); /* Get the tcp session for the flow */ diff --git a/src/tmqh-packetpool.c b/src/tmqh-packetpool.c index 79103a4a01..4c4bf40691 100644 --- a/src/tmqh-packetpool.c +++ b/src/tmqh-packetpool.c @@ -157,6 +157,57 @@ void PacketPoolWait(void) cc_barrier(); } +/** \brief Wait until we have the requested ammount of packets in the pool + * + * In some cases waiting for packets is undesirable. Especially when + * a wait would happen under a lock of some kind, other parts of the + * engine could have to wait. + * + * This function only returns when at least N packets are in our pool. + * + * \param n number of packets needed + */ +void PacketPoolWaitForN(int n) +{ + PktPool *my_pool = GetThreadPacketPool(); + Packet *p = NULL; + + while (1) { + int i = 0; + PacketPoolWait(); + + /* count packets in our stack */ + p = my_pool->head; + while (p != NULL) { + if (++i == n) + return; + + p = p->next; + } + + /* continue counting in the return stack */ + if (my_pool->return_stack.head != NULL) { + SCMutexLock(&my_pool->return_stack.mutex); + p = my_pool->return_stack.head; + while (p != NULL) { + if (++i == n) { + SCMutexUnlock(&my_pool->return_stack.mutex); + return; + } + p = p->next; + } + SCMutexUnlock(&my_pool->return_stack.mutex); + + /* or signal that we need packets and wait */ + } else { + SCMutexLock(&my_pool->return_stack.mutex); + SC_ATOMIC_ADD(my_pool->return_stack.sync_now, 1); + SCCondWait(&my_pool->return_stack.cond, &my_pool->return_stack.mutex); + SCMutexUnlock(&my_pool->return_stack.mutex); + } + } +} + /** \brief a initialized packet * * \warning Use *only* at init, not at packet runtime diff --git a/src/tmqh-packetpool.h b/src/tmqh-packetpool.h index 6da6fc6aa8..12a3f5cabd 100644 --- a/src/tmqh-packetpool.h +++ b/src/tmqh-packetpool.h @@ -69,6 +69,7 @@ void TmqhReleasePacketsToPacketPool(PacketQueue *); void TmqhPacketpoolRegister(void); Packet *PacketPoolGetPacket(void); void PacketPoolWait(void); +void PacketPoolWaitForN(int n); void PacketPoolReturnPacket(Packet *p); void PacketPoolInit(void); void PacketPoolInitEmpty(void);