libsuricata: add library runmode

Add library source and runmode modules. Reorganized
library example to create a worker thread and replay a pcap
file using the library mode.
No API layer is added at this stage.

Edits by Jason Ish:
- fix guard
- add copyright/license headers

Ticket: #7240
pull/12891/head
Angelo Mirabella 1 year ago committed by Victor Julien
parent d8c6a56a62
commit ee9714e593

@ -1,12 +1,16 @@
LIBSURICATA_CONFIG ?= @CONFIGURE_PREFIX@/bin/libsuricata-config
SURICATA_LIBS = `$(LIBSURICATA_CONFIG) --libs`
SURICATA_LIBS = `$(LIBSURICATA_CONFIG) --libs --static`
SURICATA_CFLAGS := `$(LIBSURICATA_CONFIG) --cflags`
# Currently the Suricata logging system requires this to be even for
# plugins.
CPPFLAGS += "-D__SCFILENAME__=\"$(*F)\""
all: simple
simple: main.c
$(CC) -o $@ $^ $(CFLAGS) $(SURICATA_CFLAGS) $(SURICATA_LIBS)
$(CC) -o $@ $^ $(CPPFLAGS) $(CFLAGS) $(SURICATA_CFLAGS) $(SURICATA_LIBS)
clean:
rm -f simple

@ -16,14 +16,62 @@
*/
#include "suricata.h"
#include "conf.h"
#include "pcap.h"
#include "runmode-lib.h"
#include "source-lib.h"
#include "threadvars.h"
/* Suricata worker thread in library mode.
The functions should be wrapped in an API layer. */
static void *SimpleWorker(void *arg)
{
char *pcap_file = (char *)arg;
/* Create worker. */
ThreadVars *tv = RunModeCreateWorker();
if (!tv) {
pthread_exit(NULL);
}
/* Start worker. */
if (RunModeSpawnWorker(tv) != 0) {
pthread_exit(NULL);
}
/* Replay pcap. */
pcap_t *fp = pcap_open_offline(pcap_file, NULL);
if (fp == NULL) {
pthread_exit(NULL);
}
int datalink = pcap_datalink(fp);
struct pcap_pkthdr pkthdr;
const u_char *packet;
while ((packet = pcap_next(fp, &pkthdr)) != NULL) {
if (TmModuleLibHandlePacket(tv, packet, datalink, pkthdr.ts, pkthdr.len, 0, 0, NULL) != 0) {
pthread_exit(NULL);
}
}
pcap_close(fp);
/* Cleanup. */
RunModeDestroyWorker(tv);
pthread_exit(NULL);
}
int main(int argc, char **argv)
{
SuricataPreInit(argv[0]);
/* Parse command line options. This is optional, you could
* directly configure Suricata through the Conf API. */
SCParseCommandLine(argc, argv);
* directly configure Suricata through the Conf API.
The last argument is the PCAP file to replay. */
SCParseCommandLine(argc - 1, argv);
/* Set lib runmode. There is currently no way to set it via
the Conf API. */
SuricataSetLibRunmode();
/* Validate/finalize the runmode. */
if (SCFinalizeRunMode() != TM_ECODE_OK) {
@ -47,12 +95,27 @@ int main(int argc, char **argv)
exit(EXIT_FAILURE);
}
/* Set "offline" runmode to replay a pcap in library mode. */
if (!ConfSetFromString("runmode=offline", 1)) {
exit(EXIT_FAILURE);
}
SuricataInit();
SuricataPostInit();
/* Suricata is now running, but we enter a loop to keep it running
* until it shouldn't be running anymore. */
SuricataMainLoop();
/* Create and start worker on its own thread, passing the PCAP file
as argument. This needs to be done in between SuricataInit and
SuricataPostInit. */
pthread_t worker;
if (pthread_create(&worker, NULL, SimpleWorker, argv[argc - 1]) != 0) {
exit(EXIT_FAILURE);
}
/* Need to introduce a little sleep to allow the worker thread to
initialize before SuricataPostInit invokes TmThreadContinueThreads().
This should be handle at the API level. */
usleep(100);
SuricataPostInit();
/* Shutdown engine. */
SuricataShutdown();

@ -405,6 +405,7 @@ noinst_HEADERS = \
runmode-erf-dag.h \
runmode-erf-file.h \
runmode-ipfw.h \
runmode-lib.h \
runmode-netmap.h \
runmode-nflog.h \
runmode-nfq.h \
@ -422,6 +423,7 @@ noinst_HEADERS = \
source-erf-dag.h \
source-erf-file.h \
source-ipfw.h \
source-lib.h \
source-netmap.h \
source-nflog.h \
source-nfq.h \
@ -983,6 +985,7 @@ libsuricata_c_a_SOURCES = \
runmode-erf-dag.c \
runmode-erf-file.c \
runmode-ipfw.c \
runmode-lib.c \
runmode-netmap.c \
runmode-nflog.c \
runmode-nfq.c \
@ -999,6 +1002,7 @@ libsuricata_c_a_SOURCES = \
source-erf-dag.c \
source-erf-file.c \
source-ipfw.c \
source-lib.c \
source-netmap.c \
source-nflog.c \
source-nfq.c \

@ -0,0 +1,154 @@
/* Copyright (C) 2023-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/** \file
*
* \author Angelo Mirabella <angelo.mirabella@broadcom.com>
*
* Library runmode.
*/
#include "suricata-common.h"
#include "runmode-lib.h"
#include "runmodes.h"
#include "tm-threads.h"
#include "util-device.h"
static int g_thread_id = 0;
/** \brief register runmodes for suricata as a library */
void RunModeIdsLibRegister(void)
{
RunModeRegisterNewRunMode(RUNMODE_LIB, "offline", "Library offline mode (pcap replaying)",
RunModeIdsLibOffline, NULL);
RunModeRegisterNewRunMode(RUNMODE_LIB, "live", "Library live mode", RunModeIdsLibLive, NULL);
return;
}
/** \brief runmode for offline packet processing (pcap files) */
int RunModeIdsLibOffline(void)
{
TimeModeSetOffline();
return 0;
}
/** \brief runmode for live packet processing */
int RunModeIdsLibLive(void)
{
TimeModeSetLive();
return 0;
}
const char *RunModeLibGetDefaultMode(void)
{
return "live";
}
/** \brief create a "fake" worker thread in charge of processing the packets.
*
* This method just creates a context representing the worker, which is handled from the library
* client. No actual thread (pthread_t) is created.
*
* \return Pointer to ThreadVars structure representing the worker thread */
void *RunModeCreateWorker(void)
{
char tname[TM_THREAD_NAME_MAX];
TmModule *tm_module = NULL;
snprintf(tname, sizeof(tname), "%s#%02d", thread_name_workers, ++g_thread_id);
ThreadVars *tv = TmThreadCreatePacketHandler(
tname, "packetpool", "packetpool", "packetpool", "packetpool", "lib");
if (tv == NULL) {
SCLogError("TmThreadsCreate failed");
return NULL;
}
tm_module = TmModuleGetByName("DecodeLib");
if (tm_module == NULL) {
SCLogError("TmModuleGetByName DecodeLib failed");
return NULL;
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
tm_module = TmModuleGetByName("FlowWorker");
if (tm_module == NULL) {
SCLogError("TmModuleGetByName for FlowWorker failed");
return NULL;
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
TmThreadAppend(tv, tv->type);
return tv;
}
/** \brief start the "fake" worker.
*
* This method performs all the initialization tasks.
*/
int RunModeSpawnWorker(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
if (TmThreadLibSpawn(tv) != TM_ECODE_OK) {
SCLogError("TmThreadLibSpawn failed");
return -1;
}
TmThreadsSetFlag(tv, THV_RUNNING);
return 0;
}
/** \brief destroy a worker thread */
void RunModeDestroyWorker(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
TmEcode r;
TmSlot *slot = NULL;
StatsSyncCounters(tv);
TmThreadsSetFlag(tv, THV_FLOW_LOOP);
/* process all pseudo packets the flow timeout may throw at us */
TmThreadTimeoutLoop(tv, s);
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv, THV_DEINIT);
PacketPoolDestroy();
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
}
if (slot->SlotThreadDeinit != NULL) {
r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
if (r != TM_ECODE_OK) {
break;
}
}
}
tv->stream_pq = NULL;
--g_thread_id;
SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
}

@ -0,0 +1,57 @@
/* Copyright (C) 2023-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/** \file
*
* \author Angelo Mirabella <angelo.mirabella@broadcom.com>
*
* Library runmode.
*/
#ifndef SURICATA_RUNMODE_LIB_H
#define SURICATA_RUNMODE_LIB_H
/** \brief register runmodes for suricata as a library */
void RunModeIdsLibRegister(void);
/** \brief runmode for live packet processing */
int RunModeIdsLibLive(void);
/** \brief runmode for offline packet processing (pcap files) */
int RunModeIdsLibOffline(void);
/** \brief runmode default mode (live) */
const char *RunModeLibGetDefaultMode(void);
/** \brief create a "fake" worker thread in charge of processing the packets.
*
* This method just creates a context representing the worker, which is handled from the library
* client. No actual thread (pthread_t) is created.
*
* \return Pointer to ThreadVars structure representing the worker thread */
void *RunModeCreateWorker(void);
/** \brief start the "fake" worker.
*
* This method performs all the initialization tasks.
*/
int RunModeSpawnWorker(void *);
/** \brief destroy a worker thread */
void RunModeDestroyWorker(void *);
#endif /* SURICATA_RUNMODE_LIB_H */

@ -36,6 +36,7 @@
#include "runmode-erf-dag.h"
#include "runmode-erf-file.h"
#include "runmode-ipfw.h"
#include "runmode-lib.h"
#include "runmode-netmap.h"
#include "runmode-nflog.h"
#include "runmode-nfq.h"
@ -160,6 +161,8 @@ static const char *RunModeTranslateModeToName(int runmode)
#else
return "DPDK(DISABLED)";
#endif
case RUNMODE_LIB:
return "LIB";
default:
FatalError("Unknown runtime mode. Aborting");
@ -232,6 +235,7 @@ void RunModeRegisterRunModes(void)
RunModeUnixSocketRegister();
RunModeIpsWinDivertRegister();
RunModeDpdkRegister();
RunModeIdsLibRegister();
#ifdef UNITTESTS
UtRunModeRegister();
#endif
@ -344,6 +348,9 @@ static const char *RunModeGetConfOrDefault(int capture_mode, const char *capture
custom_mode = RunModeDpdkGetDefaultMode();
break;
#endif
case RUNMODE_LIB:
custom_mode = RunModeLibGetDefaultMode();
break;
default:
return NULL;
}

@ -37,6 +37,7 @@ enum RunModes {
RUNMODE_AFXDP_DEV,
RUNMODE_NETMAP,
RUNMODE_DPDK,
RUNMODE_LIB,
RUNMODE_UNITTEST,
RUNMODE_UNIX_SOCKET,
RUNMODE_WINDIVERT,

@ -0,0 +1,180 @@
/* Copyright (C) 2023-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/** \file
*
* \author Angelo Mirabella <angelo.mirabella@broadcom.com>
*
* LIB packet and stream decoding support
*
*/
#include "suricata-common.h"
#include "source-lib.h"
#include "util-device.h"
static TmEcode DecodeLibThreadInit(ThreadVars *tv, const void *initdata, void **data);
static TmEcode DecodeLibThreadDeinit(ThreadVars *tv, void *data);
static TmEcode DecodeLib(ThreadVars *tv, Packet *p, void *data);
/* Set time to the first packet timestamp when replaying a PCAP. */
static bool time_set = false;
/** \brief register a "Decode" module for suricata as a library.
*
* The "Decode" module is the first module invoked when processing a packet */
void TmModuleDecodeLibRegister(void)
{
tmm_modules[TMM_DECODELIB].name = "DecodeLib";
tmm_modules[TMM_DECODELIB].ThreadInit = DecodeLibThreadInit;
tmm_modules[TMM_DECODELIB].Func = DecodeLib;
tmm_modules[TMM_DECODELIB].ThreadExitPrintStats = NULL;
tmm_modules[TMM_DECODELIB].ThreadDeinit = DecodeLibThreadDeinit;
tmm_modules[TMM_DECODELIB].cap_flags = 0;
tmm_modules[TMM_DECODELIB].flags = TM_FLAG_DECODE_TM;
}
/** \brief initialize the "Decode" module.
*
* \param tv Pointer to the per-thread structure.
* \param initdata Pointer to initialization context.
* \param data Pointer to the initialized context.
* \return Error code.
*/
TmEcode DecodeLibThreadInit(ThreadVars *tv, const void *initdata, void **data)
{
SCEnter();
DecodeThreadVars *dtv = NULL;
dtv = DecodeThreadVarsAlloc(tv);
if (dtv == NULL)
SCReturnInt(TM_ECODE_FAILED);
DecodeRegisterPerfCounters(dtv, tv);
*data = (void *)dtv;
SCReturnInt(TM_ECODE_OK);
}
/** \brief deinitialize the "Decode" module.
*
* \param tv Pointer to the per-thread structure.
* \param data Pointer to the context.
* \return Error code.
*/
TmEcode DecodeLibThreadDeinit(ThreadVars *tv, void *data)
{
if (data != NULL)
DecodeThreadVarsFree(tv, data);
time_set = false;
SCReturnInt(TM_ECODE_OK);
}
/** \brief main decoding function.
*
* This method receives a packet and tries to identify layer 2 to 4 layers.
*
* \param tv Pointer to the per-thread structure.
* \param p Pointer to the packet.
* \param data Pointer to the context.
* \return Error code.
*/
TmEcode DecodeLib(ThreadVars *tv, Packet *p, void *data)
{
SCEnter();
DecodeThreadVars *dtv = (DecodeThreadVars *)data;
BUG_ON(PKT_IS_PSEUDOPKT(p));
/* update counters */
DecodeUpdatePacketCounters(tv, dtv, p);
/* If suri has set vlan during reading, we increase vlan counter */
if (p->vlan_idx) {
StatsIncr(tv, dtv->counter_vlan);
}
/* call the decoder */
DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
PacketDecodeFinalize(tv, dtv, p);
SCReturnInt(TM_ECODE_OK);
}
/** \brief process a single packet.
*
* \param tv Pointer to the per-thread structure.
* \param data Pointer to the raw packet.
* \param datalink Datalink type.
* \param ts Timeval structure.
* \param len Packet length.
* \param tenant_id Tenant id of the detection engine to use.
* \param flags Packet flags (packet checksum, rule profiling...).
* \param iface Sniffing interface this packet comes from (can be NULL).
* \return Error code.
*/
int TmModuleLibHandlePacket(ThreadVars *tv, const uint8_t *data, int datalink, struct timeval ts,
uint32_t len, uint32_t tenant_id, uint32_t flags, const char *iface)
{
/* If the packet is NULL, consider it as a read timeout. */
if (data == NULL) {
TmThreadsSetFlag(tv, THV_CAPTURE_INJECT_PKT);
TmThreadsCaptureHandleTimeout(tv, NULL);
SCReturnInt(TM_ECODE_OK);
}
Packet *p = PacketGetFromQueueOrAlloc();
if (unlikely(p == NULL)) {
SCReturnInt(TM_ECODE_FAILED);
}
/* If we are processing a PCAP and it is the first packet we need to set the timestamp. */
SCTime_t timestamp = SCTIME_FROM_TIMEVAL(&ts);
if (!time_set && !TimeModeIsLive()) {
TmThreadsInitThreadsTimestamp(timestamp);
time_set = true;
}
PKT_SET_SRC(p, PKT_SRC_WIRE);
p->ts = timestamp;
p->datalink = datalink;
p->tenant_id = tenant_id;
p->flags |= flags;
/* Set the sniffing interface. */
if (iface) {
p->livedev = LiveGetDevice(iface);
}
if (PacketSetData(p, data, len) == -1) {
TmqhOutputPacketpool(tv, p);
SCReturnInt(TM_ECODE_FAILED);
}
SCLogDebug("pktlen: %" PRIu32 " (pkt %p, pkt data %p)", GET_PKT_LEN(p), p, GET_PKT_DATA(p));
if (TmThreadsSlotProcessPkt(tv, tv->tm_slots, p) != TM_ECODE_OK) {
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_OK);
}

@ -0,0 +1,51 @@
/* Copyright (C) 2023-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/** \file
*
* \author Angelo Mirabella <angelo.mirabella@broadcom.com>
*
* LIB packet and stream decoding support
*
*/
#ifndef SURICATA_SOURCE_LIB_H
#define SURICATA_SOURCE_LIB_H
#include "tm-threads.h"
/** \brief register a "Decode" module for suricata as a library.
*
* The "Decode" module is the first module invoked when processing a packet */
void TmModuleDecodeLibRegister(void);
/** \brief process a single packet.
*
* \param tv Pointer to the per-thread structure.
* \param data Pointer to the raw packet.
* \param datalink Datalink type.
* \param ts Timeval structure.
* \param len Packet length.
* \param tenant_id Tenant id of the detection engine to use.
* \param flags Packet flags (packet checksum, rule profiling...).
* \param iface Sniffing interface this packet comes from (can be NULL).
* \return Error code.
*/
int TmModuleLibHandlePacket(ThreadVars *tv, const uint8_t *data, int datalink, struct timeval ts,
uint32_t len, uint32_t tenant_id, uint32_t flags, const char *iface);
#endif /* SURICATA_SOURCE_LIB_H */

@ -90,6 +90,7 @@
#include "source-nfq-prototypes.h"
#include "source-nflog.h"
#include "source-ipfw.h"
#include "source-lib.h"
#include "source-pcap.h"
#include "source-pcap-file.h"
#include "source-pcap-file-helper.h"
@ -954,6 +955,9 @@ void RegisterAllModules(void)
/* Dpdk */
TmModuleReceiveDPDKRegister();
TmModuleDecodeDPDKRegister();
/* Library */
TmModuleDecodeLibRegister();
}
TmEcode SCLoadYamlConfig(void)
@ -3063,3 +3067,8 @@ void SuricataPostInit(void)
}
SCPledge();
}
void SuricataSetLibRunmode(void)
{
suricata.run_mode = RUNMODE_LIB;
}

@ -227,4 +227,7 @@ int WindowsInitService(int argc, char **argv);
const char *GetProgramVersion(void);
/* Library only methods. */
void SuricataSetLibRunmode(void);
#endif /* SURICATA_SURICATA_H */

@ -182,6 +182,7 @@ const char * TmModuleTmmIdToString(TmmId id)
CASE_CODE(TMM_DECODEPCAPFILE);
CASE_CODE(TMM_RECEIVEDPDK);
CASE_CODE(TMM_DECODEDPDK);
CASE_CODE(TMM_DECODELIB);
CASE_CODE (TMM_RECEIVEPLUGIN);
CASE_CODE (TMM_DECODEPLUGIN);
CASE_CODE (TMM_RESPONDREJECT);

@ -64,6 +64,7 @@ typedef enum {
TMM_RECEIVEWINDIVERT,
TMM_VERDICTWINDIVERT,
TMM_DECODEWINDIVERT,
TMM_DECODELIB,
TMM_FLOWMANAGER,
TMM_FLOWRECYCLER,

@ -163,7 +163,7 @@ TmEcode TmThreadsSlotVarRun(ThreadVars *tv, Packet *p, TmSlot *slot)
* is run until the flow engine kills the thread and the queue is
* empty.
*/
static int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
{
TmSlot *fw_slot = tv->tm_flowworker;
int r = TM_ECODE_OK;
@ -379,6 +379,89 @@ bool TmThreadsWaitForUnpause(ThreadVars *tv)
return true;
}
static void *TmThreadsLib(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
TmEcode r = TM_ECODE_OK;
TmSlot *slot = NULL;
/* Set the thread name */
SCSetThreadName(tv->name);
if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
/* Drop the capabilities for this thread */
SCDropCaps(tv);
PacketPoolInit();
/* check if we are setup properly */
if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
SCLogError("TmSlot or ThreadVars badly setup: s=%p, tmqh_in=%p,"
" tmqh_out=%p",
s, tv->tmqh_in, tv->tmqh_out);
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
return NULL;
}
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
void *slot_data = NULL;
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
if (r == TM_ECODE_DONE) {
EngineDone();
TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
goto error;
} else {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
goto error;
}
}
(void)SC_ATOMIC_SET(slot->slot_data, slot_data);
}
/* if the flowworker module is the first, get the threads input queue */
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
return NULL;
}
/* setup a queue */
} else if (slot->tm_id == TMM_FLOWWORKER) {
tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
if (tv->stream_pq_local == NULL)
FatalError("failed to alloc PacketQueue");
SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
tv->stream_pq = tv->stream_pq_local;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
return NULL;
}
}
}
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE);
TmThreadsWaitForUnpause(tv);
return NULL;
error:
tv->stream_pq = NULL;
return NULL;
}
static void *TmThreadsSlotVar(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
@ -622,6 +705,8 @@ static TmEcode TmThreadSetSlots(ThreadVars *tv, const char *name, void *(*fn_p)(
tv->tm_func = TmThreadsManagement;
} else if (strcmp(name, "command") == 0) {
tv->tm_func = TmThreadsManagement;
} else if (strcmp(name, "lib") == 0) {
tv->tm_func = TmThreadsLib;
} else if (strcmp(name, "custom") == 0) {
if (fn_p == NULL)
goto error;
@ -1708,6 +1793,25 @@ TmEcode TmThreadSpawn(ThreadVars *tv)
return TM_ECODE_OK;
}
/**
* \brief Spawns a "fake" lib thread associated with the ThreadVars instance tv
*
* \retval TM_ECODE_OK on success and TM_ECODE_FAILED on failure
*/
TmEcode TmThreadLibSpawn(ThreadVars *tv)
{
if (tv->tm_func == NULL) {
printf("ERROR: no thread function set\n");
return TM_ECODE_FAILED;
}
tv->tm_func((void *)tv);
TmThreadWaitForFlag(tv, THV_INIT_DONE | THV_RUNNING_DONE);
return TM_ECODE_OK;
}
/**
* \brief Initializes the mutex and condition variables for this TV
*

@ -94,6 +94,8 @@ ThreadVars *TmThreadCreateMgmtThreadByName(const char *name, const char *module,
ThreadVars *TmThreadCreateCmdThreadByName(const char *name, const char *module,
int mucond);
TmEcode TmThreadSpawn(ThreadVars *);
TmEcode TmThreadLibSpawn(ThreadVars *);
int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s);
void TmThreadKillThreadsFamily(int family);
void TmThreadKillThreads(void);
void TmThreadClearThreadsFamily(int family);

Loading…
Cancel
Save