Add option to PF_RING to have multiple reader threads. Improve general performance of the PF_RING module.

remotes/origin/master-1.1.x
Victor Julien 14 years ago
parent edeec290f6
commit 91f28afef4

@ -48,6 +48,8 @@
#include "cuda-packet-batcher.h"
#include "source-pfring.h"
/**
* A list of output modules that will be active for the run mode.
*/
@ -3537,6 +3539,145 @@ int RunModeIdsPfringAuto(DetectEngineCtx *de_ctx, char *iface) {
return 0;
}
int RunModeIdsPfringAutoFp(DetectEngineCtx *de_ctx, char *iface) {
SCEnter();
char tname[12];
char qname[12];
uint16_t cpu = 0;
char queues[2048] = "";
RunModeInitialize();
TimeModeSetLive();
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
cpu = 1;
/* always create at least one thread */
int thread_max = TmThreadGetNbThreads(DETECT_CPU_SET);
if (thread_max == 0)
thread_max = ncpus * threading_detect_ratio;
if (thread_max < 1)
thread_max = 1;
int thread;
for (thread = 0; thread < thread_max; thread++) {
if (strlen(queues) > 0)
strlcat(queues, ",", sizeof(queues));
snprintf(qname, sizeof(qname),"pickup%"PRIu16, thread+1);
strlcat(queues, qname, sizeof(queues));
}
SCLogDebug("queues %s", queues);
int pfring_threads = PfringConfGetThreads();
/* create the threads */
for (thread = 0; thread < pfring_threads; thread++) {
snprintf(tname, sizeof(tname),"RecvPfring%"PRIu16, thread+1);
char *thread_name = SCStrdup(tname);
ThreadVars *tv_receive = TmThreadCreatePacketHandler(thread_name,"packetpool","packetpool",queues,"flow","varslot");
if (tv_receive == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceivePfring");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceivePfring\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv_receive,tm_module,iface);
tm_module = TmModuleGetByName("DecodePfring");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodePfring failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv_receive,tm_module,NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_receive, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_receive, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_receive) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
}
for (thread = 0; thread < thread_max; thread++) {
snprintf(tname, sizeof(tname),"Detect%"PRIu16, thread+1);
snprintf(qname, sizeof(qname),"pickup%"PRIu16, thread+1);
SCLogDebug("tname %s, qname %s", tname, qname);
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name, qname, "flow","packetpool","packetpool","varslot");
//ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name, qname, "flow","alert-queue1","simple","varslot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv_detect_ncpu,tm_module,NULL);
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv_detect_ncpu,tm_module,(void *)de_ctx);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
/* If we have more than one core/cpu, the first Detect thread
* (at cpu 0) will have less priority (higher 'nice' value)
* In this case we will set the thread priority to +10 (default is 0)
*/
if (cpu == 0 && ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
} else if (ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
}
}
char *thread_group_name = SCStrdup("Detect");
if (thread_group_name == NULL) {
printf("Error allocating memory\n");
exit(EXIT_FAILURE);
}
tv_detect_ncpu->thread_group_name = thread_group_name;
/* add outputs as well */
SetupOutputs(tv_detect_ncpu);
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
if ((cpu + 1) == ncpus)
cpu = 0;
else
cpu++;
}
return 0;
}
int RunModeErfFileAuto(DetectEngineCtx *de_ctx, char *file)
{
SCEnter();

@ -54,6 +54,7 @@ void RunModeShutDown(void);
int RunModeFilePcapAutoFp(DetectEngineCtx *de_ctx, char *file);
int RunModeIdsPfringAutoFp(DetectEngineCtx *de_ctx, char *iface);
int threading_set_cpu_affinity;
#endif /* __RUNMODES_H__ */

@ -97,6 +97,15 @@ TmEcode NoPfringSupportExit(ThreadVars *tv, void *initdata, void **data)
#else /* implied we do have PF_RING support */
/* XXX replace with user configurable options */
#define LIBPFRING_SNAPLEN 1518
#define LIBPFRING_PROMISC 1
#define LIBPFRING_REENTRANT 0
#define LIBPFRING_WAIT_FOR_INCOMING 1
int g_pfring_threads;
/**
* \brief Structure to hold thread specific variables.
*/
@ -105,17 +114,15 @@ typedef struct PfringThreadVars_
/* thread specific handle */
pfring *pd;
uint8_t cluster_id;
char *interface;
/* counters */
uint32_t pkts;
uint64_t bytes;
uint32_t pkts;
#ifdef HAVE_PFRING_CLUSTER_TYPE
cluster_type ctype;
#endif /* HAVE_PFRING_CLUSTER_TYPE */
ThreadVars *tv;
uint8_t cluster_id;
char *interface;
} PfringThreadVars;
/**
@ -144,6 +151,24 @@ void TmModuleDecodePfringRegister (void) {
tmm_modules[TMM_DECODEPFRING].RegisterTests = NULL;
}
int PfringConfGetThreads(void) {
return g_pfring_threads;
}
void PfringLoadConfig(void) {
char *threadsstr = NULL;
if (ConfGet("pfring.threads", &threadsstr) != 1) {
g_pfring_threads = 1;
} else {
if (threadsstr != NULL) {
g_pfring_threads = (uint8_t)atoi(threadsstr);
SCLogInfo("Going to use %" PRId32 " PF_RING receive threads",
g_pfring_threads);
}
}
}
/**
* \brief Pfring Packet Process function.
*
@ -152,26 +177,23 @@ void TmModuleDecodePfringRegister (void) {
*
* \param user pointer to PfringThreadVars
* \param h pointer to pfring packet header
* \param pkt pointer to raw packet data
* \param p pointer to the current packet
*/
void PfringProcessPacket(void *user, struct pfring_pkthdr *h, u_char *pkt, Packet *p) {
static inline void PfringProcessPacket(void *user, struct pfring_pkthdr *h, Packet *p) {
PfringThreadVars *ptv = (PfringThreadVars *)user;
//printf("PfringProcessPacket: user %p, h %p, pkt %p, p %p\n", user, h, pkt, p);
//TmqDebugList();
//printf("PfringProcessPacket: pending %" PRIu32 "\n", pending);
ptv->bytes += h->caplen;
ptv->pkts++;
p->ts.tv_sec = h->ts.tv_sec;
p->ts.tv_usec = h->ts.tv_usec;
ptv->pkts++;
ptv->bytes += h->caplen;
/* PF_RING all packets are marked as a link type of ethernet so that is what we do here. */
/* PF_RING all packets are marked as a link type of ethernet
* so that is what we do here. */
p->datalink = LINKTYPE_ETHERNET;
PacketCopyData(p, pkt, h->caplen);
p->pktlen = h->caplen;
}
/**
@ -190,7 +212,6 @@ TmEcode ReceivePfring(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pa
PfringThreadVars *ptv = (PfringThreadVars *)data;
struct pfring_pkthdr hdr;
u_char buffer[MAX_CAPLEN];
int r;
if (suricata_ctl_flags & SURICATA_STOP ||
@ -199,12 +220,12 @@ TmEcode ReceivePfring(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pa
}
/* Depending on what compile time options are used for pfring we either return 0 or -1 on error and always 1 for success */
r = pfring_recv(ptv->pd, (char *)buffer , sizeof(buffer), &hdr, LIBPFRING_WAIT_FOR_INCOMING);
if(r == 1){
r = pfring_recv(ptv->pd, (char *)p->pkt , (u_int)default_packet_size, &hdr, LIBPFRING_WAIT_FOR_INCOMING);
if (r == 1) {
//printf("RecievePfring src %" PRIu32 " sport %" PRIu32 " dst %" PRIu32 " dstport %" PRIu32 "\n",
// hdr.parsed_pkt.ipv4_src,hdr.parsed_pkt.l4_src_port, hdr.parsed_pkt.ipv4_dst,hdr.parsed_pkt.l4_dst_port);
PfringProcessPacket(ptv, &hdr, buffer,p);
}else{
PfringProcessPacket(ptv, &hdr, p);
} else {
SCLogError(SC_ERR_PF_RING_RECV,"pfring_recv error %" PRId32 "", r);
return TM_ECODE_FAILED;
}
@ -237,42 +258,36 @@ TmEcode ReceivePfringThreadInit(ThreadVars *tv, void *initdata, void **data) {
return TM_ECODE_FAILED;
memset(ptv, 0, sizeof(PfringThreadVars));
ptv->tv = tv;
if (ConfGet("pfring.cluster-id", &tmpclusterid) != 1){
if (ConfGet("pfring.cluster-id", &tmpclusterid) != 1) {
SCLogError(SC_ERR_PF_RING_GET_CLUSTERID_FAILED,"could not get pfring.cluster-id");
return TM_ECODE_FAILED;
}else{
} else {
ptv->cluster_id = (uint8_t)atoi(tmpclusterid);
SCLogInfo("Going to use cluster-id %" PRId32 "", ptv->cluster_id);
SCLogDebug("Going to use cluster-id %" PRId32, ptv->cluster_id);
}
if (ConfGet("pfring.interface", &ptv->interface) != 1){
SCLogError(SC_ERR_PF_RING_GET_INTERFACE_FAILED,"Could not get pfring.interface");
return TM_ECODE_FAILED;
}else{
SCLogInfo("going to use interface %s",ptv->interface);
if (ConfGet("pfring.interface", &ptv->interface) != 1) {
SCLogError(SC_ERR_PF_RING_GET_INTERFACE_FAILED,"Could not get pfring.interface");
return TM_ECODE_FAILED;
} else {
SCLogDebug("going to use interface %s",ptv->interface);
}
ptv->pd = pfring_open(ptv->interface, LIBPFRING_PROMISC, LIBPFRING_SNAPLEN, LIBPFRING_REENTRANT);
if(ptv->pd == NULL) {
ptv->pd = pfring_open(ptv->interface, LIBPFRING_PROMISC, (uint32_t)default_packet_size, LIBPFRING_REENTRANT);
if (ptv->pd == NULL) {
SCLogError(SC_ERR_PF_RING_OPEN,"pfring_open error");
return TM_ECODE_FAILED;
} else {
pfring_set_application_name(ptv->pd, PROG_NAME);
pfring_version(ptv->pd, &version);
SCLogInfo("Using PF_RING v.%d.%d.%d",
(version & 0xFFFF0000) >> 16,
(version & 0x0000FF00) >> 8,
version & 0x000000FF);
}
#ifdef HAVE_PFRING_CLUSTER_TYPE
if (ConfGet("pfring.cluster-type", &tmpctype) != 1) {
SCLogError(SC_ERR_GET_CLUSTER_TYPE_FAILED,"Could not get pfring.cluster-type");
return TM_ECODE_FAILED;
} else if (strncmp(tmpctype, "cluster_round_robin", 19) == 0 || strncmp(tmpctype, "cluster_flow", 12) == 0) {
SCLogInfo("pfring cluster type %s",tmpctype);
} else if (strcmp(tmpctype, "cluster_round_robin") == 0 || strcmp(tmpctype, "cluster_flow") == 0) {
ptv->ctype = (cluster_type)tmpctype;
rc = pfring_set_cluster(ptv->pd, ptv->cluster_id, ptv->ctype);
} else {
@ -283,12 +298,16 @@ TmEcode ReceivePfringThreadInit(ThreadVars *tv, void *initdata, void **data) {
rc = pfring_set_cluster(ptv->pd, ptv->cluster_id);
#endif /* HAVE_PFRING_CLUSTER_TYPE */
if(rc != 0){
SCLogError(SC_ERR_PF_RING_SET_CLUSTER_FAILED,"pfring_set_cluster returned %d for cluster-id: %d", rc, ptv->cluster_id);
if (rc != 0) {
SCLogError(SC_ERR_PF_RING_SET_CLUSTER_FAILED, "pfring_set_cluster "
"returned %d for cluster-id: %d", rc, ptv->cluster_id);
return TM_ECODE_FAILED;
}else{
SCLogInfo("pfring_set_cluster-id %d set successfully",ptv->cluster_id);
}
SCLogInfo("(%s) Using PF_RING v.%d.%d.%d, interface %s, cluster-id %d",
tv->name, (version & 0xFFFF0000) >> 16, (version & 0x0000FF00) >> 8,
version & 0x000000FF, ptv->interface, ptv->cluster_id);
*data = (void *)ptv;
return TM_ECODE_OK;
}
@ -305,16 +324,12 @@ void ReceivePfringThreadExitStats(ThreadVars *tv, void *data) {
if(pfring_stats(ptv->pd, &pfring_s) < 0) {
SCLogError(SC_ERR_STAT,"(%s) Failed to get pfring stats", tv->name);
SCLogInfo("(%s) Packets %" PRIu32 ", bytes %" PRIu64 "", tv->name, ptv->pkts, ptv->bytes);
return;
} else {
SCLogInfo("(%s) Packets %" PRIu32 ", bytes %" PRIu64 "", tv->name, ptv->pkts, ptv->bytes);
SCLogInfo("(%s) Pfring Total:%" PRIu64 " Recv:%" PRIu64 " Drop:%" PRIu64 " (%02.1f%%).", tv->name,
(uint64_t)pfring_s.recv + (uint64_t)pfring_s.drop, (uint64_t)pfring_s.recv,
(uint64_t)pfring_s.drop, ((float)pfring_s.drop/(float)(pfring_s.drop + pfring_s.recv))*100);
return;
}
}
@ -341,7 +356,11 @@ TmEcode ReceivePfringThreadDeinit(ThreadVars *tv, void *data) {
* \param p pointer to the current packet
* \param data pointer that gets cast into PfringThreadVars for ptv
* \param pq pointer to the current PacketQueue
*
* \todo Verify that PF_RING only deals with ethernet traffic
*
* \warning This function bypasses the pkt buf and len macro's
*
* \retval TM_ECODE_OK is always returned
*/
TmEcode DecodePfring(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
@ -352,17 +371,19 @@ TmEcode DecodePfring(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pac
SCPerfCounterIncr(dtv->counter_pkts, tv->sc_perf_pca);
SCPerfCounterIncr(dtv->counter_pkts_per_sec, tv->sc_perf_pca);
SCPerfCounterAddUI64(dtv->counter_bytes, tv->sc_perf_pca, GET_PKT_LEN(p));
SCPerfCounterAddUI64(dtv->counter_bytes, tv->sc_perf_pca, p->pktlen);
#if 0
SCPerfCounterAddDouble(dtv->counter_bytes_per_sec, tv->sc_perf_pca, GET_PKT_LEN(p));
SCPerfCounterAddDouble(dtv->counter_mbit_per_sec, tv->sc_perf_pca,
(GET_PKT_LEN(p) * 8)/1000000.0 );
#endif
SCPerfCounterAddUI64(dtv->counter_avg_pkt_size, tv->sc_perf_pca, GET_PKT_LEN(p));
SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, GET_PKT_LEN(p));
SCPerfCounterAddUI64(dtv->counter_avg_pkt_size, tv->sc_perf_pca, p->pktlen);
SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, p->pktlen);
DecodeEthernet(tv, dtv, p,GET_PKT_DATA(p), GET_PKT_LEN(p), pq);
/* Bypassing the pkt buffer and size macro's as we know the size of
* our packet never to be bigger than default_packet_size */
DecodeEthernet(tv, dtv, p, p->pkt, p->pktlen, pq);
return TM_ECODE_OK;
}

@ -27,9 +27,7 @@
void TmModuleReceivePfringRegister (void);
void TmModuleDecodePfringRegister (void);
/* XXX replace with user configurable options */
#define LIBPFRING_SNAPLEN 1518
#define LIBPFRING_PROMISC 1
#define LIBPFRING_REENTRANT 0
#define LIBPFRING_WAIT_FOR_INCOMING 1
int PfringConfGetThreads(void);
void PfringLoadConfig(void);
#endif /* __SOURCE_PFRING_H__ */

@ -1186,11 +1186,16 @@ int main(int argc, char **argv)
//RunModeFilePcapAuto2(de_ctx, pcap_file);
}
else if (run_mode == MODE_PFRING) {
PfringLoadConfig();
//RunModeIdsPfring3(de_ctx, pfring_dev);
//RunModeIdsPfring2(de_ctx, pfring_dev);
//RunModeIdsPfring(de_ctx, pfring_dev);
//RunModeIdsPfring4(de_ctx, pfring_dev);
RunModeIdsPfringAuto(de_ctx, pfring_dev);
if (PfringConfGetThreads() == 1) {
RunModeIdsPfringAuto(de_ctx, pfring_dev);
} else {
RunModeIdsPfringAutoFp(de_ctx, pfring_dev);
}
}
else if (run_mode == MODE_NFQ) {
//RunModeIpsNFQ(de_ctx, nfq_id);

@ -448,6 +448,9 @@ logging:
# PF_RING configuration. for use with native PF_RING support
# for more info see http://www.ntop.org/PF_RING.html
pfring:
# Number of receive threads (>1 will enable experimental flow pinned
# runmode)
threads: 1
# Default interface we will listen on.
interface: eth0

Loading…
Cancel
Save