diff --git a/src/stream-tcp-list.c b/src/stream-tcp-list.c index 4302c04e22..9b85add4f6 100644 --- a/src/stream-tcp-list.c +++ b/src/stream-tcp-list.c @@ -619,6 +619,7 @@ static inline uint64_t GetLeftEdge(TcpSession *ssn, TcpStream *stream) { int use_app = 1; int use_raw = 1; + int use_log = 1; uint64_t left_edge = 0; if ((ssn->flags & STREAMTCP_FLAG_APP_LAYER_DISABLED) || @@ -632,6 +633,9 @@ static inline uint64_t GetLeftEdge(TcpSession *ssn, TcpStream *stream) // raw is dead use_raw = 0; } + if (!stream_config.streaming_log_api) { + use_log = 0; + } if (use_raw) { uint64_t raw_progress = STREAM_RAW_PROGRESS(stream); @@ -664,6 +668,14 @@ static inline uint64_t GetLeftEdge(TcpSession *ssn, TcpStream *stream) SCLogDebug("no app & raw: left_edge %"PRIu64" (full stream)", left_edge); } + if (use_log) { + if (use_app || use_raw) { + left_edge = MIN(left_edge, STREAM_LOG_PROGRESS(stream)); + } else { + left_edge = STREAM_LOG_PROGRESS(stream); + } + } + /* in inline mode keep at least unack'd segments so we can check for overlaps */ if (StreamTcpInlineMode() == TRUE) { uint64_t last_ack_abs = STREAM_BASE_OFFSET(stream); @@ -785,6 +797,11 @@ void StreamTcpPruneSession(Flow *f, uint8_t flags) } else { stream->raw_progress_rel = 0; } + if (slide <= stream->log_progress_rel) { + stream->log_progress_rel -= slide; + } else { + stream->log_progress_rel = 0; + } SCLogDebug("stream base_seq %u at stream offset %"PRIu64, stream->base_seq, STREAM_BASE_OFFSET(stream)); @@ -794,9 +811,9 @@ void StreamTcpPruneSession(Flow *f, uint8_t flags) TcpSegment *seg = stream->seg_list; while (seg != NULL) { - SCLogDebug("seg %p, SEQ %"PRIu32", LEN %"PRIu16", SUM %"PRIu32", FLAGS %02x", + SCLogDebug("seg %p, SEQ %"PRIu32", LEN %"PRIu16", SUM %"PRIu32, seg, seg->seq, TCP_SEG_LEN(seg), - (uint32_t)(seg->seq + TCP_SEG_LEN(seg)), seg->flags); + (uint32_t)(seg->seq + TCP_SEG_LEN(seg))); if (StreamTcpReturnSegmentCheck(stream, seg) == 0) { SCLogDebug("not removing segment"); @@ -886,8 +903,8 @@ void PrintList(TcpSegment *seg) (seg->seq - next_seq)); } - SCLogDebug("seg %10"PRIu32" len %" PRIu16 ", seg %p, prev %p, next %p, flags 0x%02x", - seg->seq, TCP_SEG_LEN(seg), seg, seg->prev, seg->next, seg->flags); + SCLogDebug("seg %10"PRIu32" len %" PRIu16 ", seg %p, prev %p, next %p", + seg->seq, TCP_SEG_LEN(seg), seg, seg->prev, seg->next); if (seg->prev != NULL && SEQ_LT(seg->seq,seg->prev->seq)) { /* check for SEQ_LT cornercase where a - b is exactly 2147483648, diff --git a/src/stream-tcp-private.h b/src/stream-tcp-private.h index dcc7693c4c..ef6ede6c82 100644 --- a/src/stream-tcp-private.h +++ b/src/stream-tcp-private.h @@ -53,11 +53,9 @@ typedef struct StreamTcpSackRecord_ { typedef struct TcpSegment_ { PoolThreadReserved res; - StreamingBufferSegment sbseg; uint16_t payload_len; /**< actual size of the payload */ - /* coccinelle: TcpSegment:flags:SEGMENTTCP_FLAG */ - uint8_t flags; uint32_t seq; + StreamingBufferSegment sbseg; struct TcpSegment_ *next; struct TcpSegment_ *prev; } TcpSegment; @@ -89,6 +87,7 @@ typedef struct TcpStream_ { uint32_t app_progress_rel; /**< app-layer progress relative to STREAM_BASE_OFFSET */ uint32_t raw_progress_rel; /**< raw reassembly progress relative to STREAM_BASE_OFFSET */ + uint32_t log_progress_rel; /**< streaming logger progress relative to STREAM_BASE_OFFSET */ StreamingBuffer sb; @@ -102,6 +101,7 @@ typedef struct TcpStream_ { #define STREAM_BASE_OFFSET(stream) ((stream)->sb.stream_offset) #define STREAM_APP_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->app_progress_rel) #define STREAM_RAW_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->raw_progress_rel) +#define STREAM_LOG_PROGRESS(stream) (STREAM_BASE_OFFSET((stream)) + (stream)->log_progress_rel) /* from /usr/include/netinet/tcp.h */ enum @@ -188,11 +188,6 @@ enum /** NOTE: flags field is 12 bits */ -/* - * Per SEGMENT flags - */ -/** Log API (streaming) has processed this segment */ -#define SEGMENTTCP_FLAG_LOGAPI_PROCESSED 0x04 #define PAWS_24DAYS 2073600 /**< 24 days in seconds */ diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index 4641002b17..debf6642ad 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -1493,40 +1493,23 @@ static int StreamReassembleRawInline(TcpSession *ssn, const Packet *p, * consumed until now. * * \param ssn tcp session - * \param p packet + * \param stream tcp stream * \param Callback the function pointer to the callback function * \param cb_data callback data + * \param[in] progress_in progress to work from * \param[out] progress_out absolute progress value of the data this * call handled. */ -int StreamReassembleRaw(TcpSession *ssn, const Packet *p, +static int StreamReassembleRawDo(TcpSession *ssn, TcpStream *stream, StreamReassembleRawFunc Callback, void *cb_data, - uint64_t *progress_out) + const uint64_t progress_in, + uint64_t *progress_out, bool eof) { SCEnter(); int r = 0; - /* handle inline seperately as the logic is very different */ - if (StreamTcpInlineMode() == TRUE) { - return StreamReassembleRawInline(ssn, p, Callback, cb_data, progress_out); - } - - TcpStream *stream; - if (PKT_IS_TOSERVER(p)) { - stream = &ssn->client; - } else { - stream = &ssn->server; - } - - if ((stream->flags & (STREAMTCP_STREAM_FLAG_NOREASSEMBLY|STREAMTCP_STREAM_FLAG_DISABLE_RAW)) || - StreamTcpReassembleRawCheckLimit(ssn, stream, p) == 0) - { - *progress_out = STREAM_RAW_PROGRESS(stream); - return 0; - } - StreamingBufferBlock *iter = NULL; - uint64_t progress = STREAM_RAW_PROGRESS(stream); + uint64_t progress = progress_in; uint64_t last_ack_abs = STREAM_BASE_OFFSET(stream); /* absolute right edge of ack'd data */ /* get window of data that is acked */ @@ -1555,7 +1538,7 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p, SCLogDebug("stream %p data in buffer %p of len %u and offset %u", stream, &stream->sb, mydata_len, (uint)progress); - if (p->flags & PKT_PSEUDO_STREAM_END) { + if (eof) { // inspect all remaining data, ack'd or not } else { if (last_ack_abs < progress) { @@ -1586,8 +1569,8 @@ int StreamReassembleRaw(TcpSession *ssn, const Packet *p, BUG_ON(r < 0); if (mydata_offset == progress) { - SCLogDebug("raw progress %"PRIu64" increasing with data len %u to %"PRIu64, - progress, mydata_len, STREAM_RAW_PROGRESS(stream) + mydata_len); + SCLogDebug("progress %"PRIu64" increasing with data len %u to %"PRIu64, + progress, mydata_len, progress_in + mydata_len); progress += mydata_len; SCLogDebug("raw progress now %"PRIu64, progress); @@ -1612,6 +1595,46 @@ end: return r; } +int StreamReassembleRaw(TcpSession *ssn, const Packet *p, + StreamReassembleRawFunc Callback, void *cb_data, + uint64_t *progress_out) +{ + /* handle inline seperately as the logic is very different */ + if (StreamTcpInlineMode() == TRUE) { + return StreamReassembleRawInline(ssn, p, Callback, cb_data, progress_out); + } + + TcpStream *stream; + if (PKT_IS_TOSERVER(p)) { + stream = &ssn->client; + } else { + stream = &ssn->server; + } + + if ((stream->flags & (STREAMTCP_STREAM_FLAG_NOREASSEMBLY|STREAMTCP_STREAM_FLAG_DISABLE_RAW)) || + StreamTcpReassembleRawCheckLimit(ssn, stream, p) == 0) + { + *progress_out = STREAM_RAW_PROGRESS(stream); + return 0; + } + + return StreamReassembleRawDo(ssn, stream, Callback, cb_data, + STREAM_RAW_PROGRESS(stream), progress_out, + (p->flags & PKT_PSEUDO_STREAM_END)); +} + +int StreamReassembleLog(TcpSession *ssn, TcpStream *stream, + StreamReassembleRawFunc Callback, void *cb_data, + uint64_t progress_in, + uint64_t *progress_out, bool eof) +{ + if (stream->flags & (STREAMTCP_STREAM_FLAG_NOREASSEMBLY)) + return 0; + + return StreamReassembleRawDo(ssn, stream, Callback, cb_data, + progress_in, progress_out, eof); +} + /** \internal * \brief update app layer based on received ACK * diff --git a/src/stream-tcp.h b/src/stream-tcp.h index c77020b521..74bfc5f486 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -59,6 +59,7 @@ typedef struct TcpStreamCnf_ { uint16_t reassembly_toclient_chunk_size; int bypass; + bool streaming_log_api; StreamingBufferConfig sbcnf; } TcpStreamCnf; @@ -119,11 +120,17 @@ void TcpSessionSetReassemblyDepth(TcpSession *ssn, uint32_t size); typedef int (*StreamReassembleRawFunc)(void *data, const uint8_t *input, const uint32_t input_len); +int StreamReassembleLog(TcpSession *ssn, TcpStream *stream, + StreamReassembleRawFunc Callback, void *cb_data, + uint64_t progress_in, + uint64_t *progress_out, bool eof); int StreamReassembleRaw(TcpSession *ssn, const Packet *p, StreamReassembleRawFunc Callback, void *cb_data, uint64_t *progress_out); void StreamReassembleRawUpdateProgress(TcpSession *ssn, Packet *p, uint64_t progress); + void StreamTcpDetectLogFlush(ThreadVars *tv, StreamTcpThread *stt, Flow *f, Packet *p, PacketQueue *pq); + /** ------- Inline functions: ------ */ /**