af-xdp: Add AF_XDP socket support

AF_XDP support is a recent technology introduced that aims at improving
capture performance. With this update, Suricata now provides a new
capture source 'af-xdp' that attaches an eBPF program to the network
interface card. Packets received in the NIC queue are forwarded to
a RX ring in user-space, bypassing the Linux network stack.

Note, there is a configuration option (force-xdp-mode) that forces the
packet through the normal Linux network stack.

libxdp and libbpf is required for this feature and is compile time
configured.

This capture source operates on single and multi-queue NIC's via
suricata.yaml. Here, various features can be enabled, disabled
or edited as required by the use case.

This feature currently only supports receiving packets via AF_XDP,
no TX support has been developed.

Ticket: https://redmine.openinfosecfoundation.org/issues/3306

Additional reading:
https://www.kernel.org/doc/html/latest/networking/af_xdp.html
pull/8237/head
Richard McConnell 3 years ago committed by Victor Julien
parent 7d1a8cc335
commit 6e128f48a2

@ -1346,6 +1346,31 @@
[[#include <linux/net_tstamp.h>]])
])
# AF_XDP support
AC_ARG_ENABLE(af-xdp,
AS_HELP_STRING([--disable-af-xdp], [Disable AF_XDP support [default=enabled]]),
[enable_af_xdp=$enableval],[enable_af_xdp=yes])
AS_IF([test "x$enable_af_xdp" = "xyes"], [
# Check for the availability of elf
AC_CHECK_LIB(elf,elf_begin,,[enable_af_xdp=no])
# Conditionally check headers, only when found will it 'continue'
AS_IF([test "x$enable_af_xdp" = "xyes"],
# Check for the availability of libxdp
AC_CHECK_HEADERS([xdp/xsk.h],,[enable_af_xdp=no])
AC_CHECK_LIB([xdp],[xsk_umem__create],,[enable_af_xdp=no]))
AS_IF([test "x$enable_af_xdp" = "xyes"],
# Check for the availability of libbpf
AC_CHECK_HEADERS([bpf/libbpf.h],,[enable_af_xdp=no])
AC_CHECK_LIB([bpf],[bpf_object__open],,[enable_af_xdp=no]))
# Are all required libs installed, yes=HAVE_AF_XDP
AS_IF([test "x$enable_af_xdp" = "xyes"],
AC_DEFINE([HAVE_AF_XDP],[1],[AF_XDP support is available]))
])
# DPDK support
AC_ARG_ENABLE(dpdk,
AS_HELP_STRING([--enable-dpdk], [Enable DPDK support [default=no]]),
@ -2537,6 +2562,7 @@ AC_OUTPUT
SURICATA_BUILD_CONF="Suricata Configuration:
AF_PACKET support: ${enable_af_packet}
AF_XDP support: ${enable_af_xdp}
DPDK support: ${enable_dpdk}
eBPF support: ${enable_ebpf}
XDP support: ${have_xdp}

@ -445,6 +445,7 @@ noinst_HEADERS = \
respond-reject.h \
respond-reject-libnet11.h \
runmode-af-packet.h \
runmode-af-xdp.h \
runmode-dpdk.h \
runmode-erf-dag.h \
runmode-erf-file.h \
@ -463,6 +464,7 @@ noinst_HEADERS = \
rust-context.h \
rust.h \
source-af-packet.h \
source-af-xdp.h \
source-dpdk.h \
source-erf-dag.h \
source-erf-file.h \
@ -609,6 +611,7 @@ noinst_HEADERS = \
util-storage.h \
util-streaming-buffer.h \
util-syslog.h \
util-sysfs.h \
util-thash.h \
util-threshold-config.h \
util-time.h \
@ -1053,6 +1056,7 @@ libsuricata_c_a_SOURCES = \
respond-reject.c \
respond-reject-libnet11.c \
runmode-af-packet.c \
runmode-af-xdp.c \
runmode-dpdk.c \
runmode-erf-dag.c \
runmode-erf-file.c \
@ -1070,6 +1074,7 @@ libsuricata_c_a_SOURCES = \
runmode-windivert.c \
rust-context.c \
source-af-packet.c \
source-af-xdp.c \
source-dpdk.c \
source-erf-dag.c \
source-erf-file.c \
@ -1210,6 +1215,7 @@ libsuricata_c_a_SOURCES = \
util-strlcpyu.c \
util-strptime.c \
util-syslog.c \
util-sysfs.c \
util-thash.c \
util-threshold-config.c \
util-time.c \

@ -78,6 +78,9 @@ enum PktSrcEnum {
#ifdef HAVE_PF_RING_FLOW_OFFLOAD
#include "source-pfring.h"
#endif
#ifdef HAVE_AF_XDP
#include "source-af-xdp.h"
#endif
#include "decode-ethernet.h"
#include "decode-gre.h"
@ -497,6 +500,9 @@ typedef struct Packet_
#endif
#ifdef HAVE_NAPATECH
NapatechPacketVars ntpv;
#endif
#ifdef HAVE_AF_XDP
AFXDPPacketVars afxdp_v;
#endif
/* A chunk of memory that a plugin can use for its packet vars. */
uint8_t plugin_v[PLUGIN_VAR_SIZE];

@ -0,0 +1,396 @@
/* Copyright (C) 2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \ingroup afxdppacket
*
* @{
*/
/**
* \file
*
* \author Richard McConnell <richard_mcconnell@rapid7.com>
*
* AF_XDP socket runmode
*
*/
#define PCAP_DONT_INCLUDE_PCAP_BPF_H 1
#define SC_PCAP_DONT_INCLUDE_PCAP_H 1
#include "suricata-common.h"
#include "tm-threads.h"
#include "conf.h"
#include "runmodes.h"
#include "runmode-af-xdp.h"
#include "output.h"
#include "log-httplog.h"
#include "detect-engine-mpm.h"
#include "alert-fastlog.h"
#include "alert-debuglog.h"
#include "flow-bypass.h"
#include "util-conf.h"
#include "util-debug.h"
#include "util-time.h"
#include "util-cpu.h"
#include "util-affinity.h"
#include "util-device.h"
#include "util-runmodes.h"
#include "util-ioctl.h"
#include "util-ebpf.h"
#include "util-byte.h"
#include "source-af-xdp.h"
#ifdef HAVE_AF_XDP
#include <linux/if_xdp.h>
#include <linux/if_link.h>
#include <xdp/xsk.h>
#endif
const char *RunModeAFXDPGetDefaultMode(void)
{
return "workers";
}
void RunModeIdsAFXDPRegister(void)
{
RunModeRegisterNewRunMode(
RUNMODE_AFXDP_DEV, "single", "Single threaded af-xdp mode", RunModeIdsAFXDPSingle);
RunModeRegisterNewRunMode(RUNMODE_AFXDP_DEV, "workers",
"Workers af-xdp mode, each thread does all"
" tasks from acquisition to logging",
RunModeIdsAFXDPWorkers);
return;
}
#ifdef HAVE_AF_XDP
#define DEFAULT_BUSY_POLL_TIME 20
#define DEFAULT_BUSY_POLL_BUDGET 64
#define DEFAULT_GRO_FLUSH_TIMEOUT 2000000
#define DEFAULT_NAPI_HARD_IRQS 2
static void AFXDPDerefConfig(void *conf)
{
AFXDPIfaceConfig *pfp = (AFXDPIfaceConfig *)conf;
/* Pcap config is used only once but cost of this low. */
if (SC_ATOMIC_SUB(pfp->ref, 1) <= 1) {
SCFree(pfp);
}
}
static TmEcode ConfigSetThreads(AFXDPIfaceConfig *aconf, const char *entry_str)
{
SCEnter();
const char *active_runmode = RunmodeGetActive();
if (active_runmode && !strcmp("single", active_runmode)) {
aconf->threads = 1;
SCReturnInt(0);
}
if (entry_str == NULL) {
SCLogError(SC_ERR_INVALID_VALUE, "Number of threads for interface \"%s\" not specified",
aconf->iface);
SCReturnInt(TM_ECODE_FAILED);
}
const int nr_queues = GetIfaceRSSQueuesNum(aconf->iface);
if (strcmp(entry_str, "auto") == 0) {
const int nr_cores = (int)UtilCpuGetNumProcessorsOnline();
/* Threads limited to MIN(cores vs queues) */
aconf->threads = (nr_cores <= nr_queues) ? nr_cores : nr_queues;
const char *sys_type = nr_cores <= nr_queues ? "cores" : "queues";
SCLogPerf("%u %s, so using %u threads", aconf->threads, sys_type, aconf->threads);
SCReturnInt(TM_ECODE_OK);
}
if (StringParseInt32(&aconf->threads, 10, 0, entry_str) < 0) {
SCLogError(SC_ERR_INVALID_VALUE,
"Threads entry for interface %s contain non-numerical characters - \"%s\"",
aconf->iface, entry_str);
SCReturnInt(TM_ECODE_FAILED);
}
if (aconf->threads < 0) {
SCLogError(SC_ERR_INVALID_VALUE, "Interface %s has a negative number of threads",
aconf->iface);
SCReturnInt(TM_ECODE_FAILED);
}
if (aconf->threads > nr_queues) {
SCLogWarning(SC_WARN_AFXDP_CONF,
"Selected threads greater than configured queues, using: %d thread(s)", nr_queues);
aconf->threads = nr_queues;
}
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief extract information from config file
*
* The returned structure will be freed by the thread init function.
* This is thus necessary to copy the structure before giving it
* to thread or to reparse the file for each thread (and thus have
* new structure.
*
* \return a AFXDPIfaceConfig corresponding to the interface name
*/
static void *ParseAFXDPConfig(const char *iface)
{
const char *confstr = NULL;
ConfNode *if_root;
ConfNode *if_default = NULL;
ConfNode *af_xdp_node = NULL;
int conf_val = 0;
intmax_t conf_val_int = 0;
bool boolval = false;
if (iface == NULL) {
return NULL;
}
AFXDPIfaceConfig *aconf = SCCalloc(1, sizeof(*aconf));
if (unlikely(aconf == NULL)) {
return NULL;
}
/* default/basic config setup */
strlcpy(aconf->iface, iface, sizeof(aconf->iface));
aconf->DerefFunc = AFXDPDerefConfig;
aconf->threads = 1;
aconf->promisc = 1;
aconf->enable_busy_poll = true;
aconf->busy_poll_time = DEFAULT_BUSY_POLL_TIME;
aconf->busy_poll_budget = DEFAULT_BUSY_POLL_BUDGET;
aconf->mode = XDP_FLAGS_UPDATE_IF_NOEXIST;
aconf->gro_flush_timeout = DEFAULT_GRO_FLUSH_TIMEOUT;
aconf->napi_defer_hard_irqs = DEFAULT_NAPI_HARD_IRQS;
aconf->mem_alignment = XSK_UMEM__DEFAULT_FLAGS;
/* Find initial node */
af_xdp_node = ConfGetNode("af-xdp");
if (af_xdp_node == NULL) {
SCLogInfo("unable to find af-xdp config using default values");
goto finalize;
}
if_root = ConfFindDeviceConfig(af_xdp_node, iface);
if_default = ConfFindDeviceConfig(af_xdp_node, "default");
if (if_root == NULL && if_default == NULL) {
SCLogInfo("unable to find af-xdp config for "
"interface \"%s\" or \"default\", using default values",
iface);
goto finalize;
}
/* If there is no setting for current interface use default one as main iface */
if (if_root == NULL) {
if_root = if_default;
if_default = NULL;
}
/* Threading */
confstr = "auto";
(void)ConfGetChildValueWithDefault(if_root, if_default, "threads", &confstr);
if (ConfigSetThreads(aconf, confstr) != TM_ECODE_OK) {
aconf->DerefFunc(aconf);
return NULL;
}
SC_ATOMIC_RESET(aconf->ref);
(void)SC_ATOMIC_ADD(aconf->ref, aconf->threads);
/* Promisc Mode */
(void)ConfGetChildValueBoolWithDefault(if_root, if_default, "disable-promisc", (int *)&boolval);
if (boolval) {
SCLogConfig("Disabling promiscuous mode on iface %s", aconf->iface);
aconf->promisc = 0;
}
#ifdef HAVE_AF_XDP
/* AF_XDP socket mode options */
if (ConfGetChildValueWithDefault(if_root, if_default, "force-xdp-mode", &confstr) == 1) {
if (strncasecmp(confstr, "drv", 3) == 0) {
aconf->mode |= XDP_FLAGS_DRV_MODE;
} else if (strncasecmp(confstr, "skb", 3) == 0) {
aconf->mode |= XDP_FLAGS_SKB_MODE;
} else if (strncasecmp(confstr, "none", 4) == 0) {
} else {
SCLogWarning(SC_WARN_AFXDP_CONF,
"Incorrect af-xdp xdp-mode setting, default (none) shall be applied");
}
}
/* copy and zerocopy binding options */
if (ConfGetChildValueWithDefault(if_root, if_default, "force-bind-mode", &confstr) == 1) {
if (strncasecmp(confstr, "zero", 4) == 0) {
aconf->bind_flags |= XDP_ZEROCOPY;
} else if (strncasecmp(confstr, "copy", 4) == 0) {
aconf->bind_flags |= XDP_COPY;
} else if (strncasecmp(confstr, "none", 4) == 0) {
} else {
SCLogWarning(SC_WARN_AFXDP_CONF,
"Incorrect af-xdp copy-mode setting, default (none) shall be applied");
}
}
/* memory alignment mode selection */
if (ConfGetChildValueWithDefault(if_root, if_default, "mem-unaligned", &confstr) == 1) {
if (strncasecmp(confstr, "yes", 3) == 0) {
aconf->mem_alignment = XDP_UMEM_UNALIGNED_CHUNK_FLAG;
}
}
/* Busy polling options */
if (ConfGetChildValueBoolWithDefault(if_root, if_default, "enable-busy-poll", &conf_val) == 1) {
if (conf_val == 0) {
aconf->enable_busy_poll = false;
}
}
if (aconf->enable_busy_poll) {
if (ConfGetChildValueIntWithDefault(if_root, if_default, "busy-poll-time", &conf_val_int) ==
1) {
if (conf_val_int) {
aconf->busy_poll_time = conf_val_int;
}
}
if (ConfGetChildValueIntWithDefault(
if_root, if_default, "busy-poll-budget", &conf_val_int) == 1) {
if (conf_val_int) {
aconf->busy_poll_budget = conf_val_int;
}
}
/* 0 value is valid for these Linux tunable's */
if (ConfGetChildValueIntWithDefault(
if_root, if_default, "gro-flush-timeout", &conf_val_int) == 1) {
aconf->gro_flush_timeout = conf_val_int;
}
if (ConfGetChildValueIntWithDefault(
if_root, if_default, "napi-defer-hard-irq", &conf_val_int) == 1) {
aconf->napi_defer_hard_irqs = conf_val_int;
}
}
#endif
finalize:
if (LiveGetOffload() == 0) {
if (GetIfaceOffloading(iface, 0, 1) == 1) {
SCLogWarning(SC_ERR_NIC_OFFLOADING,
"Using AF_XDP with offloading activated leads to capture problems");
}
} else {
DisableIfaceOffloading(LiveGetDevice(iface), 0, 1);
}
return aconf;
}
static int AFXDPConfigGetThreadsCount(void *conf)
{
if (conf == NULL)
FatalError(SC_ERR_AFXDP_CONF, "Configuration file is NULL");
AFXDPIfaceConfig *afxdp_conf = (AFXDPIfaceConfig *)conf;
return afxdp_conf->threads;
}
#endif /* HAVE_AF_XDP */
/**
* \brief Single thread version of the AF_XDP processing.
*/
int RunModeIdsAFXDPSingle(void)
{
SCEnter();
#ifdef HAVE_AF_XDP
int ret;
const char *live_dev = NULL;
RunModeInitialize();
TimeModeSetLive();
(void)ConfGet("af-xdp.live-interface", &live_dev);
if (AFXDPQueueProtectionInit() != TM_ECODE_OK) {
FatalError(SC_ERR_FATAL, "Unable to init AF_XDP queue protection.");
}
ret = RunModeSetLiveCaptureSingle(ParseAFXDPConfig, AFXDPConfigGetThreadsCount, "ReceiveAFXDP",
"DecodeAFXDP", thread_name_single, live_dev);
if (ret != 0) {
FatalError(SC_ERR_FATAL, "Unable to start runmode");
}
SCLogDebug("RunModeIdsAFXDPSingle initialised");
#endif /* HAVE_AF_XDP */
SCReturnInt(0);
}
/**
* \brief Workers version of the AF_XDP processing.
*
* Start N threads with each thread doing all the work.
*
*/
int RunModeIdsAFXDPWorkers(void)
{
SCEnter();
#ifdef HAVE_AF_XDP
int ret;
const char *live_dev = NULL;
RunModeInitialize();
TimeModeSetLive();
(void)ConfGet("af-xdp.live-interface", &live_dev);
if (AFXDPQueueProtectionInit() != TM_ECODE_OK) {
FatalError(SC_ERR_FATAL, "Unable to init AF_XDP queue protection.");
}
ret = RunModeSetLiveCaptureWorkers(ParseAFXDPConfig, AFXDPConfigGetThreadsCount, "ReceiveAFXDP",
"DecodeAFXDP", thread_name_workers, live_dev);
if (ret != 0) {
FatalError(SC_ERR_FATAL, "Unable to start runmode");
}
SCLogDebug("RunModeIdsAFXDPWorkers initialised");
#endif /* HAVE_AF_XDP */
SCReturnInt(0);
}
/**
* @}
*/

@ -0,0 +1,32 @@
/* Copyright (C) 2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Richard McConnell <richard_mcconnell@rapid7.com>
*/
#ifndef __RUNMODE_AFXDP_H__
#define __RUNMODE_AFXDP_H__
int RunModeIdsAFXDPSingle(void);
int RunModeIdsAFXDPWorkers(void);
void RunModeIdsAFXDPRegister(void);
const char *RunModeAFXDPGetDefaultMode(void);
#endif /* __RUNMODE_AFXDP_H__ */

@ -37,6 +37,7 @@
#include "queue.h"
#include "runmodes.h"
#include "runmode-af-packet.h"
#include "runmode-af-xdp.h"
#include "runmode-dpdk.h"
#include "runmode-erf-dag.h"
#include "runmode-erf-file.h"
@ -157,6 +158,8 @@ static const char *RunModeTranslateModeToName(int runmode)
return "UNITTEST";
case RUNMODE_AFP_DEV:
return "AF_PACKET_DEV";
case RUNMODE_AFXDP_DEV:
return "AF_XDP_DEV";
case RUNMODE_NETMAP:
#ifdef HAVE_NETMAP
return "NETMAP";
@ -245,6 +248,7 @@ void RunModeRegisterRunModes(void)
RunModeErfDagRegister();
RunModeNapatechRegister();
RunModeIdsAFPRegister();
RunModeIdsAFXDPRegister();
RunModeIdsNetmapRegister();
RunModeIdsNflogRegister();
RunModeUnixSocketRegister();
@ -358,6 +362,9 @@ void RunModeDispatch(int runmode, const char *custom_mode,
case RUNMODE_AFP_DEV:
custom_mode = RunModeAFPGetDefaultMode();
break;
case RUNMODE_AFXDP_DEV:
custom_mode = RunModeAFXDPGetDefaultMode();
break;
case RUNMODE_NETMAP:
custom_mode = RunModeNetmapGetDefaultMode();
break;

@ -35,6 +35,7 @@ enum RunModes {
RUNMODE_ERF_FILE,
RUNMODE_DAG,
RUNMODE_AFP_DEV,
RUNMODE_AFXDP_DEV,
RUNMODE_NETMAP,
RUNMODE_DPDK,
RUNMODE_UNITTEST,
@ -98,7 +99,6 @@ bool IsRunModeSystem(enum RunModes run_mode_to_check);
void RunModeEnablesBypassManager(void);
int RunModeNeedsBypassManager(void);
extern int threading_set_cpu_affinity;
extern float threading_detect_ratio;
extern uint64_t threading_set_stack_size;

@ -0,0 +1,914 @@
/* Copyright (C) 2011-2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \defgroup afxdppacket AF_XDP running mode
*
* @{
*/
/**
* \file
*
* \author Richard McConnell <richard_mcconnell@rapid7.com>
*
* AF_XDP socket acquisition support
*
*/
#define PCAP_DONT_INCLUDE_PCAP_BPF_H 1
#define SC_PCAP_DONT_INCLUDE_PCAP_H 1
#include "suricata-common.h"
#include "suricata.h"
#include "decode.h"
#include "packet-queue.h"
#include "threads.h"
#include "threadvars.h"
#include "tm-queuehandlers.h"
#include "tm-modules.h"
#include "tm-threads.h"
#include "tm-threads-common.h"
#include "conf.h"
#include "util-cpu.h"
#include "util-datalink.h"
#include "util-debug.h"
#include "util-device.h"
#include "util-ebpf.h"
#include "util-error.h"
#include "util-privs.h"
#include "util-optimize.h"
#include "util-checksum.h"
#include "util-ioctl.h"
#include "util-host-info.h"
#include "util-sysfs.h"
#include "tmqh-packetpool.h"
#include "source-af-xdp.h"
#include "runmodes.h"
#include "flow-storage.h"
#include "util-validate.h"
#ifdef HAVE_AF_XDP
#include <xdp/xsk.h>
#include <net/if.h>
#endif
#if HAVE_LINUX_IF_ETHER_H
#include <linux/if_ether.h>
#endif
#ifndef HAVE_AF_XDP
TmEcode NoAFXDPSupportExit(ThreadVars *, const void *, void **);
void TmModuleReceiveAFXDPRegister(void)
{
tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = NoAFXDPSupportExit;
tmm_modules[TMM_RECEIVEAFXDP].Func = NULL;
tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = NULL;
tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = NULL;
tmm_modules[TMM_RECEIVEAFXDP].cap_flags = 0;
tmm_modules[TMM_RECEIVEAFXDP].flags = TM_FLAG_RECEIVE_TM;
}
/**
* \brief Registration Function for DecodeAFXDP.
*/
void TmModuleDecodeAFXDPRegister(void)
{
tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
tmm_modules[TMM_DECODEAFXDP].ThreadInit = NoAFXDPSupportExit;
tmm_modules[TMM_DECODEAFXDP].Func = NULL;
tmm_modules[TMM_DECODEAFXDP].ThreadExitPrintStats = NULL;
tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = NULL;
tmm_modules[TMM_DECODEAFXDP].cap_flags = 0;
tmm_modules[TMM_DECODEAFXDP].flags = TM_FLAG_DECODE_TM;
}
/**
* \brief this function prints an error message and exits.
*/
TmEcode NoAFXDPSupportExit(ThreadVars *tv, const void *initdata, void **data)
{
SCLogError(SC_ERR_NO_AF_XDP,
"Error creating thread %s: you do not have "
"support for AF_XDP enabled, on Linux host please recompile "
"with --enable-af-xdp",
tv->name);
exit(EXIT_FAILURE);
}
#else /* We have AF_XDP support */
#define POLL_TIMEOUT 100
#define NUM_FRAMES XSK_RING_PROD__DEFAULT_NUM_DESCS
#define FRAME_SIZE XSK_UMEM__DEFAULT_FRAME_SIZE
#define MEM_BYTES (NUM_FRAMES * FRAME_SIZE * 2)
#define RECONNECT_TIMEOUT 500000
/* Interface state */
enum state { AFXDP_STATE_DOWN, AFXDP_STATE_UP };
struct XskInitProtect {
SCMutex queue_protect;
SC_ATOMIC_DECLARE(uint8_t, queue_num);
} xsk_protect;
struct UmemInfo {
void *buf;
struct xsk_umem *umem;
struct xsk_ring_prod fq;
struct xsk_ring_cons cq;
struct xsk_umem_config cfg;
int mmap_alignment_flag;
};
struct QueueAssignment {
uint32_t queue_num;
bool assigned;
};
struct XskSockInfo {
struct xsk_ring_cons rx;
struct xsk_ring_prod tx;
struct xsk_socket *xsk;
/* Queue assignment structure */
struct QueueAssignment queue;
/* Configuration items */
struct xsk_socket_config cfg;
bool enable_busy_poll;
uint32_t busy_poll_time;
uint32_t busy_poll_budget;
struct pollfd fd;
};
/**
* \brief Structure to hold thread specific variables.
*/
typedef struct AFXDPThreadVars_ {
ThreadVars *tv;
TmSlot *slot;
LiveDevice *livedev;
/* thread specific socket */
int promisc;
int threads;
char iface[AFXDP_IFACE_NAME_LENGTH];
uint32_t ifindex;
/* AF_XDP stucture */
struct UmemInfo umem;
struct XskSockInfo xsk;
uint32_t gro_flush_timeout;
uint32_t napi_defer_hard_irqs;
uint32_t prog_id;
/* Handle state */
uint8_t afxdp_state;
/* Stats parameters */
uint64_t pkts;
uint64_t bytes;
uint16_t capture_afxdp_packets;
uint16_t capture_kernel_drops;
uint16_t capture_afxdp_poll;
uint16_t capture_afxdp_poll_timeout;
uint16_t capture_afxdp_poll_failed;
uint16_t capture_afxdp_empty_reads;
uint16_t capture_afxdp_failed_reads;
uint16_t capture_afxdp_acquire_pkt_failed;
} AFXDPThreadVars;
static TmEcode ReceiveAFXDPThreadInit(ThreadVars *, const void *, void **);
static void ReceiveAFXDPThreadExitStats(ThreadVars *, void *);
static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *, void *);
static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot);
static TmEcode DecodeAFXDPThreadInit(ThreadVars *, const void *, void **);
static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data);
static TmEcode DecodeAFXDP(ThreadVars *, Packet *, void *);
/**
* \brief Registration Function for RecieveAFXDP.
* \todo Unit tests are needed for this module.
*/
void TmModuleReceiveAFXDPRegister(void)
{
tmm_modules[TMM_RECEIVEAFXDP].name = "ReceiveAFXDP";
tmm_modules[TMM_RECEIVEAFXDP].ThreadInit = ReceiveAFXDPThreadInit;
tmm_modules[TMM_RECEIVEAFXDP].Func = NULL;
tmm_modules[TMM_RECEIVEAFXDP].PktAcqLoop = ReceiveAFXDPLoop;
tmm_modules[TMM_RECEIVEAFXDP].PktAcqBreakLoop = NULL;
tmm_modules[TMM_RECEIVEAFXDP].ThreadExitPrintStats = ReceiveAFXDPThreadExitStats;
tmm_modules[TMM_RECEIVEAFXDP].ThreadDeinit = ReceiveAFXDPThreadDeinit;
tmm_modules[TMM_RECEIVEAFXDP].cap_flags = SC_CAP_NET_RAW;
tmm_modules[TMM_RECEIVEAFXDP].flags = TM_FLAG_RECEIVE_TM;
}
/**
* \brief Registration Function for DecodeAFXDP.
* \todo Unit tests are needed for this module.
*/
void TmModuleDecodeAFXDPRegister(void)
{
tmm_modules[TMM_DECODEAFXDP].name = "DecodeAFXDP";
tmm_modules[TMM_DECODEAFXDP].ThreadInit = DecodeAFXDPThreadInit;
tmm_modules[TMM_DECODEAFXDP].Func = DecodeAFXDP;
tmm_modules[TMM_DECODEAFXDP].ThreadExitPrintStats = NULL;
tmm_modules[TMM_DECODEAFXDP].ThreadDeinit = DecodeAFXDPThreadDeinit;
tmm_modules[TMM_DECODEAFXDP].cap_flags = 0;
tmm_modules[TMM_DECODEAFXDP].flags = TM_FLAG_DECODE_TM;
}
static inline void AFXDPDumpCounters(AFXDPThreadVars *ptv)
{
struct xdp_statistics stats;
socklen_t len = sizeof(struct xdp_statistics);
int fd = xsk_socket__fd(ptv->xsk.xsk);
if (getsockopt(fd, SOL_XDP, XDP_STATISTICS, &stats, &len) >= 0) {
uint64_t rx_dropped = stats.rx_dropped + stats.rx_invalid_descs + stats.rx_ring_full;
StatsAddUI64(ptv->tv, ptv->capture_kernel_drops,
rx_dropped - StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
StatsAddUI64(ptv->tv, ptv->capture_afxdp_packets, ptv->pkts);
(void)SC_ATOMIC_SET(ptv->livedev->drop, rx_dropped);
(void)SC_ATOMIC_ADD(ptv->livedev->pkts, ptv->pkts);
SCLogDebug("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "",
ptv->tv->name, StatsGetLocalCounterValue(ptv->tv, ptv->capture_afxdp_packets),
ptv->bytes, StatsGetLocalCounterValue(ptv->tv, ptv->capture_kernel_drops));
ptv->pkts = 0;
}
}
/**
* \brief Init function for socket creation.
*
* Mutex used to synchonise initialisation - each socket opens a
* different queue. The specific order in which each queue is
* opened is not important, but it is vital the queue_num's
* are different.
*
* \param tv pointer to ThreadVars
*/
TmEcode AFXDPQueueProtectionInit(void)
{
SCEnter();
SCMutexInit(&xsk_protect.queue_protect, NULL);
SC_ATOMIC_SET(xsk_protect.queue_num, 0);
SCReturnInt(TM_ECODE_OK);
}
void AFXDPMutexClean(void)
{
SCMutexDestroy(&xsk_protect.queue_protect);
}
static TmEcode AFXDPAssignQueueID(AFXDPThreadVars *ptv)
{
if (ptv->xsk.queue.assigned == false) {
ptv->xsk.queue.queue_num = SC_ATOMIC_GET(xsk_protect.queue_num);
SC_ATOMIC_ADD(xsk_protect.queue_num, 1);
/* Queue only needs assigned once, on startup */
ptv->xsk.queue.assigned = true;
}
SCReturnInt(TM_ECODE_OK);
}
static void AFXDPAllThreadsRunning(AFXDPThreadVars *ptv)
{
SCMutexLock(&xsk_protect.queue_protect);
if ((ptv->threads - 1) == (int)ptv->xsk.queue.queue_num) {
SCLogDebug("All AF_XDP capture threads are running.");
}
SCMutexUnlock(&xsk_protect.queue_protect);
}
static TmEcode AcquireBuffer(AFXDPThreadVars *ptv)
{
int mmap_flags = MAP_PRIVATE | MAP_ANONYMOUS | ptv->umem.mmap_alignment_flag;
ptv->umem.buf = mmap(NULL, MEM_BYTES, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
if (ptv->umem.buf == MAP_FAILED) {
SCLogError(SC_ERR_MEM_ALLOC, "mmap: failed to acquire memory");
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}
static TmEcode ConfigureXSKUmem(AFXDPThreadVars *ptv)
{
if (xsk_umem__create(&ptv->umem.umem, ptv->umem.buf, MEM_BYTES, &ptv->umem.fq, &ptv->umem.cq,
&ptv->umem.cfg)) {
SCLogError(SC_ERR_AFXDP_CREATE, "failed to create umem: %s", strerror(errno));
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}
static TmEcode InitFillRing(AFXDPThreadVars *ptv, const uint32_t cnt)
{
uint32_t idx_fq = 0;
uint32_t ret = xsk_ring_prod__reserve(&ptv->umem.fq, cnt, &idx_fq);
if (ret != cnt) {
SCLogError(SC_ERR_AFXDP_INIT, "Failed to initialise the fill ring.");
SCReturnInt(TM_ECODE_FAILED);
}
for (uint32_t i = 0; i < cnt; i++) {
*xsk_ring_prod__fill_addr(&ptv->umem.fq, idx_fq++) = i * FRAME_SIZE;
}
xsk_ring_prod__submit(&ptv->umem.fq, cnt);
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Linux knobs are tuned to enable a NAPI polling context
*
* \param tv pointer to AFXDPThreadVars
*/
static TmEcode WriteLinuxTunables(AFXDPThreadVars *ptv)
{
char fname[SYSFS_MAX_FILENAME_SIZE];
if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/gro_flush_timeout", ptv->iface) <
0) {
SCReturnInt(TM_ECODE_FAILED);
}
if (SysFsWriteValue(fname, ptv->gro_flush_timeout) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
if (snprintf(fname, SYSFS_MAX_FILENAME_SIZE, "class/net/%s/napi_defer_hard_irqs", ptv->iface) <
0) {
SCReturnInt(TM_ECODE_FAILED);
}
if (SysFsWriteValue(fname, ptv->napi_defer_hard_irqs) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}
static TmEcode ConfigureBusyPolling(AFXDPThreadVars *ptv)
{
const int fd = xsk_socket__fd(ptv->xsk.xsk);
int sock_opt = 1;
if (!ptv->xsk.enable_busy_poll) {
SCReturnInt(TM_ECODE_OK);
}
/* Kernel version must be >= 5.11 to avail of SO_PREFER_BUSY_POLL
* see linux commit: 7fd3253a7de6a317a0683f83739479fb880bffc8
*/
if (!SCKernelVersionIsAtLeast(5, 11)) {
SCLogWarning(SC_WARN_AFXDP_CONF,
"Kernel version older than required: v5.11,"
" upgrade kernel version to use 'enable-busy-poll' option.");
SCReturnInt(TM_ECODE_FAILED);
}
if (WriteLinuxTunables(ptv) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
if (setsockopt(fd, SOL_SOCKET, SO_PREFER_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
SCReturnInt(TM_ECODE_FAILED);
}
sock_opt = ptv->xsk.busy_poll_time;
if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
SCReturnInt(TM_ECODE_FAILED);
}
sock_opt = ptv->xsk.busy_poll_budget;
if (setsockopt(fd, SOL_SOCKET, SO_BUSY_POLL_BUDGET, (void *)&sock_opt, sizeof(sock_opt)) < 0) {
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}
static void AFXDPSwitchState(AFXDPThreadVars *ptv, int state)
{
ptv->afxdp_state = state;
}
static TmEcode OpenXSKSocket(AFXDPThreadVars *ptv)
{
int ret;
SCMutexLock(&xsk_protect.queue_protect);
if (AFXDPAssignQueueID(ptv) != TM_ECODE_OK) {
SCLogError(SC_ERR_SOCKET, "Failed to assign queue ID");
SCReturnInt(TM_ECODE_FAILED);
}
if ((ret = xsk_socket__create(&ptv->xsk.xsk, ptv->livedev->dev, ptv->xsk.queue.queue_num,
ptv->umem.umem, &ptv->xsk.rx, &ptv->xsk.tx, &ptv->xsk.cfg))) {
SCLogError(SC_ERR_SOCKET, "Failed to create socket: %s", strerror(-ret));
SCReturnInt(TM_ECODE_FAILED);
}
SCLogDebug("bind to %s on queue %u", ptv->iface, ptv->xsk.queue.queue_num);
/* For polling and socket options */
ptv->xsk.fd.fd = xsk_socket__fd(ptv->xsk.xsk);
ptv->xsk.fd.events = POLLIN;
/* Set state */
AFXDPSwitchState(ptv, AFXDP_STATE_UP);
SCMutexUnlock(&xsk_protect.queue_protect);
SCReturnInt(TM_ECODE_OK);
}
static void AFXDPCloseSocket(AFXDPThreadVars *ptv)
{
if (ptv->xsk.xsk) {
xsk_socket__delete(ptv->xsk.xsk);
ptv->xsk.xsk = NULL;
}
if (ptv->umem.umem) {
xsk_umem__delete(ptv->umem.umem);
ptv->umem.umem = NULL;
}
memset(&ptv->umem.fq, 0, sizeof(struct xsk_ring_prod));
memset(&ptv->umem.cq, 0, sizeof(struct xsk_ring_cons));
}
static TmEcode AFXDPSocketCreation(AFXDPThreadVars *ptv)
{
if (ConfigureXSKUmem(ptv) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
if (InitFillRing(ptv, NUM_FRAMES * 2) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
/* Open AF_XDP socket */
if (OpenXSKSocket(ptv) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
if (ConfigureBusyPolling(ptv) != TM_ECODE_OK) {
SCLogWarning(SC_WARN_AFXDP_CONF, "Failed to configure busy polling"
" performance may be reduced.");
}
/* Has the eBPF program successfully bound? */
if (bpf_get_link_xdp_id(ptv->ifindex, &ptv->prog_id, ptv->xsk.cfg.xdp_flags)) {
SCLogError(SC_ERR_BPF, "Failed to attach eBPF program to interface: %s", ptv->livedev->dev);
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Try to reopen AF_XDP socket
*
* \retval: TM_ECODE_OK in case of success
* TM_ECODE_FAILED if error occurs or a condition is not met.
*/
static TmEcode AFXDPTryReopen(AFXDPThreadVars *ptv)
{
AFXDPCloseSocket(ptv);
usleep(RECONNECT_TIMEOUT);
int if_flags = GetIfaceFlags(ptv->iface);
if (if_flags == -1) {
SCLogDebug("Couldn't get flags for interface '%s'", ptv->iface);
goto sock_err;
} else if ((if_flags & (IFF_UP | IFF_RUNNING)) == 0) {
SCLogDebug("Interface '%s' is down", ptv->iface);
goto sock_err;
}
if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
SCLogInfo("Interface '%s' is back", ptv->iface);
SCReturnInt(TM_ECODE_OK);
sock_err:
SCReturnInt(TM_ECODE_FAILED);
}
/**
* \brief Write packet entry to the fill ring, freeing
* this slot for re/fill with inbound packet descriptor
* \param pointer to Packet
* \retval: None
*/
static void AFXDPReleasePacket(Packet *p)
{
*xsk_ring_prod__fill_addr((struct xsk_ring_prod *)p->afxdp_v.fq, p->afxdp_v.fq_idx) =
p->afxdp_v.orig;
PacketFreeOrRelease(p);
}
static inline int DumpStatsEverySecond(AFXDPThreadVars *ptv, time_t *last_dump)
{
int stats_dumped = 0;
time_t current_time = time(NULL);
if (current_time != *last_dump) {
AFXDPDumpCounters(ptv);
*last_dump = current_time;
stats_dumped = 1;
}
StatsSyncCountersIfSignalled(ptv->tv);
return stats_dumped;
}
static inline ssize_t WakeupSocket(void *data)
{
ssize_t res = 0;
AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
/* Assuming kernel >= 5.11 in use if xdp_busy_poll is enabled */
if (ptv->xsk.enable_busy_poll || xsk_ring_prod__needs_wakeup(&ptv->umem.fq)) {
res = recvfrom(xsk_socket__fd(ptv->xsk.xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
}
return res;
}
/**
* \brief Init function for ReceiveAFXDP.
*
* \param tv pointer to ThreadVars
* \param initdata pointer to the interface passed from the user
* \param data pointer gets populated with AFPThreadVars
*
* \todo Create a general AFP setup function.
*/
static TmEcode ReceiveAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
{
SCEnter();
AFXDPIfaceConfig *afxdpconfig = (AFXDPIfaceConfig *)initdata;
if (initdata == NULL) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "initdata == NULL");
SCReturnInt(TM_ECODE_FAILED);
}
AFXDPThreadVars *ptv = SCMalloc(sizeof(AFXDPThreadVars));
if (unlikely(ptv == NULL)) {
afxdpconfig->DerefFunc(afxdpconfig);
SCReturnInt(TM_ECODE_FAILED);
}
memset(ptv, 0, sizeof(AFXDPThreadVars));
ptv->tv = tv;
strlcpy(ptv->iface, afxdpconfig->iface, AFXDP_IFACE_NAME_LENGTH);
ptv->iface[AFXDP_IFACE_NAME_LENGTH - 1] = '\0';
ptv->ifindex = if_nametoindex(ptv->iface);
ptv->livedev = LiveGetDevice(ptv->iface);
if (ptv->livedev == NULL) {
SCLogError(SC_ERR_INVALID_VALUE, "Unable to find Live device");
SCFree(ptv);
SCReturnInt(TM_ECODE_FAILED);
}
ptv->promisc = afxdpconfig->promisc;
if (ptv->promisc != 0) {
/* Force promiscuous mode */
if (SetIfaceFlags(ptv->iface, IFF_PROMISC | IFF_UP) != 0) {
SCLogError(SC_ERR_AFXDP_CREATE,
"Failed to switch interface (%s) to promiscuous, error %s", ptv->iface,
strerror(errno));
SCReturnInt(TM_ECODE_FAILED);
}
}
ptv->threads = afxdpconfig->threads;
/* Socket configuration */
ptv->xsk.cfg.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
ptv->xsk.cfg.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS;
ptv->xsk.cfg.xdp_flags = afxdpconfig->mode;
ptv->xsk.cfg.bind_flags = afxdpconfig->bind_flags;
/* UMEM configuration */
ptv->umem.cfg.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2;
ptv->umem.cfg.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS;
ptv->umem.cfg.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE;
ptv->umem.cfg.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM;
ptv->umem.cfg.flags = afxdpconfig->mem_alignment;
/* Use hugepages if unaligned chunk mode */
if (ptv->umem.cfg.flags == XDP_UMEM_UNALIGNED_CHUNK_FLAG) {
ptv->umem.mmap_alignment_flag = MAP_HUGETLB;
}
/* Busy polling configuration */
ptv->xsk.enable_busy_poll = afxdpconfig->enable_busy_poll;
ptv->xsk.busy_poll_budget = afxdpconfig->busy_poll_budget;
ptv->xsk.busy_poll_time = afxdpconfig->busy_poll_time;
ptv->gro_flush_timeout = afxdpconfig->gro_flush_timeout;
ptv->napi_defer_hard_irqs = afxdpconfig->napi_defer_hard_irqs;
/* Stats registration */
ptv->capture_afxdp_packets = StatsRegisterCounter("capture.afxdp_packets", ptv->tv);
ptv->capture_kernel_drops = StatsRegisterCounter("capture.kernel_drops", ptv->tv);
ptv->capture_afxdp_poll = StatsRegisterCounter("capture.afxdp.poll", ptv->tv);
ptv->capture_afxdp_poll_timeout = StatsRegisterCounter("capture.afxdp.poll_timeout", ptv->tv);
ptv->capture_afxdp_poll_failed = StatsRegisterCounter("capture.afxdp.poll_failed", ptv->tv);
ptv->capture_afxdp_empty_reads = StatsRegisterCounter("capture.afxdp.empty_reads", ptv->tv);
ptv->capture_afxdp_failed_reads = StatsRegisterCounter("capture.afxdp.failed_reads", ptv->tv);
ptv->capture_afxdp_acquire_pkt_failed =
StatsRegisterCounter("capture.afxdp.acquire_pkt_failed", ptv->tv);
/* Reserve memory for umem */
if (AcquireBuffer(ptv) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
if (AFXDPSocketCreation(ptv) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
*data = (void *)ptv;
afxdpconfig->DerefFunc(afxdpconfig);
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief Main AF_XDP reading Loop function
*/
static TmEcode ReceiveAFXDPLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
Packet *p;
time_t last_dump = 0;
struct timeval ts;
uint32_t idx_rx = 0, idx_fq = 0, rcvd;
int r;
AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
AFXDPAllThreadsRunning(ptv);
// Indicate that the thread is actually running its application level code (i.e., it can poll
// packets)
TmThreadsSetFlag(tv, THV_RUNNING);
PacketPoolWait();
while (1) {
/* Start by checking the state of our interface */
if (unlikely(ptv->afxdp_state == AFXDP_STATE_DOWN)) {
do {
usleep(RECONNECT_TIMEOUT);
if (unlikely(suricata_ctl_flags != 0)) {
break;
}
r = AFXDPTryReopen(ptv);
} while (r != TM_ECODE_OK);
}
if (unlikely(suricata_ctl_flags != 0)) {
SCLogDebug("Stopping Suricata!");
AFXDPDumpCounters(ptv);
break;
}
/* Busy polling is not set, using poll() to maintain (relatively) decent
* performance. xdp_busy_poll must be disabled for kernels < 5.11
*/
if (!ptv->xsk.enable_busy_poll) {
StatsIncr(ptv->tv, ptv->capture_afxdp_poll);
r = poll(&ptv->xsk.fd, 1, POLL_TIMEOUT);
/* Report poll results */
if (r <= 0) {
if (r == 0) {
StatsIncr(ptv->tv, ptv->capture_afxdp_poll_timeout);
} else if (r < 0) {
StatsIncr(ptv->tv, ptv->capture_afxdp_poll_failed);
SCLogWarning(SC_ERR_AFXDP_READ, "poll failed with retval %d", r);
AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
}
DumpStatsEverySecond(ptv, &last_dump);
continue;
}
}
rcvd = xsk_ring_cons__peek(&ptv->xsk.rx, ptv->xsk.busy_poll_budget, &idx_rx);
if (!rcvd) {
StatsIncr(ptv->tv, ptv->capture_afxdp_empty_reads);
ssize_t ret = WakeupSocket(ptv);
if (ret < 0) {
SCLogWarning(SC_ERR_AFXDP_READ, "recv failed with retval %ld", ret);
AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
}
DumpStatsEverySecond(ptv, &last_dump);
continue;
}
uint32_t res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
while (res != rcvd) {
StatsIncr(ptv->tv, ptv->capture_afxdp_failed_reads);
ssize_t ret = WakeupSocket(ptv);
if (ret < 0) {
SCLogWarning(SC_ERR_AFXDP_READ, "recv failed with retval %ld", ret);
AFXDPSwitchState(ptv, AFXDP_STATE_DOWN);
continue;
}
res = xsk_ring_prod__reserve(&ptv->umem.fq, rcvd, &idx_fq);
}
gettimeofday(&ts, NULL);
ptv->pkts += rcvd;
for (uint32_t i = 0; i < rcvd; i++) {
p = PacketGetFromQueueOrAlloc();
if (unlikely(p == NULL)) {
StatsIncr(ptv->tv, ptv->capture_afxdp_acquire_pkt_failed);
continue;
}
PKT_SET_SRC(p, PKT_SRC_WIRE);
p->datalink = LINKTYPE_ETHERNET;
p->livedev = ptv->livedev;
p->ReleasePacket = AFXDPReleasePacket;
p->flags |= PKT_IGNORE_CHECKSUM;
p->ts = ts;
uint64_t addr = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx)->addr;
uint32_t len = xsk_ring_cons__rx_desc(&ptv->xsk.rx, idx_rx++)->len;
uint64_t orig = xsk_umem__extract_addr(addr);
addr = xsk_umem__add_offset_to_addr(addr);
uint8_t *pkt_data = xsk_umem__get_data(ptv->umem.buf, addr);
ptv->bytes += len;
p->afxdp_v.fq_idx = idx_fq++;
p->afxdp_v.orig = orig;
p->afxdp_v.fq = &ptv->umem.fq;
PacketSetData(p, pkt_data, len);
if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(ptv->tv, p);
SCReturnInt(EXIT_FAILURE);
}
}
xsk_ring_prod__submit(&ptv->umem.fq, rcvd);
xsk_ring_cons__release(&ptv->xsk.rx, rcvd);
/* Trigger one dump of stats every second */
DumpStatsEverySecond(ptv, &last_dump);
}
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief DeInit function closes af-xdp socket at exit.
* \param tv pointer to ThreadVars
* \param data pointer that gets cast into AFXDPPThreadVars for ptv
*/
static TmEcode ReceiveAFXDPThreadDeinit(ThreadVars *tv, void *data)
{
AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
if (ptv->xsk.xsk) {
xsk_socket__delete(ptv->xsk.xsk);
ptv->xsk.xsk = NULL;
}
if (ptv->umem.umem) {
xsk_umem__delete(ptv->umem.umem);
ptv->umem.umem = NULL;
}
munmap(ptv->umem.buf, MEM_BYTES);
SCFree(ptv);
SCReturnInt(TM_ECODE_OK);
}
/**
* \brief This function prints stats to the screen at exit.
* \param tv pointer to ThreadVars
* \param data pointer that gets cast into AFXDPThreadVars for ptv
*/
static void ReceiveAFXDPThreadExitStats(ThreadVars *tv, void *data)
{
SCEnter();
AFXDPThreadVars *ptv = (AFXDPThreadVars *)data;
AFXDPDumpCounters(ptv);
SCLogPerf("(%s) Kernel: Packets %" PRIu64 ", bytes %" PRIu64 ", dropped %" PRIu64 "", tv->name,
StatsGetLocalCounterValue(tv, ptv->capture_afxdp_packets), ptv->bytes,
StatsGetLocalCounterValue(tv, ptv->capture_kernel_drops));
}
/**
* \brief This function passes off to link type decoders.
*
* DecodeAFXDP decodes packets from AF_XDP and passes
* them off to the proper link type decoder.
*
* \param t pointer to ThreadVars
* \param p pointer to the current packet
* \param data pointer that gets cast into AFXDPThreadVars for ptv
*/
static TmEcode DecodeAFXDP(ThreadVars *tv, Packet *p, void *data)
{
SCEnter();
DecodeThreadVars *dtv = (DecodeThreadVars *)data;
DEBUG_VALIDATE_BUG_ON(PKT_IS_PSEUDOPKT(p));
/* update counters */
DecodeUpdatePacketCounters(tv, dtv, p);
/* If suri has set vlan during reading, we increase vlan counter */
if (p->vlan_idx) {
StatsIncr(tv, dtv->counter_vlan);
}
/* call the decoder */
DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
PacketDecodeFinalize(tv, dtv, p);
SCReturnInt(TM_ECODE_OK);
}
static TmEcode DecodeAFXDPThreadInit(ThreadVars *tv, const void *initdata, void **data)
{
SCEnter();
DecodeThreadVars *dtv = DecodeThreadVarsAlloc(tv);
if (dtv == NULL)
SCReturnInt(TM_ECODE_FAILED);
DecodeRegisterPerfCounters(dtv, tv);
*data = (void *)dtv;
SCReturnInt(TM_ECODE_OK);
}
static TmEcode DecodeAFXDPThreadDeinit(ThreadVars *tv, void *data)
{
if (data != NULL)
DecodeThreadVarsFree(tv, data);
SCReturnInt(TM_ECODE_OK);
}
#endif /* HAVE_AF_XDP */
/* eof */
/**
* @}
*/

@ -0,0 +1,69 @@
/* Copyright (C) 2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Richard McConnell <richard_mcconnell@rapid7.com>
*/
#ifndef __SOURCE_AFXDP_H__
#define __SOURCE_AFXDP_H__
#define AFXDP_IFACE_NAME_LENGTH 48
typedef struct AFXDPIfaceConfig {
char iface[AFXDP_IFACE_NAME_LENGTH];
/* number of threads */
int threads;
int promisc;
/* misc use flags */
uint32_t mode;
uint32_t bind_flags;
int mem_alignment;
bool enable_busy_poll;
uint32_t busy_poll_time;
uint32_t busy_poll_budget;
uint32_t gro_flush_timeout;
uint32_t napi_defer_hard_irqs;
SC_ATOMIC_DECLARE(unsigned int, ref);
void (*DerefFunc)(void *);
} AFXDPIfaceConfig;
/**
* \brief per packet AF_XDP vars
*
* This structure is used by the release data system
*/
typedef struct AFXDPPacketVars_ {
/* Fill queue used by kernel for inbound packets */
void *fq;
/* Indexed location within queue to release */
uint32_t fq_idx;
/* Origin address of packet */
uint64_t orig;
} AFXDPPacketVars;
void TmModuleReceiveAFXDPRegister(void);
void TmModuleDecodeAFXDPRegister(void);
TmEcode AFXDPQueueProtectionInit(void);
void AFXDPMutexClean(void);
#endif /* __SOURCE_AFXDP_H__ */

@ -86,6 +86,7 @@
#include "respond-reject.h"
#include "runmode-af-packet.h"
#include "runmode-af-xdp.h"
#include "runmode-netmap.h"
#include "runmode-unittests.h"
@ -101,6 +102,7 @@
#include "source-erf-dag.h"
#include "source-napatech.h"
#include "source-af-packet.h"
#include "source-af-xdp.h"
#include "source-netmap.h"
#include "source-dpdk.h"
#include "source-windivert.h"
@ -644,6 +646,10 @@ static void PrintUsage(const char *progname)
#ifdef HAVE_AF_PACKET
printf("\t--af-packet[=<dev>] : run in af-packet mode, no value select interfaces from suricata.yaml\n");
#endif
#ifdef HAVE_AF_XDP
printf("\t--af-xdp[=<dev>] : run in af-xdp mode, no value select "
"interfaces from suricata.yaml\n");
#endif
#ifdef HAVE_NETMAP
printf("\t--netmap[=<dev>] : run in netmap mode, no value select interfaces from suricata.yaml\n");
#endif
@ -903,6 +909,9 @@ void RegisterAllModules(void)
/* af-packet */
TmModuleReceiveAFPRegister();
TmModuleDecodeAFPRegister();
/* af-xdp */
TmModuleReceiveAFXDPRegister();
TmModuleDecodeAFXDPRegister();
/* netmap */
TmModuleReceiveNetmapRegister();
TmModuleDecodeNetmapRegister();
@ -1009,6 +1018,22 @@ static TmEcode ParseInterfacesList(const int runmode, char *pcap_dev)
}
}
#endif
#ifdef HAVE_AF_XDP
} else if (runmode == RUNMODE_AFXDP_DEV) {
/* iface has been set on command line */
if (strlen(pcap_dev)) {
if (ConfSetFinal("af-xdp.live-interface", pcap_dev) != 1) {
SCLogError(SC_ERR_INITIALIZATION, "Failed to set af-xdp.live-interface");
SCReturnInt(TM_ECODE_FAILED);
}
} else {
int ret = LiveBuildDeviceList("af-xdp");
if (ret == 0) {
SCLogError(SC_ERR_INITIALIZATION, "No interface found in config for af-xdp");
SCReturnInt(TM_ECODE_FAILED);
}
}
#endif
#ifdef HAVE_NETMAP
} else if (runmode == RUNMODE_NETMAP) {
/* iface has been set on command line */
@ -1171,6 +1196,37 @@ static int ParseCommandLineAfpacket(SCInstance *suri, const char *in_arg)
#endif
}
static int ParseCommandLineAfxdp(SCInstance *suri, const char *in_arg)
{
#ifdef HAVE_AF_XDP
if (suri->run_mode == RUNMODE_UNKNOWN) {
suri->run_mode = RUNMODE_AFXDP_DEV;
if (in_arg) {
LiveRegisterDeviceName(in_arg);
memset(suri->pcap_dev, 0, sizeof(suri->pcap_dev));
strlcpy(suri->pcap_dev, in_arg, sizeof(suri->pcap_dev));
}
} else if (suri->run_mode == RUNMODE_AFXDP_DEV) {
if (in_arg) {
LiveRegisterDeviceName(in_arg);
} else {
SCLogInfo("Multiple af-xdp options without interface on each is useless");
}
} else {
SCLogError(SC_ERR_MULTIPLE_RUN_MODE, "more than one run mode "
"has been specified");
PrintUsage(suri->progname);
return TM_ECODE_FAILED;
}
return TM_ECODE_OK;
#else
SCLogError(SC_ERR_NO_AF_XDP, "AF_XDP not enabled. On Linux "
"host, make sure correct libraries are installed,"
" see documentation for information.");
return TM_ECODE_FAILED;
#endif
}
static int ParseCommandLineDpdk(SCInstance *suri, const char *in_arg)
{
#ifdef HAVE_DPDK
@ -1279,6 +1335,7 @@ static TmEcode ParseCommandLine(int argc, char** argv, SCInstance *suri)
{"dpdk", 0, 0, 0},
#endif
{"af-packet", optional_argument, 0, 0},
{"af-xdp", optional_argument, 0, 0},
{"netmap", optional_argument, 0, 0},
{"pcap", optional_argument, 0, 0},
{"pcap-file-continuous", 0, 0, 0},
@ -1407,6 +1464,10 @@ static TmEcode ParseCommandLine(int argc, char** argv, SCInstance *suri)
if (ParseCommandLineAfpacket(suri, optarg) != TM_ECODE_OK) {
return TM_ECODE_FAILED;
}
} else if (strcmp((long_opts[option_index]).name, "af-xdp") == 0) {
if (ParseCommandLineAfxdp(suri, optarg) != TM_ECODE_OK) {
return TM_ECODE_FAILED;
}
} else if (strcmp((long_opts[option_index]).name, "netmap") == 0) {
#ifdef HAVE_NETMAP
if (suri->run_mode == RUNMODE_UNKNOWN) {
@ -2375,6 +2436,7 @@ static int ConfigGetCaptureValue(SCInstance *suri)
/* fall through */
case RUNMODE_PCAP_DEV:
case RUNMODE_AFP_DEV:
case RUNMODE_AFXDP_DEV:
case RUNMODE_PFRING:
nlive = LiveGetDeviceNameCount();
for (lthread = 0; lthread < nlive; lthread++) {
@ -2779,7 +2841,8 @@ static void SuricataMainLoop(SCInstance *suri)
* This can be used by fuzz targets.
*/
int InitGlobal(void) {
int InitGlobal(void)
{
rs_init(&suricata_context);
SC_ATOMIC_INIT(engine_stage);

@ -229,8 +229,10 @@ const char * TmModuleTmmIdToString(TmmId id)
CASE_CODE (TMM_RECEIVENAPATECH);
CASE_CODE (TMM_DECODENAPATECH);
CASE_CODE (TMM_RECEIVEAFP);
CASE_CODE(TMM_RECEIVEAFXDP);
CASE_CODE (TMM_ALERTPCAPINFO);
CASE_CODE (TMM_DECODEAFP);
CASE_CODE(TMM_DECODEAFXDP);
CASE_CODE (TMM_STATSLOGGER);
CASE_CODE (TMM_FLOWMANAGER);
CASE_CODE (TMM_FLOWRECYCLER);

@ -52,7 +52,9 @@ typedef enum {
TMM_RECEIVEERFDAG,
TMM_DECODEERFDAG,
TMM_RECEIVEAFP,
TMM_RECEIVEAFXDP,
TMM_DECODEAFP,
TMM_DECODEAFXDP,
TMM_RECEIVEDPDK,
TMM_DECODEDPDK,
TMM_RECEIVENETMAP,

@ -389,6 +389,12 @@ const char * SCErrorToString(SCError err)
CASE_CODE(SC_WARN_CHOWN);
CASE_CODE(SC_ERR_HASH_ADD);
CASE_CODE(SC_WARN_CLASSIFICATION_CONFIG);
CASE_CODE(SC_ERR_NO_AF_XDP);
CASE_CODE(SC_ERR_AFXDP_CONF);
CASE_CODE(SC_WARN_AFXDP_CONF);
CASE_CODE(SC_ERR_AFXDP_CREATE);
CASE_CODE(SC_ERR_AFXDP_INIT);
CASE_CODE(SC_ERR_AFXDP_READ);
CASE_CODE (SC_ERR_MAX);
}

@ -379,6 +379,12 @@ typedef enum {
SC_WARN_CHOWN,
SC_ERR_HASH_ADD,
SC_WARN_CLASSIFICATION_CONFIG,
SC_ERR_NO_AF_XDP,
SC_ERR_AFXDP_CONF,
SC_WARN_AFXDP_CONF,
SC_ERR_AFXDP_CREATE,
SC_ERR_AFXDP_INIT,
SC_ERR_AFXDP_READ,
SC_ERR_MAX
} SCError;

@ -62,6 +62,7 @@ void SCDropMainThreadCaps(uint32_t userid, uint32_t groupid)
switch (run_mode) {
case RUNMODE_PCAP_DEV:
case RUNMODE_AFP_DEV:
case RUNMODE_AFXDP_DEV:
capng_updatev(CAPNG_ADD, CAPNG_EFFECTIVE|CAPNG_PERMITTED,
CAP_NET_RAW, /* needed for pcap live mode */
CAP_SYS_NICE,

@ -0,0 +1,63 @@
/* Copyright (C) 2011-2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Richard McConnell <richard_mcconnell@rapid7.com>
*
* Sysfs utility file
*/
#include "util-sysfs.h"
#define SYSFS_MAX_FILENAME_LEN (SYSFS_MAX_FILENAME_SIZE + 5)
TmEcode SysFsWriteValue(const char *path, int64_t value)
{
#if defined(__linux__)
char fname[SYSFS_MAX_FILENAME_LEN] = "/sys/";
char sentence[64];
if (!path || strlen(path) > SYSFS_MAX_FILENAME_SIZE) {
SCLogWarning(SC_ERR_ARG_LEN_LONG, "File path too long, max allowed: %d",
SYSFS_MAX_FILENAME_SIZE);
SCReturnInt(TM_ECODE_FAILED);
}
strlcat(fname, path, sizeof(fname));
/* File must be present and process have correct capabilities to open */
int fd = open(fname, O_WRONLY);
if (fd < 0) {
SCLogError(SC_ERR_FOPEN, "Could not open file: %s", fname);
SCReturnInt(TM_ECODE_FAILED);
}
snprintf(sentence, sizeof(sentence), "%ld", value);
ssize_t len = strlen(sentence);
if (write(fd, sentence, len) != len) {
SCLogError(SC_ERR_FWRITE, "Could not write to file: %s", fname);
close(fd);
SCReturnInt(TM_ECODE_FAILED);
}
close(fd);
#endif /* __LINUX__ */
SCReturnInt(TM_ECODE_OK);
}

@ -0,0 +1,36 @@
/* Copyright (C) 2011-2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Richard McConnell <richard_mcconnell@rapid7.com>
*
*/
#ifndef UTIL_SYSFS_H
#define UTIL_SYSFS_H
#include "util-error.h"
#include "util-debug.h"
/* /sys/ prepended as mount point 251 + 5 = 256 */
#define SYSFS_MAX_FILENAME_SIZE 251
TmEcode SysFsWriteValue(const char *path, int64_t value);
#endif /* UTIL_SYSFS_H */

@ -669,6 +669,56 @@ af-packet:
#use-mmap: no
#tpacket-v3: yes
# Linux high speed af-xdp capture support
af-xdp:
- interface: default
# Number of receive threads. "auto" uses least between the number
# of cores and RX queues
#threads: auto
#disable-promisc: false
# XDP_DRV mode can be chosen when the driver supports XDP
# XDP_SKB mode can be chosen when the driver does not support XDP
# Possible values are:
# - drv: enable XDP_DRV mode
# - skb: enable XDP_SKB mode
# - none: disable (kernel in charge of applying mode)
#force-xdp-mode: none
# During socket binding the kernel will attempt zero-copy, if this
# fails it will fallback to copy. If this fails, the bind fails.
# The bind can be explicitly configured using the option below.
# If configured, the bind will fail if not successful (no fallback).
# Possible values are:
# - zero: enable zero-copy mode
# - copy: enable copy mode
# - none: disable (kernel in charge of applying mode)
#force-bind-mode: none
# Memory alignment mode can vary between two modes, aligned and
# unaligned chunk modes. By default, aligned chunk mode is selected.
# select 'yes' to enable unaligned chunk mode.
# Note: unaligned chunk mode uses hugepages, so the required number
# of pages must be available.
#mem-unaligned: no
# The following options configure the prefer-busy-polling socket
# options. The polling time and budget can be edited here.
# Possible values are:
# - yes: enable (default)
# - no: disable
#enable-busy-poll: yes
# busy-poll-time sets the approximate time in microseconds to busy
# poll on a blocking receive when there is no data.
#busy-poll-time: 20
# busy-poll-budget is the budget allowed for packet batches
#busy-poll-budget: 64
# These two tunables are used to configure the Linux OS's NAPI
# context. Their purpose is to defer enabling of interrupts and
# instead schedule the NAPI context from a watchdog timer.
# The softirq NAPI will exit early, allowing busy polling to be
# performed. Successfully setting these tunables alongside busy-polling
# should improve performance.
# Defaults are:
#gro-flush-timeout: 2000000
#napi-defer-hard-irq: 2
dpdk:
eal-params:
proc-type: primary

Loading…
Cancel
Save