streaming: pass tx_id to logger

This way we can distinguish between various tx' in the logger.
pull/1112/head
Victor Julien 12 years ago
parent cd78705e3a
commit 6493554663

@ -58,7 +58,7 @@ TmEcode LogTcpDataLogThreadDeinit(ThreadVars *, void *);
void LogTcpDataLogExitPrintStats(ThreadVars *, void *); void LogTcpDataLogExitPrintStats(ThreadVars *, void *);
static void LogTcpDataLogDeInitCtx(OutputCtx *); static void LogTcpDataLogDeInitCtx(OutputCtx *);
int LogTcpDataLogger(ThreadVars *tv, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint8_t flags); int LogTcpDataLogger(ThreadVars *tv, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags);
void TmModuleLogTcpDataLogRegister (void) { void TmModuleLogTcpDataLogRegister (void) {
tmm_modules[TMM_LOGTCPDATALOG].name = MODULE_NAME; tmm_modules[TMM_LOGTCPDATALOG].name = MODULE_NAME;
@ -89,7 +89,8 @@ typedef struct LogTcpDataLogThread_ {
MemBuffer *buffer; MemBuffer *buffer;
} LogTcpDataLogThread; } LogTcpDataLogThread;
static int LogTcpDataLoggerDir(ThreadVars *tv, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint8_t flags) static int LogTcpDataLoggerDir(ThreadVars *tv, void *thread_data, const Flow *f,
const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
{ {
SCEnter(); SCEnter();
LogTcpDataLogThread *aft = thread_data; LogTcpDataLogThread *aft = thread_data;
@ -110,10 +111,15 @@ static int LogTcpDataLoggerDir(ThreadVars *tv, void *thread_data, const Flow *f,
} }
char name[PATH_MAX]; char name[PATH_MAX];
snprintf(name, sizeof(name), "%s/%s/%s_%u-%s_%u-%s.data",
char tx[64] = "";
if (flags & OUTPUT_STREAMING_FLAG_TRANSACTION)
snprintf(tx, sizeof(tx), "%"PRIu64, tx_id);
snprintf(name, sizeof(name), "%s/%s/%s_%u-%s_%u-%s-%s.data",
td->log_dir, td->log_dir,
td->type == STREAMING_HTTP_BODIES ? "http" : "tcp", td->type == STREAMING_HTTP_BODIES ? "http" : "tcp",
srcip, f->sp, dstip, f->dp, srcip, f->sp, dstip, f->dp, tx,
flags & OUTPUT_STREAMING_FLAG_TOSERVER ? "ts" : "tc"); flags & OUTPUT_STREAMING_FLAG_TOSERVER ? "ts" : "tc");
FILE *fp = fopen(name, mode); FILE *fp = fopen(name, mode);
@ -127,7 +133,8 @@ static int LogTcpDataLoggerDir(ThreadVars *tv, void *thread_data, const Flow *f,
SCReturnInt(TM_ECODE_OK); SCReturnInt(TM_ECODE_OK);
} }
static int LogTcpDataLoggerFile(ThreadVars *tv, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint8_t flags) static int LogTcpDataLoggerFile(ThreadVars *tv, void *thread_data, const Flow *f,
const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
{ {
SCEnter(); SCEnter();
LogTcpDataLogThread *aft = thread_data; LogTcpDataLogThread *aft = thread_data;
@ -165,16 +172,17 @@ static int LogTcpDataLoggerFile(ThreadVars *tv, void *thread_data, const Flow *f
SCReturnInt(TM_ECODE_OK); SCReturnInt(TM_ECODE_OK);
} }
int LogTcpDataLogger(ThreadVars *tv, void *thread_data, const Flow *f, const uint8_t *data, uint32_t data_len, uint8_t flags) int LogTcpDataLogger(ThreadVars *tv, void *thread_data, const Flow *f,
const uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
{ {
SCEnter(); SCEnter();
LogTcpDataLogThread *aft = thread_data; LogTcpDataLogThread *aft = thread_data;
LogTcpDataFileCtx *td = aft->tcpdatalog_ctx; LogTcpDataFileCtx *td = aft->tcpdatalog_ctx;
if (td->dir == 1) if (td->dir == 1)
LogTcpDataLoggerDir(tv, thread_data, f, data, data_len, flags); LogTcpDataLoggerDir(tv, thread_data, f, data, data_len, tx_id, flags);
if (td->file == 1) if (td->file == 1)
LogTcpDataLoggerFile(tv, thread_data, f, data, data_len, flags); LogTcpDataLoggerFile(tv, thread_data, f, data, data_len, tx_id, flags);
SCReturnInt(TM_ECODE_OK); SCReturnInt(TM_ECODE_OK);
} }

@ -98,7 +98,7 @@ typedef struct StreamerCallbackData_ {
enum OutputStreamingType type; enum OutputStreamingType type;
} StreamerCallbackData; } StreamerCallbackData;
int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t flags) int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint64_t tx_id, uint8_t flags)
{ {
StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata; StreamerCallbackData *streamer_cbdata = (StreamerCallbackData *)cbdata;
BUG_ON(streamer_cbdata == NULL); BUG_ON(streamer_cbdata == NULL);
@ -117,7 +117,7 @@ int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t fl
if (logger->type == streamer_cbdata->type) { if (logger->type == streamer_cbdata->type) {
SCLogDebug("logger %p", logger); SCLogDebug("logger %p", logger);
PACKET_PROFILING_TMM_START(p, logger->module_id); PACKET_PROFILING_TMM_START(p, logger->module_id);
logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, flags); logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, tx_id, flags);
PACKET_PROFILING_TMM_END(p, logger->module_id); PACKET_PROFILING_TMM_END(p, logger->module_id);
} }
@ -143,10 +143,14 @@ int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t fl
int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags) int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
{ {
SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags); SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags);
int logged = 0;
HtpState *s = f->alstate; HtpState *s = f->alstate;
if (s != NULL && s->conn != NULL) { if (s != NULL && s->conn != NULL) {
int tx_progress_done_value_ts =
AppLayerParserGetStateProgressCompletionStatus(IPPROTO_TCP, ALPROTO_HTTP, 0);
int tx_progress_done_value_tc =
AppLayerParserGetStateProgressCompletionStatus(IPPROTO_TCP, ALPROTO_HTTP, 1);
// for each tx // for each tx
uint64_t tx_id = 0; uint64_t tx_id = 0;
uint64_t total_txs = AppLayerParserGetTxCnt(f->proto, f->alproto, f->alstate); uint64_t total_txs = AppLayerParserGetTxCnt(f->proto, f->alproto, f->alstate);
@ -154,6 +158,17 @@ int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
for (tx_id = 0; tx_id < total_txs; tx_id++) { // TODO optimization store log tx for (tx_id = 0; tx_id < total_txs; tx_id++) { // TODO optimization store log tx
htp_tx_t *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, tx_id); htp_tx_t *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, tx_id);
if (tx != NULL) { if (tx != NULL) {
int tx_done = 0;
int tx_logged = 0;
int tx_progress_ts = AppLayerParserGetStateProgress(IPPROTO_TCP, ALPROTO_HTTP, tx, 0);
if (tx_progress_ts >= tx_progress_done_value_ts) {
int tx_progress_tc = AppLayerParserGetStateProgress(IPPROTO_TCP, ALPROTO_HTTP, tx, 1);
if (tx_progress_tc >= tx_progress_done_value_tc) {
tx_done = 1;
}
}
SCLogDebug("tx %p", tx); SCLogDebug("tx %p", tx);
HtpTxUserData *htud = (HtpTxUserData *) htp_tx_get_user_data(tx); HtpTxUserData *htud = (HtpTxUserData *) htp_tx_get_user_data(tx);
if (htud != NULL) { if (htud != NULL) {
@ -166,15 +181,15 @@ int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
if (body == NULL) { if (body == NULL) {
SCLogDebug("no body"); SCLogDebug("no body");
continue; goto next;
} }
if (body->first == NULL) { if (body->first == NULL) {
SCLogDebug("no body chunks"); SCLogDebug("no body chunks");
continue; goto next;
} }
if (body->last->logged == 1) { if (body->last->logged == 1) {
SCLogDebug("all logged already"); SCLogDebug("all logged already");
continue; goto next;
} }
// for each chunk // for each chunk
@ -185,31 +200,35 @@ int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
continue; continue;
} }
uint8_t flags = iflags; uint8_t flags = iflags | OUTPUT_STREAMING_FLAG_TRANSACTION;
if (chunk->stream_offset == 0) if (chunk->stream_offset == 0)
flags |= OUTPUT_STREAMING_FLAG_OPEN; flags |= OUTPUT_STREAMING_FLAG_OPEN;
/* if we need to close and we're at the last segment in the list /* if we need to close and we're at the last segment in the list
* we add the 'close' flag so the logger can close up. */ * we add the 'close' flag so the logger can close up. */
if (close && chunk->next == NULL) if ((tx_done || close) && chunk->next == NULL) {
flags |= OUTPUT_STREAMING_FLAG_CLOSE; flags |= OUTPUT_STREAMING_FLAG_CLOSE;
}
// invoke Streamer // invoke Streamer
Streamer(cbdata, f, chunk->data, (uint32_t)chunk->len, flags); Streamer(cbdata, f, chunk->data, (uint32_t)chunk->len, tx_id, flags);
//PrintRawDataFp(stdout, chunk->data, chunk->len); //PrintRawDataFp(stdout, chunk->data, chunk->len);
chunk->logged = 1; chunk->logged = 1;
logged = 1; tx_logged = 1;
}
next:
/* if we need to close we need to invoke the Streamer for sure. If we
* logged no chunks, we call the Streamer with NULL data so it can
* close up. */
if (tx_logged == 0 && (close||tx_done)) {
Streamer(cbdata, f, NULL, 0, tx_id,
OUTPUT_STREAMING_FLAG_CLOSE|OUTPUT_STREAMING_FLAG_TRANSACTION);
} }
} }
} }
} }
} }
/* if we need to close we need to invoke the Streamer for sure. If we
* logged no chunks, we call the Streamer with NULL data so it can
* close up. */
if (logged == 0 && close) {
Streamer(cbdata, f, NULL, 0, OUTPUT_STREAMING_FLAG_CLOSE);
}
return 0; return 0;
} }
@ -245,7 +264,7 @@ int StreamIterator(Flow *f, TcpStream *stream, int close, void *cbdata, uint8_t
if (close && seg->next == NULL) if (close && seg->next == NULL)
flags |= OUTPUT_STREAMING_FLAG_CLOSE; flags |= OUTPUT_STREAMING_FLAG_CLOSE;
Streamer(cbdata, f, seg->payload, (uint32_t)seg->payload_len, flags); Streamer(cbdata, f, seg->payload, (uint32_t)seg->payload_len, 0, flags);
seg->flags |= SEGMENTTCP_FLAG_LOGAPI_PROCESSED; seg->flags |= SEGMENTTCP_FLAG_LOGAPI_PROCESSED;
@ -259,7 +278,7 @@ int StreamIterator(Flow *f, TcpStream *stream, int close, void *cbdata, uint8_t
* logged no segments, we call the Streamer with NULL data so it can * logged no segments, we call the Streamer with NULL data so it can
* close up. */ * close up. */
if (logged == 0 && close) { if (logged == 0 && close) {
Streamer(cbdata, f, NULL, 0, OUTPUT_STREAMING_FLAG_CLOSE); Streamer(cbdata, f, NULL, 0, 0, OUTPUT_STREAMING_FLAG_CLOSE);
} }
return 0; return 0;

@ -29,10 +29,11 @@
#include "decode.h" #include "decode.h"
#include "util-file.h" #include "util-file.h"
#define OUTPUT_STREAMING_FLAG_OPEN 0x01 #define OUTPUT_STREAMING_FLAG_OPEN 0x01
#define OUTPUT_STREAMING_FLAG_CLOSE 0x02 #define OUTPUT_STREAMING_FLAG_CLOSE 0x02
#define OUTPUT_STREAMING_FLAG_TOSERVER 0x04 #define OUTPUT_STREAMING_FLAG_TOSERVER 0x04
#define OUTPUT_STREAMING_FLAG_TOCLIENT 0x08 #define OUTPUT_STREAMING_FLAG_TOCLIENT 0x08
#define OUTPUT_STREAMING_FLAG_TRANSACTION 0x10
enum OutputStreamingType { enum OutputStreamingType {
STREAMING_TCP_DATA, STREAMING_TCP_DATA,
@ -41,7 +42,8 @@ enum OutputStreamingType {
/** filedata logger function pointer type */ /** filedata logger function pointer type */
typedef int (*StreamingLogger)(ThreadVars *, void *thread_data, typedef int (*StreamingLogger)(ThreadVars *, void *thread_data,
const Flow *f, const uint8_t *data, uint32_t data_len, uint8_t flags); const Flow *f, const uint8_t *data, uint32_t data_len,
uint64_t tx_id, uint8_t flags);
int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc, OutputCtx *, int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc, OutputCtx *,
enum OutputStreamingType); enum OutputStreamingType);

Loading…
Cancel
Save