flow worker: profiling

Previously the detect and stream code lived in their own thread
modules. This meant profiling showed their cost as part of the
thread module profiling logic. Now that only the flow worker is
a thread module this no longer works.

This patch introduces profiling for the 3 current flow worker
steps: flow, stream, detect.
pull/2118/head
Victor Julien 10 years ago
parent 48771c1acf
commit e09643c396

@ -30,6 +30,7 @@
#include "suricata-common.h"
#include "threadvars.h"
#include "decode-events.h"
#include "flow-worker.h"
#ifdef __SC_CUDA_SUPPORT__
#include "util-cuda-buffer.h"
@ -325,6 +326,11 @@ typedef struct PktProfilingTmmData_ {
#endif
} PktProfilingTmmData;
typedef struct PktProfilingData_ {
uint64_t ticks_start;
uint64_t ticks_end;
} PktProfilingData;
typedef struct PktProfilingDetectData_ {
uint64_t ticks_start;
uint64_t ticks_end;
@ -341,6 +347,7 @@ typedef struct PktProfiling_ {
uint64_t ticks_end;
PktProfilingTmmData tmm[TMM_SIZE];
PktProfilingData flowworker[PROFILE_FLOWWORKER_SIZE];
PktProfilingAppData app[ALPROTO_MAX];
PktProfilingDetectData detect[PROF_DETECT_SIZE];
uint64_t proto_detect;

@ -158,6 +158,8 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac
/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
FlowHandlePacket(tv, fw->dtv, p);
if (likely(p->flow != NULL)) {
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
@ -165,6 +167,8 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac
}
/* Flow is now LOCKED */
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
/* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
* pseudo packet created by the flow manager. */
} else if (p->flags & PKT_HAS_FLOW) {
@ -178,7 +182,9 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac
SCLogDebug("packet %"PRIu64" is TCP", p->pcap_cnt);
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
StreamTcp(tv, p, fw->stream_thread, &fw->pq, NULL);
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
/* Packets here can safely access p->flow as it's locked */
SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
@ -188,8 +194,11 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac
// TODO do we need to call StreamTcp on these pseudo packets or not?
//StreamTcp(tv, x, fw->stream_thread, &fw->pq, NULL);
if (detect_thread != NULL)
if (detect_thread != NULL) {
FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
Detect(tv, x, detect_thread, NULL, NULL);
FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
}
#if 0
// Outputs
#endif
@ -204,7 +213,9 @@ TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *preq, Pac
SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
if (detect_thread != NULL) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
Detect(tv, p, detect_thread, NULL, NULL);
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
}
#if 0
// Outputs
@ -234,6 +245,22 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
return SC_ATOMIC_GET(fw->detect_thread);
}
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
{
switch (fwi) {
case PROFILE_FLOWWORKER_FLOW:
return "flow";
case PROFILE_FLOWWORKER_STREAM:
return "stream";
case PROFILE_FLOWWORKER_DETECT:
return "detect";
case PROFILE_FLOWWORKER_SIZE:
return "size";
}
return "error";
}
void TmModuleFlowWorkerRegister (void)
{
tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";

@ -18,6 +18,14 @@
#ifndef __FLOW_WORKER_H__
#define __FLOW_WORKER_H__
enum ProfileFlowWorkerId {
PROFILE_FLOWWORKER_FLOW = 0,
PROFILE_FLOWWORKER_STREAM,
PROFILE_FLOWWORKER_DETECT,
PROFILE_FLOWWORKER_SIZE
};
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi);
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
void *FlowWorkerGetDetectCtxPtr(void *flow_worker);

@ -30,6 +30,7 @@
#include "decode.h"
#include "detect.h"
#include "conf.h"
#include "flow-worker.h"
#include "tm-threads.h"
@ -86,6 +87,13 @@ SCProfilePacketData packet_profile_app_pd_data6[257];
SCProfilePacketData packet_profile_detect_data4[PROF_DETECT_SIZE][257];
SCProfilePacketData packet_profile_detect_data6[PROF_DETECT_SIZE][257];
struct ProfileProtoRecords {
SCProfilePacketData records4[257];
SCProfilePacketData records6[257];
};
struct ProfileProtoRecords packet_profile_flowworker_data[PROFILE_FLOWWORKER_SIZE];
int profiling_packets_enabled = 0;
int profiling_packets_csv_enabled = 0;
@ -159,6 +167,7 @@ SCProfilingInit(void)
memset(&packet_profile_app_pd_data6, 0, sizeof(packet_profile_app_pd_data6));
memset(&packet_profile_detect_data4, 0, sizeof(packet_profile_detect_data4));
memset(&packet_profile_detect_data6, 0, sizeof(packet_profile_detect_data6));
memset(&packet_profile_flowworker_data, 0, sizeof(packet_profile_flowworker_data));
const char *filename = ConfNodeLookupChildValue(conf, "filename");
if (filename != NULL) {
@ -302,6 +311,55 @@ SCProfilingDump(void)
SCLogInfo("Done dumping profiling data.");
}
static void DumpFlowWorkerIP(FILE *fp, int ipv, uint64_t total)
{
char totalstr[256];
enum ProfileFlowWorkerId fwi;
for (fwi = 0; fwi < PROFILE_FLOWWORKER_SIZE; fwi++) {
struct ProfileProtoRecords *r = &packet_profile_flowworker_data[fwi];
for (int p = 0; p < 257; p++) {
SCProfilePacketData *pd = ipv == 4 ? &r->records4[p] : &r->records6[p];
if (pd->cnt == 0) {
continue;
}
FormatNumber(pd->tot, totalstr, sizeof(totalstr));
double percent = (long double)pd->tot /
(long double)total * 100;
fprintf(fp, "%-20s IPv%d %3d %12"PRIu64" %12"PRIu64" %12"PRIu64" %12"PRIu64" %12s %-6.2f\n",
ProfileFlowWorkerIdToString(fwi), ipv, p, pd->cnt,
pd->min, pd->max, (uint64_t)(pd->tot / pd->cnt), totalstr, percent);
}
}
}
static void DumpFlowWorker(FILE *fp)
{
uint64_t total = 0;
enum ProfileFlowWorkerId fwi;
for (fwi = 0; fwi < PROFILE_FLOWWORKER_SIZE; fwi++) {
struct ProfileProtoRecords *r = &packet_profile_flowworker_data[fwi];
for (int p = 0; p < 257; p++) {
SCProfilePacketData *pd = &r->records4[p];
total += pd->tot;
pd = &r->records6[p];
total += pd->tot;
}
}
fprintf(fp, "\n%-20s %-6s %-5s %-12s %-12s %-12s %-12s\n",
"Flow Worker", "IP ver", "Proto", "cnt", "min", "max", "avg");
fprintf(fp, "%-20s %-6s %-5s %-12s %-12s %-12s %-12s\n",
"--------------------", "------", "-----", "----------", "------------", "------------", "-----------");
DumpFlowWorkerIP(fp, 4, total);
DumpFlowWorkerIP(fp, 6, total);
fprintf(fp, "Note: %s includes app-layer for TCP\n",
ProfileFlowWorkerIdToString(PROFILE_FLOWWORKER_STREAM));
}
void SCProfilingDumpPacketStats(void)
{
int i;
@ -451,6 +509,8 @@ void SCProfilingDumpPacketStats(void)
}
}
DumpFlowWorker(fp);
fprintf(fp, "\nPer App layer parser stats:\n");
fprintf(fp, "\n%-20s %-6s %-5s %-12s %-12s %-12s %-12s\n",
@ -881,6 +941,49 @@ void SCProfilingUpdatePacketTmmRecords(Packet *p)
}
}
static inline void SCProfilingUpdatePacketGenericRecord(PktProfilingData *pdt,
SCProfilePacketData *pd)
{
if (pdt == NULL || pd == NULL) {
return;
}
uint64_t delta = pdt->ticks_end - pdt->ticks_start;
if (pd->min == 0 || delta < pd->min) {
pd->min = delta;
}
if (pd->max < delta) {
pd->max = delta;
}
pd->tot += delta;
pd->cnt ++;
}
static void SCProfilingUpdatePacketGenericRecords(Packet *p, PktProfilingData *pd,
struct ProfileProtoRecords *store, int size)
{
int i;
for (i = 0; i < size; i++) {
PktProfilingData *pdt = &pd[i];
if (pdt->ticks_start == 0 || pdt->ticks_end == 0 || pdt->ticks_start > pdt->ticks_end) {
continue;
}
struct ProfileProtoRecords *r = &store[i];
SCProfilePacketData *store = NULL;
if (PKT_IS_IPV4(p)) {
store = &(r->records4[p->proto]);
} else {
store = &(r->records6[p->proto]);
}
SCProfilingUpdatePacketGenericRecord(pdt, store);
}
}
void SCProfilingAddPacket(Packet *p)
{
if (p == NULL || p->profile == NULL ||
@ -922,6 +1025,9 @@ void SCProfilingAddPacket(Packet *p)
pd->cnt ++;
}
SCProfilingUpdatePacketGenericRecords(p, p->profile->flowworker,
packet_profile_flowworker_data, PROFILE_FLOWWORKER_SIZE);
SCProfilingUpdatePacketTmmRecords(p);
SCProfilingUpdatePacketAppRecords(p);
SCProfilingUpdatePacketDetectRecords(p);
@ -954,6 +1060,9 @@ void SCProfilingAddPacket(Packet *p)
pd->cnt ++;
}
SCProfilingUpdatePacketGenericRecords(p, p->profile->flowworker,
packet_profile_flowworker_data, PROFILE_FLOWWORKER_SIZE);
SCProfilingUpdatePacketTmmRecords(p);
SCProfilingUpdatePacketAppRecords(p);
SCProfilingUpdatePacketDetectRecords(p);

@ -157,6 +157,20 @@ PktProfiling *SCProfilePacketStart(void);
} \
}
#define FLOWWORKER_PROFILING_START(p, id) \
if (profiling_packets_enabled && (p)->profile != NULL) { \
if ((id) < PROFILE_FLOWWORKER_SIZE) { \
(p)->profile->flowworker[(id)].ticks_start = UtilCpuGetTicks();\
} \
}
#define FLOWWORKER_PROFILING_END(p, id) \
if (profiling_packets_enabled && (p)->profile != NULL) { \
if ((id) < PROFILE_FLOWWORKER_SIZE) { \
(p)->profile->flowworker[(id)].ticks_end = UtilCpuGetTicks(); \
} \
}
#define PACKET_PROFILING_RESET(p) \
if (profiling_packets_enabled && (p)->profile != NULL) { \
SCFree((p)->profile); \
@ -292,6 +306,9 @@ void SCProfilingDump(void);
#define SGH_PROFILING_RECORD(det_ctx, sgh)
#define FLOWWORKER_PROFILING_START(p, id)
#define FLOWWORKER_PROFILING_END(p, id)
#endif /* PROFILING */
#endif /* ! __UTIL_PROFILE_H__ */

Loading…
Cancel
Save