@ -68,8 +68,6 @@ TmEcode NoErfDagSupportExit(ThreadVars *tv, void *initdata, void **data)
# else /* Implied we do have DAG support */
# define DAG_MAX_READ_PKTS 256
# include "source-erf-dag.h"
# include <dagapi.h>
@ -83,32 +81,26 @@ typedef struct ErfDagThreadVars_ {
int dagfd ;
int dagstream ;
char dagname [ DAGNAME_BUFSIZE ] ;
uint32_t dag_max_read_packets ;
struct timeval maxwait , poll ; /* Could possibly be made static */
uint32_t pkts ;
uint64_t bytes ;
/* Track current location in the DAG stream input buffer
/* Current location in the DAG stream input buffer.
*/
uint8_t * top ; /* We track top as well so we don't have to
call dag_advance_stream again if there
are still pkts to process .
JNM : Currently not used .
*/
uint8_t * btm ;
uint8_t * top ;
uint8_t * btm ;
} ErfDagThreadVars ;
static inline TmEcode ProcessErfDagRecords ( ErfDagThreadVars * ewtn , uint8_t * top ,
uint32_t * pkts_read ) ;
static inline TmEcode ProcessErfDagRecord ( ErfDagThreadVars * ewtn , char * prec ) ;
TmEcode ReceiveErfDagLoop ( ThreadVars * , void * data , void * slot ) ;
TmEcode ReceiveErfDagThreadInit ( ThreadVars * , void * , void * * ) ;
void ReceiveErfDagThreadExitStats ( ThreadVars * , void * ) ;
TmEcode ReceiveErfDagThreadDeinit ( ThreadVars * , void * ) ;
TmEcode ProcessErfDagRecords ( ErfDagThreadVars * ewtn , uint8_t * top ,
uint32_t * pkts_read ) ;
TmEcode ProcessErfDagRecord ( ErfDagThreadVars * ewtn , char * prec , Packet * p ) ;
TmEcode DecodeErfDagThreadInit ( ThreadVars * , void * , void * * ) ;
TmEcode DecodeErfDag ( ThreadVars * , Packet * , void * , PacketQueue * , PacketQueue * ) ;
@ -182,13 +174,6 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
memset ( ewtn , 0 , sizeof ( * ewtn ) ) ;
/* Use max_pending_packets as our maximum number of packets read
from the DAG buffer .
*/
ewtn - > dag_max_read_packets = ( DAG_MAX_READ_PKTS < max_pending_packets ) ?
DAG_MAX_READ_PKTS : max_pending_packets ;
/* dag_parse_name will return a DAG device name and stream number
* to open for this thread .
*/
@ -277,9 +262,9 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
* Initialise DAG Polling parameters .
*/
timerclear ( & ewtn - > maxwait ) ;
ewtn - > maxwait . tv_usec = 100 * 1000 ; /* 10 0ms timeout */
ewtn - > maxwait . tv_usec = 20 * 1000 ; /* 2 0ms timeout */
timerclear ( & ewtn - > poll ) ;
ewtn - > poll . tv_usec = 1 0 * 1000 ; /* 1 0 ms poll interval */
ewtn - > poll . tv_usec = 1 * 1000 ; /* 1 ms poll interval */
/* 32kB minimum data to return -- we still restrict the number of
* pkts that are processed to a maximum of dag_max_read_packets .
@ -317,15 +302,13 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
ErfDagThreadVars * dtv = ( ErfDagThreadVars * ) data ;
TmSlot * s = ( TmSlot * ) slot ;
dtv - > slot = s - > slot_next ;
SCEnter ( ) ;
uint16_t packet_q_len = 0 ;
uint32_t diff = 0 ;
int err ;
uint8_t * top = NULL ;
uint32_t pkts_read = 0 ;
SCEnter ( ) ;
while ( 1 )
{
if ( suricata_ctl_flags & SURICATA_STOP | |
@ -333,31 +316,13 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt ( TM_ECODE_FAILED ) ;
}
/* Make sure we have at least one packet in the packet pool,
* to prevent us from alloc ' ing packets at line rate . */
do {
packet_q_len = PacketPoolSize ( ) ;
if ( unlikely ( packet_q_len = = 0 ) ) {
PacketPoolWait ( ) ;
}
} while ( packet_q_len = = 0 ) ;
/* NOTE/JNM: This might not work well if we start restricting the
* number of ERF records processed per call to a small number as
* the over head required here could exceed the time it takes to
* process a small number of ERF records .
*
* XXX / JNM : Possibly process the DAG stream buffer first if there
* are ERF packets or else call dag_advance_stream and then process
* the DAG stream buffer .
*/
top = dag_advance_stream ( dtv - > dagfd , dtv - > dagstream , & ( dtv - > btm ) ) ;
if ( NULL = = top )
{
if ( ( dtv - > dagstream & 0x1 ) & & ( errno = = EAGAIN ) ) {
usleep ( 10 * 1000 ) ;
dtv - > btm = dtv - > top ;
if ( top = = NULL ) {
if ( errno = = EAGAIN ) {
if ( dtv - > dagstream & 0x1 ) {
usleep ( 10 * 1000 ) ;
dtv - > btm = dtv - > top ;
}
continue ;
}
else {
@ -369,8 +334,7 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
}
diff = top - dtv - > btm ;
if ( diff = = 0 )
{
if ( diff = = 0 ) {
continue ;
}
@ -385,10 +349,10 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
ReceiveErfDagCloseStream ( dtv - > dagfd , dtv - > dagstream ) ;
SCReturnInt ( err ) ;
}
}
SCLogDebug ( " Read %d records from stream: %d, DAG: %s " ,
pkts_read , dtv - > dagstream , dtv - > dagname ) ;
SCLogDebug ( " Read %d records from stream: %d, DAG: %s " ,
pkts_read , dtv - > dagstream , dtv - > dagname ) ;
}
if ( suricata_ctl_flags ! = 0 ) {
SCReturnInt ( TM_ECODE_FAILED ) ;
@ -403,61 +367,66 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
* This function takes a pointer to buffer read from the DAG interface
* and processes it individual records .
*/
TmEcode ProcessErfDagRecords ( ErfDagThreadVars * ewtn ,
uint8_t * top ,
uint32_t * pkts_read )
static inline TmEcode ProcessErfDagRecords ( ErfDagThreadVars * ewtn , uint8_t * top ,
uint32_t * pkts_read )
{
SCEnter ( ) ;
Packet * p ;
int err = 0 ;
dag_record_t * dr = NULL ;
char * prec = NULL ;
int rlen ;
int err = 0 ;
dag_record_t * dr = NULL ;
char * prec = NULL ;
int rlen ;
char hdr_type = 0 ;
int processed = 0 ;
int packet_q_len = 0 ;
* pkts_read = 0 ;
while ( ( ( top - ( ewtn - > btm ) ) > = dag_record_size ) & &
( ( * pkts_read ) < ( ewtn - > dag_max_read_packets ) ) )
{
prec = ( char * ) ewtn - > btm ;
dr = ( dag_record_t * ) prec ;
while ( ( ( top - ewtn - > btm ) > = dag_record_size ) & &
( ( processed + dag_record_size ) < 4 * 1024 * 1024 ) ) {
rlen = ntohs ( dr - > rlen ) ;
/* Make sure we have at least one packet in the packet pool,
* to prevent us from alloc ' ing packets at line rate . */
do {
packet_q_len = PacketPoolSize ( ) ;
if ( unlikely ( packet_q_len = = 0 ) ) {
PacketPoolWait ( ) ;
}
} while ( packet_q_len = = 0 ) ;
if ( rlen = = 20 ) {
rlen = 28 ;
SCLogWarning ( SC_WARN_ERF_DAG_REC_LEN_CHANGED ,
" Warning, adjusted the length of ERF from 20 to 28 on stream: %d, DAG: %s " ,
ewtn - > dagstream , ewtn - > dagname ) ;
}
prec = ( char * ) ewtn - > btm ;
dr = ( dag_record_t * ) prec ;
rlen = ntohs ( dr - > rlen ) ;
hdr_type = dr - > type ;
/* If we don't have enough data to finsih processing this ERF record
* return and maybe next time we will .
*/
if ( ( top - ( ewtn - > btm ) ) < rlen )
if ( ( top - ewtn - > btm ) < rlen )
SCReturnInt ( TM_ECODE_OK ) ;
p = PacketGetFromQueueOrAlloc ( ) ;
if ( p = = NULL ) {
SCLogError ( SC_ERR_MEM_ALLOC ,
" Failed to allocate a Packet on stream: %d, DAG: %s " ,
ewtn - > dagstream , ewtn - > dagname ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
err = ProcessErfDagRecord ( ewtn , prec , p ) ;
ewtn - > btm + = rlen ;
processed + = rlen ;
if ( err ! = TM_ECODE_OK ) {
TmqhOutputPacketpool ( ewtn - > tv , p ) ;
SCReturnInt ( err ) ;
/* Only support ethernet at this time. */
switch ( hdr_type & 0x7f ) {
case TYPE_PAD :
/* Skip. */
continue ;
case TYPE_ETH :
case TYPE_DSM_COLOR_ETH :
case TYPE_COLOR_ETH :
case TYPE_COLOR_HASH_ETH :
break ;
default :
SCLogError ( SC_ERR_UNIMPLEMENTED ,
" Processing of DAG record type: %d not implemented. " , dr - > type ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
ewtn - > btm + = rlen ;
err = TmThreadsSlotProcessPkt ( ewtn - > tv , ewtn - > slot , p ) ;
err = ProcessErfDagRecord ( ewtn , prec ) ;
if ( err ! = TM_ECODE_OK ) {
return err ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
( * pkts_read ) + + ;
@ -471,32 +440,49 @@ TmEcode ProcessErfDagRecords(ErfDagThreadVars *ewtn,
* \ param prec pointer to a DAG record .
* \ param
*/
TmEcode ProcessErfDagRecord ( ErfDagThreadVars * ewtn , char * prec , Packet * p )
static inline TmEcode ProcessErfDagRecord ( ErfDagThreadVars * ewtn , char * prec )
{
SCEnter ( ) ;
int wlen = 0 ;
int rlen = 0 ;
int hdr_num = 0 ;
char hdr_type = 0 ;
dag_record_t * dr = ( dag_record_t * ) prec ;
erf_payload_t * pload ;
Packet * p ;
assert ( prec ) ;
assert ( p ) ;
hdr_type = dr - > type ;
wlen = ntohs ( dr - > wlen ) ;
rlen = ntohs ( dr - > rlen ) ;
if ( p = = NULL ) SCReturnInt ( TM_ECODE_OK ) ;
/* count extension headers */
while ( hdr_type & 0x80 ) {
if ( rlen < ( dag_record_size + ( hdr_num * 8 ) ) ) {
SCLogError ( SC_ERR_UNIMPLEMENTED ,
" Insufficient captured packet length. " ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
hdr_type = prec [ ( dag_record_size + ( hdr_num * 8 ) ) ] ;
hdr_num + + ;
}
/* Only support ethernet at this time. */
if ( dr - > type ! = TYPE_ETH & &
dr - > type ! = TYPE_DSM_COLOR_ETH & &
dr - > type ! = TYPE_COLOR_ETH & &
dr - > type ! = TYPE_COLOR_HASH_ETH ) {
SCLogError ( SC_ERR_UNIMPLEMENTED ,
" Processing of DAG record type: %d not implemented. " , dr - > type ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
/* Check that the whole frame was captured */
if ( rlen < ( dag_record_size + ( 8 * hdr_num ) + 2 + wlen ) ) {
SCLogInfo ( " Incomplete frame captured. " ) ;
SCReturnInt ( TM_ECODE_OK ) ;
}
wlen = ntohs ( dr - > wlen ) ;
/* skip over extension headers */
pload = ( erf_payload_t * ) ( prec + dag_record_size + ( 8 * hdr_num ) ) ;
pload = & ( dr - > rec ) ;
p = PacketGetFromQueueOrAlloc ( ) ;
if ( p = = NULL ) {
SCLogError ( SC_ERR_MEM_ALLOC ,
" Failed to allocate a Packet on stream: %d, DAG: %s " ,
ewtn - > dagstream , ewtn - > dagname ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
SET_PKT_LEN ( p , wlen - 4 ) ; /* Trim the FCS... */
p - > datalink = LINKTYPE_ETHERNET ;
@ -504,7 +490,10 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
/* Take into account for link type Ethernet ETH frame starts
* after ther ERF header + pad .
*/
PacketCopyData ( p , pload - > eth . dst , GET_PKT_LEN ( p ) ) ;
if ( unlikely ( PacketCopyData ( p , pload - > eth . dst , GET_PKT_LEN ( p ) ) ) ) {
TmqhOutputPacketpool ( ewtn - > tv , p ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
/* Convert ERF time to timeval - from libpcap. */
uint64_t ts = dr - > ts ;
@ -520,6 +509,11 @@ TmEcode ProcessErfDagRecord(ErfDagThreadVars *ewtn, char *prec, Packet *p)
ewtn - > pkts + + ;
ewtn - > bytes + = wlen ;
if ( TmThreadsSlotProcessPkt ( ewtn - > tv , ewtn - > slot , p ) ! = TM_ECODE_OK ) {
TmqhOutputPacketpool ( ewtn - > tv , p ) ;
SCReturnInt ( TM_ECODE_FAILED ) ;
}
SCReturnInt ( TM_ECODE_OK ) ;
}