flow/manager: sleep handled by pthread_cond_t again

Use only in live mode to allow FM to respond quickly to time
increases in offline mode.

Bug #4379.
pull/7533/head
Victor Julien 3 years ago committed by Victor Julien
parent 39141a8836
commit e6ac2e4e8a

@ -590,6 +590,7 @@ static Flow *FlowGetNew(ThreadVars *tv, FlowLookupStruct *fls, Packet *p)
if (!(SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)) {
SC_ATOMIC_OR(flow_flags, FLOW_EMERGENCY);
FlowTimeoutsEmergency();
FlowWakeupFlowManagerThread();
}
f = FlowGetUsedFlow(tv, fls->dtv, &p->ts);

@ -91,6 +91,9 @@ SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
SCCtrlCondT flow_manager_ctrl_cond;
SCCtrlMutex flow_manager_ctrl_mutex;
void FlowTimeoutsInit(void)
{
SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
@ -796,6 +799,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
if (suricata_ctl_flags != 0)
return TM_ECODE_OK;
}
const bool time_is_live = TimeModeIsLive();
SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
ftd->instance, min_timeout, pass_in_sec);
@ -863,7 +867,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
memset(&ts, 0, sizeof(ts));
TimeGet(&ts);
SCLogDebug("ts %" PRIdMAX "", (intmax_t)ts.tv_sec);
const uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
uint64_t ts_ms = ts.tv_sec * 1000 + ts.tv_usec / 1000;
const bool emerge_p = (emerg && !prev_emerg);
if (emerge_p) {
next_run_ms = 0;
@ -1005,7 +1009,30 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
memset(&sleep_startts, 0, sizeof(sleep_startts));
gettimeofday(&sleep_startts, NULL);
#endif
usleep(250);
if (emerg || !time_is_live) {
usleep(250);
} else {
struct timeval cond_tv;
gettimeofday(&cond_tv, NULL);
struct timeval add_tv;
add_tv.tv_sec = 0;
add_tv.tv_usec = (sleep_per_wu * 1000);
timeradd(&cond_tv, &add_tv, &cond_tv);
struct timespec cond_time = FROM_TIMEVAL(cond_tv);
SCCtrlMutexLock(&flow_manager_ctrl_mutex);
while (1) {
int rc = SCCtrlCondTimedwait(
&flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
if (rc == ETIMEDOUT || rc < 0)
break;
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
break;
}
}
SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
}
#ifdef FM_PROFILE
struct timeval sleep_endts;
@ -1051,6 +1078,9 @@ void FlowManagerThreadSpawn()
}
flowmgr_number = (uint32_t)setting;
SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
SCLogConfig("using %u flow manager threads", flowmgr_number);
StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);

@ -24,6 +24,11 @@
#ifndef __FLOW_MANAGER_H__
#define __FLOW_MANAGER_H__
/** flow manager scheduling condition */
extern SCCtrlCondT flow_manager_ctrl_cond;
extern SCCtrlMutex flow_manager_ctrl_mutex;
#define FlowWakeupFlowManagerThread() SCCtrlCondSignal(&flow_manager_ctrl_cond)
#define FlowTimeoutsReset() FlowTimeoutsInit()
void FlowTimeoutsInit(void);
void FlowTimeoutsEmergency(void);

Loading…
Cancel
Save