Lockfree ringbuffer wip.

remotes/origin/master-1.0.x
Victor Julien 15 years ago
parent 7f29166aa8
commit a48a767efc

@ -161,6 +161,7 @@ util-cuda.c util-cuda.h \
util-cuda-handlers.c util-cuda-handlers.h \
util-privs.c util-privs.h \
util-decode-asn1.c util-decode-asn1.h \
util-ringbuffer.c util-ringbuffer.h \
tm-modules.c tm-modules.h \
tm-queues.c tm-queues.h \
tm-queuehandlers.c tm-queuehandlers.h \
@ -169,6 +170,7 @@ tmqh-simple.c tmqh-simple.h \
tmqh-nfq.c tmqh-nfq.h \
tmqh-packetpool.c tmqh-packetpool.h \
tmqh-flow.c tmqh-flow.h \
tmqh-ringbuffer.c tmqh-ringbuffer.h \
alert-fastlog.c alert-fastlog.h \
alert-debuglog.c alert-debuglog.h \
alert-prelude.c alert-prelude.h \

@ -2068,7 +2068,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
TimeModeSetOffline();
/* create the threads */
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot");
ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","ringbuffer","1slot");
if (tv_receivepcap == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
@ -2089,7 +2089,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","simple","stream-queue1","simple","varslot");
ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer","stream-queue1","ringbuffer","varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
@ -2146,7 +2146,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) {
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","simple","alert-queue1","simple","1slot");
ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","simple","1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);

@ -34,6 +34,7 @@
#include "tmqh-nfq.h"
#include "tmqh-packetpool.h"
#include "tmqh-flow.h"
#include "tmqh-ringbuffer.h"
void TmqhSetup (void) {
memset(&tmqh_table, 0, sizeof(tmqh_table));
@ -42,6 +43,7 @@ void TmqhSetup (void) {
TmqhNfqRegister();
TmqhPacketpoolRegister();
TmqhFlowRegister();
TmqhRingBufferRegister();
}
Tmqh* TmqhGetQueueHandlerByName(char *name) {

@ -29,6 +29,7 @@ enum {
TMQH_NFQ,
TMQH_PACKETPOOL,
TMQH_FLOW,
TMQH_RINGBUFFER,
TMQH_SIZE,
};
@ -48,3 +49,4 @@ void TmqhSetup (void);
Tmqh* TmqhGetQueueHandlerByName(char *name);
#endif /* __TM_QUEUEHANDLERS_H__ */

@ -782,33 +782,41 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
/* set the incoming queue */
if (inq_name != NULL && strcmp(inq_name,"packetpool") != 0) {
SCLogDebug("inq_name \"%s\"", inq_name);
tmq = TmqGetQueueByName(inq_name);
if (tmq == NULL) {
tmq = TmqCreateQueue(inq_name);
if (tmq == NULL) goto error;
}
SCLogDebug("tmq %p", tmq);
tv->inq = tmq;
tv->inq->reader_cnt++;
//printf("TmThreadCreate: tv->inq->id %" PRIu32 "\n", tv->inq->id);
SCLogDebug("tv->inq %p", tv->inq);
}
if (inqh_name != NULL) {
SCLogDebug("inqh_name \"%s\"", inqh_name);
tmqh = TmqhGetQueueHandlerByName(inqh_name);
if (tmqh == NULL) goto error;
tv->tmqh_in = tmqh->InHandler;
//printf("TmThreadCreate: tv->tmqh_in %p\n", tv->tmqh_in);
SCLogDebug("tv->tmqh_in %p", tv->tmqh_in);
}
/* set the outgoing queue */
if (outqh_name != NULL) {
SCLogDebug("outqh_name \"%s\"", outqh_name);
tmqh = TmqhGetQueueHandlerByName(outqh_name);
if (tmqh == NULL) goto error;
tv->tmqh_out = tmqh->OutHandler;
//printf("TmThreadCreate: tv->tmqh_out %p\n", tv->tmqh_out);
if (outq_name != NULL && strcmp(outq_name,"packetpool") != 0) {
SCLogDebug("outq_name \"%s\"", outq_name);
if (tmqh->OutHandlerCtxSetup != NULL) {
tv->outctx = tmqh->OutHandlerCtxSetup(outq_name);
tv->outq = NULL;
@ -818,12 +826,12 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name,
tmq = TmqCreateQueue(outq_name);
if (tmq == NULL) goto error;
}
SCLogDebug("tmq %p", tmq);
tv->outq = tmq;
tv->outctx = NULL;
tv->outq->writer_cnt++;
}
//printf("TmThreadCreate: tv->outq->id %" PRIu32 "\n", tv->outq->id);
}
}

@ -0,0 +1,65 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*
* RingBuffer queue handler
*/
#include "suricata.h"
#include "packet-queue.h"
#include "decode.h"
#include "threads.h"
#include "threadvars.h"
#include "tm-queuehandlers.h"
#include "util-ringbuffer.h"
static RingBufferMrSw *ringbuffers[256];
Packet *TmqhInputRingBuffer(ThreadVars *t);
void TmqhOutputRingBuffer(ThreadVars *t, Packet *p);
void TmqhRingBufferRegister (void) {
tmqh_table[TMQH_RINGBUFFER].name = "ringbuffer";
tmqh_table[TMQH_RINGBUFFER].InHandler = TmqhInputRingBuffer;
tmqh_table[TMQH_RINGBUFFER].OutHandler = TmqhOutputRingBuffer;
int i = 0;
for (i = 0; i < 256; i++) {
ringbuffers[i] = RingBufferMrSwInit();
}
}
Packet *TmqhInputRingBuffer(ThreadVars *t)
{
RingBufferMrSw *rb = ringbuffers[t->inq->id];
Packet *p = (Packet *)RingBufferMrSwGet(rb);
return p;
}
void TmqhOutputRingBuffer(ThreadVars *t, Packet *p)
{
RingBufferMrSw *rb = ringbuffers[t->outq->id];
RingBufferMrSwPut(rb, (void *)p);
}

@ -0,0 +1,29 @@
/* Copyright (C) 2007-2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*/
#ifndef __TMQH_RINGBUFFER_H__
#define __TMQH_RINGBUFFER_H__
void TmqhRingBufferRegister (void);
#endif /* __TMQH_RINGBUFFER_H__ */

@ -0,0 +1,107 @@
#include "suricata-common.h"
#include "util-ringbuffer.h"
/* Multi Reader, Single Writer */
RingBufferMrSw *RingBufferMrSwInit(void) {
RingBufferMrSw *rb = SCMalloc(sizeof(RingBufferMrSw));
if (rb == NULL) {
return NULL;
}
memset(rb, 0x00, sizeof(RingBufferMrSw));
return rb;
}
void RingBufferMrSwDestroy(RingBufferMrSw *rb) {
if (rb == NULL) {
SCFree(rb);
}
}
void *RingBufferMrSwGet(RingBufferMrSw *rb) {
void *ptr;
/* counter for data races. If __sync_bool_compare_and_swap (CAS) fails,
* we increase cnt, get a new ptr and try to do CAS again. We init it to
* -1 so it's 0 when first used the do { } while() loop. */
unsigned short readp = -1;
/* buffer is empty, wait... */
retry:
while (rb->read == rb->write) {
usleep(1);
}
/* atomically update rb->read */
readp += rb->read;
do {
/* with multiple readers we can get in the situation that we exitted
* from the wait loop but the rb is empty again once we get here. */
if (rb->read == rb->write)
goto retry;
readp++;
ptr = rb->array[readp];
} while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1))));
SCLogDebug("ptr %p", ptr);
return ptr;
}
/**
* \brief put a ptr in the RingBuffer
*/
void RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) {
SCLogDebug("ptr %p", ptr);
/* buffer is full, wait... */
while ((rb->write + 1) == rb->read) {
usleep(1);
}
rb->array[rb->write] = ptr;
__sync_fetch_and_add(&rb->write, 1);
}
/* Single Reader, Single Writer */
RingBufferSrSw *RingBufferSrSwInit(void) {
RingBufferSrSw *rb = SCMalloc(sizeof(RingBufferSrSw));
if (rb == NULL) {
return NULL;
}
memset(rb, 0x00, sizeof(RingBufferSrSw));
return rb;
}
void RingBufferSrSwDestroy(RingBufferSrSw *rb) {
if (rb == NULL) {
SCFree(rb);
}
}
void *RingBufferSrSwGet(RingBufferSrSw *rb) {
void *ptr = NULL;
/* buffer is empty, wait... */
while (rb->read == rb->write) {
}
ptr = rb->array[rb->read];
__sync_fetch_and_add(&rb->read, 1);
return ptr;
}
void RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) {
/* buffer is full, wait... */
while ((rb->write + 1) == rb->read) {
}
rb->array[rb->write] = ptr;
__sync_fetch_and_add(&rb->write, 1);
}

@ -0,0 +1,42 @@
#ifndef __UTIL_RINGBUFFER_H__
/** \brief ring buffer api
*
* Ring buffer api for a single writer and a single reader. It uses a
* read and write pointer. Only the read ptr needs atomic updating.
*/
#define RING_BUFFER_MRSW_SIZE 65536
/** Multiple Reader, Single Writer ring buffer, fixed at
* 65536 items so we can use unsigned shorts that just
* wrap around */
typedef struct RingBufferMrSw_ {
unsigned short write; /**< idx where we put data */
unsigned short read; /**< idx where we read data */
void *array[RING_BUFFER_MRSW_SIZE];
} RingBufferMrSw;
void *RingBufferMrSwGet(RingBufferMrSw *);
void RingBufferMrSwPut(RingBufferMrSw *, void *);
RingBufferMrSw *RingBufferMrSwInit(void);
void RingBufferMrSwDestroy(RingBufferMrSw *);
#define RING_BUFFER_SRSW_SIZE 65536
/** Single Reader, Single Writer ring buffer, fixed at
* 65536 items so we can use unsigned shorts that just
* wrap around */
typedef struct RingBufferSrSw_ {
unsigned short write; /**< idx where we put data */
unsigned short read; /**< idx where we read data */
void *array[RING_BUFFER_SRSW_SIZE];
} RingBufferSrSw;
void *RingBufferSrSwGet(RingBufferSrSw *);
void RingBufferSrSwPut(RingBufferSrSw *, void *);
RingBufferSrSw *RingBufferSrSwInit(void);
void RingBufferSrSwDestroy(RingBufferSrSw *);
#endif /* __UTIL_RINGBUFFER_H__ */
Loading…
Cancel
Save