From a48a767efcf77656b8dfd174ed849f63bc161974 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Thu, 27 May 2010 00:31:49 +0200 Subject: [PATCH] Lockfree ringbuffer wip. --- src/Makefile.am | 2 + src/runmodes.c | 6 +-- src/tm-queuehandlers.c | 2 + src/tm-queuehandlers.h | 2 + src/tm-threads.c | 16 ++++-- src/tmqh-ringbuffer.c | 65 +++++++++++++++++++++++++ src/tmqh-ringbuffer.h | 29 +++++++++++ src/util-ringbuffer.c | 107 +++++++++++++++++++++++++++++++++++++++++ src/util-ringbuffer.h | 42 ++++++++++++++++ 9 files changed, 264 insertions(+), 7 deletions(-) create mode 100644 src/tmqh-ringbuffer.c create mode 100644 src/tmqh-ringbuffer.h create mode 100644 src/util-ringbuffer.c create mode 100644 src/util-ringbuffer.h diff --git a/src/Makefile.am b/src/Makefile.am index 93ab1c6e6e..f66a62fcf3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -161,6 +161,7 @@ util-cuda.c util-cuda.h \ util-cuda-handlers.c util-cuda-handlers.h \ util-privs.c util-privs.h \ util-decode-asn1.c util-decode-asn1.h \ +util-ringbuffer.c util-ringbuffer.h \ tm-modules.c tm-modules.h \ tm-queues.c tm-queues.h \ tm-queuehandlers.c tm-queuehandlers.h \ @@ -169,6 +170,7 @@ tmqh-simple.c tmqh-simple.h \ tmqh-nfq.c tmqh-nfq.h \ tmqh-packetpool.c tmqh-packetpool.h \ tmqh-flow.c tmqh-flow.h \ +tmqh-ringbuffer.c tmqh-ringbuffer.h \ alert-fastlog.c alert-fastlog.h \ alert-debuglog.c alert-debuglog.h \ alert-prelude.c alert-prelude.h \ diff --git a/src/runmodes.c b/src/runmodes.c index 274ec10480..34c0dc550f 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -2068,7 +2068,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { TimeModeSetOffline(); /* create the threads */ - ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","simple","1slot"); + ThreadVars *tv_receivepcap = TmThreadCreatePacketHandler("ReceivePcapFile","packetpool","packetpool","pickup-queue","ringbuffer","1slot"); if (tv_receivepcap == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); @@ -2089,7 +2089,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { exit(EXIT_FAILURE); } - ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","simple","stream-queue1","simple","varslot"); + ThreadVars *tv_decode1 = TmThreadCreatePacketHandler("Decode & Stream","pickup-queue","ringbuffer","stream-queue1","ringbuffer","varslot"); if (tv_decode1 == NULL) { printf("ERROR: TmThreadsCreate failed for Decode1\n"); exit(EXIT_FAILURE); @@ -2146,7 +2146,7 @@ int RunModeFilePcapAuto(DetectEngineCtx *de_ctx, char *file) { char *thread_name = SCStrdup(tname); SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); - ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","simple","alert-queue1","simple","1slot"); + ThreadVars *tv_detect_ncpu = TmThreadCreatePacketHandler(thread_name,"stream-queue1","ringbuffer","alert-queue1","simple","1slot"); if (tv_detect_ncpu == NULL) { printf("ERROR: TmThreadsCreate failed\n"); exit(EXIT_FAILURE); diff --git a/src/tm-queuehandlers.c b/src/tm-queuehandlers.c index ac2095051d..b61a6c9e59 100644 --- a/src/tm-queuehandlers.c +++ b/src/tm-queuehandlers.c @@ -34,6 +34,7 @@ #include "tmqh-nfq.h" #include "tmqh-packetpool.h" #include "tmqh-flow.h" +#include "tmqh-ringbuffer.h" void TmqhSetup (void) { memset(&tmqh_table, 0, sizeof(tmqh_table)); @@ -42,6 +43,7 @@ void TmqhSetup (void) { TmqhNfqRegister(); TmqhPacketpoolRegister(); TmqhFlowRegister(); + TmqhRingBufferRegister(); } Tmqh* TmqhGetQueueHandlerByName(char *name) { diff --git a/src/tm-queuehandlers.h b/src/tm-queuehandlers.h index 8f1e80db3f..23532dfeec 100644 --- a/src/tm-queuehandlers.h +++ b/src/tm-queuehandlers.h @@ -29,6 +29,7 @@ enum { TMQH_NFQ, TMQH_PACKETPOOL, TMQH_FLOW, + TMQH_RINGBUFFER, TMQH_SIZE, }; @@ -48,3 +49,4 @@ void TmqhSetup (void); Tmqh* TmqhGetQueueHandlerByName(char *name); #endif /* __TM_QUEUEHANDLERS_H__ */ + diff --git a/src/tm-threads.c b/src/tm-threads.c index 6595bab51a..d23279ad5f 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -782,33 +782,41 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, /* set the incoming queue */ if (inq_name != NULL && strcmp(inq_name,"packetpool") != 0) { + SCLogDebug("inq_name \"%s\"", inq_name); + tmq = TmqGetQueueByName(inq_name); if (tmq == NULL) { tmq = TmqCreateQueue(inq_name); if (tmq == NULL) goto error; } + SCLogDebug("tmq %p", tmq); tv->inq = tmq; tv->inq->reader_cnt++; - //printf("TmThreadCreate: tv->inq->id %" PRIu32 "\n", tv->inq->id); + SCLogDebug("tv->inq %p", tv->inq); } if (inqh_name != NULL) { + SCLogDebug("inqh_name \"%s\"", inqh_name); + tmqh = TmqhGetQueueHandlerByName(inqh_name); if (tmqh == NULL) goto error; tv->tmqh_in = tmqh->InHandler; - //printf("TmThreadCreate: tv->tmqh_in %p\n", tv->tmqh_in); + SCLogDebug("tv->tmqh_in %p", tv->tmqh_in); } /* set the outgoing queue */ if (outqh_name != NULL) { + SCLogDebug("outqh_name \"%s\"", outqh_name); + tmqh = TmqhGetQueueHandlerByName(outqh_name); if (tmqh == NULL) goto error; tv->tmqh_out = tmqh->OutHandler; - //printf("TmThreadCreate: tv->tmqh_out %p\n", tv->tmqh_out); if (outq_name != NULL && strcmp(outq_name,"packetpool") != 0) { + SCLogDebug("outq_name \"%s\"", outq_name); + if (tmqh->OutHandlerCtxSetup != NULL) { tv->outctx = tmqh->OutHandlerCtxSetup(outq_name); tv->outq = NULL; @@ -818,12 +826,12 @@ ThreadVars *TmThreadCreate(char *name, char *inq_name, char *inqh_name, tmq = TmqCreateQueue(outq_name); if (tmq == NULL) goto error; } + SCLogDebug("tmq %p", tmq); tv->outq = tmq; tv->outctx = NULL; tv->outq->writer_cnt++; } - //printf("TmThreadCreate: tv->outq->id %" PRIu32 "\n", tv->outq->id); } } diff --git a/src/tmqh-ringbuffer.c b/src/tmqh-ringbuffer.c new file mode 100644 index 0000000000..9ab274fff0 --- /dev/null +++ b/src/tmqh-ringbuffer.c @@ -0,0 +1,65 @@ +/* Copyright (C) 2007-2010 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Victor Julien + * + * RingBuffer queue handler + */ + +#include "suricata.h" +#include "packet-queue.h" +#include "decode.h" +#include "threads.h" +#include "threadvars.h" + +#include "tm-queuehandlers.h" + +#include "util-ringbuffer.h" + +static RingBufferMrSw *ringbuffers[256]; + +Packet *TmqhInputRingBuffer(ThreadVars *t); +void TmqhOutputRingBuffer(ThreadVars *t, Packet *p); + +void TmqhRingBufferRegister (void) { + tmqh_table[TMQH_RINGBUFFER].name = "ringbuffer"; + tmqh_table[TMQH_RINGBUFFER].InHandler = TmqhInputRingBuffer; + tmqh_table[TMQH_RINGBUFFER].OutHandler = TmqhOutputRingBuffer; + + int i = 0; + for (i = 0; i < 256; i++) { + ringbuffers[i] = RingBufferMrSwInit(); + } +} + +Packet *TmqhInputRingBuffer(ThreadVars *t) +{ + RingBufferMrSw *rb = ringbuffers[t->inq->id]; + + Packet *p = (Packet *)RingBufferMrSwGet(rb); + return p; +} + +void TmqhOutputRingBuffer(ThreadVars *t, Packet *p) +{ + RingBufferMrSw *rb = ringbuffers[t->outq->id]; + RingBufferMrSwPut(rb, (void *)p); +} + diff --git a/src/tmqh-ringbuffer.h b/src/tmqh-ringbuffer.h new file mode 100644 index 0000000000..039f17fa9c --- /dev/null +++ b/src/tmqh-ringbuffer.h @@ -0,0 +1,29 @@ +/* Copyright (C) 2007-2010 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Victor Julien + */ + +#ifndef __TMQH_RINGBUFFER_H__ +#define __TMQH_RINGBUFFER_H__ + +void TmqhRingBufferRegister (void); + +#endif /* __TMQH_RINGBUFFER_H__ */ diff --git a/src/util-ringbuffer.c b/src/util-ringbuffer.c new file mode 100644 index 0000000000..02336c6b07 --- /dev/null +++ b/src/util-ringbuffer.c @@ -0,0 +1,107 @@ +#include "suricata-common.h" +#include "util-ringbuffer.h" + +/* Multi Reader, Single Writer */ + +RingBufferMrSw *RingBufferMrSwInit(void) { + RingBufferMrSw *rb = SCMalloc(sizeof(RingBufferMrSw)); + if (rb == NULL) { + return NULL; + } + + memset(rb, 0x00, sizeof(RingBufferMrSw)); + + return rb; +} + +void RingBufferMrSwDestroy(RingBufferMrSw *rb) { + if (rb == NULL) { + SCFree(rb); + } +} + +void *RingBufferMrSwGet(RingBufferMrSw *rb) { + void *ptr; + /* counter for data races. If __sync_bool_compare_and_swap (CAS) fails, + * we increase cnt, get a new ptr and try to do CAS again. We init it to + * -1 so it's 0 when first used the do { } while() loop. */ + unsigned short readp = -1; + /* buffer is empty, wait... */ +retry: + while (rb->read == rb->write) { + usleep(1); + } + + /* atomically update rb->read */ + readp += rb->read; + do { + /* with multiple readers we can get in the situation that we exitted + * from the wait loop but the rb is empty again once we get here. */ + if (rb->read == rb->write) + goto retry; + + readp++; + ptr = rb->array[readp]; + } while (!(__sync_bool_compare_and_swap(&rb->read, readp, (readp + 1)))); + + SCLogDebug("ptr %p", ptr); + return ptr; +} + +/** + * \brief put a ptr in the RingBuffer + */ +void RingBufferMrSwPut(RingBufferMrSw *rb, void *ptr) { + SCLogDebug("ptr %p", ptr); + + /* buffer is full, wait... */ + while ((rb->write + 1) == rb->read) { + usleep(1); + } + + rb->array[rb->write] = ptr; + __sync_fetch_and_add(&rb->write, 1); +} + + +/* Single Reader, Single Writer */ + +RingBufferSrSw *RingBufferSrSwInit(void) { + RingBufferSrSw *rb = SCMalloc(sizeof(RingBufferSrSw)); + if (rb == NULL) { + return NULL; + } + + memset(rb, 0x00, sizeof(RingBufferSrSw)); + + return rb; +} + +void RingBufferSrSwDestroy(RingBufferSrSw *rb) { + if (rb == NULL) { + SCFree(rb); + } +} + +void *RingBufferSrSwGet(RingBufferSrSw *rb) { + void *ptr = NULL; + + /* buffer is empty, wait... */ + while (rb->read == rb->write) { + } + + ptr = rb->array[rb->read]; + __sync_fetch_and_add(&rb->read, 1); + + return ptr; +} + +void RingBufferSrSwPut(RingBufferSrSw *rb, void *ptr) { + /* buffer is full, wait... */ + while ((rb->write + 1) == rb->read) { + } + + rb->array[rb->write] = ptr; + __sync_fetch_and_add(&rb->write, 1); +} + diff --git a/src/util-ringbuffer.h b/src/util-ringbuffer.h new file mode 100644 index 0000000000..acc71cb699 --- /dev/null +++ b/src/util-ringbuffer.h @@ -0,0 +1,42 @@ +#ifndef __UTIL_RINGBUFFER_H__ + +/** \brief ring buffer api + * + * Ring buffer api for a single writer and a single reader. It uses a + * read and write pointer. Only the read ptr needs atomic updating. + */ + +#define RING_BUFFER_MRSW_SIZE 65536 + +/** Multiple Reader, Single Writer ring buffer, fixed at + * 65536 items so we can use unsigned shorts that just + * wrap around */ +typedef struct RingBufferMrSw_ { + unsigned short write; /**< idx where we put data */ + unsigned short read; /**< idx where we read data */ + void *array[RING_BUFFER_MRSW_SIZE]; +} RingBufferMrSw; + +void *RingBufferMrSwGet(RingBufferMrSw *); +void RingBufferMrSwPut(RingBufferMrSw *, void *); +RingBufferMrSw *RingBufferMrSwInit(void); +void RingBufferMrSwDestroy(RingBufferMrSw *); + +#define RING_BUFFER_SRSW_SIZE 65536 + +/** Single Reader, Single Writer ring buffer, fixed at + * 65536 items so we can use unsigned shorts that just + * wrap around */ +typedef struct RingBufferSrSw_ { + unsigned short write; /**< idx where we put data */ + unsigned short read; /**< idx where we read data */ + void *array[RING_BUFFER_SRSW_SIZE]; +} RingBufferSrSw; + +void *RingBufferSrSwGet(RingBufferSrSw *); +void RingBufferSrSwPut(RingBufferSrSw *, void *); +RingBufferSrSw *RingBufferSrSwInit(void); +void RingBufferSrSwDestroy(RingBufferSrSw *); + +#endif /* __UTIL_RINGBUFFER_H__ */ +