|
|
|
@ -731,8 +731,6 @@ static uint32_t FlowTimeoutsMin(void)
|
|
|
|
|
return m;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//#define FM_PROFILE
|
|
|
|
|
|
|
|
|
|
static void GetWorkUnitSizing(const uint32_t pass_in_sec, const uint32_t rows, const uint32_t mp,
|
|
|
|
|
const bool emergency, uint64_t *wu_sleep, uint32_t *wu_rows)
|
|
|
|
|
{
|
|
|
|
@ -779,18 +777,10 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
{
|
|
|
|
|
FlowManagerThreadData *ftd = thread_data;
|
|
|
|
|
struct timeval ts;
|
|
|
|
|
uint32_t established_cnt = 0, new_cnt = 0, closing_cnt = 0;
|
|
|
|
|
bool emerg = false;
|
|
|
|
|
bool prev_emerg = false;
|
|
|
|
|
uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
|
|
|
|
|
/* VJ leaving disabled for now, as hosts are only used by tags and the numbers
|
|
|
|
|
* are really low. Might confuse ppl
|
|
|
|
|
uint16_t flow_mgr_host_prune = StatsRegisterCounter("hosts.pruned", th_v);
|
|
|
|
|
uint16_t flow_mgr_host_active = StatsRegisterCounter("hosts.active", th_v);
|
|
|
|
|
uint16_t flow_mgr_host_spare = StatsRegisterCounter("hosts.spare", th_v);
|
|
|
|
|
*/
|
|
|
|
|
memset(&ts, 0, sizeof(ts));
|
|
|
|
|
|
|
|
|
|
const uint32_t min_timeout = FlowTimeoutsMin();
|
|
|
|
|
const uint32_t pass_in_sec = min_timeout ? min_timeout * 8 : 60;
|
|
|
|
|
|
|
|
|
@ -804,21 +794,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
SCLogDebug("FM %s/%d starting. min_timeout %us. Full hash pass in %us", th_v->name,
|
|
|
|
|
ftd->instance, min_timeout, pass_in_sec);
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval endts;
|
|
|
|
|
struct timeval active;
|
|
|
|
|
struct timeval paused;
|
|
|
|
|
struct timeval sleeping;
|
|
|
|
|
memset(&endts, 0, sizeof(endts));
|
|
|
|
|
memset(&active, 0, sizeof(active));
|
|
|
|
|
memset(&paused, 0, sizeof(paused));
|
|
|
|
|
memset(&sleeping, 0, sizeof(sleeping));
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
struct timeval startts;
|
|
|
|
|
memset(&startts, 0, sizeof(startts));
|
|
|
|
|
gettimeofday(&startts, NULL);
|
|
|
|
|
|
|
|
|
|
uint32_t emerg_over_cnt = 0;
|
|
|
|
|
uint64_t next_run_ms = 0;
|
|
|
|
|
const uint32_t rows = ftd->max - ftd->min;
|
|
|
|
@ -837,32 +812,13 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
{
|
|
|
|
|
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
|
|
|
|
|
TmThreadsSetFlag(th_v, THV_PAUSED);
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval pause_startts;
|
|
|
|
|
memset(&pause_startts, 0, sizeof(pause_startts));
|
|
|
|
|
gettimeofday(&pause_startts, NULL);
|
|
|
|
|
#endif
|
|
|
|
|
TmThreadTestThreadUnPaused(th_v);
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval pause_endts;
|
|
|
|
|
memset(&pause_endts, 0, sizeof(pause_endts));
|
|
|
|
|
gettimeofday(&pause_endts, NULL);
|
|
|
|
|
struct timeval pause_time;
|
|
|
|
|
memset(&pause_time, 0, sizeof(pause_time));
|
|
|
|
|
timersub(&pause_endts, &pause_startts, &pause_time);
|
|
|
|
|
timeradd(&paused, &pause_time, &paused);
|
|
|
|
|
#endif
|
|
|
|
|
TmThreadsUnsetFlag(th_v, THV_PAUSED);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
|
|
|
|
|
emerg = true;
|
|
|
|
|
}
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval run_startts;
|
|
|
|
|
memset(&run_startts, 0, sizeof(run_startts));
|
|
|
|
|
gettimeofday(&run_startts, NULL);
|
|
|
|
|
#endif
|
|
|
|
|
/* Get the time */
|
|
|
|
|
memset(&ts, 0, sizeof(ts));
|
|
|
|
|
TimeGet(&ts);
|
|
|
|
@ -904,13 +860,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
StatsAddUI64(th_v, flow_mgr_host_prune, (uint64_t)hosts_pruned);
|
|
|
|
|
uint32_t hosts_active = HostGetActiveCount();
|
|
|
|
|
StatsSetUI64(th_v, flow_mgr_host_active, (uint64_t)hosts_active);
|
|
|
|
|
uint32_t hosts_spare = HostGetSpareCount();
|
|
|
|
|
StatsSetUI64(th_v, flow_mgr_host_spare, (uint64_t)hosts_spare);
|
|
|
|
|
*/
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_clo, (uint64_t)counters.clo);
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_new, (uint64_t)counters.new);
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_cnt_est, (uint64_t)counters.est);
|
|
|
|
@ -920,7 +869,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters.flows_notimeout);
|
|
|
|
|
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters.flows_timeout);
|
|
|
|
|
//StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_removed, (uint64_t)counters.flows_removed);
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout_inuse, (uint64_t)counters.flows_timeout_inuse);
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters.flows_aside);
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work, (uint64_t)counters.flows_aside_needs_work);
|
|
|
|
@ -930,10 +878,7 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters.bypassed_bytes);
|
|
|
|
|
|
|
|
|
|
StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters.rows_maxlen);
|
|
|
|
|
// TODO AVG MAXLEN
|
|
|
|
|
// TODO LOOKUP STEPS MAXLEN and AVG LEN
|
|
|
|
|
/* Don't fear, FlowManagerThread is here...
|
|
|
|
|
* clear emergency bit if we have at least xx flows pruned. */
|
|
|
|
|
|
|
|
|
|
uint32_t len = FlowSpareGetPoolSize();
|
|
|
|
|
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)len);
|
|
|
|
|
|
|
|
|
@ -981,7 +926,6 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
if (other_last_sec == 0 || other_last_sec < (uint32_t)ts.tv_sec) {
|
|
|
|
|
if (ftd->instance == 0) {
|
|
|
|
|
DefragTimeoutHash(&ts);
|
|
|
|
|
// uint32_t hosts_pruned =
|
|
|
|
|
HostTimeoutHash(&ts);
|
|
|
|
|
IPPairTimeoutHash(&ts);
|
|
|
|
|
HttpRangeContainersTimeoutHash(&ts);
|
|
|
|
@ -989,27 +933,11 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval run_endts;
|
|
|
|
|
memset(&run_endts, 0, sizeof(run_endts));
|
|
|
|
|
gettimeofday(&run_endts, NULL);
|
|
|
|
|
struct timeval run_time;
|
|
|
|
|
memset(&run_time, 0, sizeof(run_time));
|
|
|
|
|
timersub(&run_endts, &run_startts, &run_time);
|
|
|
|
|
timeradd(&active, &run_time, &active);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (TmThreadsCheckFlag(th_v, THV_KILL)) {
|
|
|
|
|
StatsSyncCounters(th_v);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval sleep_startts;
|
|
|
|
|
memset(&sleep_startts, 0, sizeof(sleep_startts));
|
|
|
|
|
gettimeofday(&sleep_startts, NULL);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (emerg || !time_is_live) {
|
|
|
|
|
usleep(250);
|
|
|
|
|
} else {
|
|
|
|
@ -1034,35 +962,10 @@ static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval sleep_endts;
|
|
|
|
|
memset(&sleep_endts, 0, sizeof(sleep_endts));
|
|
|
|
|
gettimeofday(&sleep_endts, NULL);
|
|
|
|
|
|
|
|
|
|
struct timeval sleep_time;
|
|
|
|
|
memset(&sleep_time, 0, sizeof(sleep_time));
|
|
|
|
|
timersub(&sleep_endts, &sleep_startts, &sleep_time);
|
|
|
|
|
timeradd(&sleeping, &sleep_time, &sleeping);
|
|
|
|
|
#endif
|
|
|
|
|
SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
|
|
|
|
|
|
|
|
|
|
StatsSyncCountersIfSignalled(th_v);
|
|
|
|
|
}
|
|
|
|
|
SCLogPerf("%" PRIu32 " new flows, %" PRIu32 " established flows were "
|
|
|
|
|
"timed out, %"PRIu32" flows in closed state", new_cnt,
|
|
|
|
|
established_cnt, closing_cnt);
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
gettimeofday(&endts, NULL);
|
|
|
|
|
struct timeval total_run_time;
|
|
|
|
|
timersub(&endts, &startts, &total_run_time);
|
|
|
|
|
|
|
|
|
|
SCLogNotice("FM: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
|
|
|
|
|
(uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
|
|
|
|
|
(uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
|
|
|
|
|
(uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
|
|
|
|
|
(uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
|
|
|
|
|
#endif
|
|
|
|
|
return TM_ECODE_OK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1146,49 +1049,14 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
memset(&ts, 0, sizeof(ts));
|
|
|
|
|
uint32_t fr_passes = 0;
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval endts;
|
|
|
|
|
struct timeval active;
|
|
|
|
|
struct timeval paused;
|
|
|
|
|
struct timeval sleeping;
|
|
|
|
|
memset(&endts, 0, sizeof(endts));
|
|
|
|
|
memset(&active, 0, sizeof(active));
|
|
|
|
|
memset(&paused, 0, sizeof(paused));
|
|
|
|
|
memset(&sleeping, 0, sizeof(sleeping));
|
|
|
|
|
#endif
|
|
|
|
|
struct timeval startts;
|
|
|
|
|
memset(&startts, 0, sizeof(startts));
|
|
|
|
|
gettimeofday(&startts, NULL);
|
|
|
|
|
|
|
|
|
|
while (1)
|
|
|
|
|
{
|
|
|
|
|
if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
|
|
|
|
|
TmThreadsSetFlag(th_v, THV_PAUSED);
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval pause_startts;
|
|
|
|
|
memset(&pause_startts, 0, sizeof(pause_startts));
|
|
|
|
|
gettimeofday(&pause_startts, NULL);
|
|
|
|
|
#endif
|
|
|
|
|
TmThreadTestThreadUnPaused(th_v);
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval pause_endts;
|
|
|
|
|
memset(&pause_endts, 0, sizeof(pause_endts));
|
|
|
|
|
gettimeofday(&pause_endts, NULL);
|
|
|
|
|
|
|
|
|
|
struct timeval pause_time;
|
|
|
|
|
memset(&pause_time, 0, sizeof(pause_time));
|
|
|
|
|
timersub(&pause_endts, &pause_startts, &pause_time);
|
|
|
|
|
timeradd(&paused, &pause_time, &paused);
|
|
|
|
|
#endif
|
|
|
|
|
TmThreadsUnsetFlag(th_v, THV_PAUSED);
|
|
|
|
|
}
|
|
|
|
|
fr_passes++;
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval run_startts;
|
|
|
|
|
memset(&run_startts, 0, sizeof(run_startts));
|
|
|
|
|
gettimeofday(&run_startts, NULL);
|
|
|
|
|
#endif
|
|
|
|
|
SC_ATOMIC_ADD(flowrec_busy,1);
|
|
|
|
|
FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
|
|
|
|
|
|
|
|
|
@ -1206,55 +1074,17 @@ static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
|
|
|
|
|
}
|
|
|
|
|
SC_ATOMIC_SUB(flowrec_busy,1);
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval run_endts;
|
|
|
|
|
memset(&run_endts, 0, sizeof(run_endts));
|
|
|
|
|
gettimeofday(&run_endts, NULL);
|
|
|
|
|
|
|
|
|
|
struct timeval run_time;
|
|
|
|
|
memset(&run_time, 0, sizeof(run_time));
|
|
|
|
|
timersub(&run_endts, &run_startts, &run_time);
|
|
|
|
|
timeradd(&active, &run_time, &active);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (bail) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval sleep_startts;
|
|
|
|
|
memset(&sleep_startts, 0, sizeof(sleep_startts));
|
|
|
|
|
gettimeofday(&sleep_startts, NULL);
|
|
|
|
|
#endif
|
|
|
|
|
usleep(250);
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
struct timeval sleep_endts;
|
|
|
|
|
memset(&sleep_endts, 0, sizeof(sleep_endts));
|
|
|
|
|
gettimeofday(&sleep_endts, NULL);
|
|
|
|
|
struct timeval sleep_time;
|
|
|
|
|
memset(&sleep_time, 0, sizeof(sleep_time));
|
|
|
|
|
timersub(&sleep_endts, &sleep_startts, &sleep_time);
|
|
|
|
|
timeradd(&sleeping, &sleep_time, &sleeping);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
SCLogDebug("woke up...");
|
|
|
|
|
|
|
|
|
|
StatsSyncCountersIfSignalled(th_v);
|
|
|
|
|
}
|
|
|
|
|
StatsSyncCounters(th_v);
|
|
|
|
|
#ifdef FM_PROFILE
|
|
|
|
|
gettimeofday(&endts, NULL);
|
|
|
|
|
struct timeval total_run_time;
|
|
|
|
|
timersub(&endts, &startts, &total_run_time);
|
|
|
|
|
SCLogNotice("FR: active %u.%us out of %u.%us; sleeping %u.%us, paused %u.%us",
|
|
|
|
|
(uint32_t)active.tv_sec, (uint32_t)active.tv_usec,
|
|
|
|
|
(uint32_t)total_run_time.tv_sec, (uint32_t)total_run_time.tv_usec,
|
|
|
|
|
(uint32_t)sleeping.tv_sec, (uint32_t)sleeping.tv_usec,
|
|
|
|
|
(uint32_t)paused.tv_sec, (uint32_t)paused.tv_usec);
|
|
|
|
|
|
|
|
|
|
SCLogNotice("FR passes %u passes/s %u", fr_passes,
|
|
|
|
|
(uint32_t)fr_passes/((uint32_t)active.tv_sec?(uint32_t)active.tv_sec:1));
|
|
|
|
|
#endif
|
|
|
|
|
SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
|
|
|
|
|
return TM_ECODE_OK;
|
|
|
|
|
}
|
|
|
|
|