eve: fix mishandling of big messages

When the string representation of a JSON message grew bigger than
64k, the JSON record would just be truncated. This lead to errors
in the parser(s) of the JSON stream.

This patch changes the buffer logic to grow the buffer on demand.
pull/1856/head
Victor Julien 10 years ago
parent 6c30f7bdbb
commit 8bb1cf08ef

@ -118,7 +118,7 @@ int LogStatsLogger(ThreadVars *tv, void *thread_data, const StatsTable *st)
/* since we can have many threads, the buffer might not be big enough.
* Expand if necessary. */
if (MEMBUFFER_OFFSET(aft->buffer) + len > MEMBUFFER_SIZE(aft->buffer)) {
if (MEMBUFFER_OFFSET(aft->buffer) + len >= MEMBUFFER_SIZE(aft->buffer)) {
MemBufferExpand(&aft->buffer, OUTPUT_BUFFER_SIZE);
}
@ -144,7 +144,7 @@ int LogStatsLogger(ThreadVars *tv, void *thread_data, const StatsTable *st)
/* since we can have many threads, the buffer might not be big enough.
* Expand if necessary. */
if (MEMBUFFER_OFFSET(aft->buffer) + len > MEMBUFFER_SIZE(aft->buffer)) {
if (MEMBUFFER_OFFSET(aft->buffer) + len >= MEMBUFFER_SIZE(aft->buffer)) {
MemBufferExpand(&aft->buffer, OUTPUT_BUFFER_SIZE);
}

@ -362,7 +362,7 @@ static int AlertJson(ThreadVars *tv, JsonAlertLogThread *aft, const Packet *p)
}
}
OutputJSONBuffer(js, aft->file_ctx, aft->json_buffer);
OutputJSONBuffer(js, aft->file_ctx, &aft->json_buffer);
json_object_del(js, "alert");
}
json_object_clear(js);
@ -373,7 +373,6 @@ static int AlertJson(ThreadVars *tv, JsonAlertLogThread *aft, const Packet *p)
static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const Packet *p)
{
MemBuffer *buffer = (MemBuffer *)aft->json_buffer;
int i;
char timebuf[64];
json_t *js;
@ -384,7 +383,7 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const
CreateIsoTimeString(&p->ts, timebuf, sizeof(timebuf));
for (i = 0; i < p->alerts.cnt; i++) {
MemBufferReset(buffer);
MemBufferReset(aft->json_buffer);
const PacketAlert *pa = &p->alerts.alerts[i];
if (unlikely(pa->s == NULL)) {
@ -436,7 +435,7 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const
/* alert */
json_object_set_new(js, "alert", ajs);
OutputJSONBuffer(js, aft->file_ctx, buffer);
OutputJSONBuffer(js, aft->file_ctx, &aft->json_buffer);
json_object_clear(js);
json_decref(js);
}

@ -73,8 +73,6 @@ typedef struct LogDnsLogThread_ {
static void LogQuery(LogDnsLogThread *aft, json_t *js, DNSTransaction *tx,
uint64_t tx_id, DNSQueryEntry *entry)
{
MemBuffer *buffer = (MemBuffer *)aft->buffer;
SCLogDebug("got a DNS request and now logging !!");
json_t *djs = json_object();
@ -83,7 +81,7 @@ static void LogQuery(LogDnsLogThread *aft, json_t *js, DNSTransaction *tx,
}
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
/* type */
json_object_set_new(djs, "type", json_string("query"));
@ -109,13 +107,12 @@ static void LogQuery(LogDnsLogThread *aft, json_t *js, DNSTransaction *tx,
/* dns */
json_object_set_new(js, "dns", djs);
OutputJSONBuffer(js, aft->dnslog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, aft->dnslog_ctx->file_ctx, &aft->buffer);
json_object_del(js, "dns");
}
static void OutputAnswer(LogDnsLogThread *aft, json_t *djs, DNSTransaction *tx, DNSAnswerEntry *entry)
{
MemBuffer *buffer = (MemBuffer *)aft->buffer;
json_t *js = json_object();
if (js == NULL)
return;
@ -179,9 +176,9 @@ static void OutputAnswer(LogDnsLogThread *aft, json_t *djs, DNSTransaction *tx,
}
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
json_object_set_new(djs, "dns", js);
OutputJSONBuffer(djs, aft->dnslog_ctx->file_ctx, buffer);
OutputJSONBuffer(djs, aft->dnslog_ctx->file_ctx, &aft->buffer);
json_object_del(djs, "dns");
return;
@ -189,7 +186,6 @@ static void OutputAnswer(LogDnsLogThread *aft, json_t *djs, DNSTransaction *tx,
static void OutputFailure(LogDnsLogThread *aft, json_t *djs, DNSTransaction *tx, DNSQueryEntry *entry)
{
MemBuffer *buffer = (MemBuffer *)aft->buffer;
json_t *js = json_object();
if (js == NULL)
return;
@ -214,9 +210,9 @@ static void OutputFailure(LogDnsLogThread *aft, json_t *djs, DNSTransaction *tx,
}
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
json_object_set_new(djs, "dns", js);
OutputJSONBuffer(djs, aft->dnslog_ctx->file_ctx, buffer);
OutputJSONBuffer(djs, aft->dnslog_ctx->file_ctx, &aft->buffer);
json_object_del(djs, "dns");
return;

@ -85,7 +85,6 @@ typedef struct JsonDropLogThread_ {
static int DropLogJSON (JsonDropLogThread *aft, const Packet *p)
{
uint16_t proto = 0;
MemBuffer *buffer = (MemBuffer *)aft->buffer;
json_t *js = CreateJSONHeader((Packet *)p, 0, "drop");//TODO const
if (unlikely(js == NULL))
return TM_ECODE_OK;
@ -97,7 +96,7 @@ static int DropLogJSON (JsonDropLogThread *aft, const Packet *p)
}
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
if (PKT_IS_IPV4(p)) {
json_object_set_new(djs, "len", json_integer(IPV4_GET_IPLEN(p)));
@ -168,7 +167,7 @@ static int DropLogJSON (JsonDropLogThread *aft, const Packet *p)
}
}
OutputJSONBuffer(js, aft->drop_ctx->file_ctx, buffer);
OutputJSONBuffer(js, aft->drop_ctx->file_ctx, &aft->buffer);
json_object_del(js, "drop");
json_object_clear(js);
json_decref(js);

@ -83,14 +83,13 @@ typedef struct JsonFileLogThread_ {
*/
static void FileWriteJsonRecord(JsonFileLogThread *aft, const Packet *p, const File *ff)
{
MemBuffer *buffer = (MemBuffer *)aft->buffer;
json_t *js = CreateJSONHeader((Packet *)p, 0, "fileinfo"); //TODO const
json_t *hjs = NULL;
if (unlikely(js == NULL))
return;
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
switch (p->flow->alproto) {
case ALPROTO_HTTP:
@ -158,7 +157,7 @@ static void FileWriteJsonRecord(JsonFileLogThread *aft, const Packet *p, const F
/* originally just 'file', but due to bug 1127 naming it fileinfo */
json_object_set_new(js, "fileinfo", fjs);
OutputJSONBuffer(js, aft->filelog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, aft->filelog_ctx->file_ctx, &aft->buffer);
json_object_del(js, "fileinfo");
switch (p->flow->alproto) {

@ -313,10 +313,9 @@ static int JsonFlowLogger(ThreadVars *tv, void *thread_data, Flow *f)
{
SCEnter();
JsonFlowLogThread *jhl = (JsonFlowLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)jhl->buffer;
/* reset */
MemBufferReset(buffer);
MemBufferReset(jhl->buffer);
json_t *js = CreateJSONHeaderFromFlow(f, "flow"); //TODO const
if (unlikely(js == NULL))
@ -324,7 +323,7 @@ static int JsonFlowLogger(ThreadVars *tv, void *thread_data, Flow *f)
JsonFlowLogJSON(jhl, js, f);
OutputJSONBuffer(js, jhl->flowlog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, jhl->flowlog_ctx->file_ctx, &jhl->buffer);
json_object_del(js, "http");
json_object_clear(js);

@ -371,7 +371,6 @@ static int JsonHttpLogger(ThreadVars *tv, void *thread_data, const Packet *p, Fl
htp_tx_t *tx = txptr;
JsonHttpLogThread *jhl = (JsonHttpLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)jhl->buffer;
json_t *js = CreateJSONHeaderWithTxId((Packet *)p, 1, "http", tx_id); //TODO const
if (unlikely(js == NULL))
@ -380,11 +379,11 @@ static int JsonHttpLogger(ThreadVars *tv, void *thread_data, const Packet *p, Fl
SCLogDebug("got a HTTP request and now logging !!");
/* reset */
MemBufferReset(buffer);
MemBufferReset(jhl->buffer);
JsonHttpLogJSON(jhl, js, tx, tx_id);
OutputJSONBuffer(js, jhl->httplog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, jhl->httplog_ctx->file_ctx, &jhl->buffer);
json_object_del(js, "http");
json_object_clear(js);

@ -289,26 +289,25 @@ static int JsonNetFlowLogger(ThreadVars *tv, void *thread_data, Flow *f)
{
SCEnter();
JsonNetFlowLogThread *jhl = (JsonNetFlowLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)jhl->buffer;
/* reset */
MemBufferReset(buffer);
MemBufferReset(jhl->buffer);
json_t *js = CreateJSONHeaderFromFlow(f, "netflow", 0); //TODO const
if (unlikely(js == NULL))
return TM_ECODE_OK;
JsonNetFlowLogJSONToServer(jhl, js, f);
OutputJSONBuffer(js, jhl->flowlog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, jhl->flowlog_ctx->file_ctx, &jhl->buffer);
json_object_del(js, "netflow");
json_object_clear(js);
json_decref(js);
/* reset */
MemBufferReset(buffer);
MemBufferReset(jhl->buffer);
js = CreateJSONHeaderFromFlow(f, "netflow", 1); //TODO const
if (unlikely(js == NULL))
return TM_ECODE_OK;
JsonNetFlowLogJSONToClient(jhl, js, f);
OutputJSONBuffer(js, jhl->flowlog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, jhl->flowlog_ctx->file_ctx, &jhl->buffer);
json_object_del(js, "netflow");
json_object_clear(js);
json_decref(js);

@ -87,7 +87,6 @@ static int JsonSmtpLogger(ThreadVars *tv, void *thread_data, const Packet *p, Fl
{
SCEnter();
JsonEmailLogThread *jhl = (JsonEmailLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)jhl->buffer;
json_t *sjs;
json_t *js = CreateJSONHeaderWithTxId((Packet *)p, 1, "smtp", tx_id);
@ -95,7 +94,7 @@ static int JsonSmtpLogger(ThreadVars *tv, void *thread_data, const Packet *p, Fl
return TM_ECODE_OK;
/* reset */
MemBufferReset(buffer);
MemBufferReset(jhl->buffer);
sjs = JsonSmtpDataLogger(f, state, tx, tx_id);
if (sjs) {
@ -103,7 +102,7 @@ static int JsonSmtpLogger(ThreadVars *tv, void *thread_data, const Packet *p, Fl
}
if (JsonEmailLogJson(jhl, js, p, f, state, tx, tx_id) == TM_ECODE_OK) {
OutputJSONBuffer(js, jhl->emaillog_ctx->file_ctx, buffer);
OutputJSONBuffer(js, jhl->emaillog_ctx->file_ctx, &jhl->buffer);
}
json_object_del(js, "email");
if (sjs) {

@ -93,7 +93,6 @@ void JsonSshLogJSON(json_t *tjs, SshState *ssh_state)
static int JsonSshLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonSshLogThread *aft = (JsonSshLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)aft->buffer;
OutputSshCtx *ssh_ctx = aft->sshlog_ctx;
if (unlikely(p->flow == NULL)) {
@ -125,13 +124,13 @@ static int JsonSshLogger(ThreadVars *tv, void *thread_data, const Packet *p)
}
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
JsonSshLogJSON(tjs, ssh_state);
json_object_set_new(js, "ssh", tjs);
OutputJSONBuffer(js, ssh_ctx->file_ctx, buffer);
OutputJSONBuffer(js, ssh_ctx->file_ctx, &aft->buffer);
json_object_clear(js);
json_decref(js);

@ -95,7 +95,6 @@ static int JsonStatsLogger(ThreadVars *tv, void *thread_data, const StatsTable *
{
SCEnter();
JsonStatsLogThread *aft = (JsonStatsLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)aft->buffer;
const char delta_suffix[] = "_delta";
struct timeval tval;
@ -185,8 +184,8 @@ static int JsonStatsLogger(ThreadVars *tv, void *thread_data, const StatsTable *
json_object_set_new(js, "stats", js_stats);
OutputJSONBuffer(js, aft->statslog_ctx->file_ctx, buffer);
MemBufferReset(buffer);
OutputJSONBuffer(js, aft->statslog_ctx->file_ctx, &aft->buffer);
MemBufferReset(aft->buffer);
json_object_clear(js_stats);
json_object_del(js, "stats");

@ -57,7 +57,6 @@ static int JsonTemplateLogger(ThreadVars *tv, void *thread_data,
{
TemplateTransaction *templatetx = tx;
LogTemplateLogThread *thread = thread_data;
MemBuffer *buffer = thread->buffer;
json_t *js, *templatejs;
SCLogNotice("Logging template transaction %"PRIu64".", templatetx->tx_id);
@ -91,8 +90,8 @@ static int JsonTemplateLogger(ThreadVars *tv, void *thread_data,
json_object_set_new(js, "template", templatejs);
MemBufferReset(buffer);
OutputJSONBuffer(js, thread->templatelog_ctx->file_ctx, buffer);
MemBufferReset(thread->buffer);
OutputJSONBuffer(js, thread->templatelog_ctx->file_ctx, &thread->buffer);
json_decref(js);
return TM_ECODE_OK;

@ -129,7 +129,6 @@ void JsonTlsLogJSONExtended(json_t *tjs, SSLState * state)
static int JsonTlsLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonTlsLogThread *aft = (JsonTlsLogThread *)thread_data;
MemBuffer *buffer = (MemBuffer *)aft->buffer;
OutputTlsCtx *tls_ctx = aft->tlslog_ctx;
if (unlikely(p->flow == NULL)) {
@ -161,7 +160,7 @@ static int JsonTlsLogger(ThreadVars *tv, void *thread_data, const Packet *p)
}
/* reset */
MemBufferReset(buffer);
MemBufferReset(aft->buffer);
JsonTlsLogJSONBasic(tjs, ssl_state);
@ -171,7 +170,7 @@ static int JsonTlsLogger(ThreadVars *tv, void *thread_data, const Packet *p)
json_object_set_new(js, "tls", tjs);
OutputJSONBuffer(js, tls_ctx->file_ctx, buffer);
OutputJSONBuffer(js, tls_ctx->file_ctx, &aft->buffer);
json_object_clear(js);
json_decref(js);

@ -117,7 +117,7 @@ void OutputJsonRegisterTests (void)
#define DEFAULT_ALERT_SYSLOG_LEVEL LOG_INFO
#define MODULE_NAME "OutputJSON"
#define OUTPUT_BUFFER_SIZE 65535
#define OUTPUT_BUFFER_SIZE 65536
TmEcode OutputJson (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode OutputJsonThreadInit(ThreadVars *, void *, void **);
@ -335,31 +335,38 @@ json_t *CreateJSONHeaderWithTxId(Packet *p, int direction_sensitive, char *event
return js;
}
/* helper struct for the callback */
typedef struct MemBufferWrapper_ {
MemBuffer **buffer;
} MemBufferWrapper;
static int MemBufferCallback(const char *str, size_t size, void *data)
{
MemBuffer *memb = data;
#if 0 // can't expand, need a MemBuffer **
/* since we can have many threads, the buffer might not be big enough.
* * Expand if necessary. */
if (MEMBUFFER_OFFSET(memb) + size > MEMBUFFER_SIZE(memb)) {
MemBufferExpand(&memb, OUTPUT_BUFFER_SIZE);
MemBufferWrapper *wrapper = data;
MemBuffer **memb = wrapper->buffer;
if (MEMBUFFER_OFFSET(*memb) + size >= MEMBUFFER_SIZE(*memb)) {
MemBufferExpand(memb, OUTPUT_BUFFER_SIZE);
}
#endif
MemBufferWriteRaw(memb, str, size);
MemBufferWriteRaw((*memb), str, size);
return 0;
}
int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer *buffer)
int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer)
{
if (file_ctx->sensor_name) {
json_object_set_new(js, "host",
json_string(file_ctx->sensor_name));
}
if (file_ctx->prefix)
MemBufferWriteRaw(buffer, file_ctx->prefix, file_ctx->prefix_len);
if (file_ctx->prefix) {
MemBufferWriteRaw((*buffer), file_ctx->prefix, file_ctx->prefix_len);
}
MemBufferWrapper wrapper = { .buffer = buffer };
int r = json_dump_callback(js, MemBufferCallback, buffer,
int r = json_dump_callback(js, MemBufferCallback, &wrapper,
JSON_PRESERVE_ORDER|JSON_COMPACT|JSON_ENSURE_ASCII|
#ifdef JSON_ESCAPE_SLASH
JSON_ESCAPE_SLASH
@ -370,7 +377,7 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer *buffer)
if (r != 0)
return TM_ECODE_OK;
LogFileWrite(file_ctx, buffer);
LogFileWrite(file_ctx, *buffer);
return 0;
}

@ -38,7 +38,7 @@ void JsonTcpFlags(uint8_t flags, json_t *js);
json_t *CreateJSONHeader(Packet *p, int direction_sensative, char *event_type);
json_t *CreateJSONHeaderWithTxId(Packet *p, int direction_sensitive, char *event_type, uint32_t tx_id);
TmEcode OutputJSON(json_t *js, void *data, uint64_t *count);
int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer *buffer);
int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer);
OutputCtx *OutputJsonInitCtx(ConfNode *);
enum JsonFormat { COMPACT, INDENT };

@ -135,7 +135,7 @@ void MemBufferFree(MemBuffer *buffer);
\
if (((raw_buffer_len) >= (dst)->size - (dst)->offset)) { \
SCLogDebug("Truncating data write since it exceeded buffer limit of " \
"- %"PRIu32"\n", (dst)->size); \
"- %"PRIu32, (dst)->size); \
write_len = ((dst)->size - (dst)->offset) - 1; \
} else { \
write_len = (raw_buffer_len); \

Loading…
Cancel
Save