ipfw: Add support for autofp and worker runmode

This patch convert ipfw code to the PcktAcqLoop API and
rework the running mode to use the running mode wrapper
already used by NFQ.
remotes/origin/master-1.2.x
Eric Leblond 14 years ago committed by Victor Julien
parent c1ad64b333
commit 6e7a8f38bf

@ -32,6 +32,7 @@
#include "threadvars.h" #include "threadvars.h"
#include "source-nfq.h" #include "source-nfq.h"
#include "source-ipfw.h"
#include "source-pcap.h" #include "source-pcap.h"
#include "action-globals.h" #include "action-globals.h"
@ -352,6 +353,10 @@ typedef struct Packet_
#ifdef NFQ #ifdef NFQ
NFQPacketVars nfq_v; NFQPacketVars nfq_v;
#endif /* NFQ */ #endif /* NFQ */
#ifdef IPFW
IPFWPacketVars ipfw_v;
#endif /* IPFW */
/** libpcap vars: shared by Pcap Live mode and Pcap File mode */ /** libpcap vars: shared by Pcap Live mode and Pcap File mode */
PcapPacketVars pcap_v; PcapPacketVars pcap_v;

@ -45,6 +45,8 @@
#include "util-time.h" #include "util-time.h"
#include "util-cpu.h" #include "util-cpu.h"
#include "util-affinity.h" #include "util-affinity.h"
#include "util-runmodes.h"
#include "source-ipfw.h"
static const char *default_mode; static const char *default_mode;
@ -60,264 +62,71 @@ void RunModeIpsIPFWRegister(void)
"Multi threaded IPFW IPS mode", "Multi threaded IPFW IPS mode",
RunModeIpsIPFWAuto); RunModeIpsIPFWAuto);
RunModeRegisterNewRunMode(RUNMODE_IPFW, "autofp",
"Multi threaded IPFW IPS mode with respect to flow",
RunModeIpsIPFWAutoFp);
RunModeRegisterNewRunMode(RUNMODE_IPFW, "worker",
"Multi queue IPFW IPS mode with one thread per queue",
RunModeIpsIPFWWorker);
return; return;
} }
/**
* \brief RunModeIpsIPFWAuto set up the following thread packet handlers:
* - Receive thread (from IPFW)
* - Decode thread
* - Stream thread
* - Detect: If we have only 1 cpu, it will setup one Detect thread
* If we have more than one, it will setup num_cpus - 1
* starting from the second cpu available.
* - Veredict thread (IPFW)
* - Respond/Reject thread
* - Outputs thread
* By default the threads will use the first cpu available
* except the Detection threads if we have more than one cpu.
*
* \param de_ctx Pointer to the Detection Engine.
*
* \retval 0 If all goes well. (If any problem is detected the engine will
* exit()).
*/
int RunModeIpsIPFWAuto(DetectEngineCtx *de_ctx) int RunModeIpsIPFWAuto(DetectEngineCtx *de_ctx)
{ {
SCEnter(); SCEnter();
char tname[12]; int ret = 0;
uint16_t cpu = 0; #ifdef IPFW
/* Available cpus */
uint16_t ncpus = UtilCpuGetNumProcessorsOnline();
RunModeInitialize(); RunModeInitialize();
TimeModeSetLive(); TimeModeSetLive();
/* create the threads */ ret = RunModeSetIPSAuto(de_ctx,
ThreadVars *tv_receiveipfw = IPFWGetThread,
TmThreadCreatePacketHandler("ReceiveIPFW", "ReceiveIPFW",
"packetpool", "packetpool", "VerdictIPFW",
"pickup-queue", "simple", "DecodeIPFW");
"1slot_noinout"); #endif /* IPFW */
if (tv_receiveipfw == NULL) { return ret;
printf("ERROR: TmThreadsCreate failed\n"); }
exit(EXIT_FAILURE);
}
TmModule *tm_module = TmModuleGetByName("ReceiveIPFW");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName failed for ReceiveIPFW\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_receiveipfw, tm_module, NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_receiveipfw, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_receiveipfw, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_receiveipfw) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_decode1 =
TmThreadCreatePacketHandler("Decode1",
"pickup-queue", "simple",
"decode-queue1", "simple",
"1slot");
if (tv_decode1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Decode1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("DecodeIPFW");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName DecodeIPFW failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_decode1, tm_module, NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_decode1, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_decode1, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_decode1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
ThreadVars *tv_stream1 =
TmThreadCreatePacketHandler("Stream1",
"decode-queue1", "simple",
"stream-queue1", "simple",
"1slot");
if (tv_stream1 == NULL) {
printf("ERROR: TmThreadsCreate failed for Stream1\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("StreamTcp");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName StreamTcp failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_stream1, tm_module, NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_stream1, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_stream1, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_stream1) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
/* start with cpu 1 so that if we're creating an odd number of detect
* threads we're not creating the most on CPU0. */
if (ncpus > 0)
cpu = 1;
/* always create at least one thread */
int thread_max = TmThreadGetNbThreads(DETECT_CPU_SET);
if (thread_max == 0)
thread_max = ncpus * threading_detect_ratio;
if (thread_max < 1)
thread_max = 1;
int thread;
for (thread = 0; thread < thread_max; thread++) {
snprintf(tname, sizeof(tname), "Detect%"PRIu16, thread+1);
char *thread_name = SCStrdup(tname);
SCLogDebug("Assigning %s affinity to cpu %u", thread_name, cpu);
ThreadVars *tv_detect_ncpu =
TmThreadCreatePacketHandler(thread_name,
"stream-queue1", "simple",
"verdict-queue", "simple",
"1slot");
if (tv_detect_ncpu == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("Detect");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName Detect failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, (void *)de_ctx);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_detect_ncpu, (int)cpu);
/* If we have more than one core/cpu, the first Detect thread
* (at cpu 0) will have less priority (higher 'nice' value)
* In this case we will set the thread priority to +10 (default is 0)
*/
if (cpu == 0 && ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_LOW);
} else if (ncpus > 1) {
TmThreadSetThreadPriority(tv_detect_ncpu, PRIO_MEDIUM);
}
}
char *thread_group_name = SCStrdup("Detect");
if (thread_group_name == NULL) {
printf("Error allocating memory\n");
exit(EXIT_FAILURE);
}
tv_detect_ncpu->thread_group_name = thread_group_name;
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
if ((cpu + 1) == ncpus)
cpu = 0;
else
cpu++;
}
ThreadVars *tv_verdict =
TmThreadCreatePacketHandler("Verdict",
"verdict-queue", "simple",
"respond-queue", "simple",
"1slot");
if (tv_verdict == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("VerdictIPFW");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName VerdictIPFW failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_verdict, tm_module, NULL);
if (threading_set_cpu_affinity) {
TmThreadSetCPUAffinity(tv_verdict, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_verdict, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_verdict) != TM_ECODE_OK) { int RunModeIpsIPFWAutoFp(DetectEngineCtx *de_ctx)
printf("ERROR: TmThreadSpawn failed\n"); {
exit(EXIT_FAILURE); SCEnter();
} int ret = 0;
#ifdef IPFW
ThreadVars *tv_rreject = RunModeInitialize();
TmThreadCreatePacketHandler("RespondReject",
"respond-queue", "simple",
"alert-queue1", "simple",
"1slot");
if (tv_rreject == NULL) {
printf("ERROR: TmThreadsCreate failed\n");
exit(EXIT_FAILURE);
}
tm_module = TmModuleGetByName("RespondReject");
if (tm_module == NULL) {
printf("ERROR: TmModuleGetByName for RespondReject failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv_rreject, tm_module, NULL);
if (threading_set_cpu_affinity) { TimeModeSetLive();
TmThreadSetCPUAffinity(tv_rreject, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_rreject, PRIO_MEDIUM);
}
if (TmThreadSpawn(tv_rreject) != TM_ECODE_OK) { ret = RunModeSetIPSAutoFp(de_ctx,
printf("ERROR: TmThreadSpawn failed\n"); IPFWGetThread,
exit(EXIT_FAILURE); "ReceiveIPFW",
} "VerdictIPFW",
"DecodeIPFW");
#endif /* IPFW */
return ret;
}
ThreadVars *tv_outputs = int RunModeIpsIPFWWorker(DetectEngineCtx *de_ctx)
TmThreadCreatePacketHandler("Outputs", {
"alert-queue1", "simple", SCEnter();
"packetpool", "packetpool", int ret = 0;
"varslot"); #ifdef IPFW
if (tv_outputs == NULL) { RunModeInitialize();
printf("ERROR: TmThreadCreatePacketHandler for Outputs failed\n");
exit(EXIT_FAILURE);
}
if (threading_set_cpu_affinity) { TimeModeSetLive();
TmThreadSetCPUAffinity(tv_outputs, 0);
if (ncpus > 1)
TmThreadSetThreadPriority(tv_outputs, PRIO_MEDIUM);
}
SetupOutputs(tv_outputs);
if (TmThreadSpawn(tv_outputs) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
return 0; ret = RunModeSetIPSWorker(de_ctx,
IPFWGetThread,
"ReceiveIPFW",
"VerdictIPFW",
"DecodeIPFW");
#endif /* IPFW */
return ret;
} }

@ -24,6 +24,8 @@
#define __RUNMODE_IPFW_H__ #define __RUNMODE_IPFW_H__
int RunModeIpsIPFWAuto(DetectEngineCtx *); int RunModeIpsIPFWAuto(DetectEngineCtx *);
int RunModeIpsIPFWAutoFp(DetectEngineCtx *);
int RunModeIpsIPFWWorker(DetectEngineCtx *);
void RunModeIpsIPFWRegister(void); void RunModeIpsIPFWRegister(void);
const char *RunModeIpsIPFWGetDefaultMode(void); const char *RunModeIpsIPFWGetDefaultMode(void);

@ -19,6 +19,7 @@
* \file * \file
* *
* \author Nick Rogness <nick@rogness.net> * \author Nick Rogness <nick@rogness.net>
* \author Eric Leblond <eric@regit.org>
* *
* IPFW packet acquisition support * IPFW packet acquisition support
*/ */
@ -34,7 +35,9 @@
#include "source-ipfw.h" #include "source-ipfw.h"
#include "util-debug.h" #include "util-debug.h"
#include "conf.h" #include "conf.h"
#include "util-byte.h"
#include "util-privs.h" #include "util-privs.h"
#include "util-device.h"
#define IPFW_ACCEPT 0 #define IPFW_ACCEPT 0
#define IPFW_DROP 1 #define IPFW_DROP 1
@ -53,6 +56,7 @@
TmEcode NoIPFWSupportExit(ThreadVars *, void *, void **); TmEcode NoIPFWSupportExit(ThreadVars *, void *, void **);
void TmModuleReceiveIPFWRegister (void) { void TmModuleReceiveIPFWRegister (void) {
tmm_modules[TMM_RECEIVEIPFW].name = "ReceiveIPFW"; tmm_modules[TMM_RECEIVEIPFW].name = "ReceiveIPFW";
tmm_modules[TMM_RECEIVEIPFW].ThreadInit = NoIPFWSupportExit; tmm_modules[TMM_RECEIVEIPFW].ThreadInit = NoIPFWSupportExit;
tmm_modules[TMM_RECEIVEIPFW].Func = NULL; tmm_modules[TMM_RECEIVEIPFW].Func = NULL;
@ -100,6 +104,11 @@ typedef struct IPFWThreadVars_
/* data link type for the thread, probably not needed */ /* data link type for the thread, probably not needed */
int datalink; int datalink;
/* this one should be not changing after init */
uint16_t port_num;
/* position into the NFQ queue var array */
uint16_t ipfw_index;
/* counters */ /* counters */
uint32_t pkts; uint32_t pkts;
uint64_t bytes; uint64_t bytes;
@ -108,15 +117,16 @@ typedef struct IPFWThreadVars_
uint32_t dropped; uint32_t dropped;
} IPFWThreadVars; } IPFWThreadVars;
/* Global socket handler for the divert socket */ static IPFWThreadVars ipfw_t[IPFW_MAX_QUEUE];
struct sockaddr_in ipfw_sin; static IPFWQueueVars ipfw_q[IPFW_MAX_QUEUE];
socklen_t ipfw_sinlen; static uint16_t receive_port_num = 0;
int ipfw_sock; static SCMutex ipfw_init_lock;
static SCMutex ipfw_socket_lock;
/* IPFW Prototypes */ /* IPFW Prototypes */
void *IPFWGetQueue(int number);
TmEcode ReceiveIPFWThreadInit(ThreadVars *, void *, void **); TmEcode ReceiveIPFWThreadInit(ThreadVars *, void *, void **);
TmEcode ReceiveIPFW(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode ReceiveIPFW(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot);
void ReceiveIPFWThreadExitStats(ThreadVars *, void *); void ReceiveIPFWThreadExitStats(ThreadVars *, void *);
TmEcode ReceiveIPFWThreadDeinit(ThreadVars *, void *); TmEcode ReceiveIPFWThreadDeinit(ThreadVars *, void *);
@ -134,9 +144,12 @@ TmEcode DecodeIPFW(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *)
* \todo Unit tests are needed for this module. * \todo Unit tests are needed for this module.
*/ */
void TmModuleReceiveIPFWRegister (void) { void TmModuleReceiveIPFWRegister (void) {
SCMutexInit(&ipfw_init_lock, NULL);
tmm_modules[TMM_RECEIVEIPFW].name = "ReceiveIPFW"; tmm_modules[TMM_RECEIVEIPFW].name = "ReceiveIPFW";
tmm_modules[TMM_RECEIVEIPFW].ThreadInit = ReceiveIPFWThreadInit; tmm_modules[TMM_RECEIVEIPFW].ThreadInit = ReceiveIPFWThreadInit;
tmm_modules[TMM_RECEIVEIPFW].Func = ReceiveIPFW; tmm_modules[TMM_RECEIVEIPFW].Func = ReceiveIPFW;
tmm_modules[TMM_RECEIVEIPFW].PktAcqLoop = ReceiveIPFWLoop;
tmm_modules[TMM_RECEIVEIPFW].ThreadExitPrintStats = ReceiveIPFWThreadExitStats; tmm_modules[TMM_RECEIVEIPFW].ThreadExitPrintStats = ReceiveIPFWThreadExitStats;
tmm_modules[TMM_RECEIVEIPFW].ThreadDeinit = ReceiveIPFWThreadDeinit; tmm_modules[TMM_RECEIVEIPFW].ThreadDeinit = ReceiveIPFWThreadDeinit;
tmm_modules[TMM_RECEIVEIPFW].cap_flags = SC_CAP_NET_ADMIN | SC_CAP_NET_RAW | tmm_modules[TMM_RECEIVEIPFW].cap_flags = SC_CAP_NET_ADMIN | SC_CAP_NET_RAW |
@ -189,7 +202,8 @@ void TmModuleDecodeIPFWRegister (void) {
TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq) TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, PacketQueue *postpq)
{ {
IPFWThreadVars *ptv = (IPFWThreadVars *)data; IPFWThreadVars *ptv = (IPFWThreadVars *)data;
char pkt[IP_MAXPACKET]; IPFWQueueVars *nq = IPFWGetQueue(ptv->ipfw_index);
uint8_t pkt[IP_MAXPACKET];
int pktlen=0; int pktlen=0;
int r = 0; int r = 0;
struct pollfd IPFWpoll; struct pollfd IPFWpoll;
@ -198,8 +212,8 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
//printf("Entering RecieveIPFW\n"); //printf("Entering RecieveIPFW\n");
IPFWpoll.fd=ipfw_sock; IPFWpoll.fd = nq->fd;
IPFWpoll.events= POLLRDNORM; IPFWpoll.events = POLLRDNORM;
/* Read packets from divert socket */ /* Read packets from divert socket */
while (r == 0) { while (r == 0) {
@ -218,18 +232,15 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
} /* end while */ } /* end while */
SCMutexLock(&ipfw_socket_lock); if ((pktlen = recvfrom(nq->fd, pkt, sizeof(pkt), 0,(struct sockaddr *)&nq->ipfw_sin, &nq->ipfw_sinlen)) == -1) {
if ((pktlen = recvfrom(ipfw_sock, pkt, sizeof(pkt), 0,(struct sockaddr *)&ipfw_sin, &ipfw_sinlen)) == -1) {
/* We received an error on socket read */ /* We received an error on socket read */
if (errno == EINTR || errno == EWOULDBLOCK) { if (errno == EINTR || errno == EWOULDBLOCK) {
/* Nothing for us to process */ /* Nothing for us to process */
SCMutexUnlock(&ipfw_socket_lock);
SCReturnInt(TM_ECODE_OK); SCReturnInt(TM_ECODE_OK);
} else { } else {
SCLogWarning(SC_WARN_IPFW_RECV,"Read from IPFW divert socket failed: %s",strerror(errno)); SCLogWarning(SC_WARN_IPFW_RECV,"Read from IPFW divert socket failed: %s",strerror(errno));
SCMutexUnlock(&ipfw_socket_lock);
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
@ -240,8 +251,6 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
r++; r++;
} }
SCMutexUnlock(&ipfw_socket_lock);
SCLogDebug("Received Packet Len: %d",pktlen); SCLogDebug("Received Packet Len: %d",pktlen);
/* Setup packet */ /* Setup packet */
@ -254,6 +263,9 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
ptv->bytes += pktlen; ptv->bytes += pktlen;
p->datalink = ptv->datalink; p->datalink = ptv->datalink;
p->ipfw_v.ipfw_index = ptv->ipfw_index;
PacketCopyData(p, pkt, pktlen); PacketCopyData(p, pkt, pktlen);
SCLogDebug("Packet info: pkt_len: %" PRIu32 " (pkt %02x, pkt_data %02x)", GET_PKT_LEN(p), *pkt, GET_PKT_DATA(p)); SCLogDebug("Packet info: pkt_len: %" PRIu32 " (pkt %02x, pkt_data %02x)", GET_PKT_LEN(p), *pkt, GET_PKT_DATA(p));
@ -264,6 +276,110 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
} }
TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
{
IPFWThreadVars *ptv = (IPFWThreadVars *)data;
IPFWQueueVars *nq = NULL;
uint8_t pkt[IP_MAXPACKET];
int pktlen=0;
struct pollfd IPFWpoll;
struct timeval IPFWts;
Packet *p = NULL;
uint16_t packet_q_len = 0;
SCEnter();
if (ptv == NULL) {
SCLogWarning(SC_ERR_INVALID_ARGUMENT, "Null data pointer");
SCReturnInt(TM_ECODE_FAILED);
}
nq = IPFWGetQueue(ptv->ipfw_index);
if (nq == NULL) {
SCLogWarning(SC_ERR_INVALID_ARGUMENT, "Can't get thread variable");
SCReturnInt(TM_ECODE_FAILED);
}
SCLogInfo("Thread '%s' will run on port %d (item %d)",
tv->name,
nq->port_num,
ptv->ipfw_index);
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL)
{
SCReturnInt(TM_ECODE_OK);
}
IPFWpoll.fd = nq->fd;
IPFWpoll.events = POLLRDNORM;
/* Poll the socket for status */
if ( (poll(&IPFWpoll, 1, IPFW_SOCKET_POLL_MSEC)) > 0) {
if (!(IPFWpoll.revents & (POLLRDNORM | POLLERR)))
continue;
}
if ((pktlen = recvfrom(nq->fd, pkt, sizeof(pkt), 0,
(struct sockaddr *)&nq->ipfw_sin,
&nq->ipfw_sinlen)) == -1) {
/* We received an error on socket read */
if (errno == EINTR || errno == EWOULDBLOCK) {
/* Nothing for us to process */
continue;
} else {
SCLogWarning(SC_WARN_IPFW_RECV,
"Read from IPFW divert socket failed: %s",
strerror(errno));
SCReturnInt(TM_ECODE_FAILED);
}
} else {
/* We have a packet to process */
memset (&IPFWts, 0, sizeof(struct timeval));
gettimeofday(&IPFWts, NULL);
}
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
do {
packet_q_len = PacketPoolSize();
if (unlikely(packet_q_len == 0)) {
PacketPoolWait();
}
} while (packet_q_len == 0);
p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
SCReturnInt(TM_ECODE_FAILED);
}
SCLogDebug("Received Packet Len: %d", pktlen);
p->ts.tv_sec = IPFWts.tv_sec;
p->ts.tv_usec = IPFWts.tv_usec;
ptv->pkts++;
ptv->bytes += pktlen;
p->datalink = ptv->datalink;
p->ipfw_v.ipfw_index = ptv->ipfw_index;
PacketCopyData(p, pkt, pktlen);
SCLogDebug("Packet info: pkt_len: %" PRIu32 " (pkt %02x, pkt_data %02x)",
GET_PKT_LEN(p), *pkt, GET_PKT_DATA(p));
if (TmThreadsSlotProcessPkt(tv, ((TmSlot *) slot)->slot_next, p)
!= TM_ECODE_OK) {
TmqhOutputPacketpool(tv, p);
SCReturnInt(TM_ECODE_FAILED);
}
SCPerfSyncCountersIfSignalled(tv, 0);
}
SCReturnInt(TM_ECODE_OK);
}
/** /**
* \brief Init function for RecieveIPFW. * \brief Init function for RecieveIPFW.
* *
@ -279,9 +395,8 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
TmEcode ReceiveIPFWThreadInit(ThreadVars *tv, void *initdata, void **data) TmEcode ReceiveIPFWThreadInit(ThreadVars *tv, void *initdata, void **data)
{ {
struct timeval timev; struct timeval timev;
IPFWThreadVars *ntv = (IPFWThreadVars *) initdata;
uint16_t divert_port=0; IPFWQueueVars *nq = IPFWGetQueue(ntv->ipfw_index);
char *tmpdivertport;
sigset_t sigs; sigset_t sigs;
sigfillset(&sigs); sigfillset(&sigs);
@ -289,31 +404,9 @@ TmEcode ReceiveIPFWThreadInit(ThreadVars *tv, void *initdata, void **data)
SCEnter(); SCEnter();
/* divert socket port to listen/send on */ SCMutexInit(&nq->socket_lock, NULL);
if ((ConfGet("ipfw.ipfw_divert_port", &tmpdivertport)) != 1) {
SCLogError(SC_ERR_IPFW_NOPORT,"Please supply an IPFW divert port");
SCReturnInt(TM_ECODE_FAILED);
} else {
if (atoi(tmpdivertport) > 0 && atoi(tmpdivertport) <= 65535) {
divert_port = (uint16_t)atoi(tmpdivertport);
SCLogInfo("Using IPFW divert port %u",divert_port);
} else {
SCLogError(SC_ERR_IPFW_BIND,"Divert port: %s is invalid",tmpdivertport);
SCReturnInt(TM_ECODE_FAILED);
}
}
/* Setup Threadvars */
IPFWThreadVars *ptv = SCMalloc(sizeof(IPFWThreadVars));
if (ptv == NULL)
SCReturnInt(TM_ECODE_FAILED);
memset(ptv, 0, sizeof(IPFWThreadVars));
SCMutexInit(&ipfw_socket_lock, NULL);
/* We need a divert socket to play with */ /* We need a divert socket to play with */
if ((ipfw_sock = socket(PF_INET, SOCK_RAW, IPPROTO_DIVERT)) == -1) { if ((nq->fd = socket(PF_INET, SOCK_RAW, IPPROTO_DIVERT)) == -1) {
SCLogError(SC_ERR_IPFW_SOCK,"Can't create divert socket: %s", strerror(errno)); SCLogError(SC_ERR_IPFW_SOCK,"Can't create divert socket: %s", strerror(errno));
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
@ -323,26 +416,26 @@ TmEcode ReceiveIPFWThreadInit(ThreadVars *tv, void *initdata, void **data)
timev.tv_sec = 1; timev.tv_sec = 1;
timev.tv_usec = 0; timev.tv_usec = 0;
if (setsockopt(ipfw_sock, SOL_SOCKET, SO_RCVTIMEO, &timev, sizeof(timev)) == -1) { if (setsockopt(nq->fd, SOL_SOCKET, SO_RCVTIMEO, &timev, sizeof(timev)) == -1) {
SCLogWarning(SC_WARN_IPFW_SETSOCKOPT,"Can't set IPFW divert socket timeout: %s", strerror(errno)); SCLogWarning(SC_WARN_IPFW_SETSOCKOPT,"Can't set IPFW divert socket timeout: %s", strerror(errno));
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
ipfw_sinlen=sizeof(ipfw_sin); nq->ipfw_sinlen=sizeof(nq->ipfw_sin);
memset(&ipfw_sin, 0, ipfw_sinlen); memset(&nq->ipfw_sin, 0, nq->ipfw_sinlen);
ipfw_sin.sin_family = PF_INET; nq->ipfw_sin.sin_family = PF_INET;
ipfw_sin.sin_addr.s_addr = INADDR_ANY; nq->ipfw_sin.sin_addr.s_addr = INADDR_ANY;
ipfw_sin.sin_port = htons(divert_port); nq->ipfw_sin.sin_port = htons(nq->port_num);
/* Bind that SOB */ /* Bind that SOB */
if (bind(ipfw_sock, (struct sockaddr *)&ipfw_sin, ipfw_sinlen) == -1) { if (bind(nq->fd, (struct sockaddr *)&nq->ipfw_sin, nq->ipfw_sinlen) == -1) {
SCLogError(SC_ERR_IPFW_BIND,"Can't bind divert socket on port %d: %s",divert_port,strerror(errno)); SCLogError(SC_ERR_IPFW_BIND,"Can't bind divert socket on port %d: %s",nq->port_num,strerror(errno));
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
ptv->datalink = DLT_RAW; ntv->datalink = DLT_RAW;
*data = (void *)ptv; *data = (void *)ntv;
SCReturnInt(TM_ECODE_OK); SCReturnInt(TM_ECODE_OK);
} }
@ -373,11 +466,12 @@ void ReceiveIPFWThreadExitStats(ThreadVars *tv, void *data)
TmEcode ReceiveIPFWThreadDeinit(ThreadVars *tv, void *data) TmEcode ReceiveIPFWThreadDeinit(ThreadVars *tv, void *data)
{ {
IPFWThreadVars *ptv = (IPFWThreadVars *)data; IPFWThreadVars *ptv = (IPFWThreadVars *)data;
IPFWQueueVars *nq = IPFWGetQueue(ptv->ipfw_index);
SCEnter(); SCEnter();
/* Attempt to shut the socket down...close instead? */ /* Attempt to shut the socket down...close instead? */
if (shutdown(ipfw_sock,SHUT_RD) < 0) { if (shutdown(nq->fd, SHUT_RD) < 0) {
SCLogWarning(SC_WARN_IPFW_UNBIND,"Unable to disable ipfw socket: %s",strerror(errno)); SCLogWarning(SC_WARN_IPFW_UNBIND,"Unable to disable ipfw socket: %s",strerror(errno));
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
@ -465,11 +559,23 @@ TmEcode IPFWSetVerdict(ThreadVars *tv, IPFWThreadVars *ptv, Packet *p)
{ {
uint32_t verdict; uint32_t verdict;
struct pollfd IPFWpoll; struct pollfd IPFWpoll;
IPFWQueueVars *nq = NULL;
SCEnter(); SCEnter();
IPFWpoll.fd=ipfw_sock; if (p == NULL) {
IPFWpoll.events= POLLWRNORM; SCLogWarning(SC_ERR_INVALID_ARGUMENT, "Packet is NULL");
SCReturnInt(TM_ECODE_FAILED);
}
nq = IPFWGetQueue(p->ipfw_v.ipfw_index);
if (nq == NULL) {
SCLogWarning(SC_ERR_INVALID_ARGUMENT, "No thread found");
SCReturnInt(TM_ECODE_FAILED);
}
IPFWpoll.fd = nq->fd;
IPFWpoll.events = POLLWRNORM;
if (p->action & ACTION_DROP) { if (p->action & ACTION_DROP) {
verdict = IPFW_DROP; verdict = IPFW_DROP;
@ -485,7 +591,7 @@ TmEcode IPFWSetVerdict(ThreadVars *tv, IPFWThreadVars *ptv, Packet *p)
/* For divert sockets, accepting means writing the /* For divert sockets, accepting means writing the
* packet back to the socket for ipfw to pick up * packet back to the socket for ipfw to pick up
*/ */
SCLogDebug("IPFWSetVerdict writing to socket %d, %p, %u", ipfw_sock,GET_PKT_DATA(p),GET_PKT_LEN(p)); SCLogDebug("IPFWSetVerdict writing to socket %d, %p, %u", nq->fd, GET_PKT_DATA(p),GET_PKT_LEN(p));
while ( (poll(&IPFWpoll,1,IPFW_SOCKET_POLL_MSEC)) < 1) { while ( (poll(&IPFWpoll,1,IPFW_SOCKET_POLL_MSEC)) < 1) {
@ -496,14 +602,14 @@ TmEcode IPFWSetVerdict(ThreadVars *tv, IPFWThreadVars *ptv, Packet *p)
} }
} }
SCMutexLock(&ipfw_socket_lock); SCMutexLock(&nq->socket_lock);
if (sendto(ipfw_sock, GET_PKT_DATA(p), GET_PKT_LEN(p), 0,(struct sockaddr *)&ipfw_sin, ipfw_sinlen) == -1) { if (sendto(nq->fd, GET_PKT_DATA(p), GET_PKT_LEN(p), 0,(struct sockaddr *)&nq->ipfw_sin, nq->ipfw_sinlen) == -1) {
SCLogWarning(SC_WARN_IPFW_XMIT,"Write to ipfw divert socket failed: %s",strerror(errno)); SCLogWarning(SC_WARN_IPFW_XMIT,"Write to ipfw divert socket failed: %s",strerror(errno));
SCMutexUnlock(&ipfw_socket_lock); SCMutexUnlock(&nq->socket_lock);
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
SCMutexUnlock(&ipfw_socket_lock); SCMutexUnlock(&nq->socket_lock);
SCLogDebug("Sent Packet back into IPFW Len: %d",GET_PKT_LEN(p)); SCLogDebug("Sent Packet back into IPFW Len: %d",GET_PKT_LEN(p));
@ -569,7 +675,7 @@ TmEcode VerdictIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
/* don't verdict if we are not ready */ /* don't verdict if we are not ready */
if (verdict == 1) { if (verdict == 1) {
SCLogDebug("Setting verdict on tunnel"); SCLogDebug("Setting verdict on tunnel");
retval=IPFWSetVerdict(tv, ptv, p->root ? p->root : p); retval = IPFWSetVerdict(tv, ptv, p->root ? p->root : p);
} else { } else {
TUNNEL_INCR_PKT_RTV(p); TUNNEL_INCR_PKT_RTV(p);
@ -577,7 +683,7 @@ TmEcode VerdictIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
} else { } else {
/* no tunnel, verdict normally */ /* no tunnel, verdict normally */
SCLogDebug("Setting verdict on non-tunnel"); SCLogDebug("Setting verdict on non-tunnel");
retval=IPFWSetVerdict(tv, ptv, p); retval = IPFWSetVerdict(tv, ptv, p);
} /* IS_TUNNEL_PKT end */ } /* IS_TUNNEL_PKT end */
SCReturnInt(retval); SCReturnInt(retval);
@ -640,6 +746,85 @@ void VerdictIPFWThreadExitStats(ThreadVars *tv, void *data)
SCLogInfo("IPFW Processing: - (%s) Pkts accepted %" PRIu32 ", dropped %" PRIu32 "", tv->name, ptv->accepted, ptv->dropped); SCLogInfo("IPFW Processing: - (%s) Pkts accepted %" PRIu32 ", dropped %" PRIu32 "", tv->name, ptv->accepted, ptv->dropped);
} }
/**
* \brief Add an IPFW divert
*
* \param string with the queue name
*
* \retval 0 on success.
* \retval -1 on failure.
*/
int IPFWRegisterQueue(char *queue)
{
IPFWThreadVars *ntv = NULL;
IPFWQueueVars *nq = NULL;
/* Extract the queue number from the specified command line argument */
uint16_t port_num = 0;
if ((ByteExtractStringUint16(&port_num, 10, strlen(queue), queue)) < 0)
{
SCLogError(SC_ERR_INVALID_ARGUMENT, "specified queue number %s is not "
"valid", queue);
return -1;
}
SCMutexLock(&ipfw_init_lock);
if (receive_port_num >= IPFW_MAX_QUEUE) {
SCLogError(SC_ERR_INVALID_ARGUMENT,
"too much IPFW divert port registered (%d)",
receive_port_num);
SCMutexUnlock(&ipfw_init_lock);
return -1;
}
if (receive_port_num == 0) {
memset(&ipfw_t, 0, sizeof(ipfw_t));
memset(&ipfw_q, 0, sizeof(ipfw_q));
}
ntv = &ipfw_t[receive_port_num];
ntv->ipfw_index = receive_port_num;
nq = &ipfw_q[receive_port_num];
nq->port_num = port_num;
receive_port_num++;
SCMutexUnlock(&ipfw_init_lock);
LiveRegisterDevice(queue);
SCLogDebug("Queue \"%s\" registered.", queue);
return 0;
}
/**
* \brief Get a pointer to the IPFW queue at index
*
* \param number idx of the queue in our array
*
* \retval ptr pointer to the IPFWThreadVars at index
* \retval NULL on error
*/
void *IPFWGetQueue(int number) {
if (number >= receive_port_num)
return NULL;
return (void *)&ipfw_q[number];
}
/**
* \brief Get a pointer to the IPFW thread at index
*
* This function is temporary used as configuration parser.
*
* \param number idx of the queue in our array
*
* \retval ptr pointer to the IPFWThreadVars at index
* \retval NULL on error
*/
void *IPFWGetThread(int number) {
if (number >= receive_port_num)
return NULL;
return (void *)&ipfw_t[number];
}
#endif /* End ifdef IPFW */ #endif /* End ifdef IPFW */
/* eof */ /* eof */

@ -27,14 +27,45 @@
#include <pthread.h> #include <pthread.h>
#define IPFW_MAX_QUEUE 16
/* per packet IPFW vars (Not used) */ /* per packet IPFW vars (Not used) */
typedef struct IPFWPacketVars_ typedef struct IPFWPacketVars_
{ {
int ipfw_index;
} IPFWPacketVars; } IPFWPacketVars;
typedef struct IPFWQueueVars_
{
int fd;
SCMutex socket_lock;
/* this one should be not changing after init */
uint16_t port_num;
/* position into the ipfw queue var array */
uint16_t ipfw_index;
struct sockaddr_in ipfw_sin;
socklen_t ipfw_sinlen;
#ifdef DBG_PERF
int dbg_maxreadsize;
#endif /* DBG_PERF */
/* counters */
uint32_t pkts;
uint64_t bytes;
uint32_t errs;
uint32_t accepted;
uint32_t dropped;
uint32_t replaced;
} IPFWQueueVars;
void *IPFWGetThread(int number);
int IPFWRegisterQueue(char *queue);
void TmModuleReceiveIPFWRegister (void); void TmModuleReceiveIPFWRegister (void);
void TmModuleVerdictIPFWRegister (void); void TmModuleVerdictIPFWRegister (void);
void TmModuleDecodeIPFWRegister (void); void TmModuleDecodeIPFWRegister (void);
#endif /* __SOURCE_IPFW_H__ */ #endif /* __SOURCE_IPFW_H__ */

@ -993,16 +993,17 @@ int main(int argc, char **argv)
if (run_mode == RUNMODE_UNKNOWN) { if (run_mode == RUNMODE_UNKNOWN) {
run_mode = RUNMODE_IPFW; run_mode = RUNMODE_IPFW;
SET_ENGINE_MODE_IPS(engine_mode); SET_ENGINE_MODE_IPS(engine_mode);
if (IPFWRegisterQueue(optarg) == -1)
exit(EXIT_FAILURE);
} else if (run_mode == RUNMODE_IPFW) {
if (IPFWRegisterQueue(optarg) == -1)
exit(EXIT_FAILURE);
} else { } else {
SCLogError(SC_ERR_MULTIPLE_RUN_MODE, "more than one run mode " SCLogError(SC_ERR_MULTIPLE_RUN_MODE, "more than one run mode "
"has been specified"); "has been specified");
usage(argv[0]); usage(argv[0]);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
if (ConfSet("ipfw.ipfw_divert_port", optarg, 0) != 1) {
fprintf(stderr, "ERROR: Failed to set ipfw_divert_port\n");
exit(EXIT_FAILURE);
}
#else #else
SCLogError(SC_ERR_IPFW_NOSUPPORT,"IPFW not enabled. Make sure to pass --enable-ipfw to configure when building."); SCLogError(SC_ERR_IPFW_NOSUPPORT,"IPFW not enabled. Make sure to pass --enable-ipfw to configure when building.");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);

Loading…
Cancel
Save