Add initial support for reading packets from a DAG card, we only support reading from a single stream at this time.

Use the --dag <dagname> cmd line option to specify from which DAG card to read pkts
from.

Issue at the moment with pkts being ejected during shutdown -- at the moment we
ignore any packets that are not of link type Ethernet.
remotes/origin/master-1.0.x
Jason MacLulich 15 years ago committed by Victor Julien
parent 49d68169ea
commit 835630efbd

@ -1,5 +1,14 @@
#!/bin/sh
# Run this to generate all the initial makefiles, etc.
libtoolize -c
if which libtoolize > /dev/null; then
echo "Found libtoolize"
libtoolize -c
elif which glibtoolize > /dev/null; then
echo "Found glibtoolize"
glibtoolize -c
else
echo "Failed to find libtoolize or glibtoolize, please ensure it is installed and accessible via your PATH env variable"
exit 1
fi;
autoreconf -fv --install
echo "You can now run \"./configure\" and then \"make\"."

@ -746,6 +746,47 @@ AC_CHECK_HEADER(pcap.h,,[AC_ERROR(pcap.h not found ...)])
CFLAGS="${CFLAGS} -DPROFILING"
])
# Check for DAG support.
AC_ARG_ENABLE(dag,
[ --enable-dag Enable DAG capture],
[ enable_dag=yes ],
[ enable_dag=no])
AC_ARG_WITH(dag_includes,
[ --with-dag-includes=DIR dagapi include directory],
[with_dag_includes="$withval"],[with_dag_includes="no"])
AC_ARG_WITH(dag_libraries,
[ --with-dag-libraries=DIR dagapi library directory],
[with_dag_libraries="$withval"],[with_dag_libraries="no"])
if test "$enable_dag" = "yes"; then
if test "$with_dag_includes" != "no"; then
CPPFLAGS="${CPPFLAGS} -I${with_dag_includes}"
fi
if test "$with_dag_libraries" != "no"; then
LDFLAGS="${LDFLAGS} -I${with_dag_libraries}"
fi
AC_CHECK_HEADER(dagapi.h,DAG="yes",DAG="no")
if test "$DAG" != "no"; then
DAG=""
AC_CHECK_LIB(dag,dag_open,DAG="yes",DAG="no")
fi
if test "$DAG" != "no"; then
CFLAGS="${CFLAGS} -DHAVE_DAG"
fi
if test "$DAG" = "no"; then
echo
echo " ERROR! libdag library not found"
echo
exit 1
fi
fi
AC_SUBST(CFLAGS)
AC_SUBST(LDFLAGS)
AC_SUBST(CPPFLAGS)
@ -761,6 +802,7 @@ Suricata Configuration:
Unit tests enabled: ${enable_unittests}
Debug output enabled: ${enable_debug}
CUDA enabled: ${enable_cuda}
DAG enabled: ${enable_dag}
Profiling enabled: ${enable_profiling}
GCC Protect enabled: ${enable_gccprotect}
GCC march native enabled: ${enable_gccmarch_native}

@ -14,6 +14,7 @@ source-pcap-file.c source-pcap-file.h \
source-pfring.c source-pfring.h \
source-ipfw.c source-ipfw.h \
source-erf-file.c source-erf-file.h \
source-erf-dag.c source-erf-dag.h \
decode.c decode.h \
decode-ethernet.c decode-ethernet.h \
decode-vlan.c decode-vlan.h \

@ -3155,3 +3155,166 @@ int RunModeErfFileAuto(DetectEngineCtx *de_ctx, char *file)
return 0;
}
/**
*
* \brief Sets up support for reading from a DAG card.
*
* \param de_ctx
* \param file
* \notes Currently only supports a single interface.
*/
int RunModeErfDagAuto(DetectEngineCtx *de_ctx, char *file)
{
SCEnter();
char tname[12];
uint16_t cpu = 0;
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
RunModeInitialize();
SCLogDebug("file %s", file);
TimeModeSetOffline();
/* @TODO/JNM: We need to create a separate processing pipeliine for each
* interface supported by the
*/
ThreadVars *tv_receiveerf =
TmThreadCreatePacketHandler("ReceiveErfDag",
"packetpool","packetpool",
"pickup-queue","simple",
"1slot");
if (tv_receiveerf == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceiveErfDag");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceiveErfDag\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_receiveerf, tm_module, file);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_receiveerf, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_receiveerf, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_receiveerf) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode & Stream",
"pickup-queue","simple",
"stream-queue1","simple",
"varslot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodeErfDag");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodeErfDag failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv_decode1,tm_module,NULL);
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmVarSlotSetFuncAppend(tv_decode1,tm_module,NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_decode1, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
cpu = 1;
/* always create at least one thread */
int thread_max = ncpus * threading_detect_ratio;
if (thread_max < 1)
thread_max = 1;
int thread;
for (thread = 0; thread < thread_max; thread++) {
snprintf(tname, sizeof(tname),"Detect%"PRIu16, thread+1);
if (tname == NULL)
break;
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(thread_name,
"stream-queue1","simple",
"alert-queue1","simple",
"1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
Tm1SlotSetFunc(tv_detect_ncpu,tm_module,(void *)de_ctx);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
/* If we have more than one core/cpu, the first Detect thread
* (at cpu 0) will have less priority (higher 'nice' value)
* In this case we will set the thread priority to +10 (default is 0)
*/
if (cpu == 0 && ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
} else if (ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
}
}
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
}
ThreadVars *tv_outputs =
TmThreadCreatePacketHandler("Outputs",
"alert-queue1", "simple",
"packetpool", "packetpool",
"varslot");
SetupOutputs(tv_outputs);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_outputs, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
return 0;
}

@ -48,6 +48,7 @@ int RunModeIpsIPFW(DetectEngineCtx *);
int RunModeIpsIPFWAuto(DetectEngineCtx *);
int RunModeErfFileAuto(DetectEngineCtx *, char *);
int RunModeErfDagAuto(DetectEngineCtx *, char *);
void RunModeShutDown(void);

@ -0,0 +1,643 @@
/* Copyright (C) 2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Endace Technology Limited.
* \author Jason MacLulich <jason.maclulich@eendace.com>
*
* Support for reading ERF records from a DAG card.
*
* Only ethernet supported at this time.
*/
#include "suricata-common.h"
#include "suricata.h"
#include "tm-modules.h"
#include "util-privs.h"
#ifndef HAVE_DAG
TmEcode NoErfDagSupportExit(ThreadVars *, void *, void **);
void TmModuleReceiveErfDagRegister (void) {
tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = NoErfDagSupportExit;
tmm_modules[TMM_RECEIVEERFDAG].Func = NULL;
tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats = NULL;
tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL;
tmm_modules[TMM_RECEIVEERFDAG].RegisterTests = NULL;
tmm_modules[TMM_RECEIVEERFDAG].cap_flags = SC_CAP_NET_ADMIN;
}
void TmModuleDecodeErfDagRegister (void) {
tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag";
tmm_modules[TMM_DECODEERFDAG].ThreadInit = NoErfDagSupportExit;
tmm_modules[TMM_DECODEERFDAG].Func = NULL;
tmm_modules[TMM_DECODEERFDAG].ThreadExitPrintStats = NULL;
tmm_modules[TMM_DECODEERFDAG].ThreadDeinit = NULL;
tmm_modules[TMM_DECODEERFDAG].RegisterTests = NULL;
tmm_modules[TMM_DECODEERFDAG].cap_flags = 0;
}
TmEcode NoErfDagSupportExit(ThreadVars *tv, void *initdata, void **data)
{
SCLogError(SC_ERR_DAG_NOSUPPORT,
"Error creating thread %s: you do not have support for DAG cards "
"enabled please recompile with --enable-dag", tv->name);
exit(EXIT_FAILURE);
}
#else /* Implied we do have DAG support */
#define DAG_MAX_READ_PKTS 256
#include "source-erf-dag.h"
#include <dagapi.h>
// #include <dagutil.h>
extern int max_pending_packets;
extern uint8_t suricata_ctl_flags;
typedef struct ErfDagThreadVars_ {
ThreadVars *tv;
int dagfd;
int dagstream;
char dagname[DAGNAME_BUFSIZE];
uint32_t dag_max_read_packets;
struct timeval maxwait, poll; /* Could possibly be made static */
uint32_t pkts;
uint64_t bytes;
/* Track current location in the DAG stream input buffer
*/
uint8_t* top; /* We track top as well so we don't have to
call dag_advance_stream again if there
are still pkts to process.
*/
uint8_t* btm;
} ErfDagThreadVars;
TmEcode ReceiveErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceiveErfDagThreadInit(ThreadVars *, void *, void **);
void ReceiveErfDagThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *, void *);
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn, Packet *p, uint8_t* top,
PacketQueue *postpq, uint32_t *pkts_read);
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p);
TmEcode DecodeErfDagThreadInit(ThreadVars *, void *, void **);
TmEcode DecodeErfDag(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
/**
* \brief Register the ERF file receiver (reader) module.
*/
void
TmModuleReceiveErfDagRegister(void)
{
tmm_modules[TMM_RECEIVEERFDAG].name = "ReceiveErfDag";
tmm_modules[TMM_RECEIVEERFDAG].ThreadInit = ReceiveErfDagThreadInit;
tmm_modules[TMM_RECEIVEERFDAG].Func = ReceiveErfDag;
tmm_modules[TMM_RECEIVEERFDAG].ThreadExitPrintStats =
ReceiveErfDagThreadExitStats;
tmm_modules[TMM_RECEIVEERFDAG].ThreadDeinit = NULL;
tmm_modules[TMM_RECEIVEERFDAG].RegisterTests = NULL;
tmm_modules[TMM_RECEIVEERFDAG].cap_flags = 0;
}
/**
* \brief Register the ERF file decoder module.
*/
void
TmModuleDecodeErfDagRegister(void)
{
tmm_modules[TMM_DECODEERFDAG].name = "DecodeErfDag";
tmm_modules[TMM_DECODEERFDAG].ThreadInit = DecodeErfDagThreadInit;
tmm_modules[TMM_DECODEERFDAG].Func = DecodeErfDag;
tmm_modules[TMM_DECODEERFDAG].ThreadExitPrintStats = NULL;
tmm_modules[TMM_DECODEERFDAG].ThreadDeinit = NULL;
tmm_modules[TMM_DECODEERFDAG].RegisterTests = NULL;
tmm_modules[TMM_DECODEERFDAG].cap_flags = 0;
}
/**
* \brief Initialize the ERF receiver thread, generate a single
* ErfDagThreadVar structure for each thread, this will
* contain a DAG file descriptor which is read when the
* thread executes.
*
* \param tv Thread variable to ThreadVars
* \param initdata Initial data to the interface passed from the user,
* this is processed by the user.
*
* We assume that we have only a single name for the DAG
* interface.
*
* \param data data pointer gets populated with
*
*/
TmEcode
ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
{
SCEnter();
int stream_count = 0;
if (initdata == NULL) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "Error: No DAG interface provided.");
SCReturnInt(TM_ECODE_FAILED);
}
ErfDagThreadVars *ewtn = SCMalloc(sizeof(ErfDagThreadVars));
if (ewtn == NULL) {
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for ERF DAG thread vars.");
exit(EXIT_FAILURE);
}
memset(ewtn, 0, sizeof(*ewtn));
/* Use max_pending_packets as our maximum number of packets read
from the DAG buffer.
*/
ewtn->dag_max_read_packets = (DAG_MAX_READ_PKTS < max_pending_packets) ?
DAG_MAX_READ_PKTS : max_pending_packets;
/* dag_parse_name will return a DAG device name and stream number
* to open for this thread.
*/
if (dag_parse_name(initdata, ewtn->dagname, DAGNAME_BUFSIZE,
&ewtn->dagstream) < 0)
{
SCLogError(SC_ERR_INVALID_ARGUMENT,
"Failed to parse DAG interface: %s",
(char*)initdata);
SCFree(ewtn);
exit(EXIT_FAILURE);
}
SCLogInfo("Opening DAG: %s on stream: %d for processing",
ewtn->dagname, ewtn->dagstream);
if ((ewtn->dagfd = dag_open(ewtn->dagname)) < 0)
{
SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED, "Failed to open DAG: %s",
ewtn->dagname);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
/* Check to make sure the card has enough available streams to
* support reading from the one specified.
*/
if ((stream_count = dag_rx_get_stream_count(ewtn->dagfd)) < 0)
{
SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED,
"Failed to open stream: %d, DAG: %s, could not query stream count",
ewtn->dagstream, ewtn->dagname);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
/* Check to make sure we have enough rx streams to open the stream
* the user is asking for.
*/
if (ewtn->dagstream > stream_count*2)
{
SCLogError(SC_ERR_ERF_DAG_OPEN_FAILED,
"Failed to open stream: %d, DAG: %s, insufficient streams: %d",
ewtn->dagstream, ewtn->dagname, stream_count);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
/* If we are transmitting into a soft DAG card then set the stream
* to act in reverse mode.
*/
if (0 != (ewtn->dagstream & 0x01))
{
/* Setting reverse mode for using with soft dag from daemon side */
if(dag_set_mode(ewtn->dagfd, ewtn->dagstream, DAG_REVERSE_MODE)) {
SCLogError(SC_ERR_ERF_DAG_STREAM_OPEN_FAILED,
"Failed to set mode to DAG_REVERSE_MODE on stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
}
if (dag_attach_stream(ewtn->dagfd, ewtn->dagstream, 0, 0) < 0)
{
SCLogError(SC_ERR_ERF_DAG_STREAM_OPEN_FAILED,
"Failed to open DAG stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
if (dag_start_stream(ewtn->dagfd, ewtn->dagstream) < 0)
{
SCLogError(SC_ERR_ERF_DAG_STREAM_START_FAILED,
"Failed to start DAG stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
SCLogInfo("Attached and started stream: %d on DAG: %s",
ewtn->dagstream, ewtn->dagname);
/*
* Initialise DAG Polling parameters.
*/
timerclear(&ewtn->maxwait);
ewtn->maxwait.tv_usec = 100 * 1000; /* 100ms timeout */
timerclear(&ewtn->poll);
ewtn->poll.tv_usec = 10 * 1000; /* 10ms poll interval */
/* 32kB minimum data to return -- we still restrict the number of
* pkts that are processed to a maximum of dag_max_read_packets.
*/
if (dag_set_stream_poll(ewtn->dagfd, ewtn->dagstream, 32*1024, &(ewtn->maxwait), &(ewtn->poll)) < 0)
{
SCLogError(SC_ERR_ERF_DAG_STREAM_SET_FAILED,
"Failed to set poll parameters for stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCFree(ewtn);
SCReturnInt(TM_ECODE_FAILED);
}
ewtn->tv = tv;
*data = (void *)ewtn;
SCLogInfo("Starting processing packets from stream: %d on DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Thread entry function for reading ERF records from a DAG card.
*
* Reads a new ERF record the DAG input buffer and copies it to
* an internal Suricata packet buffer -- similar to the way the
* pcap packet handler works.
*
* We create new packet structures using PacketGetFromQueueOrAlloc
* for each packet between the top and btm pointers except for
* the first packet for which a Packet buffer is provided
* from the packetpool.
*
* We always read up to dag_max_read_packets ERF packets from the
* DAG buffer, but we might read less. This differs from the
* ReceivePcap handler -- it will only read pkts up to a maximum
* of either the packetpool count or the pcap_max_read_packets.
*
* \param tv pointer to ThreadVars
* \param p data pointer
* \param data
* \param pq pointer to the PacketQueue (not used here)
* \param postpq
* \retval TM_ECODE_FAILED on failure and TM_ECODE_OK on success.
* \note We also use the packetpool hack first used in the source-pcap
* handler so we don't keep producing packets without any dying.
* This implies that if we are in this situation we run the risk
* of dropping packets at the interface.
*/
TmEcode
ReceiveErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
PacketQueue *postpq)
{
SCEnter();
uint16_t packet_q_len = 0;
uint32_t diff = 0;
int err;
uint8_t *top = NULL;
uint32_t pkts_read = 0;
assert(p);
assert(pq);
assert(postpq);
ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
/* NOTE/JNM: Hack copied from source-pcap.c
*
* Make sure we have at least one packet in the packet pool, to
* prevent us from alloc'ing packets at line rate
*/
while (packet_q_len == 0) {
SCMutexLock(&packet_q.mutex_q);
packet_q_len = packet_q.len;
if (packet_q.len == 0) {
SCondWait(&packet_q.cond_q, &packet_q.mutex_q);
}
packet_q_len = packet_q.len;
SCMutexUnlock(&packet_q.mutex_q);
}
if (postpq == NULL) {
ewtn->dag_max_read_packets = 1;
}
while(pkts_read == 0)
{
if (suricata_ctl_flags != 0) {
break;
}
/* NOTE/JNM: This might not work well if we start restricting the
* number of ERF records processed per call to a small number as
* the over head required here could exceed the time it takes to
* process a small number of ERF records.
*
* XXX/JNM: Possibly process the DAG stream buffer first if there
* are ERF packets or else call dag_advance_stream and then process
* the DAG stream buffer.
*/
top = dag_advance_stream(ewtn->dagfd, ewtn->dagstream, &(ewtn->btm));
if (NULL == top)
{
if((ewtn->dagstream & 0x1) && (errno == EAGAIN)) {
usleep(10 * 1000);
ewtn->btm = ewtn->top;
continue;
}
else {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
}
diff = top - ewtn->btm;
if (diff == 0)
{
continue;
}
assert(diff >= dag_record_size);
err = ProcessErfDagRecords(ewtn, p, top, postpq, &pkts_read);
if (err == TM_ECODE_FAILED) {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(err);
}
}
SCLogDebug("Read %d records from stream: %d, DAG: %s",
pkts_read, ewtn->dagstream, ewtn->dagname);
if (suricata_ctl_flags != 0) {
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(err);
}
TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
Packet *p,
uint8_t* top,
PacketQueue *postpq,
uint32_t *pkts_read)
{
SCEnter();
int err = 0;
dag_record_t* dr = NULL;
char *prec = NULL;
int rlen;
*pkts_read = 0;
while(((top-(ewtn->btm))>=dag_record_size) &&
((*pkts_read)<(ewtn->dag_max_read_packets)))
{
prec = (char*)ewtn->btm;
dr = (dag_record_t*)prec;
rlen = htons(dr->rlen);
if (rlen == 20) {
rlen = 28;
SCLogWarning(SC_WARN_ERF_DAG_REC_LEN_CHANGED,
"Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
}
/* If we don't have enough data to finsih processing this ERF record
* return and maybe next time we will.
*/
if ((top-(ewtn->btm)) < rlen)
SCReturnInt(TM_ECODE_OK);
p = p ? p : PacketGetFromQueueOrAlloc();
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate a Packet on stream: %d, DAG: %s",
ewtn->dagstream, ewtn->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
err = ProcessErfDagRecord(ewtn, prec, p);
if (err != TM_ECODE_OK)
SCReturnInt(err);
ewtn->btm += rlen;
/* XXX/JNM: Hack to get around the fact that the first Packet from
* Suricata is added explicitly by the Slot code and shouldn't go
* onto the post queue -- else it is added twice to the next queue.
*/
if (*pkts_read) {
PacketEnqueue(postpq, p);
}
(*pkts_read)++;
p = NULL;
}
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Process a DAG record into a TM packet buffer.
* \param prec pointer to a DAG record.
* \param
*/
TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
{
SCEnter();
int wlen = 0;
dag_record_t *dr = (dag_record_t*)prec;
erf_payload_t *pload;
assert(prec);
assert(p);
if (p == NULL) SCReturnInt(TM_ECODE_OK);
/* Only support ethernet at this time. */
if (dr->type != TYPE_ETH) {
SCLogError(SC_ERR_UNIMPLEMENTED,
"Processing of DAG record type: %d not implemented.", dr->type);
SCReturnInt(TM_ECODE_FAILED);
}
wlen = htons(dr->wlen);
pload = &(dr->rec);
p->pktlen = wlen - 4; /* Trim the FCS... */
p->datalink = LINKTYPE_ETHERNET;
/* Take into account for link type Ethernet ETH frame starts
* after ther ERF header + pad.
*/
memcpy(p->pkt, pload->eth.dst, p->pktlen);
SCLogDebug("p->pktlen: %" PRIu32 " (pkt %02x, p->pkt %02x)",
p->pktlen, *p, *p->pkt);
/* Convert ERF time to timeval - from libpcap. */
uint64_t ts = dr->ts;
p->ts.tv_sec = ts >> 32;
ts = (ts & 0xffffffffULL) * 1000000;
ts += 0x80000000; /* rounding */
p->ts.tv_usec = ts >> 32;
if (p->ts.tv_usec >= 1000000) {
p->ts.tv_usec -= 1000000;
p->ts.tv_sec++;
}
ewtn->pkts++;
ewtn->bytes += wlen;
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Print some stats to the log at program exit.
*
* \param tv Pointer to ThreadVars.
* \param data Pointer to data, ErfFileThreadVars.
*/
void
ReceiveErfDagThreadExitStats(ThreadVars *tv, void *data)
{
ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
SCLogInfo("Packets: %"PRIu32"; Bytes: %"PRIu64, ewtn->pkts, ewtn->bytes);
}
/**
* \brief Deinitializes the DAG card.
* \param tv pointer to ThreadVars
* \param data pointer that gets cast into PcapThreadVars for ptv
*/
TmEcode ReceiveErfDagThreadDeinit(ThreadVars *tv, void *data) {
SCEnter();
ErfDagThreadVars *ewtn = (ErfDagThreadVars *)data;
dag_stop_stream(ewtn->dagfd, ewtn->dagstream);
dag_detach_stream(ewtn->dagfd, ewtn->dagstream);
dag_close(ewtn->dagfd);
SCReturnInt(TM_ECODE_OK);
}
/** Decode ErfDag */
/**
* \brief This function passes off to link type decoders.
*
* DecodeErfDag reads packets from the PacketQueue and passes
* them off to the proper link type decoder.
*
* \param t pointer to ThreadVars
* \param p pointer to the current packet
* \param data pointer that gets cast into PcapThreadVars for ptv
* \param pq pointer to the current PacketQueue
*/
TmEcode DecodeErfDag(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
PacketQueue *postpq)
{
SCEnter();
DecodeThreadVars *dtv = (DecodeThreadVars *)data;
/* update counters */
SCPerfCounterIncr(dtv->counter_pkts, tv->sc_perf_pca);
SCPerfCounterIncr(dtv->counter_pkts_per_sec, tv->sc_perf_pca);
SCPerfCounterAddUI64(dtv->counter_bytes, tv->sc_perf_pca, p->pktlen);
SCPerfCounterAddDouble(dtv->counter_bytes_per_sec, tv->sc_perf_pca, p->pktlen);
SCPerfCounterAddDouble(dtv->counter_mbit_per_sec, tv->sc_perf_pca,
(p->pktlen * 8)/1000000.0);
SCPerfCounterAddUI64(dtv->counter_avg_pkt_size, tv->sc_perf_pca, p->pktlen);
SCPerfCounterSetUI64(dtv->counter_max_pkt_size, tv->sc_perf_pca, p->pktlen);
/* call the decoder */
switch(p->datalink) {
case LINKTYPE_ETHERNET:
DecodeEthernet(tv, dtv, p, p->pkt, p->pktlen, pq);
break;
default:
SCLogError(SC_ERR_DATALINK_UNIMPLEMENTED,
"Error: datalink type %" PRId32 " not yet supported in module DecodeErfDag",
p->datalink);
break;
}
SCReturnInt(TM_ECODE_OK);
}
TmEcode DecodeErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
{
SCEnter();
DecodeThreadVars *dtv = NULL;
if ( (dtv = SCMalloc(sizeof(DecodeThreadVars))) == NULL)
SCReturnInt(TM_ECODE_FAILED);
memset(dtv, 0, sizeof(DecodeThreadVars));
DecodeRegisterPerfCounters(dtv, tv);
*data = (void *)dtv;
SCReturnInt(TM_ECODE_OK);
}
#endif /* HAVE_DAG */

@ -0,0 +1,32 @@
/* Copyright (C) 2010 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Endace Technology Limited
* \author Jason MacLulich <jason.maclulich@endace.com>
*/
#ifndef __SOURCE_ERR_DAG_H__
#define __SOURCE_ERF_DAG_H__
void TmModuleReceiveErfDagRegister(void);
void TmModuleDecodeErfDagRegister(void);
#endif /* __SOURCE_ERF_DAG_H__ */

@ -84,6 +84,7 @@
#include "source-pfring.h"
#include "source-erf-file.h"
#include "source-erf-dag.h"
#include "respond-reject.h"
@ -321,6 +322,9 @@ void usage(const char *progname)
printf("\t--group <group> : run suricata as this group after init\n");
#endif /* HAVE_LIBCAP_NG */
printf("\t--erf-in <path> : process an ERF file\n");
#ifdef HAVE_DAG
printf("\t--dag <dag0,dag1,...> : process ERF records from 0,1,...,n DAG input streams\n");
#endif
printf("\n");
printf("\nTo run the engine with default configuration on "
"interface eth0 with signature file \"signatures.rules\", run the "
@ -351,6 +355,7 @@ int main(int argc, char **argv)
uint32_t userid = 0;
uint32_t groupid = 0;
char *erf_file = NULL;
char *dag_input = NULL;
char *log_dir;
struct stat buf;
@ -410,6 +415,7 @@ int main(int argc, char **argv)
{"user", required_argument, 0, 0},
{"group", required_argument, 0, 0},
{"erf-in", required_argument, 0, 0},
{"dag", required_argument, 0, 0},
{NULL, 0, NULL, 0}
};
@ -531,6 +537,16 @@ int main(int argc, char **argv)
run_mode = MODE_ERF_FILE;
erf_file = optarg;
}
else if (strcmp((long_opts[option_index]).name, "dag") == 0) {
#ifdef HAVE_DAG
run_mode = MODE_DAG;
dag_input = optarg;
#else
SCLogError(SC_ERR_DAG_REQUIRED, "libdag and a DAG card are required"
" to receieve packets using --dag.");
exit(EXIT_FAILURE);
#endif /* HAVE_DAG */
}
else if(strcmp((long_opts[option_index]).name, "pcap-buffer-size") == 0) {
#ifdef HAVE_PCAP_SET_BUFF
if (ConfSet("pcap.buffer-size", optarg, 0) != 1) {
@ -775,6 +791,8 @@ int main(int argc, char **argv)
#endif
TmModuleReceiveErfFileRegister();
TmModuleDecodeErfFileRegister();
TmModuleReceiveErfDagRegister();
TmModuleDecodeErfDagRegister();
TmModuleDebugList();
/** \todo we need an api for these */
@ -1015,6 +1033,9 @@ int main(int argc, char **argv)
else if (run_mode == MODE_ERF_FILE) {
RunModeErfFileAuto(de_ctx, erf_file);
}
else if (run_mode == MODE_DAG) {
RunModeErfDagAuto(de_ctx, dag_input);
}
else {
SCLogError(SC_ERR_UNKNOWN_RUN_MODE, "Unknown runtime mode. Aborting");
exit(EXIT_FAILURE);

@ -60,6 +60,7 @@ enum {
MODE_IPFW,
MODE_UNITTEST,
MODE_ERF_FILE,
MODE_DAG,
};
/* preallocated packet structures here

@ -81,6 +81,8 @@ enum {
#endif
TMM_RECEIVEERFFILE,
TMM_DECODEERFFILE,
TMM_RECEIVEERFDAG,
TMM_DECODEERFDAG,
TMM_SIZE,
};

@ -181,6 +181,12 @@ const char * SCErrorToString(SCError err)
CASE_CODE (SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG);
CASE_CODE (SC_WARN_FLOW_EMERGENCY);
CASE_CODE (SC_ERR_SVC);
CASE_CODE (SC_ERR_ERF_DAG_OPEN_FAILED);
CASE_CODE (SC_ERR_ERF_DAG_STREAM_OPEN_FAILED);
CASE_CODE (SC_ERR_ERF_DAG_STREAM_START_FAILED);
CASE_CODE (SC_ERR_ERF_DAG_STREAM_SET_FAILED);
CASE_CODE (SC_ERR_ERF_DAG_STREAM_READ_FAILED);
CASE_CODE (SC_WARN_ERF_DAG_REC_LEN_CHANGED);
default:
return "UNKNOWN_ERROR";

@ -189,6 +189,14 @@ typedef enum {
SC_ERR_LIBNET11_INCOMPATIBLE_WITH_LIBCAP_NG,
SC_WARN_FLOW_EMERGENCY,
SC_ERR_SVC,
SC_ERR_ERF_DAG_OPEN_FAILED,
SC_ERR_ERF_DAG_STREAM_OPEN_FAILED,
SC_ERR_ERF_DAG_STREAM_START_FAILED,
SC_ERR_ERF_DAG_STREAM_SET_FAILED,
SC_ERR_ERF_DAG_STREAM_READ_FAILED,
SC_WARN_ERF_DAG_REC_LEN_CHANGED,
SC_ERR_DAG_REQUIRED,
SC_ERR_DAG_NOSUPPORT, /**< no ERF/DAG support compiled in */
SC_ERR_FATAL,
} SCError;

Loading…
Cancel
Save