streaming logger: support Http Body logging

Add an argument to the registration to indicate which iterator
needs to be used: Stream or HttpBody

Add HttpBody Iterator, calling the logger(s) for each Http body chunk.
pull/1109/head
Victor Julien 12 years ago
parent ab6fac884d
commit bac6c3ab02

@ -92,6 +92,7 @@ int HtpBodyAppendChunk(HtpTxUserData *htud, HtpBody *body, uint8_t *data, uint32
bd->len = len;
bd->stream_offset = 0;
bd->next = NULL;
bd->logged = 0;
bd->data = HTPMalloc(len);
if (bd->data == NULL) {
@ -110,6 +111,7 @@ int HtpBodyAppendChunk(HtpTxUserData *htud, HtpBody *body, uint8_t *data, uint32
bd->len = len;
bd->stream_offset = body->content_len_so_far;
bd->next = NULL;
bd->logged = 0;
bd->data = HTPMalloc(len);
if (bd->data == NULL) {

@ -161,6 +161,7 @@ struct HtpBodyChunk_ {
struct HtpBodyChunk_ *next; /**< Pointer to the next chunk */
uint64_t stream_offset;
uint32_t len; /**< Length of the chunk */
int logged;
} __attribute__((__packed__));
typedef struct HtpBodyChunk_ HtpBodyChunk;

@ -20,7 +20,7 @@
*
* \author Victor Julien <victor@inliniac.net>
*
* Logger for stremaing data
* Logger for streaming data
*/
#include "suricata-common.h"
@ -28,6 +28,8 @@
#include "output-streaming.h"
#include "app-layer.h"
#include "app-layer-parser.h"
#include "app-layer-htp.h"
#include "util-print.h"
#include "conf.h"
#include "util-profiling.h"
@ -40,6 +42,7 @@ typedef struct OutputLoggerThreadStore_ {
* data for the packet loggers. */
typedef struct OutputLoggerThreadData_ {
OutputLoggerThreadStore *store;
uint32_t loggers;
} OutputLoggerThreadData;
/* logger instance, a module + a output ctx,
@ -51,11 +54,13 @@ typedef struct OutputStreamingLogger_ {
struct OutputStreamingLogger_ *next;
const char *name;
TmmId module_id;
enum OutputStreamingType type;
} OutputStreamingLogger;
static OutputStreamingLogger *list = NULL;
int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc, OutputCtx *output_ctx)
int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc,
OutputCtx *output_ctx, enum OutputStreamingType type )
{
int module_id = TmModuleGetIdByName(name);
if (module_id < 0)
@ -70,6 +75,7 @@ int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc, Out
op->output_ctx = output_ctx;
op->name = name;
op->module_id = (TmmId) module_id;
op->type = type;
if (list == NULL)
list = op;
@ -89,6 +95,7 @@ typedef struct StreamerCallbackData_ {
OutputLoggerThreadStore *store;
ThreadVars *tv;
Packet *p;
enum OutputStreamingType type;
} StreamerCallbackData;
int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t flags)
@ -107,10 +114,12 @@ int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t fl
while (logger && store) {
BUG_ON(logger->LogFunc == NULL);
SCLogDebug("logger %p", logger);
PACKET_PROFILING_TMM_START(p, logger->module_id);
logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, flags);
PACKET_PROFILING_TMM_END(p, logger->module_id);
if (logger->type == streamer_cbdata->type) {
SCLogDebug("logger %p", logger);
PACKET_PROFILING_TMM_START(p, logger->module_id);
logger->LogFunc(tv, store->thread_data, (const Flow *)f, data, data_len, flags);
PACKET_PROFILING_TMM_END(p, logger->module_id);
}
logger = logger->next;
store = store->next;
@ -122,8 +131,92 @@ int Streamer(void *cbdata, Flow *f, uint8_t *data, uint32_t data_len, uint8_t fl
return 0;
}
/** \brief Http Body Iterator for logging
*
* Global logic:
*
* - For each tx
* - For each body chunk
* - Invoke Streamer
*/
int HttpBodyIterator(Flow *f, int close, void *cbdata, uint8_t iflags)
{
SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags);
int logged = 0;
HtpState *s = f->alstate;
if (s != NULL && s->conn != NULL) {
// for each tx
uint64_t tx_id = 0;
uint64_t total_txs = AppLayerParserGetTxCnt(f->proto, f->alproto, f->alstate);
SCLogDebug("s->conn %p", s->conn);
for (tx_id = 0; tx_id < total_txs; tx_id++) { // TODO optimization store log tx
htp_tx_t *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, tx_id);
if (tx != NULL) {
SCLogDebug("tx %p", tx);
HtpTxUserData *htud = (HtpTxUserData *) htp_tx_get_user_data(tx);
if (htud != NULL) {
SCLogDebug("htud %p", htud);
HtpBody *body = NULL;
if (iflags & OUTPUT_STREAMING_FLAG_TOCLIENT)
body = &htud->request_body;
else if (iflags & OUTPUT_STREAMING_FLAG_TOSERVER)
body = &htud->response_body;
if (body == NULL) {
SCLogDebug("no body");
continue;
}
if (body->first == NULL) {
SCLogDebug("no body chunks");
continue;
}
if (body->last->logged == 1) {
SCLogDebug("all logged already");
continue;
}
// for each chunk
HtpBodyChunk *chunk = body->first;
for ( ; chunk != NULL; chunk = chunk->next) {
if (chunk->logged) {
SCLogDebug("logged %d", chunk->logged);
continue;
}
uint8_t flags = iflags;
if (chunk->stream_offset == 0)
flags |= OUTPUT_STREAMING_FLAG_OPEN;
/* if we need to close and we're at the last segment in the list
* we add the 'close' flag so the logger can close up. */
if (close && chunk->next == NULL)
flags |= OUTPUT_STREAMING_FLAG_CLOSE;
// invoke Streamer
Streamer(cbdata, f, chunk->data, (uint32_t)chunk->len, flags);
//PrintRawDataFp(stdout, chunk->data, chunk->len);
chunk->logged = 1;
logged = 1;
}
}
}
}
}
/* if we need to close we need to invoke the Streamer for sure. If we
* logged no chunks, we call the Streamer with NULL data so it can
* close up. */
if (logged == 0 && close) {
Streamer(cbdata, f, NULL, 0, OUTPUT_STREAMING_FLAG_CLOSE);
}
return 0;
}
int StreamIterator(Flow *f, TcpStream *stream, int close, void *cbdata, uint8_t iflags)
{
SCLogDebug("called with %p, %d, %p, %02x", f, close, cbdata, iflags);
int logged = 0;
/* optimization: don't iterate list if we've logged all,
@ -181,7 +274,7 @@ static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data,
OutputStreamingLogger *logger = list;
OutputLoggerThreadStore *store = op_thread_data->store;
StreamerCallbackData streamer_cbdata = { logger, store, tv, p };
StreamerCallbackData streamer_cbdata = { logger, store, tv, p , 0};
BUG_ON(logger == NULL && store != NULL);
BUG_ON(logger != NULL && store == NULL);
@ -202,16 +295,33 @@ static TmEcode OutputStreamingLog(ThreadVars *tv, Packet *p, void *thread_data,
FLOWLOCK_WRLOCK(f);
TcpSession *ssn = f->protoctx;
if (ssn) {
int close = (ssn->state >= TCP_CLOSED);
close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
SCLogDebug("close ? %s", close ? "yes" : "no");
if (op_thread_data->loggers & (1<<STREAMING_TCP_DATA)) {
TcpSession *ssn = f->protoctx;
if (ssn) {
int close = (ssn->state >= TCP_CLOSED);
close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
SCLogDebug("close ? %s", close ? "yes" : "no");
TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
TcpStream *stream = flags & OUTPUT_STREAMING_FLAG_TOSERVER ? &ssn->client : &ssn->server;
StreamIterator(p->flow, stream, close, (void *)&streamer_cbdata, flags);
streamer_cbdata.type = STREAMING_TCP_DATA;
StreamIterator(p->flow, stream, close, (void *)&streamer_cbdata, flags);
}
}
if (op_thread_data->loggers & (1<<STREAMING_HTTP_BODIES)) {
if (f->alproto == ALPROTO_HTTP && f->alstate != NULL) {
int close = 0;
TcpSession *ssn = f->protoctx;
if (ssn) {
close = (ssn->state >= TCP_CLOSED);
close |= ((p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0);
}
SCLogDebug("close ? %s", close ? "yes" : "no");
streamer_cbdata.type = STREAMING_HTTP_BODIES;
HttpBodyIterator(f, close, (void *)&streamer_cbdata, flags);
}
}
FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}
@ -257,10 +367,12 @@ static TmEcode OutputStreamingLogThreadInit(ThreadVars *tv, void *initdata, void
tmp->next = ts;
}
SCLogDebug("%s is now set up", logger->name);
SCLogInfo("%s is now set up", logger->name);
}
}
td->loggers |= (1<<logger->type);
logger = logger->next;
}

@ -34,11 +34,17 @@
#define OUTPUT_STREAMING_FLAG_TOSERVER 0x04
#define OUTPUT_STREAMING_FLAG_TOCLIENT 0x08
enum OutputStreamingType {
STREAMING_TCP_DATA,
STREAMING_HTTP_BODIES,
};
/** filedata logger function pointer type */
typedef int (*StreamingLogger)(ThreadVars *, void *thread_data,
const Flow *f, const uint8_t *data, uint32_t data_len, uint8_t flags);
int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc, OutputCtx *);
int OutputRegisterStreamingLogger(const char *name, StreamingLogger LogFunc, OutputCtx *,
enum OutputStreamingType);
void TmModuleStreamingLoggerRegister (void);

@ -434,7 +434,8 @@ error:
*/
void
OutputRegisterStreamingModule(const char *name, const char *conf_name,
OutputCtx *(*InitFunc)(ConfNode *), StreamingLogger StreamingLogFunc)
OutputCtx *(*InitFunc)(ConfNode *), StreamingLogger StreamingLogFunc,
enum OutputStreamingType stream_type)
{
if (unlikely(StreamingLogFunc == NULL)) {
goto error;
@ -449,6 +450,7 @@ OutputRegisterStreamingModule(const char *name, const char *conf_name,
module->conf_name = conf_name;
module->InitFunc = InitFunc;
module->StreamingLogFunc = StreamingLogFunc;
module->stream_type = stream_type;
TAILQ_INSERT_TAIL(&output_modules, module, entries);
SCLogDebug("Streaming logger \"%s\" registered.", name);
@ -469,7 +471,7 @@ error:
void
OutputRegisterStreamingSubModule(const char *parent_name, const char *name,
const char *conf_name, OutputCtx *(*InitFunc)(ConfNode *, OutputCtx *),
StreamingLogger StreamingLogFunc)
StreamingLogger StreamingLogFunc, enum OutputStreamingType stream_type)
{
if (unlikely(StreamingLogFunc == NULL)) {
goto error;
@ -485,6 +487,7 @@ OutputRegisterStreamingSubModule(const char *parent_name, const char *name,
module->parent_name = parent_name;
module->InitSubFunc = InitFunc;
module->StreamingLogFunc = StreamingLogFunc;
module->stream_type = stream_type;
TAILQ_INSERT_TAIL(&output_modules, module, entries);
SCLogDebug("Streaming logger \"%s\" registered.", name);

@ -52,6 +52,7 @@ typedef struct OutputModule_ {
FlowLogger FlowLogFunc;
StreamingLogger StreamingLogFunc;
AppProto alproto;
enum OutputStreamingType stream_type;
TAILQ_ENTRY(OutputModule_) entries;
} OutputModule;
@ -91,10 +92,11 @@ void OutputRegisterFlowSubModule(const char *parent_name, const char *name,
FlowLogger FlowLogFunc);
void OutputRegisterStreamingModule(const char *name, const char *conf_name,
OutputCtx *(*InitFunc)(ConfNode *), StreamingLogger StreamingLogFunc);
OutputCtx *(*InitFunc)(ConfNode *), StreamingLogger StreamingLogFunc,
enum OutputStreamingType stream_type);
void OutputRegisterStreamingSubModule(const char *parent_name, const char *name,
const char *conf_name, OutputCtx *(*InitFunc)(ConfNode *, OutputCtx *),
StreamingLogger StreamingLogFunc);
StreamingLogger StreamingLogFunc, enum OutputStreamingType stream_type);
OutputModule *OutputGetModuleByConfName(const char *name);
void OutputDeregisterAll(void);

@ -588,7 +588,8 @@ static void SetupOutput(const char *name, OutputModule *module, OutputCtx *outpu
}
} else if (module->StreamingLogFunc) {
SCLogDebug("%s is a streaming logger", module->name);
OutputRegisterStreamingLogger(module->name, module->StreamingLogFunc, output_ctx);
OutputRegisterStreamingLogger(module->name, module->StreamingLogFunc,
output_ctx, module->stream_type);
/* need one instance of the streaming logger module */
if (streaming_logger_module == NULL) {

Loading…
Cancel
Save