source-pcap-file: Directory mode may miss files (bug #2394)

https://redmine.openinfosecfoundation.org/issues/2394

Certain parameters of delay and poll interval could cause newly added
files in a directory to be missed. Cleaned up how time is handled for
files in a directory and fix which time is used for future directory
traversals. Add a mutex to make sure processing time is not optimized
away.
pull/3263/head
Danny Browning 8 years ago committed by Victor Julien
parent cd98d7ddcc
commit 4b897c9060

@ -203,12 +203,13 @@ class SuricataSC:
tenant = None tenant = None
if len(parts) > 3: if len(parts) > 3:
tenant = parts[3] tenant = parts[3]
if cmd != "pcap-file-continuous": if cmd != "pcap-file":
raise SuricataCommandException("Invalid command '%s'" % (command)) raise SuricataCommandException("Invalid command '%s'" % (command))
else: else:
arguments = {} arguments = {}
arguments["filename"] = filename arguments["filename"] = filename
arguments["output-dir"] = output arguments["output-dir"] = output
arguments["continuous"] = True
if tenant != None: if tenant != None:
arguments["tenant"] = int(tenant) arguments["tenant"] = int(tenant)
elif "iface-stat" in command: elif "iface-stat" in command:

@ -34,6 +34,7 @@
#include "flow-timeout.h" #include "flow-timeout.h"
#include "stream-tcp.h" #include "stream-tcp.h"
#include "stream-tcp-reassemble.h" #include "stream-tcp-reassemble.h"
#include "source-pcap-file-directory-helper.h"
#include "host.h" #include "host.h"
#include "defrag.h" #include "defrag.h"
#include "defrag-hash.h" #include "defrag-hash.h"
@ -132,6 +133,7 @@ static int unix_manager_pcap_task_running = 0;
static int unix_manager_pcap_task_failed = 0; static int unix_manager_pcap_task_failed = 0;
static int unix_manager_pcap_task_interrupted = 0; static int unix_manager_pcap_task_interrupted = 0;
static struct timespec unix_manager_pcap_last_processed; static struct timespec unix_manager_pcap_last_processed;
static SCCtrlMutex unix_manager_pcap_last_processed_mutex;
/** /**
* \brief return list of files in the queue * \brief return list of files in the queue
@ -186,7 +188,7 @@ static TmEcode UnixSocketPcapCurrent(json_t *cmd, json_t* answer, void *data)
{ {
PcapCommand *this = (PcapCommand *) data; PcapCommand *this = (PcapCommand *) data;
if (this->current_file && this->current_file->filename) { if (this->current_file != NULL && this->current_file->filename != NULL) {
json_object_set_new(answer, "message", json_object_set_new(answer, "message",
json_string(this->current_file->filename)); json_string(this->current_file->filename));
} else { } else {
@ -197,8 +199,11 @@ static TmEcode UnixSocketPcapCurrent(json_t *cmd, json_t* answer, void *data)
static TmEcode UnixSocketPcapLastProcessed(json_t *cmd, json_t *answer, void *data) static TmEcode UnixSocketPcapLastProcessed(json_t *cmd, json_t *answer, void *data)
{ {
uint64_t epoch_millis = unix_manager_pcap_last_processed.tv_sec * 1000l + json_int_t epoch_millis;
unix_manager_pcap_last_processed.tv_nsec / 100000l; SCCtrlMutexLock(&unix_manager_pcap_last_processed_mutex);
epoch_millis = SCTimespecAsEpochMillis(&unix_manager_pcap_last_processed);
SCCtrlMutexUnlock(&unix_manager_pcap_last_processed_mutex);
json_object_set_new(answer, "message", json_object_set_new(answer, "message",
json_integer(epoch_millis)); json_integer(epoch_millis));
@ -422,7 +427,7 @@ static TmEcode UnixSocketAddPcapFile(json_t *cmd, json_t* answer, void *data)
} }
/** /**
* \brief Command to add a file to treatment list * \brief Command to add a file to treatment list, forcing continuous mode
* *
* \param cmd the content of command Arguments as a json_t object * \param cmd the content of command Arguments as a json_t object
* \param answer the json_t object that has to be used to answer * \param answer the json_t object that has to be used to answer
@ -467,6 +472,7 @@ static TmEcode UnixSocketPcapFilesCheck(void *data)
} }
this->current_file = NULL; this->current_file = NULL;
} }
if (TAILQ_EMPTY(&this->files)) { if (TAILQ_EMPTY(&this->files)) {
// nothing to do // nothing to do
return TM_ECODE_OK; return TM_ECODE_OK;
@ -573,8 +579,10 @@ void RunModeUnixSocketRegister(void)
TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed) TmEcode UnixSocketPcapFile(TmEcode tm, struct timespec *last_processed)
{ {
#ifdef BUILD_UNIX_SOCKET #ifdef BUILD_UNIX_SOCKET
SCCtrlMutexLock(&unix_manager_pcap_last_processed_mutex);
unix_manager_pcap_last_processed.tv_sec = last_processed->tv_sec; unix_manager_pcap_last_processed.tv_sec = last_processed->tv_sec;
unix_manager_pcap_last_processed.tv_nsec = last_processed->tv_nsec; unix_manager_pcap_last_processed.tv_nsec = last_processed->tv_nsec;
SCCtrlMutexUnlock(&unix_manager_pcap_last_processed_mutex);
switch (tm) { switch (tm) {
case TM_ECODE_DONE: case TM_ECODE_DONE:
SCLogInfo("Marking current task as done"); SCLogInfo("Marking current task as done");
@ -1399,6 +1407,10 @@ static int RunModeUnixSocketMaster(void)
pcapcmd->running = 0; pcapcmd->running = 0;
pcapcmd->current_file = NULL; pcapcmd->current_file = NULL;
memset(&unix_manager_pcap_last_processed, 0, sizeof(struct timespec));
SCCtrlMutexInit(&unix_manager_pcap_last_processed_mutex, NULL);
UnixManagerRegisterCommand("pcap-file", UnixSocketAddPcapFile, pcapcmd, UNIX_CMD_TAKE_ARGS); UnixManagerRegisterCommand("pcap-file", UnixSocketAddPcapFile, pcapcmd, UNIX_CMD_TAKE_ARGS);
UnixManagerRegisterCommand("pcap-file-continuous", UnixSocketAddPcapFileContinuous, pcapcmd, UNIX_CMD_TAKE_ARGS); UnixManagerRegisterCommand("pcap-file-continuous", UnixSocketAddPcapFileContinuous, pcapcmd, UNIX_CMD_TAKE_ARGS);
UnixManagerRegisterCommand("pcap-file-number", UnixSocketPcapFilesNumber, pcapcmd, 0); UnixManagerRegisterCommand("pcap-file-number", UnixSocketPcapFilesNumber, pcapcmd, 0);

@ -35,13 +35,10 @@ static TmEcode PcapDirectoryFailure(PcapFileDirectoryVars *ptv);
static TmEcode PcapDirectoryDone(PcapFileDirectoryVars *ptv); static TmEcode PcapDirectoryDone(PcapFileDirectoryVars *ptv);
static int PcapDirectoryGetModifiedTime(char const * file, struct timespec * out); static int PcapDirectoryGetModifiedTime(char const * file, struct timespec * out);
static TmEcode PcapDirectoryInsertFile(PcapFileDirectoryVars *pv, static TmEcode PcapDirectoryInsertFile(PcapFileDirectoryVars *pv,
PendingFile *file_to_add, PendingFile *file_to_add);
struct timespec *file_to_add_modified_time);
static TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *ptv, static TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *ptv,
struct timespec * newer_than,
struct timespec * older_than); struct timespec * older_than);
static TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv, static TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv,
struct timespec *newer_than,
struct timespec *older_than); struct timespec *older_than);
void GetTime(struct timespec *tm) void GetTime(struct timespec *tm)
@ -49,7 +46,7 @@ void GetTime(struct timespec *tm)
struct timeval now; struct timeval now;
if(gettimeofday(&now, NULL) == 0) { if(gettimeofday(&now, NULL) == 0) {
tm->tv_sec = now.tv_sec; tm->tv_sec = now.tv_sec;
tm->tv_nsec = now.tv_usec * 1000; tm->tv_nsec = now.tv_usec * 1000L;
} }
} }
@ -248,22 +245,20 @@ int PcapDirectoryGetModifiedTime(char const *file, struct timespec *out)
#endif #endif
#ifdef OS_DARWIN #ifdef OS_DARWIN
*out = buf.st_mtimespec; out->tv_sec = buf.st_mtimespec.tv_sec;
out->tv_nsec = buf.st_mtimespec.tv_nsec;
#elif OS_WIN32 #elif OS_WIN32
struct timespec ts; out->tv_sec = buf.st_mtime;
memset(&ts, 0, sizeof(ts));
ts.tv_sec = buf.st_mtime;
*out = ts;
#else #else
*out = buf.st_mtim; out->tv_sec = buf.st_mtim.tv_sec;
out->tv_nsec = buf.st_mtim.tv_nsec;
#endif #endif
return ret; return ret;
} }
TmEcode PcapDirectoryInsertFile(PcapFileDirectoryVars *pv, TmEcode PcapDirectoryInsertFile(PcapFileDirectoryVars *pv,
PendingFile *file_to_add, PendingFile *file_to_add
struct timespec *file_to_add_modified_time
) { ) {
PendingFile *file_to_compare = NULL; PendingFile *file_to_compare = NULL;
PendingFile *next_file_to_compare = NULL; PendingFile *next_file_to_compare = NULL;
@ -290,14 +285,7 @@ TmEcode PcapDirectoryInsertFile(PcapFileDirectoryVars *pv,
} else { } else {
file_to_compare = TAILQ_FIRST(&pv->directory_content); file_to_compare = TAILQ_FIRST(&pv->directory_content);
while(file_to_compare != NULL) { while(file_to_compare != NULL) {
struct timespec modified_time; if (CompareTimes(&file_to_add->modified_time, &file_to_compare->modified_time) < 0) {
memset(&modified_time, 0, sizeof(struct timespec));
if (PcapDirectoryGetModifiedTime(file_to_compare->filename,
&modified_time) == TM_ECODE_FAILED) {
SCReturnInt(TM_ECODE_FAILED);
}
if (CompareTimes(file_to_add_modified_time, &modified_time) < 0) {
TAILQ_INSERT_BEFORE(file_to_compare, file_to_add, next); TAILQ_INSERT_BEFORE(file_to_compare, file_to_add, next);
file_to_compare = NULL; file_to_compare = NULL;
} else { } else {
@ -315,7 +303,6 @@ TmEcode PcapDirectoryInsertFile(PcapFileDirectoryVars *pv,
} }
TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *pv, TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *pv,
struct timespec *newer_than,
struct timespec *older_than struct timespec *older_than
) { ) {
if (unlikely(pv == NULL)) { if (unlikely(pv == NULL)) {
@ -355,11 +342,14 @@ TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *pv,
memset(&temp_time, 0, sizeof(struct timespec)); memset(&temp_time, 0, sizeof(struct timespec));
if (PcapDirectoryGetModifiedTime(pathbuff, &temp_time) == 0) { if (PcapDirectoryGetModifiedTime(pathbuff, &temp_time) == 0) {
SCLogDebug("File %s time (%lu > %lu < %lu)", pathbuff, SCLogDebug("%" PRIuMAX " < %" PRIuMAX "(%s) < %" PRIuMAX ")",
newer_than->tv_sec, temp_time.tv_sec, older_than->tv_sec); (uintmax_t)SCTimespecAsEpochMillis(&pv->shared->last_processed),
(uintmax_t)SCTimespecAsEpochMillis(&temp_time),
pathbuff,
(uintmax_t)SCTimespecAsEpochMillis(older_than));
// Skip files outside of our time range // Skip files outside of our time range
if (CompareTimes(&temp_time, newer_than) < 0) { if (CompareTimes(&temp_time, &pv->shared->last_processed) <= 0) {
SCLogDebug("Skipping old file %s", pathbuff); SCLogDebug("Skipping old file %s", pathbuff);
continue; continue;
} }
@ -387,9 +377,13 @@ TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *pv,
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
SCLogDebug("Found \"%s\"", file_to_add->filename); memset(&file_to_add->modified_time, 0, sizeof(struct timespec));
CopyTime(&temp_time, &file_to_add->modified_time);
if (PcapDirectoryInsertFile(pv, file_to_add, &temp_time) == TM_ECODE_FAILED) { SCLogInfo("Found \"%s\" at %" PRIuMAX, file_to_add->filename,
(uintmax_t)SCTimespecAsEpochMillis(&file_to_add->modified_time));
if (PcapDirectoryInsertFile(pv, file_to_add) == TM_ECODE_FAILED) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "Failed to add file"); SCLogError(SC_ERR_INVALID_ARGUMENT, "Failed to add file");
CleanupPendingFile(file_to_add); CleanupPendingFile(file_to_add);
@ -403,10 +397,9 @@ TmEcode PcapDirectoryPopulateBuffer(PcapFileDirectoryVars *pv,
TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv, TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv,
struct timespec *newer_than,
struct timespec *older_than) struct timespec *older_than)
{ {
if (PcapDirectoryPopulateBuffer(pv, newer_than, older_than) == TM_ECODE_FAILED) { if (PcapDirectoryPopulateBuffer(pv, older_than) == TM_ECODE_FAILED) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "Failed to populate directory buffer"); SCLogError(SC_ERR_INVALID_ARGUMENT, "Failed to populate directory buffer");
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
} }
@ -414,7 +407,7 @@ TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv,
TmEcode status = TM_ECODE_OK; TmEcode status = TM_ECODE_OK;
if (TAILQ_EMPTY(&pv->directory_content)) { if (TAILQ_EMPTY(&pv->directory_content)) {
SCLogInfo("Directory %s has no files to process", pv->filename); SCLogDebug("Directory %s has no files to process", pv->filename);
GetTime(older_than); GetTime(older_than);
older_than->tv_sec = older_than->tv_sec - pv->delay; older_than->tv_sec = older_than->tv_sec - pv->delay;
rewinddir(pv->directory); rewinddir(pv->directory);
@ -422,6 +415,9 @@ TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv,
} else { } else {
PendingFile *current_file = NULL; PendingFile *current_file = NULL;
struct timespec last_time_seen;
memset(&last_time_seen, 0, sizeof(struct timespec));
while (status == TM_ECODE_OK && !TAILQ_EMPTY(&pv->directory_content)) { while (status == TM_ECODE_OK && !TAILQ_EMPTY(&pv->directory_content)) {
current_file = TAILQ_FIRST(&pv->directory_content); current_file = TAILQ_FIRST(&pv->directory_content);
TAILQ_REMOVE(&pv->directory_content, current_file, next); TAILQ_REMOVE(&pv->directory_content, current_file, next);
@ -467,16 +463,13 @@ TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv,
SCReturnInt(status); SCReturnInt(status);
} }
struct timespec temp_time; SCLogInfo("Processed file %s, processed up to %" PRIuMAX,
memset(&temp_time, 0, sizeof(struct timespec)); current_file->filename,
(uintmax_t)SCTimespecAsEpochMillis(&current_file->modified_time));
if (PcapDirectoryGetModifiedTime(current_file->filename, if(CompareTimes(&current_file->modified_time, &last_time_seen) > 0) {
&temp_time) != 0) { CopyTime(&current_file->modified_time, &last_time_seen);
CopyTime(&temp_time, newer_than);
} }
SCLogDebug("Processed file %s, processed up to %ld",
current_file->filename, temp_time.tv_sec);
CopyTime(&temp_time, &pv->shared->last_processed);
CleanupPendingFile(current_file); CleanupPendingFile(current_file);
pv->current_file = NULL; pv->current_file = NULL;
@ -486,7 +479,12 @@ TmEcode PcapDirectoryDispatchForTimeRange(PcapFileDirectoryVars *pv,
} }
} }
*newer_than = *older_than; if(CompareTimes(&last_time_seen, &pv->shared->last_processed) > 0) {
SCLogInfo("Updating processed to %" PRIuMAX,
(uintmax_t)SCTimespecAsEpochMillis(&last_time_seen));
CopyTime(&last_time_seen, &pv->shared->last_processed);
status = PcapRunStatus(pv);
}
} }
GetTime(older_than); GetTime(older_than);
older_than->tv_sec = older_than->tv_sec - pv->delay; older_than->tv_sec = older_than->tv_sec - pv->delay;
@ -500,8 +498,6 @@ TmEcode PcapDirectoryDispatch(PcapFileDirectoryVars *ptv)
DIR *directory_check = NULL; DIR *directory_check = NULL;
struct timespec newer_than;
memset(&newer_than, 0, sizeof(struct timespec));
struct timespec older_than; struct timespec older_than;
memset(&older_than, 0, sizeof(struct timespec)); memset(&older_than, 0, sizeof(struct timespec));
older_than.tv_sec = LONG_MAX; older_than.tv_sec = LONG_MAX;
@ -516,8 +512,9 @@ TmEcode PcapDirectoryDispatch(PcapFileDirectoryVars *ptv)
while (status == TM_ECODE_OK) { while (status == TM_ECODE_OK) {
//loop while directory is ok //loop while directory is ok
SCLogInfo("Processing pcaps directory %s, files must be newer than %" PRIuMAX " and older than %" PRIuMAX, SCLogInfo("Processing pcaps directory %s, files must be newer than %" PRIuMAX " and older than %" PRIuMAX,
ptv->filename, (uintmax_t)newer_than.tv_sec, (uintmax_t)older_than.tv_sec); ptv->filename, (uintmax_t)SCTimespecAsEpochMillis(&ptv->shared->last_processed),
status = PcapDirectoryDispatchForTimeRange(ptv, &newer_than, &older_than); (uintmax_t)SCTimespecAsEpochMillis(&older_than));
status = PcapDirectoryDispatchForTimeRange(ptv, &older_than);
if (ptv->should_loop && status == TM_ECODE_OK) { if (ptv->should_loop && status == TM_ECODE_OK) {
sleep(poll_seconds); sleep(poll_seconds);
//update our status based on suricata control flags or unix command socket //update our status based on suricata control flags or unix command socket

@ -31,6 +31,7 @@
typedef struct PendingFile_ typedef struct PendingFile_
{ {
char *filename; char *filename;
struct timespec modified_time;
TAILQ_ENTRY(PendingFile_) next; TAILQ_ENTRY(PendingFile_) next;
} PendingFile; } PendingFile;
/** /**

@ -193,6 +193,7 @@ TmEcode ReceivePcapFileThreadInit(ThreadVars *tv, const void *initdata, void **d
if (unlikely(ptv == NULL)) if (unlikely(ptv == NULL))
SCReturnInt(TM_ECODE_FAILED); SCReturnInt(TM_ECODE_FAILED);
memset(ptv, 0, sizeof(PcapFileThreadVars)); memset(ptv, 0, sizeof(PcapFileThreadVars));
memset(&ptv->shared.last_processed, 0, sizeof(struct timespec));
intmax_t tenant = 0; intmax_t tenant = 0;
if (ConfGetInt("pcap-file.tenant-id", &tenant) == 1) { if (ConfGetInt("pcap-file.tenant-id", &tenant) == 1) {

@ -2264,6 +2264,7 @@ void PostRunDeinit(const int runmode, struct timeval *start_time)
HostCleanup(); HostCleanup();
StreamTcpFreeConfig(STREAM_VERBOSE); StreamTcpFreeConfig(STREAM_VERBOSE);
DefragDestroy(); DefragDestroy();
TmqResetQueues(); TmqResetQueues();
#ifdef PROFILING #ifdef PROFILING
if (profiling_rules_enabled) if (profiling_rules_enabled)

@ -63,5 +63,6 @@ int SCTimeToStringPattern (time_t epoch, const char *pattern, char *str,
uint64_t SCParseTimeSizeString (const char *str); uint64_t SCParseTimeSizeString (const char *str);
uint64_t SCGetSecondsUntil (const char *str, time_t epoch); uint64_t SCGetSecondsUntil (const char *str, time_t epoch);
uint64_t SCTimespecAsEpochMillis(const struct timespec *ts); uint64_t SCTimespecAsEpochMillis(const struct timespec *ts);
#endif /* __UTIL_TIME_H__ */ #endif /* __UTIL_TIME_H__ */

Loading…
Cancel
Save