stream: introduce optional 'log' progress tracker

For logging streaming TCP data so far the individual segments where
used. However since the last big stream changes, the segments are
no longer the proper place for this. Segments can now have overlaps
etc.

This patch introduces a new tracker. Next to the existing 'app' and
'raw' trackers, the new tracker is 'log'. When the TCP logging is
used, a flag in the config is set and the log tracker is used to
determine how much of the stream window can be moved.
pull/2737/head
Victor Julien 8 years ago
parent 5b1d8c7e94
commit f4c4ef12c0

@ -619,6 +619,7 @@ static inline uint64_t GetLeftEdge(TcpSession *ssn, TcpStream *stream)
{ {
int use_app = 1; int use_app = 1;
int use_raw = 1; int use_raw = 1;
int use_log = 1;
uint64_t left_edge = 0; uint64_t left_edge = 0;
if ((ssn->flags & STREAMTCP_FLAG_APP_LAYER_DISABLED) || if ((ssn->flags & STREAMTCP_FLAG_APP_LAYER_DISABLED) ||
@ -632,6 +633,9 @@ static inline uint64_t GetLeftEdge(TcpSession *ssn, TcpStream *stream)
// raw is dead // raw is dead
use_raw = 0; use_raw = 0;
} }
if (!stream_config.streaming_log_api) {
use_log = 0;
}
if (use_raw) { if (use_raw) {
uint64_t raw_progress = STREAM_RAW_PROGRESS(stream); uint64_t raw_progress = STREAM_RAW_PROGRESS(stream);
@ -664,6 +668,14 @@ static inline uint64_t GetLeftEdge(TcpSession *ssn, TcpStream *stream)
SCLogDebug("no app & raw: left_edge %"PRIu64" (full stream)", left_edge); SCLogDebug("no app & raw: left_edge %"PRIu64" (full stream)", left_edge);
} }
if (use_log) {
if (use_app || use_raw) {
left_edge = MIN(left_edge, STREAM_LOG_PROGRESS(stream));
} else {
left_edge = STREAM_LOG_PROGRESS(stream);
}
}
/* in inline mode keep at least unack'd segments so we can check for overlaps */ /* in inline mode keep at least unack'd segments so we can check for overlaps */
if (StreamTcpInlineMode() == TRUE) { if (StreamTcpInlineMode() == TRUE) {
uint64_t last_ack_abs = STREAM_BASE_OFFSET(stream); uint64_t last_ack_abs = STREAM_BASE_OFFSET(stream);
@ -785,6 +797,11 @@ void StreamTcpPruneSession(Flow *f, uint8_t flags)
} else { } else {
stream->raw_progress_rel = 0; stream->raw_progress_rel = 0;
} }
if (slide <= stream->log_progress_rel) {
stream->log_progress_rel -= slide;
} else {
stream->log_progress_rel = 0;
}
SCLogDebug("stream base_seq %u at stream offset %"PRIu64, SCLogDebug("stream base_seq %u at stream offset %"PRIu64,
stream->base_seq, STREAM_BASE_OFFSET(stream)); stream->base_seq, STREAM_BASE_OFFSET(stream));
@ -794,9 +811,9 @@ void StreamTcpPruneSession(Flow *f, uint8_t flags)
TcpSegment *seg = stream->seg_list; TcpSegment *seg = stream->seg_list;
while (seg != NULL) while (seg != NULL)
{ {
SCLogDebug("seg %p, SEQ %"PRIu32", LEN %"PRIu16", SUM %"PRIu32", FLAGS %02x", SCLogDebug("seg %p, SEQ %"PRIu32", LEN %"PRIu16", SUM %"PRIu32,
seg, seg->seq, TCP_SEG_LEN(seg), seg, seg->seq, TCP_SEG_LEN(seg),
(uint32_t)(seg->seq + TCP_SEG_LEN(seg)), seg->flags); (uint32_t)(seg->seq + TCP_SEG_LEN(seg)));
if (StreamTcpReturnSegmentCheck(stream, seg) == 0) { if (StreamTcpReturnSegmentCheck(stream, seg) == 0) {
SCLogDebug("not removing segment"); SCLogDebug("not removing segment");
@ -886,8 +903,8 @@ void PrintList(TcpSegment *seg)
(seg->seq - next_seq)); (seg->seq - next_seq));
} }
SCLogDebug("seg %10"PRIu32" len %" PRIu16 ", seg %p, prev %p, next %p, flags 0x%02x", SCLogDebug("seg %10"PRIu32" len %" PRIu16 ", seg %p, prev %p, next %p",
seg->seq, TCP_SEG_LEN(seg), seg, seg->prev, seg->next, seg->flags); seg->seq, TCP_SEG_LEN(seg), seg, seg->prev, seg->next);
if (seg->prev != NULL && SEQ_LT(seg->seq,seg->prev->seq)) { if (seg->prev != NULL && SEQ_LT(seg->seq,seg->prev->seq)) {
/* check for SEQ_LT cornercase where a - b is exactly 2147483648, /* check for SEQ_LT cornercase where a - b is exactly 2147483648,

@ -53,11 +53,9 @@ typedef struct StreamTcpSackRecord_ {
typedef struct TcpSegment_ { typedef struct TcpSegment_ {
PoolThreadReserved res; PoolThreadReserved res;
StreamingBufferSegment sbseg;
uint16_t payload_len; /**< actual size of the payload */ uint16_t payload_len; /**< actual size of the payload */
/* coccinelle: TcpSegment:flags:SEGMENTTCP_FLAG */
uint8_t flags;
uint32_t seq; uint32_t seq;
StreamingBufferSegment sbseg;
struct TcpSegment_ *next; struct TcpSegment_ *next;
struct TcpSegment_ *prev; struct TcpSegment_ *prev;
} TcpSegment; } TcpSegment;
@ -89,6 +87,7 @@ typedef struct TcpStream_ {
uint32_t app_progress_rel; /**< app-layer progress relative to STREAM_BASE_OFFSET */ uint32_t app_progress_rel; /**< app-layer progress relative to STREAM_BASE_OFFSET */
uint32_t raw_progress_rel; /**< raw reassembly progress relative to STREAM_BASE_OFFSET */ uint32_t raw_progress_rel; /**< raw reassembly progress relative to STREAM_BASE_OFFSET */
uint32_t log_progress_rel; /**< streaming logger progress relative to STREAM_BASE_OFFSET */
StreamingBuffer sb; StreamingBuffer sb;
@ -102,6 +101,7 @@ typedef struct TcpStream_ {
#define STREAM_BASE_OFFSET(stream) ((stream)->sb.stream_offset) #define STREAM_BASE_OFFSET(stream) ((stream)->sb.stream_offset)
#define STREAM_APP_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->app_progress_rel) #define STREAM_APP_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->app_progress_rel)
#define STREAM_RAW_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->raw_progress_rel) #define STREAM_RAW_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->raw_progress_rel)
#define STREAM_LOG_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->log_progress_rel)
/* from /usr/include/netinet/tcp.h */ /* from /usr/include/netinet/tcp.h */
enum enum
@ -188,11 +188,6 @@ enum
/** NOTE: flags field is 12 bits */ /** NOTE: flags field is 12 bits */
/*
* Per SEGMENT flags
*/
/** Log API (streaming) has processed this segment */
#define SEGMENTTCP_FLAG_LOGAPI_PROCESSED 0x04
#define PAWS_24DAYS 2073600 /**< 24 days in seconds */ #define PAWS_24DAYS 2073600 /**< 24 days in seconds */

@ -1493,40 +1493,23 @@ static int StreamReassembleRawInline(TcpSession *ssn, const Packet *p,
* consumed until now. * consumed until now.
* *
* \param ssn tcp session * \param ssn tcp session
* \param p packet * \param stream tcp stream
* \param Callback the function pointer to the callback function * \param Callback the function pointer to the callback function
* \param cb_data callback data * \param cb_data callback data
* \param[in] progress_in progress to work from
* \param[out] progress_out absolute progress value of the data this * \param[out] progress_out absolute progress value of the data this
* call handled. * call handled.
*/ */
int StreamReassembleRaw(TcpSession *ssn, const Packet *p, static int StreamReassembleRawDo(TcpSession *ssn, TcpStream *stream,
StreamReassembleRawFunc Callback, void *cb_data, StreamReassembleRawFunc Callback, void *cb_data,
uint64_t *progress_out) const uint64_t progress_in,
uint64_t *progress_out, bool eof)
{ {
SCEnter(); SCEnter();
int r = 0; int r = 0;
/* handle inline seperately as the logic is very different */
if (StreamTcpInlineMode() == TRUE) {
return StreamReassembleRawInline(ssn, p, Callback, cb_data, progress_out);
}
TcpStream *stream;
if (PKT_IS_TOSERVER(p)) {
stream = &ssn->client;
} else {
stream = &ssn->server;
}
if ((stream->flags & (STREAMTCP_STREAM_FLAG_NOREASSEMBLY|STREAMTCP_STREAM_FLAG_DISABLE_RAW)) ||
StreamTcpReassembleRawCheckLimit(ssn, stream, p) == 0)
{
*progress_out = STREAM_RAW_PROGRESS(stream);
return 0;
}
StreamingBufferBlock *iter = NULL; StreamingBufferBlock *iter = NULL;
uint64_t progress = STREAM_RAW_PROGRESS(stream); uint64_t progress = progress_in;
uint64_t last_ack_abs = STREAM_BASE_OFFSET(stream); /* absolute right edge of ack'd data */ uint64_t last_ack_abs = STREAM_BASE_OFFSET(stream); /* absolute right edge of ack'd data */
/* get window of data that is acked */ /* get window of data that is acked */
@ -1555,7 +1538,7 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p,
SCLogDebug("stream %p data in buffer %p of len %u and offset %u", SCLogDebug("stream %p data in buffer %p of len %u and offset %u",
stream, &stream->sb, mydata_len, (uint)progress); stream, &stream->sb, mydata_len, (uint)progress);
if (p->flags & PKT_PSEUDO_STREAM_END) { if (eof) {
// inspect all remaining data, ack'd or not // inspect all remaining data, ack'd or not
} else { } else {
if (last_ack_abs < progress) { if (last_ack_abs < progress) {
@ -1586,8 +1569,8 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p,
BUG_ON(r < 0); BUG_ON(r < 0);
if (mydata_offset == progress) { if (mydata_offset == progress) {
SCLogDebug("raw progress %"PRIu64" increasing with data len %u to %"PRIu64, SCLogDebug("progress %"PRIu64" increasing with data len %u to %"PRIu64,
progress, mydata_len, STREAM_RAW_PROGRESS(stream) + mydata_len); progress, mydata_len, progress_in + mydata_len);
progress += mydata_len; progress += mydata_len;
SCLogDebug("raw progress now %"PRIu64, progress); SCLogDebug("raw progress now %"PRIu64, progress);
@ -1612,6 +1595,46 @@ end:
return r; return r;
} }
int StreamReassembleRaw(TcpSession *ssn, const Packet *p,
StreamReassembleRawFunc Callback, void *cb_data,
uint64_t *progress_out)
{
/* handle inline seperately as the logic is very different */
if (StreamTcpInlineMode() == TRUE) {
return StreamReassembleRawInline(ssn, p, Callback, cb_data, progress_out);
}
TcpStream *stream;
if (PKT_IS_TOSERVER(p)) {
stream = &ssn->client;
} else {
stream = &ssn->server;
}
if ((stream->flags & (STREAMTCP_STREAM_FLAG_NOREASSEMBLY|STREAMTCP_STREAM_FLAG_DISABLE_RAW)) ||
StreamTcpReassembleRawCheckLimit(ssn, stream, p) == 0)
{
*progress_out = STREAM_RAW_PROGRESS(stream);
return 0;
}
return StreamReassembleRawDo(ssn, stream, Callback, cb_data,
STREAM_RAW_PROGRESS(stream), progress_out,
(p->flags & PKT_PSEUDO_STREAM_END));
}
int StreamReassembleLog(TcpSession *ssn, TcpStream *stream,
StreamReassembleRawFunc Callback, void *cb_data,
uint64_t progress_in,
uint64_t *progress_out, bool eof)
{
if (stream->flags & (STREAMTCP_STREAM_FLAG_NOREASSEMBLY))
return 0;
return StreamReassembleRawDo(ssn, stream, Callback, cb_data,
progress_in, progress_out, eof);
}
/** \internal /** \internal
* \brief update app layer based on received ACK * \brief update app layer based on received ACK
* *

@ -59,6 +59,7 @@ typedef struct TcpStreamCnf_ {
uint16_t reassembly_toclient_chunk_size; uint16_t reassembly_toclient_chunk_size;
int bypass; int bypass;
bool streaming_log_api;
StreamingBufferConfig sbcnf; StreamingBufferConfig sbcnf;
} TcpStreamCnf; } TcpStreamCnf;
@ -119,11 +120,17 @@ void TcpSessionSetReassemblyDepth(TcpSession *ssn, uint32_t size);
typedef int (*StreamReassembleRawFunc)(void *data, const uint8_t *input, const uint32_t input_len); typedef int (*StreamReassembleRawFunc)(void *data, const uint8_t *input, const uint32_t input_len);
int StreamReassembleLog(TcpSession *ssn, TcpStream *stream,
StreamReassembleRawFunc Callback, void *cb_data,
uint64_t progress_in,
uint64_t *progress_out, bool eof);
int StreamReassembleRaw(TcpSession *ssn, const Packet *p, int StreamReassembleRaw(TcpSession *ssn, const Packet *p,
StreamReassembleRawFunc Callback, void *cb_data, uint64_t *progress_out); StreamReassembleRawFunc Callback, void *cb_data, uint64_t *progress_out);
void StreamReassembleRawUpdateProgress(TcpSession *ssn, Packet *p, uint64_t progress); void StreamReassembleRawUpdateProgress(TcpSession *ssn, Packet *p, uint64_t progress);
void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq); void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq);
/** ------- Inline functions: ------ */ /** ------- Inline functions: ------ */
/** /**

Loading…
Cancel
Save