packet pool: fix memleaks

Don't kill flow manager and recyclers before the rest of the threads. The
packet threads may still have packets from their pools. As the flow threads
would destroy their pools the packets would be lost.

This patch doesn't kill the threads, it just pulls them out of their run
loop and into a wait loop. The packet pools won't be cleared until all
threads are killed.

Wait for flow management threads to close before moving on to the
next steps in the shutdown process.

Don't destroy flow force reassembly packet pool too early. Worker
threads may still want to return packets to it.
pull/1416/merge
Victor Julien 11 years ago
parent 22142d9b8b
commit 94321b8a2f

@ -98,12 +98,12 @@ typedef struct FlowTimeoutCounters_ {
} FlowTimeoutCounters;
/**
* \brief Used to kill flow manager thread(s).
* \brief Used to disable flow manager thread(s).
*
* \todo Kinda hackish since it uses the tv name to identify flow manager
* thread. We need an all weather identification scheme.
*/
void FlowKillFlowManagerThread(void)
void FlowDisableFlowManagerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
@ -121,8 +121,24 @@ void FlowKillFlowManagerThread(void)
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
cnt++;
/* value in seconds */
#define THREAD_KILL_MAX_WAIT_TIME 60
/* value in microseconds */
#define WAIT_TIME 100
double total_wait_time = 0;
while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
usleep(WAIT_TIME);
total_wait_time += WAIT_TIME / 1000000.0;
if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
SCLogError(SC_ERR_FATAL, "Engine unable to "
"disable detect thread - \"%s\". "
"Killing engine", tv->name);
exit(EXIT_FAILURE);
}
}
}
tv = tv->next;
}
@ -131,24 +147,6 @@ void FlowKillFlowManagerThread(void)
for (u = 0; u < flowmgr_number; u++)
SCCtrlCondSignal(&flow_manager_ctrl_cond);
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
/* reset count, so we can kill and respawn (unix socket) */
@ -913,14 +911,14 @@ void FlowRecyclerThreadSpawn()
}
/**
* \brief Used to kill flow recycler thread(s).
* \brief Used to disable flow recycler thread(s).
*
* \note this should only be called when the flow manager is already gone
*
* \todo Kinda hackish since it uses the tv name to identify flow recycler
* thread. We need an all weather identification scheme.
*/
void FlowKillFlowRecyclerThread(void)
void FlowDisableFlowRecyclerThread(void)
{
ThreadVars *tv = NULL;
int cnt = 0;
@ -947,8 +945,24 @@ void FlowKillFlowRecyclerThread(void)
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
TmThreadsSetFlag(tv, THV_KILL);
TmThreadsSetFlag(tv, THV_DEINIT);
cnt++;
/* value in seconds */
#define THREAD_KILL_MAX_WAIT_TIME 60
/* value in microseconds */
#define WAIT_TIME 100
double total_wait_time = 0;
while (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
usleep(WAIT_TIME);
total_wait_time += WAIT_TIME / 1000000.0;
if (total_wait_time > THREAD_KILL_MAX_WAIT_TIME) {
SCLogError(SC_ERR_FATAL, "Engine unable to "
"disable detect thread - \"%s\". "
"Killing engine", tv->name);
exit(EXIT_FAILURE);
}
}
}
tv = tv->next;
}
@ -957,24 +971,6 @@ void FlowKillFlowRecyclerThread(void)
for (u = 0; u < flowrec_number; u++)
SCCtrlCondSignal(&flow_recycler_ctrl_cond);
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowRecyclerThread") == 0) {
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
}
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (cnt == 0) {
SCMutexUnlock(&tv_root_lock);
abort();
}
SCMutexUnlock(&tv_root_lock);
/* reset count, so we can kill and respawn (unix socket) */

@ -30,7 +30,7 @@ SCCtrlMutex flow_manager_ctrl_mutex;
#define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond)
void FlowManagerThreadSpawn(void);
void FlowKillFlowManagerThread(void);
void FlowDisableFlowManagerThread(void);
void FlowMgrRegisterTests (void);
/** flow recycler scheduling condition */
@ -40,7 +40,7 @@ SCCtrlMutex flow_recycler_ctrl_mutex;
SCCtrlCondSignal(&flow_recycler_ctrl_cond)
void FlowRecyclerThreadSpawn(void);
void FlowKillFlowRecyclerThread(void);
void FlowDisableFlowRecyclerThread(void);
void TmModuleFlowManagerRegister (void);
void TmModuleFlowRecyclerRegister (void);

@ -553,11 +553,8 @@ static inline void FlowForceReassemblyForHash(void)
*/
void FlowForceReassembly(void)
{
/* called by 'main()' which has no packet pool */
PacketPoolInit();
/* Carry out flow reassembly for unattended flows */
FlowForceReassemblyForHash();
PacketPoolDestroy();
return;
}

@ -283,13 +283,16 @@ TmEcode UnixSocketPcapFilesCheck(void *data)
}
this->currentfile = NULL;
/* needed by FlowForceReassembly */
PacketPoolInit();
/* handle graceful shutdown of the flow engine, it's helper
* threads and the packet threads */
FlowKillFlowManagerThread();
FlowDisableFlowManagerThread();
TmThreadDisableReceiveThreads();
FlowForceReassembly();
TmThreadDisablePacketThreads();
FlowKillFlowRecyclerThread();
FlowDisableFlowRecyclerThread();
/* kill the stats threads */
TmThreadKillThreadsFamily(TVT_MGMT);
@ -299,6 +302,8 @@ TmEcode UnixSocketPcapFilesCheck(void *data)
TmThreadKillThreadsFamily(TVT_PPT);
TmThreadClearThreadsFamily(TVT_PPT);
PacketPoolDestroy();
/* mgt and ppt threads killed, we can run non thread-safe
* shutdown functions */
SCPerfReleaseResources();

@ -2407,14 +2407,18 @@ int main(int argc, char **argv)
UnixSocketKillSocketThread();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
/* First we need to kill the flow manager thread */
FlowKillFlowManagerThread();
/* First we need to disable the flow manager thread */
FlowDisableFlowManagerThread();
}
/* Disable packet acquisition first */
TmThreadDisableReceiveThreads();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
/* we need a packet pool for FlowForceReassembly */
PacketPoolInit();
FlowForceReassembly();
/* kill receive threads when they have processed all
* flow timeout packets */
@ -2426,13 +2430,18 @@ int main(int argc, char **argv)
/* before TmThreadKillThreads, as otherwise that kills it
* but more slowly */
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
FlowKillFlowRecyclerThread();
FlowDisableFlowRecyclerThread();
}
/* kill remaining threads */
TmThreadKillThreads();
if (suri.run_mode != RUNMODE_UNIX_SOCKET) {
/* destroy the packet pool for flow reassembly after all
* the other threads are gone. */
PacketPoolDestroy();
SCPerfReleaseResources();
IPPairShutdown();
FlowShutdown();

@ -352,11 +352,11 @@ void *TmThreadsSlotPktAcqLoop(void *td)
/* process all pseudo packets the flow timeout may throw at us */
TmThreadTimeoutLoop(tv, s);
PacketPoolDestroy();
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv, THV_DEINIT);
PacketPoolDestroy();
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));

Loading…
Cancel
Save