main/flush: Support periodic flush logs

Issue: 3449
pull/12679/head
Jeff Lucovsky 1 year ago committed by Victor Julien
parent 36111450ac
commit 49d4686144

@ -344,6 +344,7 @@ noinst_HEADERS = \
ippair-storage.h \
ippair-timeout.h \
log-cf-common.h \
log-flush.h \
log-httplog.h \
log-pcap.h \
log-stats.h \
@ -916,6 +917,7 @@ libsuricata_c_a_SOURCES = \
ippair-storage.c \
ippair-timeout.c \
log-cf-common.c \
log-flush.c \
log-httplog.c \
log-pcap.c \
log-stats.c \

@ -2323,15 +2323,50 @@ int DetectEngineInspectPktBufferGeneric(
}
/** \internal
* \brief inject a pseudo packet into each detect thread that doesn't use the
* new det_ctx yet
* \brief inject a pseudo packet into each detect thread
* if the thread should flush its output logs.
*/
static void InjectPackets(ThreadVars **detect_tvs,
DetectEngineThreadCtx **new_det_ctx,
int no_of_detect_tvs)
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread isn't using the new ctx yet,
* this speeds up the process */
/* inject a fake packet if the detect thread that needs it. This function
* is called when a heartbeat log-flush request has been made
* and it should process a pseudo packet and flush its output logs
* to speed the process. */
#if DEBUG
int count = 0;
#endif
for (int i = 0; i < no_of_detect_tvs; i++) {
if (detect_tvs[i]) { // && detect_tvs[i]->inq != NULL) {
Packet *p = PacketGetFromAlloc();
if (p != NULL) {
SCLogDebug("Injecting pkt for tv %s[i=%d] %d", detect_tvs[i]->name, i, count++);
p->flags |= PKT_PSEUDO_STREAM_END;
p->flags |= PKT_PSEUDO_LOG_FLUSH;
PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
PacketQueue *q = detect_tvs[i]->stream_pq;
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}
}
}
SCLogDebug("leaving: thread notification count = %d", count);
}
/** \internal
* \brief inject a pseudo packet into each detect thread
* -that doesn't use the new det_ctx yet
* -*or*, if the thread should flush its output logs.
*/
static void InjectPackets(
ThreadVars **detect_tvs, DetectEngineThreadCtx **new_det_ctx, int no_of_detect_tvs)
{
/* inject a fake packet if the detect thread that needs it. This function
* is called if
* - A thread isn't using a DE ctx and should
* - Or, it should process a pseudo packet and flush its output logs.
* to speed the process. */
for (int i = 0; i < no_of_detect_tvs; i++) {
if (SC_ATOMIC_GET(new_det_ctx[i]->so_far_used_by_detect) != 1) {
if (detect_tvs[i]->inq != NULL) {

@ -212,4 +212,6 @@ void DetectEngineStateResetTxs(Flow *f);
void DeStateRegisterTests(void);
/* packet injection */
void InjectPacketsForFlush(ThreadVars **detect_tvs, int no_of_detect_tvs);
#endif /* SURICATA_DETECT_ENGINE_H */

@ -73,6 +73,8 @@ typedef struct FlowWorkerThreadData_ {
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
SC_ATOMIC_DECLARE(bool, flush_ack);
void *output_thread; /* Output thread data. */
void *output_thread_flow; /* Output thread data. */
@ -554,6 +556,15 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
SCLogDebug("packet %"PRIu64, p->pcap_cnt);
if ((PKT_IS_FLUSHPKT(p))) {
SCLogDebug("thread %s flushing", tv->printable_name);
OutputLoggerFlush(tv, p, fw->output_thread);
/* Ack if a flush was requested */
bool notset = false;
SC_ATOMIC_CAS(&fw->flush_ack, notset, true);
return TM_ECODE_OK;
}
/* handle Flow */
if (p->flags & PKT_WANTS_FLOW) {
FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
@ -723,6 +734,23 @@ void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
return SC_ATOMIC_GET(fw->detect_thread);
}
void *FlowWorkerGetThreadData(void *flow_worker)
{
return (FlowWorkerThreadData *)flow_worker;
}
bool FlowWorkerGetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
return SC_ATOMIC_GET(fw->flush_ack) == true;
}
void FlowWorkerSetFlushAck(void *flow_worker)
{
FlowWorkerThreadData *fw = flow_worker;
SC_ATOMIC_SET(fw->flush_ack, false);
}
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
{
switch (fwi) {

@ -32,6 +32,9 @@ const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi);
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx);
void *FlowWorkerGetDetectCtxPtr(void *flow_worker);
void *FlowWorkerGetThreadData(void *flow_worker);
bool FlowWorkerGetFlushAck(void *flow_worker);
void FlowWorkerSetFlushAck(void *flow_worker);
void TmModuleFlowWorkerRegister (void);

@ -0,0 +1,199 @@
/* Copyright (C) 2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Jeff Lucovsky <jlucovsky@oisf.net>
*/
#include "suricata-common.h"
#include "suricata.h"
#include "detect.h"
#include "detect-engine.h"
#include "flow-worker.h"
#include "log-flush.h"
#include "tm-threads.h"
#include "conf.h"
#include "conf-yaml-loader.h"
#include "util-privs.h"
/**
* \brief Trigger detect threads to flush their output logs
*
* This function is intended to be called at regular intervals to force
* buffered log data to be persisted
*/
static void WorkerFlushLogs(void)
{
SCEnter();
/* count detect threads in use */
uint32_t no_of_detect_tvs = TmThreadCountThreadsByTmmFlags(TM_FLAG_DETECT_TM);
/* can be zero in unix socket mode */
if (no_of_detect_tvs == 0) {
return;
}
/* prepare swap structures */
void *fw_threads[no_of_detect_tvs];
ThreadVars *detect_tvs[no_of_detect_tvs];
memset(fw_threads, 0x00, (no_of_detect_tvs * sizeof(void *)));
memset(detect_tvs, 0x00, (no_of_detect_tvs * sizeof(ThreadVars *)));
/* start by initiating the log flushes */
uint32_t i = 0;
SCMutexLock(&tv_root_lock);
/* get reference to tv's and setup fw_threads array */
for (ThreadVars *tv = tv_root[TVT_PPT]; tv != NULL; tv = tv->next) {
if ((tv->tmm_flags & TM_FLAG_DETECT_TM) == 0) {
continue;
}
for (TmSlot *s = tv->tm_slots; s != NULL; s = s->slot_next) {
TmModule *tm = TmModuleGetById(s->tm_id);
if (!(tm->flags & TM_FLAG_DETECT_TM)) {
continue;
}
if (suricata_ctl_flags != 0) {
SCMutexUnlock(&tv_root_lock);
goto error;
}
fw_threads[i] = FlowWorkerGetThreadData(SC_ATOMIC_GET(s->slot_data));
if (fw_threads[i]) {
FlowWorkerSetFlushAck(fw_threads[i]);
SCLogDebug("Setting flush-ack for thread %s[i=%d]", tv->printable_name, i);
detect_tvs[i] = tv;
}
i++;
break;
}
}
BUG_ON(i != no_of_detect_tvs);
SCMutexUnlock(&tv_root_lock);
SCLogDebug("Creating flush pseudo packets for %d threads", no_of_detect_tvs);
InjectPacketsForFlush(detect_tvs, no_of_detect_tvs);
uint32_t threads_done = 0;
retry:
for (i = 0; i < no_of_detect_tvs; i++) {
if (suricata_ctl_flags != 0) {
threads_done = no_of_detect_tvs;
break;
}
usleep(1000);
if (fw_threads[i] && FlowWorkerGetFlushAck(fw_threads[i])) {
SCLogDebug("thread slot %d has ack'd flush request", i);
threads_done++;
} else if (detect_tvs[i]) {
SCLogDebug("thread slot %d not yet ack'd flush request", i);
TmThreadsCaptureBreakLoop(detect_tvs[i]);
}
}
if (threads_done < no_of_detect_tvs) {
threads_done = 0;
SleepMsec(250);
goto retry;
}
error:
return;
}
static int OutputFlushInterval(void)
{
intmax_t output_flush_interval = 0;
if (ConfGetInt("heartbeat.output-flush-interval", &output_flush_interval) == 0) {
output_flush_interval = 0;
}
if (output_flush_interval < 0 || output_flush_interval > 60) {
SCLogConfig("flush_interval must be 0 or less than 60; using 0");
output_flush_interval = 0;
}
return (int)output_flush_interval;
}
static void *LogFlusherWakeupThread(void *arg)
{
int output_flush_interval = OutputFlushInterval();
/* This was checked by the logic creating this thread */
BUG_ON(output_flush_interval == 0);
SCLogConfig("Using output-flush-interval of %d seconds", output_flush_interval);
/*
* Calculate the number of sleep intervals based on the output flush interval. This is necessary
* because this thread pauses a fixed amount of time to react to shutdown situations more
* quickly.
*/
const int log_flush_sleep_time = 500; /* milliseconds */
const int flush_wait_count = (1000 * output_flush_interval) / log_flush_sleep_time;
ThreadVars *tv_local = (ThreadVars *)arg;
SCSetThreadName(tv_local->name);
if (tv_local->thread_setup_flags != 0)
TmThreadSetupOptions(tv_local);
/* Set the threads capability */
tv_local->cap_flags = 0;
SCDropCaps(tv_local);
TmThreadsSetFlag(tv_local, THV_INIT_DONE | THV_RUNNING);
int wait_count = 0;
uint64_t worker_flush_count = 0;
bool run = TmThreadsWaitForUnpause(tv_local);
while (run) {
usleep(log_flush_sleep_time * 1000);
if (++wait_count == flush_wait_count) {
worker_flush_count++;
WorkerFlushLogs();
wait_count = 0;
}
if (TmThreadsCheckFlag(tv_local, THV_KILL)) {
break;
}
}
TmThreadsSetFlag(tv_local, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv_local, THV_DEINIT);
TmThreadsSetFlag(tv_local, THV_CLOSED);
SCLogInfo("%s: initiated %" PRIu64 " flushes", tv_local->name, worker_flush_count);
return NULL;
}
void LogFlushThreads(void)
{
if (0 == OutputFlushInterval()) {
SCLogConfig("log flusher thread not used with heartbeat.output-flush-interval of 0");
return;
}
ThreadVars *tv_log_flush =
TmThreadCreateMgmtThread(thread_name_heartbeat, LogFlusherWakeupThread, 1);
if (!tv_log_flush || (TmThreadSpawn(tv_log_flush) != 0)) {
FatalError("Unable to create and start log flush thread");
}
}

@ -0,0 +1,26 @@
/* Copyright (C) 2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Jeff Lucovsky <jlucovsky@oisf.net>
*/
#ifndef SURICATA_LOG_FLUSH_H__
#define SURICATA_LOG_FLUSH_H__
void LogFlushThreads(void);
#endif /* SURICATA_LOG_FLUSH_H__ */

@ -708,6 +708,21 @@ void OutputNotifyFileRotation(void) {
}
}
TmEcode OutputLoggerFlush(ThreadVars *tv, Packet *p, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
RootLogger *logger = TAILQ_FIRST(&active_loggers);
LoggerThreadStoreNode *thread_store_node = TAILQ_FIRST(thread_store);
while (logger && thread_store_node) {
if (logger->FlushFunc)
logger->FlushFunc(tv, p, thread_store_node->thread_data);
logger = TAILQ_NEXT(logger, entries);
thread_store_node = TAILQ_NEXT(thread_store_node, entries);
}
return TM_ECODE_OK;
}
TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;

@ -164,6 +164,7 @@ void OutputRegisterRootLogger(ThreadInitFunc ThreadInit, ThreadDeinitFunc Thread
void TmModuleLoggerRegister(void);
TmEcode OutputLoggerLog(ThreadVars *, Packet *, void *);
TmEcode OutputLoggerFlush(ThreadVars *, Packet *, void *);
TmEcode OutputLoggerThreadInit(ThreadVars *, const void *, void **);
TmEcode OutputLoggerThreadDeinit(ThreadVars *, void *);
void OutputLoggerExitPrintStats(ThreadVars *, void *);

@ -28,6 +28,7 @@
#include "util-debug.h"
#include "util-affinity.h"
#include "conf.h"
#include "log-flush.h"
#include "runmodes.h"
#include "runmode-af-packet.h"
#include "runmode-af-xdp.h"
@ -72,6 +73,7 @@ const char *thread_name_unix_socket = "US";
const char *thread_name_detect_loader = "DL";
const char *thread_name_counter_stats = "CS";
const char *thread_name_counter_wakeup = "CW";
const char *thread_name_heartbeat = "HB";
/**
* \brief Holds description for a runmode.
@ -436,6 +438,7 @@ void RunModeDispatch(int runmode, const char *custom_mode, const char *capture_p
BypassedFlowManagerThreadSpawn();
}
StatsSpawnThreads();
LogFlushThreads();
TmThreadsSealThreads();
}
}

@ -73,6 +73,7 @@ extern const char *thread_name_unix_socket;
extern const char *thread_name_detect_loader;
extern const char *thread_name_counter_stats;
extern const char *thread_name_counter_wakeup;
extern const char *thread_name_heartbeat;
char *RunmodeGetActive(void);
const char *RunModeGetMainMode(void);

@ -3021,7 +3021,7 @@ void SuricataPostInit(void)
#if defined(HAVE_SYS_RESOURCE_H)
#ifdef linux
if (geteuid() == 0) {
SCLogWarning("setrlimit has no effet when running as root.");
SCLogWarning("setrlimit has no effect when running as root.");
}
#endif
struct rlimit r = { 0, 0 };

@ -152,6 +152,7 @@ typedef struct SCInstance_ {
int offline;
int verbose;
int checksum_validation;
int output_flush_interval;
struct timeval start_time;

@ -572,19 +572,20 @@ outputs:
scripts:
# - script1.lua
# Logging configuration. This is not about logging IDS alerts/events, but
# output about what Suricata is doing, like startup messages, errors, etc.
logging:
# The flush-interval governs how often Suricata will instruct the detection
# threads to flush their EVE output. Specify the value in seconds [1-60]
heartbeat:
# The output-flush-interval value governs how often Suricata will instruct the
# detection threads to flush their EVE output. Specify the value in seconds [1-60]
# and Suricata will initiate EVE log output flushes at that interval. A value
# of 0 means no EVE log output flushes are initiated. When the EVE output
# buffer-size value is non-zero, some EVE output that was written may remain
# buffered. The flush-interval governs how much buffered data exists.
# buffered. The output-flush-interval governs how much buffered data exists.
#
# The default value is: 0 (never instruct detection threads to flush output)
#flush-interval: 0
#output-flush-interval: 0
# Logging configuration. This is not about logging IDS alerts/events, but
# output about what Suricata is doing, like startup messages, errors, etc.
logging:
# The default log level: can be overridden in an output section.
# Note that debug level logging will only be emitted if Suricata was
# compiled with the --enable-debug configure option.

Loading…
Cancel
Save