Add per packet profiling.
Per packet profiling uses tick based accounting. It has 2 outputs, a summary
and a csv file that contains per packet stats.
Stats per packet include:
1) total ticks spent
2) ticks spent per individual thread module
3) "threading overhead" which is simply calculated by subtracting (2) of (1).
A number of changes were made to integrate the new code in a clean way:
a number of generic enums are now placed in tm-threads-common.h so we can
include them from any part of the engine.
Code depends on --enable-profiling just like the rule profiling code.
New yaml parameters:
profiling:
# packet profiling
packets:
# Profiling can be disabled here, but it will still have a
# performance impact if compiled in.
enabled: yes
filename: packet_stats.log
append: yes
# per packet csv output
csv:
# Output can be disabled here, but it will still have a
# performance impact if compiled in.
enabled: no
filename: packet_stats.csv
Example output of summary stats:
IP ver Proto cnt min max avg
------ ----- ------ ------ ---------- -------
IPv4 6 19436 11448 5404365 32993
IPv4 256 4 11511 49968 30575
Per Thread module stats:
Thread Module IP ver Proto cnt min max avg
------------------------ ------ ----- ------ ------ ---------- -------
TMM_DECODEPCAPFILE IPv4 6 19434 1242 47889 1770
TMM_DETECT IPv4 6 19436 1107 137241 1504
TMM_ALERTFASTLOG IPv4 6 19436 90 1323 155
TMM_ALERTUNIFIED2ALERT IPv4 6 19436 108 1359 138
TMM_ALERTDEBUGLOG IPv4 6 19436 90 1134 154
TMM_LOGHTTPLOG IPv4 6 19436 414 5392089 7944
TMM_STREAMTCP IPv4 6 19434 828 1299159 19438
The proto 256 is a counter for handling of pseudo/tunnel packets.
Example output of csv:
pcap_cnt,ipver,ipproto,total,TMM_DECODENFQ,TMM_VERDICTNFQ,TMM_RECEIVENFQ,TMM_RECEIVEPCAP,TMM_RECEIVEPCAPFILE,TMM_DECODEPCAP,TMM_DECODEPCAPFILE,TMM_RECEIVEPFRING,TMM_DECODEPFRING,TMM_DETECT,TMM_ALERTFASTLOG,TMM_ALERTFASTLOG4,TMM_ALERTFASTLOG6,TMM_ALERTUNIFIEDLOG,TMM_ALERTUNIFIEDALERT,TMM_ALERTUNIFIED2ALERT,TMM_ALERTPRELUDE,TMM_ALERTDEBUGLOG,TMM_ALERTSYSLOG,TMM_LOGDROPLOG,TMM_ALERTSYSLOG4,TMM_ALERTSYSLOG6,TMM_RESPONDREJECT,TMM_LOGHTTPLOG,TMM_LOGHTTPLOG4,TMM_LOGHTTPLOG6,TMM_PCAPLOG,TMM_STREAMTCP,TMM_DECODEIPFW,TMM_VERDICTIPFW,TMM_RECEIVEIPFW,TMM_RECEIVEERFFILE,TMM_DECODEERFFILE,TMM_RECEIVEERFDAG,TMM_DECODEERFDAG,threading
1,4,6,172008,0,0,0,0,0,0,47889,0,0,48582,1323,0,0,0,0,1359,0,1134,0,0,0,0,0,8028,0,0,0,49356,0,0,0,0,0,0,0,14337
First line of the file contains labels.
2 example gnuplot scripts added to plot the data.
14 years ago
|
|
|
/* Copyright (C) 2007-2011 Open Information Security Foundation
|
|
|
|
*
|
|
|
|
* You can copy, redistribute or modify this Program under the terms of
|
|
|
|
* the GNU General Public License version 2 as published by the Free
|
|
|
|
* Software Foundation.
|
|
|
|
*
|
|
|
|
* This program is distributed in the hope that it will be useful,
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
* GNU General Public License for more details.
|
|
|
|
*
|
|
|
|
* You should have received a copy of the GNU General Public License
|
|
|
|
* version 2 along with this program; if not, write to the Free Software
|
|
|
|
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
|
|
* 02110-1301, USA.
|
|
|
|
*/
|
|
|
|
|
|
|
|
/**
|
|
|
|
* \file
|
|
|
|
*
|
|
|
|
* \author Victor Julien <victor@inliniac.net>
|
|
|
|
* \author Anoop Saldanha <anoopsaldanha@gmail.com>
|
|
|
|
*/
|
|
|
|
|
|
|
|
#ifndef __TM_THREADS_H__
|
|
|
|
#define __TM_THREADS_H__
|
|
|
|
|
|
|
|
#include "tmqh-packetpool.h"
|
Add per packet profiling.
Per packet profiling uses tick based accounting. It has 2 outputs, a summary
and a csv file that contains per packet stats.
Stats per packet include:
1) total ticks spent
2) ticks spent per individual thread module
3) "threading overhead" which is simply calculated by subtracting (2) of (1).
A number of changes were made to integrate the new code in a clean way:
a number of generic enums are now placed in tm-threads-common.h so we can
include them from any part of the engine.
Code depends on --enable-profiling just like the rule profiling code.
New yaml parameters:
profiling:
# packet profiling
packets:
# Profiling can be disabled here, but it will still have a
# performance impact if compiled in.
enabled: yes
filename: packet_stats.log
append: yes
# per packet csv output
csv:
# Output can be disabled here, but it will still have a
# performance impact if compiled in.
enabled: no
filename: packet_stats.csv
Example output of summary stats:
IP ver Proto cnt min max avg
------ ----- ------ ------ ---------- -------
IPv4 6 19436 11448 5404365 32993
IPv4 256 4 11511 49968 30575
Per Thread module stats:
Thread Module IP ver Proto cnt min max avg
------------------------ ------ ----- ------ ------ ---------- -------
TMM_DECODEPCAPFILE IPv4 6 19434 1242 47889 1770
TMM_DETECT IPv4 6 19436 1107 137241 1504
TMM_ALERTFASTLOG IPv4 6 19436 90 1323 155
TMM_ALERTUNIFIED2ALERT IPv4 6 19436 108 1359 138
TMM_ALERTDEBUGLOG IPv4 6 19436 90 1134 154
TMM_LOGHTTPLOG IPv4 6 19436 414 5392089 7944
TMM_STREAMTCP IPv4 6 19434 828 1299159 19438
The proto 256 is a counter for handling of pseudo/tunnel packets.
Example output of csv:
pcap_cnt,ipver,ipproto,total,TMM_DECODENFQ,TMM_VERDICTNFQ,TMM_RECEIVENFQ,TMM_RECEIVEPCAP,TMM_RECEIVEPCAPFILE,TMM_DECODEPCAP,TMM_DECODEPCAPFILE,TMM_RECEIVEPFRING,TMM_DECODEPFRING,TMM_DETECT,TMM_ALERTFASTLOG,TMM_ALERTFASTLOG4,TMM_ALERTFASTLOG6,TMM_ALERTUNIFIEDLOG,TMM_ALERTUNIFIEDALERT,TMM_ALERTUNIFIED2ALERT,TMM_ALERTPRELUDE,TMM_ALERTDEBUGLOG,TMM_ALERTSYSLOG,TMM_LOGDROPLOG,TMM_ALERTSYSLOG4,TMM_ALERTSYSLOG6,TMM_RESPONDREJECT,TMM_LOGHTTPLOG,TMM_LOGHTTPLOG4,TMM_LOGHTTPLOG6,TMM_PCAPLOG,TMM_STREAMTCP,TMM_DECODEIPFW,TMM_VERDICTIPFW,TMM_RECEIVEIPFW,TMM_RECEIVEERFFILE,TMM_DECODEERFFILE,TMM_RECEIVEERFDAG,TMM_DECODEERFDAG,threading
1,4,6,172008,0,0,0,0,0,0,47889,0,0,48582,1323,0,0,0,0,1359,0,1134,0,0,0,0,0,8028,0,0,0,49356,0,0,0,0,0,0,0,14337
First line of the file contains labels.
2 example gnuplot scripts added to plot the data.
14 years ago
|
|
|
#include "tm-threads-common.h"
|
|
|
|
#include "tm-modules.h"
|
|
|
|
#include "flow.h" // for the FlowQueue
|
|
|
|
|
|
|
|
#ifdef OS_WIN32
|
|
|
|
static inline void SleepUsec(uint64_t usec)
|
|
|
|
{
|
|
|
|
uint64_t msec = 1;
|
|
|
|
if (usec > 1000) {
|
|
|
|
msec = usec / 1000;
|
|
|
|
}
|
|
|
|
Sleep(msec);
|
|
|
|
}
|
|
|
|
#define SleepMsec(msec) Sleep((msec))
|
|
|
|
#else
|
|
|
|
#define SleepUsec(usec) usleep((usec))
|
|
|
|
#define SleepMsec(msec) usleep((msec) * 1000)
|
|
|
|
#endif
|
|
|
|
|
|
|
|
#define TM_QUEUE_NAME_MAX 16
|
|
|
|
#define TM_THREAD_NAME_MAX 16
|
|
|
|
|
|
|
|
typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *);
|
|
|
|
|
|
|
|
typedef struct TmSlot_ {
|
|
|
|
/* function pointers */
|
|
|
|
union {
|
|
|
|
TmSlotFunc SlotFunc;
|
|
|
|
TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);
|
|
|
|
TmEcode (*Management)(ThreadVars *, void *);
|
|
|
|
};
|
|
|
|
/** linked list of slots, used when a pipeline has multiple slots
|
|
|
|
* in a single thread. */
|
|
|
|
struct TmSlot_ *slot_next;
|
|
|
|
|
|
|
|
SC_ATOMIC_DECLARE(void *, slot_data);
|
|
|
|
|
detect: inspect all packets in multi-layer tunneling
When the decoders encounter multiple layers of tunneling, multiple tunnel
packets are created. These are then stored in ThreadVars::decode_pq, where
they are processed after the current thread "slot" is done. However, due
to a logic error, the tunnel packets after the first, where not called
for the correct position in the packet pipeline. This would lead to these
packets not going through the FlowWorker module, so skipping everything
from flow tracking, detection and logging.
This would only happen for single and workers, due to how the pipelines
are constructed.
The "slot" holding the decoder, would contain 2 packets in
ThreadVars::decode_pq. Then it would call the pipeline on the first
packet with the next slot of the pipeline through a indirect call to
TmThreadsSlotVarRun(), so it would be called for the FlowWorker.
However when that first (the most inner) packet was done, the call
to TmThreadsSlotVarRun() would again service the ThreadVars::decode_pq
and process it, again moving the slot pointer forward, so past the
FlowWorker.
This patch addresses the issue by making sure only a "decode" thread
slot will service the ThreadVars::decode_pq, thus never moving the
slot past the FlowWorker.
Bug: #6402.
2 years ago
|
|
|
/** copy of the TmModule::flags */
|
|
|
|
uint8_t tm_flags;
|
|
|
|
|
|
|
|
/* store the thread module id */
|
|
|
|
int tm_id;
|
|
|
|
|
|
|
|
TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
|
|
|
|
void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
|
|
|
|
TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);
|
|
|
|
|
|
|
|
/* data storage */
|
|
|
|
const void *slot_initdata;
|
|
|
|
|
|
|
|
} TmSlot;
|
|
|
|
|
|
|
|
extern ThreadVars *tv_root[TVT_MAX];
|
|
|
|
|
|
|
|
extern SCMutex tv_root_lock;
|
|
|
|
|
|
|
|
void TmSlotSetFuncAppend(ThreadVars *, TmModule *, const void *);
|
|
|
|
|
|
|
|
ThreadVars *TmThreadCreate(const char *, const char *, const char *, const char *, const char *, const char *,
|
|
|
|
void *(fn_p)(void *), int);
|
|
|
|
ThreadVars *TmThreadCreatePacketHandler(const char *, const char *, const char *, const char *, const char *,
|
|
|
|
const char *);
|
|
|
|
ThreadVars *TmThreadCreateMgmtThread(const char *name, void *(fn_p)(void *), int);
|
|
|
|
ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
|
|
|
|
int mucond);
|
|
|
|
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
|
|
|
|
int mucond);
|
|
|
|
TmEcode TmThreadSpawn(ThreadVars *);
|
unix-manager: add unix command socket and associated script
This patch introduces a unix command socket. JSON formatted messages
can be exchanged between suricata and a program connecting to a
dedicated socket.
The protocol is the following:
* Client connects to the socket
* It sends a version message: { "version": "$VERSION_ID" }
* Server answers with { "return": "OK|NOK" }
If server returns OK, the client is now allowed to send command.
The format of command is the following:
{
"command": "pcap-file",
"arguments": { "filename": "smtp-clean.pcap", "output-dir": "/tmp/out" }
}
The server will try to execute the "command" specified with the
(optional) provided "arguments".
The answer by server is the following:
{
"return": "OK|NOK",
"message": JSON_OBJECT or information string
}
A simple script is provided and is available under scripts/suricatasc. It
is not intended to be enterprise-grade tool but it is more a proof of
concept/example code. The first command line argument of suricatasc is
used to specify the socket to connect to.
Configuration of the feature is made in the YAML under the 'unix-command'
section:
unix-command:
enabled: yes
filename: custom.socket
The path specified in 'filename' is not absolute and is relative to the
state directory.
A new running mode called 'unix-socket' is also added.
When starting in this mode, only a unix socket manager
is started. When it receives a 'pcap-file' command, the manager
start a 'pcap-file' running mode which does not really leave at
the end of file but simply exit. The manager is then able to start
a new running mode with a new file.
To start this mode, Suricata must be started with the --unix-socket
option which has an optional argument which fix the file name of the
socket. The path is not absolute and is relative to the state directory.
THe 'pcap-file' command adds a file to the list of files to treat.
For each pcap file, a pcap file running mode is started and the output
directory is changed to what specified in the command. The running
mode specified in the 'runmode' YAML setting is used to select which
running mode must be use for the pcap file treatment.
This requires modification in suricata.c file where initialisation code
is now conditional to the fact 'unix-socket' mode is not used.
Two other commands exists to get info on the remaining tasks:
* pcap-file-number: return the number of files in the waiting queue
* pcap-file-list: return the list of waiting files
'pcap-file-list' returns a structured object as message. The
structure is the following:
{
'count': 2,
'files': ['file1.pcap', 'file2.pcap']
}
14 years ago
|
|
|
void TmThreadKillThreadsFamily(int family);
|
|
|
|
void TmThreadKillThreads(void);
|
unix-manager: add unix command socket and associated script
This patch introduces a unix command socket. JSON formatted messages
can be exchanged between suricata and a program connecting to a
dedicated socket.
The protocol is the following:
* Client connects to the socket
* It sends a version message: { "version": "$VERSION_ID" }
* Server answers with { "return": "OK|NOK" }
If server returns OK, the client is now allowed to send command.
The format of command is the following:
{
"command": "pcap-file",
"arguments": { "filename": "smtp-clean.pcap", "output-dir": "/tmp/out" }
}
The server will try to execute the "command" specified with the
(optional) provided "arguments".
The answer by server is the following:
{
"return": "OK|NOK",
"message": JSON_OBJECT or information string
}
A simple script is provided and is available under scripts/suricatasc. It
is not intended to be enterprise-grade tool but it is more a proof of
concept/example code. The first command line argument of suricatasc is
used to specify the socket to connect to.
Configuration of the feature is made in the YAML under the 'unix-command'
section:
unix-command:
enabled: yes
filename: custom.socket
The path specified in 'filename' is not absolute and is relative to the
state directory.
A new running mode called 'unix-socket' is also added.
When starting in this mode, only a unix socket manager
is started. When it receives a 'pcap-file' command, the manager
start a 'pcap-file' running mode which does not really leave at
the end of file but simply exit. The manager is then able to start
a new running mode with a new file.
To start this mode, Suricata must be started with the --unix-socket
option which has an optional argument which fix the file name of the
socket. The path is not absolute and is relative to the state directory.
THe 'pcap-file' command adds a file to the list of files to treat.
For each pcap file, a pcap file running mode is started and the output
directory is changed to what specified in the command. The running
mode specified in the 'runmode' YAML setting is used to select which
running mode must be use for the pcap file treatment.
This requires modification in suricata.c file where initialisation code
is now conditional to the fact 'unix-socket' mode is not used.
Two other commands exists to get info on the remaining tasks:
* pcap-file-number: return the number of files in the waiting queue
* pcap-file-list: return the list of waiting files
'pcap-file-list' returns a structured object as message. The
structure is the following:
{
'count': 2,
'files': ['file1.pcap', 'file2.pcap']
}
14 years ago
|
|
|
void TmThreadClearThreadsFamily(int family);
|
|
|
|
void TmThreadAppend(ThreadVars *, int);
|
|
|
|
void TmThreadSetGroupName(ThreadVars *tv, const char *name);
|
|
|
|
|
|
|
|
TmEcode TmThreadSetCPUAffinity(ThreadVars *, uint16_t);
|
|
|
|
TmEcode TmThreadSetThreadPriority(ThreadVars *, int);
|
|
|
|
TmEcode TmThreadSetCPU(ThreadVars *, uint8_t);
|
|
|
|
TmEcode TmThreadSetupOptions(ThreadVars *);
|
|
|
|
void TmThreadSetPrio(ThreadVars *);
|
|
|
|
int TmThreadGetNbThreads(uint8_t type);
|
|
|
|
|
|
|
|
void TmThreadInitMC(ThreadVars *);
|
|
|
|
void TmThreadTestThreadUnPaused(ThreadVars *);
|
|
|
|
void TmThreadContinue(ThreadVars *);
|
|
|
|
void TmThreadContinueThreads(void);
|
|
|
|
void TmThreadCheckThreadState(void);
|
|
|
|
TmEcode TmThreadWaitOnThreadInit(void);
|
|
|
|
|
|
|
|
int TmThreadsCheckFlag(ThreadVars *, uint32_t);
|
|
|
|
void TmThreadsSetFlag(ThreadVars *, uint32_t);
|
|
|
|
void TmThreadsUnsetFlag(ThreadVars *, uint32_t);
|
|
|
|
void TmThreadWaitForFlag(ThreadVars *, uint32_t);
|
|
|
|
|
|
|
|
TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
|
|
|
|
|
|
|
|
void TmThreadDisablePacketThreads(void);
|
|
|
|
void TmThreadDisableReceiveThreads(void);
|
|
|
|
|
|
|
|
uint32_t TmThreadCountThreadsByTmmFlags(uint8_t flags);
|
|
|
|
|
|
|
|
TmEcode TmThreadWaitOnThreadRunning(void);
|
|
|
|
|
|
|
|
TmEcode TmThreadsProcessDecodePseudoPackets(
|
|
|
|
ThreadVars *tv, PacketQueueNoLock *decode_pq, TmSlot *slot);
|
|
|
|
|
|
|
|
static inline void TmThreadsCleanDecodePQ(PacketQueueNoLock *pq)
|
|
|
|
{
|
|
|
|
while (1) {
|
|
|
|
Packet *p = PacketDequeueNoLock(pq);
|
|
|
|
if (unlikely(p == NULL))
|
|
|
|
break;
|
|
|
|
TmqhOutputPacketpool(NULL, p);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline void TmThreadsSlotProcessPktFail(ThreadVars *tv, TmSlot *s, Packet *p)
|
|
|
|
{
|
|
|
|
if (p != NULL) {
|
|
|
|
TmqhOutputPacketpool(tv, p);
|
|
|
|
}
|
|
|
|
TmThreadsCleanDecodePQ(&tv->decode_pq);
|
|
|
|
if (tv->stream_pq_local) {
|
|
|
|
SCMutexLock(&tv->stream_pq_local->mutex_q);
|
|
|
|
TmqhReleasePacketsToPacketPool(tv->stream_pq_local);
|
|
|
|
SCMutexUnlock(&tv->stream_pq_local->mutex_q);
|
|
|
|
}
|
|
|
|
TmThreadsSetFlag(tv, THV_FAILED);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* \brief Handle timeout from the capture layer. Checks
|
|
|
|
* stream_pq which may have been filled by the flow
|
|
|
|
* manager.
|
|
|
|
* \param s pipeline to run on these packets.
|
|
|
|
*/
|
|
|
|
static inline bool TmThreadsHandleInjectedPackets(ThreadVars *tv)
|
|
|
|
{
|
|
|
|
PacketQueue *pq = tv->stream_pq_local;
|
|
|
|
if (pq && pq->len > 0) {
|
|
|
|
while (1) {
|
|
|
|
SCMutexLock(&pq->mutex_q);
|
|
|
|
Packet *extra_p = PacketDequeue(pq);
|
|
|
|
SCMutexUnlock(&pq->mutex_q);
|
|
|
|
if (extra_p == NULL)
|
|
|
|
break;
|
|
|
|
#ifdef DEBUG_VALIDATION
|
|
|
|
BUG_ON(extra_p->flow != NULL);
|
|
|
|
#endif
|
|
|
|
TmEcode r = TmThreadsSlotVarRun(tv, extra_p, tv->tm_flowworker);
|
|
|
|
if (r == TM_ECODE_FAILED) {
|
|
|
|
TmThreadsSlotProcessPktFail(tv, tv->tm_flowworker, extra_p);
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
tv->tmqh_out(tv, extra_p);
|
|
|
|
}
|
|
|
|
return true;
|
|
|
|
} else {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* \brief Process the rest of the functions (if any) and queue.
|
|
|
|
*/
|
|
|
|
static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet *p)
|
|
|
|
{
|
|
|
|
if (s == NULL) {
|
|
|
|
tv->tmqh_out(tv, p);
|
|
|
|
return TM_ECODE_OK;
|
|
|
|
}
|
|
|
|
|
|
|
|
TmEcode r = TmThreadsSlotVarRun(tv, p, s);
|
|
|
|
if (unlikely(r == TM_ECODE_FAILED)) {
|
|
|
|
TmThreadsSlotProcessPktFail(tv, s, p);
|
|
|
|
return TM_ECODE_FAILED;
|
|
|
|
}
|
|
|
|
|
|
|
|
tv->tmqh_out(tv, p);
|
|
|
|
|
|
|
|
TmThreadsHandleInjectedPackets(tv);
|
|
|
|
|
|
|
|
return TM_ECODE_OK;
|
|
|
|
}
|
|
|
|
|
|
|
|
/** \brief inject packet if THV_CAPTURE_INJECT_PKT is set
|
|
|
|
* Allow caller to supply their own packet
|
|
|
|
*
|
|
|
|
* Meant for detect reload process that interrupts an sleeping capture thread
|
|
|
|
* to force a packet through the engine to complete a reload */
|
|
|
|
static inline void TmThreadsCaptureInjectPacket(ThreadVars *tv, Packet *p)
|
|
|
|
{
|
|
|
|
TmThreadsUnsetFlag(tv, THV_CAPTURE_INJECT_PKT);
|
|
|
|
if (p == NULL)
|
|
|
|
p = PacketGetFromQueueOrAlloc();
|
|
|
|
if (p != NULL) {
|
|
|
|
p->flags |= PKT_PSEUDO_STREAM_END;
|
|
|
|
PKT_SET_SRC(p, PKT_SRC_CAPTURE_TIMEOUT);
|
|
|
|
if (TmThreadsSlotProcessPkt(tv, tv->tm_flowworker, p) != TM_ECODE_OK) {
|
|
|
|
TmqhOutputPacketpool(tv, p);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/** \brief handle capture timeout
|
|
|
|
* When a capture method times out we check for house keeping
|
|
|
|
* tasks in the capture thread.
|
|
|
|
*
|
|
|
|
* \param p packet. Capture method may have taken a packet from
|
|
|
|
* the pool prior to the timing out call. We will then
|
|
|
|
* use that packet. Otherwise we can get our own.
|
|
|
|
*/
|
|
|
|
static inline void TmThreadsCaptureHandleTimeout(ThreadVars *tv, Packet *p)
|
|
|
|
{
|
|
|
|
if (TmThreadsCheckFlag(tv, THV_CAPTURE_INJECT_PKT)) {
|
|
|
|
TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
|
|
|
|
return;
|
|
|
|
|
|
|
|
} else {
|
|
|
|
if (TmThreadsHandleInjectedPackets(tv) == false) {
|
|
|
|
/* see if we have to do some house keeping */
|
|
|
|
if (tv->flow_queue && SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) {
|
|
|
|
TmThreadsCaptureInjectPacket(tv, p); /* consumes 'p' */
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/* packet could have been passed to us that we won't use
|
|
|
|
* return it to the pool. */
|
|
|
|
if (p != NULL)
|
|
|
|
tv->tmqh_out(tv, p);
|
|
|
|
}
|
|
|
|
|
|
|
|
static inline void TmThreadsCaptureBreakLoop(ThreadVars *tv)
|
|
|
|
{
|
|
|
|
if ((tv->tmm_flags & TM_FLAG_RECEIVE_TM) == 0) {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
/* find the correct slot */
|
|
|
|
TmSlot *s = tv->tm_slots;
|
|
|
|
TmModule *tm = TmModuleGetById(s->tm_id);
|
|
|
|
if (tm->flags & TM_FLAG_RECEIVE_TM) {
|
|
|
|
/* if the method supports it, BreakLoop. Otherwise we rely on
|
|
|
|
* the capture method's recv timeout */
|
|
|
|
if (tm->PktAcqLoop && tm->PktAcqBreakLoop) {
|
|
|
|
tm->PktAcqBreakLoop(tv, SC_ATOMIC_GET(s->slot_data));
|
|
|
|
}
|
|
|
|
TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void TmThreadsListThreads(void);
|
|
|
|
int TmThreadsRegisterThread(ThreadVars *tv, const int type);
|
|
|
|
void TmThreadsUnregisterThread(const int id);
|
flow: redesign of flow timeout handling
Goals:
- reduce locking
- take advantage of 'hot' caches
- better locality
Locking reduction
New flow spare pool. The global pool is implmented as a list of blocks,
where each block has a 100 spare flows. Worker threads fetch a block at
a time, storing the block in the local thread storage.
Flow Recycler now returns flows to the pool is blocks as well.
Flow Recycler fetches all flows to be processed in one step instead of
one at a time.
Cache 'hot'ness
Worker threads now check the timeout of flows they evaluate during lookup.
The worker will have to read the flow into cache anyway, so the added
overhead of checking the timeout value is minimal. When a flow is considered
timed out, one of 2 things happens:
- if the flow is 'owned' by the thread it is handled locally. Handling means
checking if the flow needs 'timeout' work.
- otherwise, the flow is added to a special 'evicted' list in the flow
bucket where it will be picked up by the flow manager.
Flow Manager timing
By default the flow manager now tries to do passes of the flow hash in
smaller steps, where the goal is to do full pass in 8 x the lowest timeout
value it has to enforce. So if the lowest timeout value is 30s, a full pass
will take 4 minutes. The goal here is to reduce locking overhead and not
get in the way of the workers.
In emergency mode each pass is full, and lower timeouts are used.
Timing of the flow manager is also no longer relying on pthread condition
variables, as these generally cause waking up much quicker than the desired
timout. Instead a simple (u)sleep loop is used.
Both changes reduce the number of hash passes a lot.
Emergency behavior
In emergency mode there a number of changes to the workers. In this scenario
the flow memcap is fully used up and it is unavoidable that some flows won't
be tracked.
1. flow spare pool fetches are reduced to once a second. This avoids locking
overhead, while the chance of success was very low.
2. getting an active flow directly from the hash skips flows that had very
recent activity to avoid the scenario where all flows get only into the
NEW state before getting reused. Rather allow some to have a chance of
completing.
3. TCP packets that are not SYN packets will not get a used flow, unless
stream.midstream is enabled. The goal here is again to avoid evicting
active flows unnecessarily.
Better Localily
Flow Manager injects flows into the worker threads now, instead of one or
two packets. Advantage of this is that the worker threads can get packets
from their local packet pools, avoiding constant overhead of packets returning
to 'foreign' pools.
Counters
A lot of flow counters have been added and some have been renamed.
Overall the worker threads increment 'flow.wrk.*' counters, while the flow
manager increments 'flow.mgr.*'.
Additionally, none of the counters are snapshots anymore, they all increment
over time. The flow.memuse and flow.spare counters are exceptions.
Misc
FlowQueue has been split into a FlowQueuePrivate (unlocked) and FlowQueue.
Flow no longer has 'prev' pointers and used a unified 'next' pointer for
both hash and queue use.
6 years ago
|
|
|
void TmThreadsInjectFlowById(Flow *f, const int id);
|
|
|
|
|
|
|
|
void TmThreadsInitThreadsTimestamp(const SCTime_t ts);
|
|
|
|
void TmThreadsSetThreadTimestamp(const int id, const SCTime_t ts);
|
|
|
|
void TmThreadsGetMinimalTimestamp(struct timeval *ts);
|
|
|
|
uint16_t TmThreadsGetWorkerThreadMax(void);
|
|
|
|
bool TmThreadsTimeSubsysIsReady(void);
|
|
|
|
|
|
|
|
#endif /* __TM_THREADS_H__ */
|