diff --git a/autogen.sh b/autogen.sh index c83d11ad12..b7a4df05c9 100755 --- a/autogen.sh +++ b/autogen.sh @@ -1,5 +1,14 @@ #!/bin/sh # Run this to generate all the initial makefiles, etc. -libtoolize -c +if which libtoolize > /dev/null; then + echo "Found libtoolize" + libtoolize -c +elif which glibtoolize > /dev/null; then + echo "Found glibtoolize" + glibtoolize -c +else + echo "Failed to find libtoolize or glibtoolize, please ensure it is installed and accessible via your PATH env variable" + exit 1 +fi; autoreconf -fv --install echo "You can now run \"./configure\" and then \"make\"." diff --git a/configure.in b/configure.in index c278f6c225..350ec93b9d 100644 --- a/configure.in +++ b/configure.in @@ -746,6 +746,47 @@ AC_CHECK_HEADER(pcap.h,,[AC_ERROR(pcap.h not found ...)]) CFLAGS="${CFLAGS} -DPROFILING" ]) +# Check for DAG support. + + AC_ARG_ENABLE(dag, + [ --enable-dag Enable DAG capture], + [ enable_dag=yes ], + [ enable_dag=no]) + AC_ARG_WITH(dag_includes, + [ --with-dag-includes=DIR dagapi include directory], + [with_dag_includes="$withval"],[with_dag_includes="no"]) + AC_ARG_WITH(dag_libraries, + [ --with-dag-libraries=DIR dagapi library directory], + [with_dag_libraries="$withval"],[with_dag_libraries="no"]) + + if test "$enable_dag" = "yes"; then + + if test "$with_dag_includes" != "no"; then + CPPFLAGS="${CPPFLAGS} -I${with_dag_includes}" + fi + + if test "$with_dag_libraries" != "no"; then + LDFLAGS="${LDFLAGS} -I${with_dag_libraries}" + fi + + AC_CHECK_HEADER(dagapi.h,DAG="yes",DAG="no") + if test "$DAG" != "no"; then + DAG="" + AC_CHECK_LIB(dag,dag_open,DAG="yes",DAG="no") + fi + + if test "$DAG" != "no"; then + CFLAGS="${CFLAGS} -DHAVE_DAG" + fi + + if test "$DAG" = "no"; then + echo + echo " ERROR! libdag library not found" + echo + exit 1 + fi + fi + AC_SUBST(CFLAGS) AC_SUBST(LDFLAGS) AC_SUBST(CPPFLAGS) @@ -761,6 +802,7 @@ Suricata Configuration: Unit tests enabled: ${enable_unittests} Debug output enabled: ${enable_debug} CUDA enabled: ${enable_cuda} + DAG enabled: ${enable_dag} Profiling enabled: ${enable_profiling} GCC Protect enabled: ${enable_gccprotect} GCC march native enabled: ${enable_gccmarch_native} diff --git a/src/Makefile.am b/src/Makefile.am index a0380cfabf..487b4301ef 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -14,6 +14,7 @@ source-pcap-file.c source-pcap-file.h \ source-pfring.c source-pfring.h \ source-ipfw.c source-ipfw.h \ source-erf-file.c source-erf-file.h \ +source-erf-dag.c source-erf-dag.h \ decode.c decode.h \ decode-ethernet.c decode-ethernet.h \ decode-vlan.c decode-vlan.h \ diff --git a/src/runmodes.c b/src/runmodes.c index 36f9881392..2fcbe5aa67 100644 --- a/src/runmodes.c +++ b/src/runmodes.c @@ -3155,3 +3155,166 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx, char *file) return 0; } + +/** + * + * \brief Sets up support for reading from a DAG card. + * + * \param de_ctx + * \param file + * \notes Currently only supports a single interface. + */ +int RunModeErfDagAuto(DetectEngineCtx *de_ctx, char *file) +{ + SCEnter(); + char tname[12]; + uint16_t cpu = 0; + + /* Available cpus */ + uint16_t ncpus = UtilCpuGetNumProcessorsOnline(); + + RunModeInitialize(); + + SCLogDebug("file %s", file); + TimeModeSetOffline(); + + /* @TODO/JNM: We need to create a separate processing pipeliine for each + * interface supported by the + */ + + ThreadVars *tv_receiveerf = + TmThreadCreatePacketHandler("ReceiveErfDag", + "packetpool","packetpool", + "pickup-queue","simple", + "1slot"); + if (tv_receiveerf == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + TmModule *tm_module = TmModuleGetByName("ReceiveErfDag"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName failed for ReceiveErfDag\n"); + exit(EXIT_FAILURE); + } + Tm1SlotSetFunc(tv_receiveerf, tm_module, file); + + if (threading_set_cpu_affinity) { + TmThreadSetCPUAffinity(tv_receiveerf, 0); + if (ncpus > 1) + TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM); + } + + if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + + ThreadVars *tv_decode1 = + TmThreadCreatePacketHandler("Decode & Stream", + "pickup-queue","simple", + "stream-queue1","simple", + "varslot"); + if (tv_decode1 == NULL) { + printf("ERROR: TmThreadsCreate failed for Decode1\n"); + exit(EXIT_FAILURE); + } + tm_module = TmModuleGetByName("DecodeErfDag"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName DecodeErfDag failed\n"); + exit(EXIT_FAILURE); + } + TmVarSlotSetFuncAppend(tv_decode1,tm_module,NULL); + + tm_module = TmModuleGetByName("StreamTcp"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName StreamTcp failed\n"); + exit(EXIT_FAILURE); + } + TmVarSlotSetFuncAppend(tv_decode1,tm_module,NULL); + + if (threading_set_cpu_affinity) { + TmThreadSetCPUAffinity(tv_decode1, 0); + if (ncpus > 1) + TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM); + } + + if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + + /* start with cpu 1 so that if we're creating an odd number of detect + * threads we're not creating the most on CPU0. */ + if (ncpus > 0) + cpu = 1; + + /* always create at least one thread */ + int thread_max = ncpus * threading_detect_ratio; + if (thread_max < 1) + thread_max = 1; + + int thread; + for (thread = 0; thread < thread_max; thread++) { + snprintf(tname, sizeof(tname),"Detect%"PRIu16, thread+1); + if (tname == NULL) + break; + + char *thread_name = SCStrdup(tname); + SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu); + + ThreadVars *tv_detect_ncpu = + TmThreadCreatePacketHandler(thread_name, + "stream-queue1","simple", + "alert-queue1","simple", + "1slot"); + if (tv_detect_ncpu == NULL) { + printf("ERROR: TmThreadsCreate failed\n"); + exit(EXIT_FAILURE); + } + tm_module = TmModuleGetByName("Detect"); + if (tm_module == NULL) { + printf("ERROR: TmModuleGetByName Detect failed\n"); + exit(EXIT_FAILURE); + } + Tm1SlotSetFunc(tv_detect_ncpu,tm_module,(void *)de_ctx); + + if (threading_set_cpu_affinity) { + TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu); + + /* If we have more than one core/cpu, the first Detect thread + * (at cpu 0) will have less priority (higher 'nice' value) + * In this case we will set the thread priority to +10 (default is 0) + */ + if (cpu == 0 && ncpus > 1) { + TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW); + } else if (ncpus > 1) { + TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM); + } + } + + if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + } + + ThreadVars *tv_outputs = + TmThreadCreatePacketHandler("Outputs", + "alert-queue1", "simple", + "packetpool", "packetpool", + "varslot"); + SetupOutputs(tv_outputs); + + if (threading_set_cpu_affinity) { + TmThreadSetCPUAffinity(tv_outputs, 0); + if (ncpus > 1) + TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM); + } + + if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) { + printf("ERROR: TmThreadSpawn failed\n"); + exit(EXIT_FAILURE); + } + + return 0; +} diff --git a/src/runmodes.h b/src/runmodes.h index 16ebab800d..4c20fdae8e 100644 --- a/src/runmodes.h +++ b/src/runmodes.h @@ -48,6 +48,7 @@ int RunModeIpsIPFW(DetectEngineCtx *); int RunModeIpsIPFWAuto(DetectEngineCtx *); int RunModeErfFileAuto(DetectEngineCtx *, char *); +int RunModeErfDagAuto(DetectEngineCtx *, char *); void RunModeShutDown(void); diff --git a/src/source-erf-dag.c b/src/source-erf-dag.c new file mode 100644 index 0000000000..7c953affb3 --- /dev/null +++ b/src/source-erf-dag.c @@ -0,0 +1,643 @@ +/* Copyright (C) 2010 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Endace Technology Limited. + * \author Jason MacLulich + * + * Support for reading ERF records from a DAG card. + * + * Only ethernet supported at this time. + */ + +#include "suricata-common.h" +#include "suricata.h" +#include "tm-modules.h" + +#include "util-privs.h" + +#ifndef HAVE_DAG + +TmEcode NoErfDagSupportExit(ThreadVars *, void *, void **); + +void TmModuleReceiveErfDagRegister (void) { + tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag"; + tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = NoErfDagSupportExit; + tmm_modules[TMM_RECEIVEERFDAG].Func = NULL; + tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats = NULL; + tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL; + tmm_modules[TMM_RECEIVEERFDAG].RegisterTests = NULL; + tmm_modules[TMM_RECEIVEERFDAG].cap_flags = SC_CAP_NET_ADMIN; +} + +void TmModuleDecodeErfDagRegister (void) { + tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag"; + tmm_modules[TMM_DECODEERFDAG].ThreadInit = NoErfDagSupportExit; + tmm_modules[TMM_DECODEERFDAG].Func = NULL; + tmm_modules[TMM_DECODEERFDAG].ThreadExitPrintStats = NULL; + tmm_modules[TMM_DECODEERFDAG].ThreadDeinit = NULL; + tmm_modules[TMM_DECODEERFDAG].RegisterTests = NULL; + tmm_modules[TMM_DECODEERFDAG].cap_flags = 0; +} + +TmEcode NoErfDagSupportExit(ThreadVars *tv, void *initdata, void **data) +{ + SCLogError(SC_ERR_DAG_NOSUPPORT, + "Error creating thread %s: you do not have support for DAG cards " + "enabled please recompile with --enable-dag", tv->name); + exit(EXIT_FAILURE); +} + +#else /* Implied we do have DAG support */ + +#define DAG_MAX_READ_PKTS 256 + +#include "source-erf-dag.h" +#include +// #include + +extern int max_pending_packets; +extern uint8_t suricata_ctl_flags; + +typedef struct ErfDagThreadVars_ { + ThreadVars *tv; + int dagfd; + int dagstream; + char dagname[DAGNAME_BUFSIZE]; + uint32_t dag_max_read_packets; + + struct timeval maxwait, poll; /* Could possibly be made static */ + + uint32_t pkts; + uint64_t bytes; + + /* Track current location in the DAG stream input buffer + */ + uint8_t* top; /* We track top as well so we don't have to + call dag_advance_stream again if there + are still pkts to process. + */ + uint8_t* btm; + +} ErfDagThreadVars; + +TmEcode ReceiveErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); +TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **); +void ReceiveErfDagThreadExitStats(ThreadVars *, void *); +TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *); +TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, Packet *p, uint8_t* top, + PacketQueue *postpq, uint32_t *pkts_read); +TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p); + +TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **); +TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); + +/** + * \brief Register the ERF file receiver (reader) module. + */ +void +TmModuleReceiveErfDagRegister(void) +{ + tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag"; + tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = ReceiveErfDagThreadInit; + tmm_modules[TMM_RECEIVEERFDAG].Func = ReceiveErfDag; + tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats = + ReceiveErfDagThreadExitStats; + tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL; + tmm_modules[TMM_RECEIVEERFDAG].RegisterTests = NULL; + tmm_modules[TMM_RECEIVEERFDAG].cap_flags = 0; +} + +/** + * \brief Register the ERF file decoder module. + */ +void +TmModuleDecodeErfDagRegister(void) +{ + tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag"; + tmm_modules[TMM_DECODEERFDAG].ThreadInit = DecodeErfDagThreadInit; + tmm_modules[TMM_DECODEERFDAG].Func = DecodeErfDag; + tmm_modules[TMM_DECODEERFDAG].ThreadExitPrintStats = NULL; + tmm_modules[TMM_DECODEERFDAG].ThreadDeinit = NULL; + tmm_modules[TMM_DECODEERFDAG].RegisterTests = NULL; + tmm_modules[TMM_DECODEERFDAG].cap_flags = 0; +} + +/** + * \brief Initialize the ERF receiver thread, generate a single + * ErfDagThreadVar structure for each thread, this will + * contain a DAG file descriptor which is read when the + * thread executes. + * + * \param tv Thread variable to ThreadVars + * \param initdata Initial data to the interface passed from the user, + * this is processed by the user. + * + * We assume that we have only a single name for the DAG + * interface. + * + * \param data data pointer gets populated with + * + */ +TmEcode +ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data) +{ + SCEnter(); + int stream_count = 0; + + if (initdata == NULL) { + SCLogError(SC_ERR_INVALID_ARGUMENT, "Error: No DAG interface provided."); + SCReturnInt(TM_ECODE_FAILED); + } + + ErfDagThreadVars *ewtn = SCMalloc(sizeof(ErfDagThreadVars)); + if (ewtn == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, + "Failed to allocate memory for ERF DAG thread vars."); + exit(EXIT_FAILURE); + } + + memset(ewtn, 0, sizeof(*ewtn)); + + /* Use max_pending_packets as our maximum number of packets read + from the DAG buffer. + */ + ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ? + DAG_MAX_READ_PKTS : max_pending_packets; + + + /* dag_parse_name will return a DAG device name and stream number + * to open for this thread. + */ + if (dag_parse_name(initdata, ewtn->dagname, DAGNAME_BUFSIZE, + &ewtn->dagstream) < 0) + { + SCLogError(SC_ERR_INVALID_ARGUMENT, + "Failed to parse DAG interface: %s", + (char*)initdata); + SCFree(ewtn); + exit(EXIT_FAILURE); + } + + SCLogInfo("Opening DAG: %s on stream: %d for processing", + ewtn->dagname, ewtn->dagstream); + + if ((ewtn->dagfd = dag_open(ewtn->dagname)) < 0) + { + SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED, "Failed to open DAG: %s", + ewtn->dagname); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + + /* Check to make sure the card has enough available streams to + * support reading from the one specified. + */ + if ((stream_count = dag_rx_get_stream_count(ewtn->dagfd)) < 0) + { + SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED, + "Failed to open stream: %d, DAG: %s, could not query stream count", + ewtn->dagstream, ewtn->dagname); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + + /* Check to make sure we have enough rx streams to open the stream + * the user is asking for. + */ + if (ewtn->dagstream > stream_count*2) + { + SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED, + "Failed to open stream: %d, DAG: %s, insufficient streams: %d", + ewtn->dagstream, ewtn->dagname, stream_count); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + + /* If we are transmitting into a soft DAG card then set the stream + * to act in reverse mode. + */ + if (0 != (ewtn->dagstream & 0x01)) + { + /* Setting reverse mode for using with soft dag from daemon side */ + if(dag_set_mode(ewtn->dagfd, ewtn->dagstream, DAG_REVERSE_MODE)) { + SCLogError(SC_ERR_ERF_DAG_STREAM_OPEN_FAILED, + "Failed to set mode to DAG_REVERSE_MODE on stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + } + + if (dag_attach_stream(ewtn->dagfd, ewtn->dagstream, 0, 0) < 0) + { + SCLogError(SC_ERR_ERF_DAG_STREAM_OPEN_FAILED, + "Failed to open DAG stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + + if (dag_start_stream(ewtn->dagfd, ewtn->dagstream) < 0) + { + SCLogError(SC_ERR_ERF_DAG_STREAM_START_FAILED, + "Failed to start DAG stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + + SCLogInfo("Attached and started stream: %d on DAG: %s", + ewtn->dagstream, ewtn->dagname); + + /* + * Initialise DAG Polling parameters. + */ + timerclear(&ewtn->maxwait); + ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */ + timerclear(&ewtn->poll); + ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */ + + /* 32kB minimum data to return -- we still restrict the number of + * pkts that are processed to a maximum of dag_max_read_packets. + */ + if (dag_set_stream_poll(ewtn->dagfd, ewtn->dagstream, 32*1024, &(ewtn->maxwait), &(ewtn->poll)) < 0) + { + SCLogError(SC_ERR_ERF_DAG_STREAM_SET_FAILED, + "Failed to set poll parameters for stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCFree(ewtn); + SCReturnInt(TM_ECODE_FAILED); + } + + ewtn->tv = tv; + *data = (void *)ewtn; + + SCLogInfo("Starting processing packets from stream: %d on DAG: %s", + ewtn->dagstream, ewtn->dagname); + + SCReturnInt(TM_ECODE_OK); +} + +/** + * \brief Thread entry function for reading ERF records from a DAG card. + * + * Reads a new ERF record the DAG input buffer and copies it to + * an internal Suricata packet buffer -- similar to the way the + * pcap packet handler works. + * + * We create new packet structures using PacketGetFromQueueOrAlloc + * for each packet between the top and btm pointers except for + * the first packet for which a Packet buffer is provided + * from the packetpool. + * + * We always read up to dag_max_read_packets ERF packets from the + * DAG buffer, but we might read less. This differs from the + * ReceivePcap handler -- it will only read pkts up to a maximum + * of either the packetpool count or the pcap_max_read_packets. + * + * \param tv pointer to ThreadVars + * \param p data pointer + * \param data + * \param pq pointer to the PacketQueue (not used here) + * \param postpq + * \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success. + * \note We also use the packetpool hack first used in the source-pcap + * handler so we don't keep producing packets without any dying. + * This implies that if we are in this situation we run the risk + * of dropping packets at the interface. + */ +TmEcode +ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, + PacketQueue *postpq) +{ + SCEnter(); + + uint16_t packet_q_len = 0; + uint32_t diff = 0; + int err; + uint8_t *top = NULL; + uint32_t pkts_read = 0; + + assert(p); + assert(pq); + assert(postpq); + + ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data; + + /* NOTE/JNM: Hack copied from source-pcap.c + * + * Make sure we have at least one packet in the packet pool, to + * prevent us from alloc'ing packets at line rate + */ + while (packet_q_len == 0) { + SCMutexLock(&packet_q.mutex_q); + packet_q_len = packet_q.len; + if (packet_q.len == 0) { + SCondWait(&packet_q.cond_q, &packet_q.mutex_q); + } + packet_q_len = packet_q.len; + SCMutexUnlock(&packet_q.mutex_q); + } + + if (postpq == NULL) { + ewtn->dag_max_read_packets = 1; + } + + while(pkts_read == 0) + { + if (suricata_ctl_flags != 0) { + break; + } + + /* NOTE/JNM: This might not work well if we start restricting the + * number of ERF records processed per call to a small number as + * the over head required here could exceed the time it takes to + * process a small number of ERF records. + * + * XXX/JNM: Possibly process the DAG stream buffer first if there + * are ERF packets or else call dag_advance_stream and then process + * the DAG stream buffer. + */ + top = dag_advance_stream(ewtn->dagfd, ewtn->dagstream, &(ewtn->btm)); + + if (NULL == top) + { + if((ewtn->dagstream & 0x1) && (errno == EAGAIN)) { + usleep(10 * 1000); + ewtn->btm = ewtn->top; + continue; + } + else { + SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED, + "Failed to read from stream: %d, DAG: %s when using dag_advance_stream", + ewtn->dagstream, ewtn->dagname); + SCReturnInt(TM_ECODE_FAILED); + } + } + + diff = top - ewtn->btm; + if (diff == 0) + { + continue; + } + + assert(diff >= dag_record_size); + + err = ProcessErfDagRecords(ewtn, p, top, postpq, &pkts_read); + + if (err == TM_ECODE_FAILED) { + SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED, + "Failed to read from stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCReturnInt(err); + } + } + + SCLogDebug("Read %d records from stream: %d, DAG: %s", + pkts_read, ewtn->dagstream, ewtn->dagname); + + if (suricata_ctl_flags != 0) { + SCReturnInt(TM_ECODE_FAILED); + } + + SCReturnInt(err); +} + +TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, + Packet *p, + uint8_t* top, + PacketQueue *postpq, + uint32_t *pkts_read) +{ + SCEnter(); + + int err = 0; + dag_record_t* dr = NULL; + char *prec = NULL; + int rlen; + + *pkts_read = 0; + + while(((top-(ewtn->btm))>=dag_record_size) && + ((*pkts_read)<(ewtn->dag_max_read_packets))) + { + prec = (char*)ewtn->btm; + dr = (dag_record_t*)prec; + + rlen = htons(dr->rlen); + + if (rlen == 20) { + rlen = 28; + SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED, + "Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + } + + /* If we don't have enough data to finsih processing this ERF record + * return and maybe next time we will. + */ + if ((top-(ewtn->btm)) < rlen) + SCReturnInt(TM_ECODE_OK); + + p = p ? p : PacketGetFromQueueOrAlloc(); + + if (p == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, + "Failed to allocate a Packet on stream: %d, DAG: %s", + ewtn->dagstream, ewtn->dagname); + SCReturnInt(TM_ECODE_FAILED); + } + + err = ProcessErfDagRecord(ewtn, prec, p); + + if (err != TM_ECODE_OK) + SCReturnInt(err); + + ewtn->btm += rlen; + + /* XXX/JNM: Hack to get around the fact that the first Packet from + * Suricata is added explicitly by the Slot code and shouldn't go + * onto the post queue -- else it is added twice to the next queue. + */ + if (*pkts_read) { + PacketEnqueue(postpq, p); + } + + (*pkts_read)++; + + p = NULL; + } + + SCReturnInt(TM_ECODE_OK); +} + +/** + * \brief Process a DAG record into a TM packet buffer. + * \param prec pointer to a DAG record. + * \param + */ +TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p) +{ + SCEnter(); + + int wlen = 0; + dag_record_t *dr = (dag_record_t*)prec; + erf_payload_t *pload; + + assert(prec); + assert(p); + + if (p == NULL) SCReturnInt(TM_ECODE_OK); + + /* Only support ethernet at this time. */ + if (dr->type != TYPE_ETH) { + SCLogError(SC_ERR_UNIMPLEMENTED, + "Processing of DAG record type: %d not implemented.", dr->type); + SCReturnInt(TM_ECODE_FAILED); + } + + wlen = htons(dr->wlen); + + pload = &(dr->rec); + + p->pktlen = wlen - 4; /* Trim the FCS... */ + p->datalink = LINKTYPE_ETHERNET; + + /* Take into account for link type Ethernet ETH frame starts + * after ther ERF header + pad. + */ + memcpy(p->pkt, pload->eth.dst, p->pktlen); + + SCLogDebug("p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)", + p->pktlen, *p, *p->pkt); + + /* Convert ERF time to timeval - from libpcap. */ + uint64_t ts = dr->ts; + p->ts.tv_sec = ts >> 32; + ts = (ts & 0xffffffffULL) * 1000000; + ts += 0x80000000; /* rounding */ + p->ts.tv_usec = ts >> 32; + if (p->ts.tv_usec >= 1000000) { + p->ts.tv_usec -= 1000000; + p->ts.tv_sec++; + } + + ewtn->pkts++; + ewtn->bytes += wlen; + + SCReturnInt(TM_ECODE_OK); +} + +/** + * \brief Print some stats to the log at program exit. + * + * \param tv Pointer to ThreadVars. + * \param data Pointer to data, ErfFileThreadVars. + */ +void +ReceiveErfDagThreadExitStats(ThreadVars *tv, void *data) +{ + ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data; + + SCLogInfo("Packets: %"PRIu32"; Bytes: %"PRIu64, ewtn->pkts, ewtn->bytes); +} + +/** + * \brief Deinitializes the DAG card. + * \param tv pointer to ThreadVars + * \param data pointer that gets cast into PcapThreadVars for ptv + */ +TmEcode ReceiveErfDagThreadDeinit(ThreadVars *tv, void *data) { + + SCEnter(); + + ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data; + + dag_stop_stream(ewtn->dagfd, ewtn->dagstream); + dag_detach_stream(ewtn->dagfd, ewtn->dagstream); + dag_close(ewtn->dagfd); + + SCReturnInt(TM_ECODE_OK); +} + + +/** Decode ErfDag */ + +/** + * \brief This function passes off to link type decoders. + * + * DecodeErfDag reads packets from the PacketQueue and passes + * them off to the proper link type decoder. + * + * \param t pointer to ThreadVars + * \param p pointer to the current packet + * \param data pointer that gets cast into PcapThreadVars for ptv + * \param pq pointer to the current PacketQueue + */ +TmEcode DecodeErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, + PacketQueue *postpq) +{ + SCEnter(); + DecodeThreadVars *dtv = (DecodeThreadVars *)data; + + /* update counters */ + SCPerfCounterIncr(dtv->counter_pkts, tv->sc_perf_pca); + SCPerfCounterIncr(dtv->counter_pkts_per_sec, tv->sc_perf_pca); + + SCPerfCounterAddUI64(dtv->counter_bytes, tv->sc_perf_pca, p->pktlen); + SCPerfCounterAddDouble(dtv->counter_bytes_per_sec, tv->sc_perf_pca, p->pktlen); + SCPerfCounterAddDouble(dtv->counter_mbit_per_sec, tv->sc_perf_pca, + (p->pktlen * 8)/1000000.0); + + SCPerfCounterAddUI64(dtv->counter_avg_pkt_size, tv->sc_perf_pca, p->pktlen); + SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, p->pktlen); + + /* call the decoder */ + switch(p->datalink) { + case LINKTYPE_ETHERNET: + DecodeEthernet(tv, dtv, p, p->pkt, p->pktlen, pq); + break; + default: + SCLogError(SC_ERR_DATALINK_UNIMPLEMENTED, + "Error: datalink type %" PRId32 " not yet supported in module DecodeErfDag", + p->datalink); + break; + } + + SCReturnInt(TM_ECODE_OK); +} + +TmEcode DecodeErfDagThreadInit(ThreadVars *tv, void *initdata, void **data) +{ + SCEnter(); + DecodeThreadVars *dtv = NULL; + + if ( (dtv = SCMalloc(sizeof(DecodeThreadVars))) == NULL) + SCReturnInt(TM_ECODE_FAILED); + memset(dtv, 0, sizeof(DecodeThreadVars)); + + DecodeRegisterPerfCounters(dtv, tv); + + *data = (void *)dtv; + + SCReturnInt(TM_ECODE_OK); +} + +#endif /* HAVE_DAG */ diff --git a/src/source-erf-dag.h b/src/source-erf-dag.h new file mode 100644 index 0000000000..2b718e1fa2 --- /dev/null +++ b/src/source-erf-dag.h @@ -0,0 +1,32 @@ +/* Copyright (C) 2010 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Endace Technology Limited + * \author Jason MacLulich + */ + +#ifndef __SOURCE_ERR_DAG_H__ +#define __SOURCE_ERF_DAG_H__ + +void TmModuleReceiveErfDagRegister(void); +void TmModuleDecodeErfDagRegister(void); + +#endif /* __SOURCE_ERF_DAG_H__ */ + diff --git a/src/suricata.c b/src/suricata.c index 5341ca352c..6f540c11f8 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -84,6 +84,7 @@ #include "source-pfring.h" #include "source-erf-file.h" +#include "source-erf-dag.h" #include "respond-reject.h" @@ -321,6 +322,9 @@ void usage(const char *progname) printf("\t--group : run suricata as this group after init\n"); #endif /* HAVE_LIBCAP_NG */ printf("\t--erf-in : process an ERF file\n"); +#ifdef HAVE_DAG + printf("\t--dag : process ERF records from 0,1,...,n DAG input streams\n"); +#endif printf("\n"); printf("\nTo run the engine with default configuration on " "interface eth0 with signature file \"signatures.rules\", run the " @@ -351,6 +355,7 @@ int main(int argc, char **argv) uint32_t userid = 0; uint32_t groupid = 0; char *erf_file = NULL; + char *dag_input = NULL; char *log_dir; struct stat buf; @@ -410,6 +415,7 @@ int main(int argc, char **argv) {"user", required_argument, 0, 0}, {"group", required_argument, 0, 0}, {"erf-in", required_argument, 0, 0}, + {"dag", required_argument, 0, 0}, {NULL, 0, NULL, 0} }; @@ -531,6 +537,16 @@ int main(int argc, char **argv) run_mode = MODE_ERF_FILE; erf_file = optarg; } + else if (strcmp((long_opts[option_index]).name, "dag") == 0) { +#ifdef HAVE_DAG + run_mode = MODE_DAG; + dag_input = optarg; +#else + SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required" + " to receieve packets using --dag."); + exit(EXIT_FAILURE); +#endif /* HAVE_DAG */ + } else if(strcmp((long_opts[option_index]).name, "pcap-buffer-size") == 0) { #ifdef HAVE_PCAP_SET_BUFF if (ConfSet("pcap.buffer-size", optarg, 0) != 1) { @@ -775,6 +791,8 @@ int main(int argc, char **argv) #endif TmModuleReceiveErfFileRegister(); TmModuleDecodeErfFileRegister(); + TmModuleReceiveErfDagRegister(); + TmModuleDecodeErfDagRegister(); TmModuleDebugList(); /** \todo we need an api for these */ @@ -1015,6 +1033,9 @@ int main(int argc, char **argv) else if (run_mode == MODE_ERF_FILE) { RunModeErfFileAuto(de_ctx, erf_file); } + else if (run_mode == MODE_DAG) { + RunModeErfDagAuto(de_ctx, dag_input); + } else { SCLogError(SC_ERR_UNKNOWN_RUN_MODE, "Unknown runtime mode. Aborting"); exit(EXIT_FAILURE); diff --git a/src/suricata.h b/src/suricata.h index ff9d835f3b..f6e7495da8 100644 --- a/src/suricata.h +++ b/src/suricata.h @@ -60,6 +60,7 @@ enum { MODE_IPFW, MODE_UNITTEST, MODE_ERF_FILE, + MODE_DAG, }; /* preallocated packet structures here diff --git a/src/tm-modules.h b/src/tm-modules.h index aecad59c9d..cdb394369a 100644 --- a/src/tm-modules.h +++ b/src/tm-modules.h @@ -81,6 +81,8 @@ enum { #endif TMM_RECEIVEERFFILE, TMM_DECODEERFFILE, + TMM_RECEIVEERFDAG, + TMM_DECODEERFDAG, TMM_SIZE, }; diff --git a/src/util-error.c b/src/util-error.c index cfe6568bc5..ffc168b759 100644 --- a/src/util-error.c +++ b/src/util-error.c @@ -181,6 +181,12 @@ const char * SCErrorToString(SCError err) CASE_CODE (SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG); CASE_CODE (SC_WARN_FLOW_EMERGENCY); CASE_CODE (SC_ERR_SVC); + CASE_CODE (SC_ERR_ERF_DAG_OPEN_FAILED); + CASE_CODE (SC_ERR_ERF_DAG_STREAM_OPEN_FAILED); + CASE_CODE (SC_ERR_ERF_DAG_STREAM_START_FAILED); + CASE_CODE (SC_ERR_ERF_DAG_STREAM_SET_FAILED); + CASE_CODE (SC_ERR_ERF_DAG_STREAM_READ_FAILED); + CASE_CODE (SC_WARN_ERF_DAG_REC_LEN_CHANGED); default: return "UNKNOWN_ERROR"; diff --git a/src/util-error.h b/src/util-error.h index 0490c430a9..4a36ae2ea8 100644 --- a/src/util-error.h +++ b/src/util-error.h @@ -189,6 +189,14 @@ typedef enum { SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG, SC_WARN_FLOW_EMERGENCY, SC_ERR_SVC, + SC_ERR_ERF_DAG_OPEN_FAILED, + SC_ERR_ERF_DAG_STREAM_OPEN_FAILED, + SC_ERR_ERF_DAG_STREAM_START_FAILED, + SC_ERR_ERF_DAG_STREAM_SET_FAILED, + SC_ERR_ERF_DAG_STREAM_READ_FAILED, + SC_WARN_ERF_DAG_REC_LEN_CHANGED, + SC_ERR_DAG_REQUIRED, + SC_ERR_DAG_NOSUPPORT, /**< no ERF/DAG support compiled in */ SC_ERR_FATAL, } SCError;