napatech: simplify integration with Napatech cards

- There is now an option to automatically create streams on the
  correct NUMA node when using cpu affinity.

- When not using cpu affinity the user can specify streams to be
  created in the suricata.yaml file.  It is no longer required to
  use NTPL to create streams before running suricata.

- The legacy usage model of running NTPL to create streams is still
  available. This can be used for legacy configurations and complex
  configurations that cannot be satisfied by the auto-config option.
pull/3788/head
Phil Young 6 years ago committed by Victor Julien
parent fd9f64d00f
commit 05271bfbe5

@ -1973,6 +1973,19 @@
echo " ERROR! libntapi library not found"
echo
exit 1
else
AC_CHECK_LIB(numa, numa_available,, LIBNUMA="no")
if test "$LIBNUMA" = "no"; then
echo
echo " WARNING: libnuma is required to use Napatech auto-config"
echo " libnuma is not found. Go get it"
echo " from http://github.com/numactl/numactl or your distribution:"
echo " Ubuntu: apt-get install libnuma-dev"
echo " Fedora: dnf install numactl-devel"
echo " CentOS/RHEL: yum install numactl-devel"
echo
exit 1
fi
fi
AC_DEFINE([HAVE_NAPATECH],[1],(Napatech capture card support))

@ -40,16 +40,36 @@
#define NT_RUNMODE_AUTOFP 1
#define NT_RUNMODE_WORKERS 2
static const char *default_mode = NULL;
static const char *default_mode = "workers";
#ifdef HAVE_NAPATECH
#define MAX_STREAMS 256
static uint16_t num_configured_streams = 0;
static uint16_t first_stream = 0xffff;
static uint16_t last_stream = 0xffff;
static int auto_config = 0;
uint16_t GetNumConfiguredStreams(void) {
uint16_t NapatechGetNumConfiguredStreams(void)
{
return num_configured_streams;
}
uint16_t NapatechGetNumFirstStream(void)
{
return first_stream;
}
uint16_t NapatechGetNumLastStream(void)
{
return last_stream;
}
bool NapatechIsAutoConfigEnabled(void)
{
return (auto_config != 0);
}
#endif
const char *RunModeNapatechGetDefaultMode(void)
@ -60,12 +80,6 @@ const char *RunModeNapatechGetDefaultMode(void)
void RunModeNapatechRegister(void)
{
#ifdef HAVE_NAPATECH
default_mode = "autofp";
RunModeRegisterNewRunMode(RUNMODE_NAPATECH, "autofp",
"Multi threaded Napatech mode. Packets from "
"each flow are assigned to a single detect "
"thread instead of any detect thread",
RunModeNapatechAutoFp);
RunModeRegisterNewRunMode(RUNMODE_NAPATECH, "workers",
"Workers Napatech mode, each thread does all"
" tasks from acquisition to logging",
@ -77,25 +91,27 @@ void RunModeNapatechRegister(void)
#ifdef HAVE_NAPATECH
static int NapatechRegisterDeviceStreams(void)
{
/* Display the configuration mode */
int use_all_streams;
if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf");
exit(EXIT_FAILURE);
SCLogInfo("Could not find napatech.use-all-streams in config file. Defaulting to \"no\".");
use_all_streams = 0;
}
if (ConfGetBool("napatech.auto-config", &auto_config) == 0) {
SCLogInfo("napatech.auto-config not found in config file. Defaulting to disabled.");
}
if (use_all_streams) {
SCLogInfo("Using All Napatech Streams");
} else {
SCLogInfo("Using Selected Napatech Streams");
if (use_all_streams && auto_config) {
SCLogError(SC_ERR_RUNMODE, "auto-config cannot be used with use-all-streams.");
}
/* Get the stream ID's either from the conf or by querying Napatech */
NapatechStreamConfig stream_config[MAX_STREAMS];
uint16_t stream_cnt = NapatechGetStreamConfig(stream_config);
num_configured_streams = stream_cnt;
SCLogDebug("Configuring %d Napatech Streams...", stream_cnt);
@ -103,13 +119,31 @@ static int NapatechRegisterDeviceStreams(void)
for (uint16_t inst = 0; inst < stream_cnt; ++inst) {
char *plive_dev_buf = SCCalloc(1, 9);
if (unlikely(plive_dev_buf == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for NAPATECH stream counter.");
exit(EXIT_FAILURE);
}
snprintf(plive_dev_buf, 9, "nt%d", stream_config[inst].stream_id);
SCLogInfo("registering Napatech device: %s - active stream%sfound.",
plive_dev_buf, stream_config[inst].is_active ? " " : " NOT ");
if (auto_config) {
if (stream_config[inst].is_active) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Registering Napatech device: %s - active stream found.",
plive_dev_buf);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Delete the stream or disable auto-config before running.");
exit(EXIT_FAILURE);
}
} else {
SCLogInfo("Registering Napatech device: %s - active stream%sfound.",
plive_dev_buf, stream_config[inst].is_active ? " " : " NOT ");
}
LiveRegisterDevice(plive_dev_buf);
if (first_stream == 0xffff) {
first_stream = stream_config[inst].stream_id;
}
last_stream = stream_config[inst].stream_id;
}
/* Napatech stats come from a separate thread. This will surpress
@ -124,13 +158,15 @@ static void *NapatechConfigParser(const char *device)
/* Expect device to be of the form nt%d where %d is the stream id to use */
int dev_len = strlen(device);
if (dev_len < 3 || dev_len > 5) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG, "Could not parse config for device: %s - invalid length", device);
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Could not parse config for device: %s - invalid length", device);
return NULL;
}
struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf));
if (unlikely(conf == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device name.");
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for NAPATECH device name.");
return NULL;
}
@ -155,54 +191,50 @@ static int NapatechGetThreadsCount(void *conf __attribute__((unused)))
static int NapatechInit(int runmode)
{
int ret;
char error_buf[100];
int status;
RunModeInitialize();
TimeModeSetLive();
/* Initialize the API and check version compatibility */
if ((ret = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) {
NT_ExplainError(ret, error_buf, sizeof (error_buf));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_Init failed. Code 0x%X = %s", ret, error_buf);
if ((status = NT_Init(NTAPI_VERSION)) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_INIT_FAILED, status);
exit(EXIT_FAILURE);
}
ret = NapatechRegisterDeviceStreams();
if (ret < 0 || num_configured_streams <= 0) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Unable to setup up Napatech Streams");
status = NapatechRegisterDeviceStreams();
if (status < 0 || num_configured_streams <= 0) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Unable to find existing Napatech Streams");
exit(EXIT_FAILURE);
}
struct NapatechStreamDevConf *conf = SCCalloc(1, sizeof (struct NapatechStreamDevConf));
struct NapatechStreamDevConf *conf =
SCCalloc(1, sizeof (struct NapatechStreamDevConf));
if (unlikely(conf == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH device.");
exit(EXIT_FAILURE);
}
if ( (ConfGetInt("napatech.hba", &conf->hba) != 0) && (conf->hba > 0)){
SCLogInfo("Host Buffer Allowance: %d", (int)conf->hba);
if ((ConfGetInt("napatech.hba", &conf->hba) != 0) && (conf->hba > 0)) {
SCLogInfo("Host Buffer Allowance: %d", (int) conf->hba);
}
/* Start a thread to process the statistics */
NapatechStartStats();
switch (runmode) {
case NT_RUNMODE_AUTOFP:
ret = RunModeSetLiveCaptureAutoFp(NapatechConfigParser, NapatechGetThreadsCount,
"NapatechStream", "NapatechDecode",
thread_name_autofp, NULL);
break;
case NT_RUNMODE_WORKERS:
ret = RunModeSetLiveCaptureWorkers(NapatechConfigParser, NapatechGetThreadsCount,
status = RunModeSetLiveCaptureWorkers(NapatechConfigParser,
NapatechGetThreadsCount,
"NapatechStream", "NapatechDecode",
thread_name_workers, NULL);
break;
default:
ret = -1;
status = -1;
}
if (ret != 0) {
if (status != 0) {
SCLogError(SC_ERR_RUNMODE, "Runmode start failed");
exit(EXIT_FAILURE);
}

@ -38,7 +38,11 @@ int RunModeNapatechWorkers(void);
void RunModeNapatechRegister(void);
const char *RunModeNapatechGetDefaultMode(void);
uint16_t GetNumConfiguredStreams(void);
uint16_t NapatechGetNumConfiguredStreams(void);
uint16_t NapatechGetNumFirstStream(void);
uint16_t NapatechGetNumLastStream(void);
bool NapatechIsAutoConfigEnabled(void);

@ -25,6 +25,7 @@
* Requires libntapi from Napatech A/S.
*
*/
#include "suricata-common.h"
#include "suricata.h"
#include "threadvars.h"
@ -41,7 +42,8 @@
TmEcode NoNapatechSupportExit(ThreadVars *, const void *, void **);
void TmModuleNapatechStreamRegister(void) {
void TmModuleNapatechStreamRegister(void)
{
tmm_modules[TMM_RECEIVENAPATECH].name = "NapatechStream";
tmm_modules[TMM_RECEIVENAPATECH].ThreadInit = NoNapatechSupportExit;
tmm_modules[TMM_RECEIVENAPATECH].Func = NULL;
@ -51,7 +53,8 @@ void TmModuleNapatechStreamRegister(void) {
tmm_modules[TMM_RECEIVENAPATECH].cap_flags = SC_CAP_NET_ADMIN;
}
void TmModuleNapatechDecodeRegister(void) {
void TmModuleNapatechDecodeRegister(void)
{
tmm_modules[TMM_DECODENAPATECH].name = "NapatechDecode";
tmm_modules[TMM_DECODENAPATECH].ThreadInit = NoNapatechSupportExit;
tmm_modules[TMM_DECODENAPATECH].Func = NULL;
@ -62,7 +65,8 @@ void TmModuleNapatechDecodeRegister(void) {
tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM;
}
TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data) {
TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data)
{
SCLogError(SC_ERR_NAPATECH_NOSUPPORT,
"Error creating thread %s: you do not have support for Napatech adapter "
"enabled please recompile with --enable-napatech", tv->name);
@ -71,7 +75,7 @@ TmEcode NoNapatechSupportExit(ThreadVars *tv, const void *initdata, void **data)
#else /* Implied we do have NAPATECH support */
#include <numa.h>
#include <nt.h>
#define MAX_STREAMS 256
@ -102,6 +106,15 @@ SC_ATOMIC_DECLARE(uint64_t, total_packets);
SC_ATOMIC_DECLARE(uint64_t, total_drops);
SC_ATOMIC_DECLARE(uint16_t, total_tallied);
/* Streams are counted as they are instantiated in order to know when all threads
* are running*/
SC_ATOMIC_DECLARE(uint16_t, stream_count);
SC_ATOMIC_DECLARE(uint16_t, numa0_count);
SC_ATOMIC_DECLARE(uint16_t, numa1_count);
SC_ATOMIC_DECLARE(uint16_t, numa2_count);
SC_ATOMIC_DECLARE(uint16_t, numa3_count);
/**
* \brief Register the Napatech receiver (reader) module.
*/
@ -121,6 +134,12 @@ void TmModuleNapatechStreamRegister(void)
SC_ATOMIC_INIT(total_packets);
SC_ATOMIC_INIT(total_drops);
SC_ATOMIC_INIT(total_tallied);
SC_ATOMIC_INIT(stream_count);
SC_ATOMIC_INIT(numa0_count);
SC_ATOMIC_INIT(numa1_count);
SC_ATOMIC_INIT(numa2_count);
SC_ATOMIC_INIT(numa3_count);
}
/**
@ -138,13 +157,6 @@ void TmModuleNapatechDecodeRegister(void)
tmm_modules[TMM_DECODENAPATECH].flags = TM_FLAG_DECODE_TM;
}
/*
*-----------------------------------------------------------------------------
*-----------------------------------------------------------------------------
* Statistics code
*-----------------------------------------------------------------------------
*/
/**
* \brief Initialize the Napatech receiver thread, generate a single
* NapatechThreadVar structure for each thread, this will
@ -192,6 +204,66 @@ static void NapatechReleasePacket(struct Packet_ *p)
PacketEnqueue(&packets_to_release[p->ntpv.stream_id], p);
}
static int GetNumaNode(void)
{
int cpu = 0;
int node = 0;
#if defined(__linux__)
cpu = sched_getcpu();
node = numa_node_of_cpu(cpu);
#else
SCLogWarning(SC_ERR_NAPATECH_NOSUPPORT,
"Auto configuration of NUMA node is not supported on this OS.");
#endif
return node;
}
static void RecommendNUMAConfig(SCLogLevel log_level)
{
char string0[16];
char string1[16];
char string2[16];
char string3[16];
int set_cpu_affinity = 0;
if (ConfGetBool("threading.set-cpu-affinity", &set_cpu_affinity) != 1) {
set_cpu_affinity = 0;
}
if (set_cpu_affinity) {
SCLog(log_level, __FILE__, __FUNCTION__, __LINE__,
"Minimum host buffers that should be defined in ntservice.ini:");
SCLog(log_level, __FILE__, __FUNCTION__, __LINE__,
" NUMA Node 0: %d", (SC_ATOMIC_GET(numa0_count)));
if (numa_max_node() >= 1) SCLog(log_level, __FILE__, __FUNCTION__, __LINE__,
" NUMA Node 1: %d ", (SC_ATOMIC_GET(numa1_count)));
if (numa_max_node() >= 2) SCLog(log_level, __FILE__, __FUNCTION__, __LINE__,
" NUMA Node 2: %d ", (SC_ATOMIC_GET(numa2_count)));
if (numa_max_node() >= 3) SCLog(log_level, __FILE__, __FUNCTION__, __LINE__,
" NUMA Node 3: %d ", (SC_ATOMIC_GET(numa3_count)));
snprintf(string0, 16, "[%d, 16, 0]", SC_ATOMIC_GET(numa0_count));
snprintf(string1, 16, (numa_max_node() >= 1 ? ",[%d, 16, 1]" : ""),
SC_ATOMIC_GET(numa1_count));
snprintf(string2, 16, (numa_max_node() >= 2 ? ",[%d, 16, 2]" : ""),
SC_ATOMIC_GET(numa2_count));
snprintf(string3, 16, (numa_max_node() >= 3 ? ",[%d, 16, 3]" : ""),
SC_ATOMIC_GET(numa3_count));
SCLog(log_level, __FILE__, __FUNCTION__, __LINE__,
"E.g.: HostBuffersRx=%s%s%s%s", string0, string1, string2, string3);
} else if (log_level == SC_LOG_ERROR) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Or, try running /opt/napatech3/bin/ntpl -e \"delete=all\" to clean-up stream NUMA config.");
}
}
TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
{
int32_t status;
@ -202,14 +274,74 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
uint64_t hba_pkt_drops = 0;
uint64_t hba_byte_drops = 0;
uint16_t hba_pkt = 0;
uint32_t filter_id = 0;
uint32_t hash_id = 0;
uint32_t numa_node = 0;
int set_cpu_affinity = 0;
/* This just keeps the startup output more orderly. */
usleep(200000 * ntv->stream_id);
if (NapatechIsAutoConfigEnabled()) {
numa_node = GetNumaNode();
switch (numa_node) {
case 0: SC_ATOMIC_ADD(numa0_count, 1);
break;
case 1: SC_ATOMIC_ADD(numa1_count, 1);
break;
case 2: SC_ATOMIC_ADD(numa2_count, 1);
break;
case 3: SC_ATOMIC_ADD(numa3_count, 1);
break;
default: break;
}
if (ConfGetBool("threading.set-cpu-affinity", &set_cpu_affinity) != 1) {
set_cpu_affinity = 0;
}
if (set_cpu_affinity) {
NapatechSetupNuma(ntv->stream_id, numa_node);
}
}
if (NapatechIsAutoConfigEnabled()) {
numa_node = GetNumaNode();
SC_ATOMIC_ADD(stream_count, 1);
if (SC_ATOMIC_GET(stream_count) == NapatechGetNumConfiguredStreams()) {
/* The last thread to run sets up the streams */
status = NapatechSetupTraffic(NapatechGetNumFirstStream(),
NapatechGetNumLastStream(),
&filter_id, &hash_id);
if (filter_id == 0) {
if (status == 0x20002061) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Check host buffer configuration in ntservice.ini.");
RecommendNUMAConfig(SC_LOG_ERROR);
} else if (filter_id == 0x20000008) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Check napatech.ports in the suricata config file.");
}
exit(EXIT_FAILURE);
}
RecommendNUMAConfig(SC_LOG_INFO);
}
}
SCLogInfo("Napatech Packet Loop Started - cpu: %3d, cpu_numa: %3d stream: %3u ",
sched_getcpu(), numa_node, ntv->stream_id);
if (ntv->hba > 0) {
char *s_hbad_pkt = SCCalloc(1, 32);
if (unlikely(s_hbad_pkt == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for NAPATECH stream counter.");
exit(EXIT_FAILURE);
}
snprintf(s_hbad_pkt, 32, "nt%d.hba_drop", ntv->stream_id);
@ -219,20 +351,14 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
}
SCLogDebug("Opening NAPATECH Stream: %lu for processing", ntv->stream_id);
if ((status = NT_NetRxOpen(&(ntv->rx_stream), "SuricataStream", NT_NET_INTERFACE_PACKET, ntv->stream_id, ntv->hba)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_OPEN_FAILED, "Failed to open NAPATECH Stream: %u - %s", ntv->stream_id, error_buffer);
if ((status = NT_NetRxOpen(&(ntv->rx_stream), "SuricataStream",
NT_NET_INTERFACE_PACKET, ntv->stream_id, ntv->hba)) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_OPEN_FAILED, status);
SCFree(ntv);
SCReturnInt(TM_ECODE_FAILED);
}
#if defined(__linux__)
SCLogInfo("Napatech Packet Loop Started - cpu: %3d, stream: %3u (numa: %u)",
sched_getcpu(), ntv->stream_id, NapatechGetNumaNode(ntv->stream_id));
#else
SCLogInfo("Napatech Packet Loop Started - stream: %lu ", ntv->stream_id);
#endif
TmSlot *s = (TmSlot *) slot;
ntv->slot = s->slot_next;
@ -246,8 +372,7 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
if (unlikely(status == NT_STATUS_TIMEOUT || status == NT_STATUS_TRYAGAIN)) {
continue;
} else if (unlikely(status != NT_SUCCESS)) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
NAPATECH_ERROR(SC_ERR_NAPATECH_OPEN_FAILED, status);
SCLogInfo("Failed to read from Napatech Stream%d: %s",
ntv->stream_id, error_buffer);
SCReturnInt(TM_ECODE_FAILED);
@ -263,7 +388,8 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
/*
* Handle the different timestamp forms that the napatech cards could use
* - NT_TIMESTAMP_TYPE_NATIVE is not supported due to having an base of 0 as opposed to NATIVE_UNIX which has a base of 1/1/1970
* - NT_TIMESTAMP_TYPE_NATIVE is not supported due to having an base
* of 0 as opposed to NATIVE_UNIX which has a base of 1/1/1970
*/
switch (NT_NET_GET_PKT_TIMESTAMP_TYPE(packet_buffer)) {
case NT_TIMESTAMP_TYPE_NATIVE_UNIX:
@ -276,12 +402,14 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
break;
case NT_TIMESTAMP_TYPE_PCAP_NANOTIME:
p->ts.tv_sec = pkt_ts >> 32;
p->ts.tv_usec = ((pkt_ts & 0xFFFFFFFF) / 1000) + (pkt_ts % 1000) > 500 ? 1 : 0;
p->ts.tv_usec = ( (pkt_ts & 0xFFFFFFFF) / 1000)
+ (pkt_ts % 1000) > 500 ? 1 : 0;
break;
case NT_TIMESTAMP_TYPE_NATIVE_NDIS:
/* number of seconds between 1/1/1601 and 1/1/1970 */
p->ts.tv_sec = (pkt_ts / 100000000) - 11644473600;
p->ts.tv_usec = ((pkt_ts % 100000000) / 100) + (pkt_ts % 100) > 50 ? 1 : 0;
p->ts.tv_usec = ( (pkt_ts % 100000000) / 100)
+ (pkt_ts % 100) > 50 ? 1 : 0;
break;
default:
SCLogError(SC_ERR_NAPATECH_TIMESTAMP_TYPE_NOT_SUPPORTED,
@ -294,9 +422,9 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
if (unlikely(ntv->hba > 0)) {
NtNetRx_t stat_cmd;
stat_cmd.cmd = NT_NETRX_READ_CMD_STREAM_DROP;
// Update drop counter
/* Update drop counter */
if (unlikely((status = NT_NetRxRead(ntv->rx_stream, &stat_cmd)) != NT_SUCCESS)) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
NAPATECH_ERROR(SC_ERR_NAPATECH_OPEN_FAILED, status);
SCLogInfo("Couldn't retrieve drop statistics from the RX stream: %u - %s",
ntv->stream_id, error_buffer);
} else {
@ -312,7 +440,10 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
p->ntpv.stream_id = ntv->stream_id;
p->datalink = LINKTYPE_ETHERNET;
if (unlikely(PacketSetData(p, (uint8_t *)NT_NET_GET_PKT_L2_PTR(packet_buffer), NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) {
if (unlikely(PacketSetData(p,
(uint8_t *) NT_NET_GET_PKT_L2_PTR(packet_buffer),
NT_NET_GET_PKT_WIRE_LENGTH(packet_buffer)))) {
TmqhOutputPacketpool(ntv->tv, p);
NT_NetRxRelease(ntv->rx_stream, packet_buffer);
SCReturnInt(TM_ECODE_FAILED);
@ -333,14 +464,22 @@ TmEcode NapatechPacketLoopZC(ThreadVars *tv, void *data, void *slot)
StatsSyncCountersIfSignalled(tv);
}
if (filter_id) {
NapatechDeleteFilter(filter_id);
}
if (hash_id) {
NapatechDeleteFilter(hash_id);
}
if (unlikely(ntv->hba > 0)) {
SCLogInfo("Host Buffer Allowance Drops - pkts: %ld, bytes: %ld", hba_pkt_drops, hba_byte_drops);
SCLogInfo("Host Buffer Allowance Drops - pkts: %ld, bytes: %ld",
hba_pkt_drops, hba_byte_drops);
}
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Print some stats to the log at program exit.
*
@ -355,24 +494,24 @@ void NapatechStreamThreadExitStats(ThreadVars *tv, void *data)
double percent = 0;
if (stat.current_drops > 0)
percent = (((double) stat.current_drops)
/ (stat.current_packets + stat.current_drops)) * 100;
/ (stat.current_packets + stat.current_drops)) * 100;
SCLogInfo("nt%lu - pkts: %lu; drop: %lu (%5.2f%%); bytes: %lu",
(uint64_t) ntv->stream_id, stat.current_packets,
stat.current_drops, percent, stat.current_bytes);
(uint64_t) ntv->stream_id, stat.current_packets,
stat.current_drops, percent, stat.current_bytes);
SC_ATOMIC_ADD(total_packets, stat.current_packets);
SC_ATOMIC_ADD(total_drops, stat.current_drops);
SC_ATOMIC_ADD(total_tallied, 1);
if (SC_ATOMIC_GET(total_tallied) == GetNumConfiguredStreams()) {
if (SC_ATOMIC_GET(total_tallied) == NapatechGetNumConfiguredStreams()) {
if (SC_ATOMIC_GET(total_drops) > 0)
percent = (((double) SC_ATOMIC_GET(total_drops)) / (SC_ATOMIC_GET(total_packets)
+ SC_ATOMIC_GET(total_drops))) * 100;
+ SC_ATOMIC_GET(total_drops))) * 100;
SCLogInfo(" ");
SCLogInfo("--- Total Packets: %ld Total Dropped: %ld (%5.2f%%)",
SC_ATOMIC_GET(total_packets), SC_ATOMIC_GET(total_drops), percent);
SC_ATOMIC_GET(total_packets), SC_ATOMIC_GET(total_drops), percent);
}
}
@ -424,7 +563,7 @@ TmEcode NapatechDecode(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq,
break;
default:
SCLogError(SC_ERR_DATALINK_UNIMPLEMENTED,
"Error: datalink type %" PRId32 " not yet supported in module NapatechDecode",
"Error: datalink type %" PRId32 " not yet supported in module NapatechDecode",
p->datalink);
break;
}
@ -450,6 +589,7 @@ TmEcode NapatechDecodeThreadDeinit(ThreadVars *tv, void *data)
{
if (data != NULL)
DecodeThreadVarsFree(tv, data);
SCReturnInt(TM_ECODE_OK); }
SCReturnInt(TM_ECODE_OK);
}
#endif /* HAVE_NAPATECH */

@ -24,15 +24,14 @@
#ifndef __SOURCE_NAPATECH_H__
#define __SOURCE_NAPATECH_H__
void TmModuleNapatechStreamRegister (void);
void TmModuleNapatechStreamRegister(void);
TmEcode NapatechStreamThreadDeinit(ThreadVars *tv, void *data);
void TmModuleNapatechDecodeRegister (void);
void TmModuleNapatechDecodeRegister(void);
#ifdef HAVE_NAPATECH
#include <nt.h>
struct NapatechStreamDevConf
{
struct NapatechStreamDevConf {
uint16_t stream_id;
intmax_t hba;
};

@ -22,55 +22,21 @@
* \author Phil Young <py@napatech.com>
*
*
*/
*/
#include "suricata-common.h"
#ifdef HAVE_NAPATECH
#include "suricata.h"
#include "util-device.h"
#include "util-cpu.h"
#include "threadvars.h"
#include "tm-threads.h"
uint16_t NapatechGetNumaNode(uint16_t stream_id)
{
int status;
char buffer[80]; // Error buffer
NtInfoStream_t info_stream;
NtInfo_t info;
uint16_t numa_node;
if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
NT_ExplainError(status, buffer, sizeof (buffer) - 1);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", buffer);
exit(EXIT_FAILURE);
}
// Read the info on this specific stream
info.cmd = NT_INFO_CMD_READ_STREAMID;
info.u.streamID.streamId = stream_id;
if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
NT_ExplainError(status, buffer, sizeof (buffer) - 1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", buffer);
exit(EXIT_FAILURE);
}
numa_node = info.u.streamID.data.numaNode;
if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
NT_ExplainError(status, buffer, sizeof (buffer) - 1);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoClose failed: %s", buffer);
exit(EXIT_FAILURE);
}
return numa_node;
}
/*-----------------------------------------------------------------------------
*-----------------------------------------------------------------------------
* Statistics code
*-----------------------------------------------------------------------------
*/
*/
typedef struct StreamCounters_ {
uint16_t pkts;
@ -87,6 +53,13 @@ NapatechCurrentStats NapatechGetCurrentStats(uint16_t id)
return current_stats[id];
}
enum CONFIG_SPECIFIER {
CONFIG_SPECIFIER_UNDEFINED = 0,
CONFIG_SPECIFIER_RANGE,
CONFIG_SPECIFIER_INDIVIDUAL
};
#define MAX_HOSTBUFFERS 8
static uint16_t TestStreamConfig(
NtInfoStream_t hInfo,
@ -112,7 +85,8 @@ static uint16_t TestStreamConfig(
if ((status = NT_StatRead(hStatStream, &stat)) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, buffer, sizeof (buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead():2 failed: %s\n", buffer);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_StatRead():2 failed: %s\n", buffer);
return 0;
}
@ -148,12 +122,14 @@ static uint32_t UpdateStreamStats(ThreadVars *tv,
hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_InfoRead() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
}
uint16_t num_active;
if ((num_active = TestStreamConfig(hInfo, hStatStream, stream_config, num_streams)) == 0) {
if ((num_active = TestStreamConfig(hInfo, hStatStream,
stream_config, num_streams)) == 0) {
/* None of the configured streams are active */
return 0;
}
@ -164,7 +140,7 @@ static uint32_t UpdateStreamStats(ThreadVars *tv,
for (stream_cnt = 0; stream_cnt < num_streams; ++stream_cnt) {
while(inst_id < num_streams) {
while (inst_id < num_streams) {
if (stream_config[inst_id].is_active) {
break;
} else {
@ -182,7 +158,8 @@ static uint32_t UpdateStreamStats(ThreadVars *tv,
if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_StatRead() failed: %s\n", error_buffer);
return 0;
}
@ -249,7 +226,8 @@ static void *NapatechStatsLoop(void *arg)
for (int i = 0; i < stream_cnt; ++i) {
char *pkts_buf = SCCalloc(1, 32);
if (unlikely(pkts_buf == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for NAPATECH stream counter.");
exit(EXIT_FAILURE);
}
@ -258,7 +236,8 @@ static void *NapatechStatsLoop(void *arg)
char *byte_buf = SCCalloc(1, 32);
if (unlikely(byte_buf == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for NAPATECH stream counter.");
exit(EXIT_FAILURE);
}
snprintf(byte_buf, 32, "nt%d.bytes", stream_config[i].stream_id);
@ -266,7 +245,8 @@ static void *NapatechStatsLoop(void *arg)
char *drop_buf = SCCalloc(1, 32);
if (unlikely(drop_buf == NULL)) {
SCLogError(SC_ERR_MEM_ALLOC, "Failed to allocate memory for NAPATECH stream counter.");
SCLogError(SC_ERR_MEM_ALLOC,
"Failed to allocate memory for NAPATECH stream counter.");
exit(EXIT_FAILURE);
}
snprintf(drop_buf, 32, "nt%d.drop", stream_config[i].stream_id);
@ -284,7 +264,7 @@ static void *NapatechStatsLoop(void *arg)
uint32_t num_active = UpdateStreamStats(tv, hInfo, hStatStream,
stream_cnt, stream_config, streamCounters);
if (num_active < stream_cnt) {
if (!NapatechIsAutoConfigEnabled() && (num_active < stream_cnt)) {
SCLogInfo("num_active: %d, stream_cnt: %d", num_active, stream_cnt);
SCLogWarning(SC_ERR_NAPATECH_CONFIG_STREAM,
"Some or all of the configured streams are not created. Proceeding with active streams.");
@ -333,7 +313,7 @@ static void *NapatechStatsLoop(void *arg)
#define HB_HIGHWATER 2048 //1982
static bool RegisteredStream(uint16_t stream_id, uint16_t num_registered,
NapatechStreamConfig registered_streams[])
NapatechStreamConfig registered_streams[])
{
for (uint16_t reg_id = 0; reg_id < num_registered; ++reg_id) {
if (stream_id == registered_streams[reg_id].stream_id) {
@ -343,7 +323,76 @@ static bool RegisteredStream(uint16_t stream_id, uint16_t num_registered,
return false;
}
uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
static uint32_t CountWorkerThreads(void)
{
int worker_count = 0;
ConfNode *affinity;
ConfNode *root = ConfGetNode("threading.cpu-affinity");
if (root != NULL) {
TAILQ_FOREACH(affinity, &root->head, next)
{
if (strcmp(affinity->val, "decode-cpu-set") == 0 ||
strcmp(affinity->val, "stream-cpu-set") == 0 ||
strcmp(affinity->val, "reject-cpu-set") == 0 ||
strcmp(affinity->val, "output-cpu-set") == 0) {
continue;
}
if (strcmp(affinity->val, "worker-cpu-set") == 0) {
ConfNode *node = ConfNodeLookupChild(affinity->head.tqh_first, "cpu");
ConfNode *lnode;
enum CONFIG_SPECIFIER cpu_spec = CONFIG_SPECIFIER_UNDEFINED;
TAILQ_FOREACH(lnode, &node->head, next)
{
uint8_t start, end;
if (strncmp(lnode->val, "all", 4) == 0) {
/* check that the sting in the config file is correctly specified */
if (cpu_spec != CONFIG_SPECIFIER_UNDEFINED) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Only one Napatech port specifier type allowed.");
exit(EXIT_FAILURE);
}
cpu_spec = CONFIG_SPECIFIER_RANGE;
worker_count = UtilCpuGetNumProcessorsConfigured();
} else if (strchr(lnode->val, '-')) {
/* check that the sting in the config file is correctly specified */
if (cpu_spec != CONFIG_SPECIFIER_UNDEFINED) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Only one Napatech port specifier type allowed.");
exit(EXIT_FAILURE);
}
cpu_spec = CONFIG_SPECIFIER_RANGE;
char copystr[16];
strlcpy(copystr, lnode->val, 16);
start = atoi(copystr);
end = atoi(strchr(copystr, '-') + 1);
worker_count = end - start + 1;
} else {
/* check that the sting in the config file is correctly specified */
if (cpu_spec == CONFIG_SPECIFIER_RANGE) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Napatech port range specifiers cannot be combined with individual stream specifiers.");
exit(EXIT_FAILURE);
}
cpu_spec = CONFIG_SPECIFIER_INDIVIDUAL;
++worker_count;
}
}
break;
}
}
}
return worker_count;
}
int NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
{
int status;
char error_buffer[80]; // Error buffer
@ -352,8 +401,12 @@ uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
NtInfoStream_t info_stream;
NtInfo_t info;
uint16_t instance_cnt = 0;
int use_all_streams;
int use_all_streams = 0;
int set_cpu_affinity = 0;
ConfNode *ntstreams;
uint16_t stream_id = 0;
uint16_t start = 0;
uint16_t end = 0;
for (uint16_t i = 0; i < MAX_STREAMS; ++i) {
stream_config[i].stream_id = 0;
@ -362,32 +415,33 @@ uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
}
if (ConfGetBool("napatech.use-all-streams", &use_all_streams) == 0) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.use-all-streams from Conf");
exit(EXIT_FAILURE);
/* default is "no" */
use_all_streams = 0;
}
if ((status = NT_InfoOpen(&info_stream, "SuricataStreamInfo")) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoOpen failed: %s", error_buffer);
exit(EXIT_FAILURE);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"NT_InfoOpen failed: %s", error_buffer);
return -1;
}
if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_RUNMODE, "NT_StatOpen() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
return -1;
}
if (use_all_streams) {
info.cmd = NT_INFO_CMD_READ_STREAM;
if ((status = NT_InfoRead(info_stream, &info)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "NT_InfoRead failed: %s", error_buffer);
exit(EXIT_FAILURE);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"NT_InfoRead failed: %s", error_buffer);
return -1;
}
uint16_t stream_id = 0;
while (instance_cnt < info.u.stream.data.count) {
/*
@ -405,8 +459,9 @@ uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
return 0;
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_StatRead() failed: %s\n", error_buffer);
return -1;
}
if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
@ -421,61 +476,89 @@ uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
}
} else {
/* When not using the default streams we need to parse the array of streams from the conf */
if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.streams from Conf");
exit(EXIT_FAILURE);
}
ConfGetBool("threading.set-cpu-affinity", &set_cpu_affinity);
if (NapatechIsAutoConfigEnabled() && (set_cpu_affinity == 1)) {
start = 0;
end = CountWorkerThreads() - 1;
} else {
/* When not using the default streams we need to
* parse the array of streams from the conf */
if ((ntstreams = ConfGetNode("napatech.streams")) == NULL) {
SCLogError(SC_ERR_RUNMODE,
"Failed retrieving napatech.streams from Config");
if (NapatechIsAutoConfigEnabled() && (set_cpu_affinity == 0)) {
SCLogError(SC_ERR_RUNMODE,
"if set-cpu-affinity: no in conf then napatech.streams must be defined");
}
exit(EXIT_FAILURE);
}
/* Loop through all stream numbers in the array and register the devices */
ConfNode *stream;
instance_cnt = 0;
/* Loop through all stream numbers in the array and register the devices */
ConfNode *stream;
enum CONFIG_SPECIFIER stream_spec = CONFIG_SPECIFIER_UNDEFINED;
instance_cnt = 0;
TAILQ_FOREACH(stream, &ntstreams->head, next) {
uint16_t stream_id = 0;
TAILQ_FOREACH(stream, &ntstreams->head, next)
{
if (stream == NULL) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, "Couldn't Parse Stream Configuration");
exit(EXIT_FAILURE);
}
if (stream == NULL) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Couldn't Parse Stream Configuration");
return -1;
}
uint16_t start, end;
if (strchr(stream->val, '-')) {
char copystr[16];
strlcpy(copystr, stream->val, 16);
if (strchr(stream->val, '-')) {
if (stream_spec != CONFIG_SPECIFIER_UNDEFINED) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Only one Napatech stream range specifier allowed.");
return -1;
}
stream_spec = CONFIG_SPECIFIER_RANGE;
start = atoi(copystr);
end = atoi(strchr(copystr, '-')+1);
} else {
stream_config[instance_cnt].stream_id = atoi(stream->val);
start = stream_config[instance_cnt].stream_id;
end = stream_config[instance_cnt].stream_id;
char copystr[16];
strlcpy(copystr, stream->val, 16);
start = atoi(copystr);
end = atoi(strchr(copystr, '-') + 1);
} else {
if (stream_spec == CONFIG_SPECIFIER_RANGE) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Napatech range and individual specifiers cannot be combined.");
exit(EXIT_FAILURE);
}
stream_spec = CONFIG_SPECIFIER_INDIVIDUAL;
stream_config[instance_cnt].stream_id = atoi(stream->val);
start = stream_config[instance_cnt].stream_id;
end = stream_config[instance_cnt].stream_id;
}
}
SCLogInfo("%s start: %d end: %d", stream->val, start, end);
}
for (stream_id = start; stream_id <= end; ++stream_id) {
/* if we get here it is configured in the .yaml file */
stream_config[instance_cnt].stream_id = stream_id;
for (stream_id = start; stream_id <= end; ++stream_id) {
/* if we get here it is configured in the .yaml file */
stream_config[instance_cnt].stream_id = stream_id;
/* Check to see if it is an active stream */
memset(&hStat, 0, sizeof (NtStatistics_t));
/* Check to see if it is an active stream */
memset(&hStat, 0, sizeof (NtStatistics_t));
/* Read usage data for the chosen stream ID */
hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
hStat.u.usageData_v0.streamid = (uint8_t) stream_config[instance_cnt].stream_id;
/* Read usage data for the chosen stream ID */
hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
hStat.u.usageData_v0.streamid =
(uint8_t) stream_config[instance_cnt].stream_id;
if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
return 0;
}
if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_StatRead() failed: %s\n", error_buffer);
return -1;
}
if (hStat.u.usageData_v0.data.numHostBufferUsed > 0) {
stream_config[instance_cnt].is_active = true;
}
instance_cnt++;
if (hStat.u.usageData_v0.data.numHostBufferUsed > 0) {
stream_config[instance_cnt].is_active = true;
}
instance_cnt++;
}
}
@ -484,20 +567,19 @@ uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[])
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_RUNMODE, "NT_StatClose() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
return -1;
}
if ((status = NT_InfoClose(info_stream)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"NT_InfoClose failed: %s", error_buffer);
exit(EXIT_FAILURE);
"NT_InfoClose failed: %s", error_buffer);
return -1;
}
return instance_cnt;
}
static void *NapatechBufMonitorLoop(void *arg)
{
ThreadVars *tv = (ThreadVars *) arg;
@ -523,27 +605,33 @@ static void *NapatechBufMonitorLoop(void *arg)
/* Open the info and Statistics */
if ((status = NT_InfoOpen(&hInfo, "InfoStream")) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoOpen() failed: %s\n", error_buffer);
exit(1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_InfoOpen() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
}
if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatOpen() failed: %s\n", error_buffer);
exit(1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_StatOpen() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
}
/* Read the info on all streams instantiated in the system */
hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_InfoRead() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
}
NapatechStreamConfig registered_streams[MAX_STREAMS];
uint16_t num_registered = NapatechGetStreamConfig(registered_streams);
int num_registered = NapatechGetStreamConfig(registered_streams);
if (num_registered == -1) {
exit(EXIT_FAILURE);
}
TmThreadsSetFlag(tv, THV_INIT_DONE);
while (1) {
@ -558,7 +646,8 @@ static void *NapatechBufMonitorLoop(void *arg)
hStreamInfo.cmd = NT_INFO_CMD_READ_STREAM;
if ((status = NT_InfoRead(hInfo, &hStreamInfo)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoRead() failed: %s\n", error_buffer);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_InfoRead() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
}
@ -580,8 +669,9 @@ static void *NapatechBufMonitorLoop(void *arg)
if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatRead() failed: %s\n", error_buffer);
exit(1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED,
"NT_StatRead() failed: %s\n", error_buffer);
exit(EXIT_FAILURE);
}
if (hStat.u.usageData_v0.data.numHostBufferUsed == 0) {
@ -594,7 +684,9 @@ static void *NapatechBufMonitorLoop(void *arg)
ave_OB_fill_level[stream_id] = 0;
ave_HB_fill_level[stream_id] = 0;
for (uint32_t hb_count = 0; hb_count < hStat.u.usageData_v0.data.numHostBufferUsed; hb_count++) {
for (uint32_t hb_count = 0;
hb_count < hStat.u.usageData_v0.data.numHostBufferUsed;
hb_count++) {
OB_fill_level[hb_count] =
((100 * hStat.u.usageData_v0.data.hb[hb_count].onboardBuffering.used) /
@ -623,11 +715,13 @@ static void *NapatechBufMonitorLoop(void *arg)
/* Host Buffer Fill Level warnings... */
if (ave_HB_fill_level[stream_id] >= (HB_alert_level[stream_id] + alertInterval)) {
while (ave_HB_fill_level[stream_id] >= HB_alert_level[stream_id] + alertInterval) {
while (ave_HB_fill_level[stream_id] >= HB_alert_level[stream_id]
+ alertInterval) {
HB_alert_level[stream_id] += alertInterval;
}
SCLogInfo("nt%d - Increasing Host Buffer Fill Level : %4d%%",
stream_id, ave_HB_fill_level[stream_id]);
stream_id, ave_HB_fill_level[stream_id] - 1);
}
if (HB_alert_level[stream_id] > 0) {
@ -673,7 +767,7 @@ static void *NapatechBufMonitorLoop(void *arg)
if ((status = NT_InfoClose(hInfo)) != NT_SUCCESS) {
NT_ExplainError(status, error_buffer, sizeof (error_buffer) - 1);
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_InfoClose() failed: %s\n", error_buffer);
exit(1);
exit(EXIT_FAILURE);
}
/* Close the statistics stream */
@ -681,7 +775,7 @@ static void *NapatechBufMonitorLoop(void *arg)
/* Get the status code as text */
NT_ExplainError(status, error_buffer, sizeof (error_buffer));
SCLogError(SC_ERR_NAPATECH_INIT_FAILED, "NT_StatClose() failed: %s\n", error_buffer);
exit(1);
exit(EXIT_FAILURE);
}
SCLogDebug("Exiting NapatechStatsLoop");
@ -733,4 +827,253 @@ void NapatechStartStats(void)
return;
}
bool NapatechSetupNuma(uint32_t stream, uint32_t numa)
{
uint32_t status = 0;
static NtConfigStream_t hconfig;
char ntpl_cmd[64];
snprintf(ntpl_cmd, 64, "setup[numanode=%d] = streamid == %d", numa, stream);
NtNtplInfo_t ntpl_info;
if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, status);
return false;
}
if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info, NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
status = ntpl_info.ntplId;
} else {
NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
return false;
}
return status;
}
static bool NapatechSetHashmode(uint32_t *filter_id)
{
uint32_t status = 0;
const char *hash_mode;
static NtConfigStream_t hconfig;
char ntpl_cmd[64];
NtNtplInfo_t ntpl_info;
*filter_id = 0;
/* Get the hashmode from the conf file. */
ConfGetValue("napatech.hashmode", &hash_mode);
snprintf(ntpl_cmd, 64, "hashmode = %s", hash_mode);
/* Issue the NTPL command */
if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, status);
return false;
}
if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info,
NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
*filter_id = ntpl_info.ntplId;
SCLogInfo("Napatech hashmode: %s ID: %d", hash_mode, status);
} else {
NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
status = 0;
}
return status;
}
static uint32_t GetStreamNUMAs(uint32_t stream_id, int stream_numas[])
{
NtStatistics_t hStat; // Stat handle.
NtStatStream_t hStatStream;
int status; // Status variable
for (int i = 0; i < MAX_HOSTBUFFERS; ++i)
stream_numas[i] = -1;
if ((status = NT_StatOpen(&hStatStream, "StatsStream")) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_INIT_FAILED, status);
exit(EXIT_FAILURE);
}
char pktCntStr[4096];
memset(pktCntStr, 0, sizeof (pktCntStr));
/* Read usage data for the chosen stream ID */
hStat.cmd = NT_STATISTICS_READ_CMD_USAGE_DATA_V0;
hStat.u.usageData_v0.streamid = (uint8_t) stream_id;
if ((status = NT_StatRead(hStatStream, &hStat)) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_INIT_FAILED, status);
exit(EXIT_FAILURE);
}
for (uint32_t hb_id = 0; hb_id < hStat.u.usageData_v0.data.numHostBufferUsed; ++hb_id) {
stream_numas[hb_id] = hStat.u.usageData_v0.data.hb[hb_id].numaNode;
}
return hStat.u.usageData_v0.data.numHostBufferUsed;
}
uint32_t NapatechSetupTraffic(uint32_t first_stream, uint32_t last_stream,
uint32_t *filter_id, uint32_t *hash_id)
{
#define PORTS_SPEC_SIZE 64
char ports_spec[PORTS_SPEC_SIZE];
ConfNode *ntports;
bool first_iteration = true;
int status = 0;
static NtConfigStream_t hconfig;
char ntpl_cmd[128];
NapatechSetHashmode(hash_id);
/* When not using the default streams we need to parse
* the array of streams from the conf
*/
if ((ntports = ConfGetNode("napatech.ports")) == NULL) {
SCLogError(SC_ERR_RUNMODE, "Failed retrieving napatech.ports from Conf");
exit(EXIT_FAILURE);
}
/* Loop through all ports in the array */
ConfNode *port;
enum CONFIG_SPECIFIER stream_spec = CONFIG_SPECIFIER_UNDEFINED;
/* Build the NTPL command using values in the config file. */
TAILQ_FOREACH(port, &ntports->head, next)
{
if (port == NULL) {
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED,
"Couldn't Parse Port Configuration");
exit(EXIT_FAILURE);
}
uint8_t start, end;
if (strncmp(port->val, "all", 3) == 0) {
/* check that the sting in the config file is correctly specified */
if (stream_spec != CONFIG_SPECIFIER_UNDEFINED) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Only one Napatech port specifier type allowed.");
exit(EXIT_FAILURE);
}
stream_spec = CONFIG_SPECIFIER_RANGE;
snprintf(ports_spec, sizeof(ports_spec), "all");
} else if (strchr(port->val, '-')) {
/* check that the sting in the config file is correctly specified */
if (stream_spec != CONFIG_SPECIFIER_UNDEFINED) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Only one Napatech port specifier type allowed.");
exit(EXIT_FAILURE);
}
stream_spec = CONFIG_SPECIFIER_RANGE;
char copystr[16];
strlcpy(copystr, port->val, sizeof(copystr));
start = atoi(copystr);
end = atoi(strchr(copystr, '-') + 1);
snprintf(ports_spec, sizeof(ports_spec), "port == (%d..%d)", start, end);
} else {
/* check that the sting in the config file is correctly specified */
if (stream_spec == CONFIG_SPECIFIER_RANGE) {
SCLogError(SC_ERR_NAPATECH_PARSE_CONFIG,
"Napatech port range specifiers cannot be combined with individual stream specifiers.");
exit(EXIT_FAILURE);
}
stream_spec = CONFIG_SPECIFIER_INDIVIDUAL;
/* Determine the ports to use on the NTPL assign statement*/
if (first_iteration) {
snprintf(ports_spec, sizeof(ports_spec), "port==%s", port->val);
first_iteration = false;
} else {
char temp[PORTS_SPEC_SIZE];
snprintf(temp, sizeof(temp), "%s,%s",ports_spec,port->val);
snprintf(ports_spec, sizeof(ports_spec), "%s", temp);
}
}
}
/* Build the NTPL command */
snprintf(ntpl_cmd, sizeof(ntpl_cmd), "assign[streamid=(%d..%d)] = %s",
first_stream, last_stream, ports_spec);
NtNtplInfo_t ntpl_info;
if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_INIT_FAILED, status);
exit(EXIT_FAILURE);
}
if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info,
NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
*filter_id = ntpl_info.ntplId;
status = ntpl_info.u.errorData.errCode;
SCLogInfo("NTPL filter assignment \"%s\" returned filter id %4d",
ntpl_cmd, *filter_id);
} else {
NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
status = ntpl_info.u.errorData.errCode;
return false;
}
SCLogInfo("Host-buffer NUMA assignments: ");
int numa_nodes[MAX_HOSTBUFFERS];
uint32_t stream_id;
for (stream_id = first_stream; stream_id < last_stream; ++stream_id) {
char temp1[256];
char temp2[256];
uint32_t num_host_buffers = GetStreamNUMAs(stream_id, numa_nodes);
snprintf(temp1, 256, " stream %d:", stream_id);
for (uint32_t hb_id = 0; hb_id < num_host_buffers; ++hb_id) {
snprintf(temp2, 256, "%s %d ", temp1, numa_nodes[hb_id]);
snprintf(temp1, 256, "%s", temp2);
}
SCLogInfo("%s", temp1);
}
return status;
}
bool NapatechDeleteFilter(uint32_t filter_id)
{
uint32_t status = 0;
static NtConfigStream_t hconfig;
char ntpl_cmd[64];
NtNtplInfo_t ntpl_info;
/* issue an NTPL command to delete the filter */
snprintf(ntpl_cmd, 64, "delete = %d", filter_id);
if ((status = NT_ConfigOpen(&hconfig, "ConfigStream")) != NT_SUCCESS) {
NAPATECH_ERROR(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, status);
exit(EXIT_FAILURE);
}
if ((status = NT_NTPL(hconfig, ntpl_cmd, &ntpl_info,
NT_NTPL_PARSER_VALIDATE_NORMAL)) == NT_SUCCESS) {
status = ntpl_info.ntplId;
SCLogInfo("Removed Napatech filter %d. ", filter_id);
} else {
NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status);
status = 0;
}
return status;
}
#endif // HAVE_NAPATECH

@ -28,16 +28,13 @@
#ifdef HAVE_NAPATECH
#include <nt.h>
typedef struct NapatechPacketVars_
{
typedef struct NapatechPacketVars_ {
uint64_t stream_id;
NtNetBuf_t nt_packet_buf;
ThreadVars *tv;
} NapatechPacketVars;
typedef struct NapatechStreamConfig_
{
typedef struct NapatechStreamConfig_ {
uint16_t stream_id;
bool is_active;
bool initialized;
@ -52,9 +49,37 @@ typedef struct NapatechCurrentStats_ {
#define MAX_STREAMS 256
extern void NapatechStartStats(void);
uint16_t NapatechGetNumaNode(uint16_t stream_id);
NapatechCurrentStats NapatechGetCurrentStats(uint16_t id);
uint16_t NapatechGetStreamConfig(NapatechStreamConfig stream_config[]);
#define NAPATECH_ERROR(err_type, status) { \
char errorBuffer[1024]; \
NT_ExplainError((status), errorBuffer, sizeof (errorBuffer) - 1); \
SCLogError((err_type), "Napatech Error: %s", errorBuffer); \
}
#define NAPATECH_NTPL_ERROR(ntpl_cmd, ntpl_info, status) { \
char errorBuffer[1024]; \
NT_ExplainError(status, errorBuffer, sizeof (errorBuffer) - 1); \
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, \
" NTPL failed: %s", errorBuffer); \
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, \
" cmd: %s", ntpl_cmd); \
if (strncmp(ntpl_info.u.errorData.errBuffer[0], "", 256) != 0) \
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, \
" %s", ntpl_info.u.errorData.errBuffer[0]); \
if (strncmp(ntpl_info.u.errorData.errBuffer[1], "", 256) != 0) \
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, \
" %s", ntpl_info.u.errorData.errBuffer[1]); \
if (strncmp(ntpl_info.u.errorData.errBuffer[2], "", 256) != 0) \
SCLogError(SC_ERR_NAPATECH_STREAMS_REGISTER_FAILED, \
" %s", ntpl_info.u.errorData.errBuffer[2]); \
}
NapatechCurrentStats NapatechGetCurrentStats(uint16_t id);
int NapatechGetStreamConfig(NapatechStreamConfig stream_config[]);
bool NapatechSetupNuma(uint32_t stream, uint32_t numa);
uint32_t NapatechSetupTraffic(uint32_t first_stream, uint32_t last_stream, uint32_t *filter_id, uint32_t *hash_id);
bool NapatechDeleteFilter(uint32_t filter_id);
#endif //HAVE_NAPATECH
#endif /* __UTIL_NAPATECH_H__ */

@ -1795,19 +1795,63 @@ napatech:
# (-1 = OFF, 1 - 100 = percentage of the host buffer that can be held back)
# This may be enabled when sharing streams with another application.
# Otherwise, it should be turned off.
hba: -1
# use_all_streams set to "yes" will query the Napatech service for all configured
# streams and listen on all of them. When set to "no" the streams config array
# will be used.
use-all-streams: yes
# The streams to listen on. This can be either:
# a list of individual streams (e.g. streams: [0,1,2,3])
#hba: -1
# When use_all_streams is set to "yes" the initialization code will query
# the Napatech service for all configured streams and listen on all of them.
# When set to "no" the streams config array will be used.
#
# This option necessitates running the appropriate NTPL commands to create
# the desired streams prior to running suricata.
#use-all-streams: no
# The streams to listen on when auto-config is disabled or when and threading
# cpu-affinity is disabled. This can be either:
# an individual stream (e.g. streams: [0])
# or
# a range of streams (e.g. streams: ["0-3"])
#
streams: ["0-3"]
# When auto-config is enabled the streams will be created and assigned
# automatically to the NUMA node where the thread resides. If cpu-affinity
# is enabled in the threading section. Then the streams will be created
# according to the number of worker threads specified in the worker cpu set.
# Otherwise, the streams array is used to define the streams.
#
# This option cannot be used simultaneous with "use-all-streams".
#
auto-config: yes
# Ports indicates which napatech ports are to be used in auto-config mode.
# these are the port ID's of the ports that will be merged prior to the
# traffic being distributed to the streams.
#
# This can be specified in any of the following ways:
#
# a list of individual ports (e.g. ports: [0,1,2,3])
#
# a range of ports (e.g. ports: [0-3])
#
# "all" to indicate that all ports are to be merged together
# (e.g. ports: [all])
#
# This has no effect if auto-config is disabled.
#
ports: [all]
# When auto-config is enabled the hashmode specifies the algorithm for
# determining to which stream a given packet is to be delivered.
# This can be any valid Napatech NTPL hashmode command.
#
# The most common hashmode commands are: hash2tuple, hash2tuplesorted,
# hash5tuple, hash5tuplesorted and roundrobin.
#
# See Napatech NTPL documentation other hashmodes and details on their use.
#
# This has no effect if auto-config is disabled.
#
hashmode: hash5tuplesorted
##
## Configure Suricata to load Suricata-Update managed rules.
##

Loading…
Cancel
Save