log: Support multi-threaded eve output.

pull/5257/head
Jeff Lucovsky 5 years ago committed by Victor Julien
parent 15b4554ab3
commit aa20770277

@ -1,5 +1,5 @@
/* vi: set et ts=4: */
/* Copyright (C) 2007-2014 Open Information Security Foundation
/* Copyright (C) 2007-2020 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
@ -41,6 +41,9 @@
#include "util-log-redis.h"
#endif /* HAVE_LIBHIREDIS */
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i);
static bool SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count);
#ifdef BUILD_WITH_UNIXSOCKET
/** \brief connect to the indicated local stream socket, logging any errors
* \param path filesystem path to connect to
@ -175,6 +178,40 @@ tryagain:
}
#endif /* BUILD_WITH_UNIXSOCKET */
/**
* \brief Write buffer to log file.
* \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of
* characters successfully written).
*/
static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx *log_ctx)
{
int ret = 0;
BUG_ON(log_ctx->is_sock);
/* Check for rotation. */
if (log_ctx->rotation_flag) {
log_ctx->rotation_flag = 0;
SCConfLogReopen(log_ctx);
}
if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) {
time_t now = time(NULL);
if (now >= log_ctx->rotate_time) {
SCConfLogReopen(log_ctx);
log_ctx->rotate_time = now + log_ctx->rotate_interval;
}
}
if (log_ctx->fp) {
clearerr(log_ctx->fp);
ret = SCFwriteUnlocked(buffer, buffer_len, 1, log_ctx->fp);
fflush(log_ctx->fp);
}
return ret;
}
/**
* \brief Write buffer to log file.
* \retval 0 on failure; otherwise, the return value of fwrite (number of
@ -239,12 +276,64 @@ static char *SCLogFilenameFromPattern(const char *pattern)
return filename;
}
static void SCLogFileClose(LogFileCtx *log_ctx)
static void SCLogFileCloseNoLock(LogFileCtx *log_ctx)
{
if (log_ctx->fp)
fclose(log_ctx->fp);
}
static void SCLogFileClose(LogFileCtx *log_ctx)
{
SCMutexLock(&log_ctx->fp_mutex);
SCLogFileCloseNoLock(log_ctx);
SCMutexUnlock(&log_ctx->fp_mutex);
}
static bool
SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
{
parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx));
if (!parent_ctx->threads) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads container");
return false;
}
parent_ctx->threads->append = SCStrdup(append);
if (!parent_ctx->threads->append) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting");
goto error_exit;
}
parent_ctx->threads->slot_count = slot_count;
parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *));
if (!parent_ctx->threads->lf_slots) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate thread slots");
goto error_exit;
}
SCLogDebug("Allocated %d file context pointers for threaded array",
parent_ctx->threads->slot_count);
int slot = 1;
for (; slot < parent_ctx->threads->slot_count; slot++) {
if (!LogFileNewThreadedCtx(parent_ctx, log_path, append, slot)) {
/* TODO: clear allocated entries [1, slot) */
goto error_exit;
}
}
SCMutexInit(&parent_ctx->threads->mutex, NULL);
return true;
error_exit:
if (parent_ctx->threads->lf_slots) {
SCFree(parent_ctx->threads->lf_slots);
}
if (parent_ctx->threads->append) {
SCFree(parent_ctx->threads->append);
}
SCFree(parent_ctx->threads);
parent_ctx->threads = NULL;
return false;
}
/** \brief open the indicated file, logging any errors
* \param path filesystem path to open
* \param append_setting open file with O_APPEND: "yes" or "no"
@ -430,10 +519,16 @@ SCConfLogOpenGeneric(ConfNode *conf,
#endif
} else if (strcasecmp(filetype, DEFAULT_LOG_FILETYPE) == 0 ||
strcasecmp(filetype, "file") == 0) {
log_ctx->fp = SCLogOpenFileFp(log_path, append, log_ctx->filemode);
if (log_ctx->fp == NULL)
return -1; // Error already logged by Open...Fp routine
log_ctx->is_regular = 1;
if (!log_ctx->threaded) {
log_ctx->fp = SCLogOpenFileFp(log_path, append, log_ctx->filemode);
if (log_ctx->fp == NULL)
return -1; // Error already logged by Open...Fp routine
} else {
if (!SCLogOpenThreadedFileFp(log_path, append, log_ctx, 1)) {
return -1;
}
}
if (rotate) {
OutputRegisterFileRotationFlag(&log_ctx->rotation_flag);
}
@ -514,24 +609,125 @@ int SCConfLogReopen(LogFileCtx *log_ctx)
LogFileCtx *LogFileNewCtx(void)
{
LogFileCtx* lf_ctx;
lf_ctx = (LogFileCtx*)SCMalloc(sizeof(LogFileCtx));
lf_ctx = (LogFileCtx*)SCCalloc(1, sizeof(LogFileCtx));
if (lf_ctx == NULL)
return NULL;
memset(lf_ctx, 0, sizeof(LogFileCtx));
SCMutexInit(&lf_ctx->fp_mutex,NULL);
// Default Write and Close functions
lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
return lf_ctx;
}
/** \brief LogFileEnsureExists() Ensure a log file context for the thread exists
* \param parent_ctx
* \param thread_id
* \retval LogFileCtx * pointer if successful; NULL otherwise
*/
LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id)
{
/* threaded output disabled */
if (!parent_ctx->threaded)
return parent_ctx;
SCLogDebug("Adding reference %d to file ctx %p", thread_id, parent_ctx);
SCMutexLock(&parent_ctx->threads->mutex);
/* are there enough context slots already */
if (thread_id < parent_ctx->threads->slot_count) {
/* has it been opened yet? */
if (!parent_ctx->threads->lf_slots[thread_id]) {
SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx);
LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
}
SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
SCMutexUnlock(&parent_ctx->threads->mutex);
return parent_ctx->threads->lf_slots[thread_id];
}
/* ensure there's a slot for the caller */
int new_size = MAX(parent_ctx->threads->slot_count << 1, thread_id + 1);
SCLogDebug("Increasing slot count; current %d, trying %d",
parent_ctx->threads->slot_count, new_size);
LogFileCtx **new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
if (new_array == NULL) {
/* Try one more time */
SCLogDebug("Unable to increase file context array size to %d; trying %d",
new_size, thread_id + 1);
new_size = thread_id + 1;
new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
}
if (new_array == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size);
SCMutexUnlock(&parent_ctx->threads->mutex);
return NULL;
}
parent_ctx->threads->lf_slots = new_array;
/* initialize newly added slots */
for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
parent_ctx->threads->lf_slots[i] = NULL;
}
parent_ctx->threads->slot_count = new_size;
LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
SCMutexUnlock(&parent_ctx->threads->mutex);
return parent_ctx->threads->lf_slots[thread_id];
}
/** \brief LogFileNewThreadedCtx() Create file context for threaded output
* \param parent_ctx
* \param log_path
* \param append
* \param thread_id
*/
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int thread_id)
{
LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx));
if (!thread) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate thread file context slot %d", thread_id);
return false;
}
*thread = *parent_ctx;
char fname[NAME_MAX];
snprintf(fname, sizeof(fname), "%s.%d", log_path, thread_id);
SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path);
thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
if (thread->fp == NULL) {
goto error;
}
thread->filename = SCStrdup(fname);
if (!thread->filename) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", thread_id);
goto error;
}
thread->threaded = false;
thread->parent = parent_ctx;
thread->id = thread_id;
thread->Write = SCLogFileWriteNoLock;
thread->Close = SCLogFileCloseNoLock;
parent_ctx->threads->lf_slots[thread_id] = thread;
return true;
error:
if (thread->fp) {
thread->Close(thread);
}
if (thread) {
SCFree(thread);
}
parent_ctx->threads->lf_slots[thread_id] = NULL;
return false;
}
/** \brief LogFileFreeCtx() Destroy a LogFileCtx (Close the file and free memory)
* \param motcx pointer to the OutputCtx
* \retval int 1 if succesful, 0 if error
* \param lf_ctx pointer to the OutputCtx
* \retval int 1 if successful, 0 if error
* */
int LogFileFreeCtx(LogFileCtx *lf_ctx)
{
@ -539,14 +735,29 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
SCReturnInt(0);
}
if (lf_ctx->fp != NULL) {
SCMutexLock(&lf_ctx->fp_mutex);
lf_ctx->Close(lf_ctx);
SCMutexUnlock(&lf_ctx->fp_mutex);
if (lf_ctx->threaded) {
SCMutexDestroy(&lf_ctx->threads->mutex);
for(int i = 0; i < lf_ctx->threads->slot_count; i++) {
if (lf_ctx->threads->lf_slots[i]) {
SCFree(lf_ctx->threads->lf_slots[i]->filename);
SCFree(lf_ctx->threads->lf_slots[i]);
}
}
SCFree(lf_ctx->threads->lf_slots);
SCFree(lf_ctx->threads->append);
SCFree(lf_ctx->threads);
} else {
if (lf_ctx->fp != NULL) {
lf_ctx->Close(lf_ctx);
}
if (lf_ctx->parent) {
SCMutexLock(&lf_ctx->parent->threads->mutex);
lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
SCMutexUnlock(&lf_ctx->parent->threads->mutex);
}
SCMutexDestroy(&lf_ctx->fp_mutex);
}
SCMutexDestroy(&lf_ctx->fp_mutex);
if (lf_ctx->prefix != NULL) {
SCFree(lf_ctx->prefix);
lf_ctx->prefix_len = 0;

@ -1,4 +1,4 @@
/* Copyright (C) 2007-2014 Open Information Security Foundation
/* Copyright (C) 2007-2020 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
@ -46,12 +46,20 @@ typedef struct SyslogSetup_ {
int alert_syslog_level;
} SyslogSetup;
struct LogFileCtx_;
typedef struct LogThreadedFileCtx_ {
int slot_count;
SCMutex mutex;
struct LogFileCtx_ **lf_slots;
char *append;
} LogThreadedFileCtx;
/** Global structure for Output Context */
typedef struct LogFileCtx_ {
union {
FILE *fp;
PcieFile *pcie_fp;
LogThreadedFileCtx *threads;
#ifdef HAVE_LIBHIREDIS
void *redis;
#endif
@ -71,6 +79,11 @@ typedef struct LogFileCtx_ {
* record cannot be written to the file in one call */
SCMutex fp_mutex;
/** When threaded, track of the parent and thread id */
bool threaded;
struct LogFileCtx_ *parent;
int id;
/** the type of file */
enum LogFileType type;
@ -144,6 +157,7 @@ LogFileCtx *LogFileNewCtx(void);
int LogFileFreeCtx(LogFileCtx *);
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
int SCConfLogReopen(LogFileCtx *);

Loading…
Cancel
Save