From 4f0cdf28a3e769613741b44a7145a8b33fdb0ca8 Mon Sep 17 00:00:00 2001 From: Eric Leblond Date: Tue, 18 Oct 2011 14:09:38 +0200 Subject: [PATCH] Introduce StreamSegmentForEach function This patch introduces a function called StreamMsgForEach which can be used to run a callback on all segments of a stream. This is currently only supported for TCP as this is the only streaming aware protocol. --- src/stream-tcp.c | 45 ++++++++++++++++++++++++++++++++++++++++++++- src/stream-tcp.h | 4 ++++ src/stream.c | 21 +++++++++++++++++++++ src/stream.h | 5 +++++ 4 files changed, 74 insertions(+), 1 deletion(-) diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 45c36436df..50aa1ad752 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -3758,7 +3758,7 @@ TmEcode StreamTcp (ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Packe // return TM_ECODE_FAILED; stt->pkts++; - return TM_ECODE_OK; + return ret; } TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) @@ -4581,6 +4581,49 @@ void StreamTcpPseudoPacketCreateStreamEndPacket(Packet *p, TcpSession *ssn, Pack SCReturn; } +/** + * \brief Run callback function on each TCP segment + * + * This function is used by StreamMsgForEach() which + * should be used directly. + * + * \return 1 in case of success + * + */ +int StreamTcpSegmentForEach(Packet *p, uint8_t flag, StreamSegmentCallback CallbackFunc, void *data) +{ + TcpSession *ssn = NULL; + TcpStream *stream = NULL; + int ret = 0; + + if (p->flow == NULL) + return 1; + + SCMutexLock(&p->flow->m); + ssn = (TcpSession *)p->flow->protoctx; + + if (ssn == NULL) + return 1; + + if (flag & FLOW_PKT_TOSERVER) { + stream = &(ssn->server); + } else { + stream = &(ssn->client); + } + TcpSegment *seg = stream->seg_list; + for (; seg != NULL && SEQ_LT(seg->seq, stream->last_ack);) { + ret = CallbackFunc(p, data, seg->payload, seg->payload_len); + if (ret != 1) { + SCLogInfo("Callback function has failed"); + SCMutexUnlock(&p->flow->m); + return ret; + } + seg = seg->next; + } + SCMutexUnlock(&p->flow->m); + return 1; +} + #ifdef UNITTESTS /** diff --git a/src/stream-tcp.h b/src/stream-tcp.h index b9e34a48e7..cbcefb2443 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -102,6 +102,10 @@ Packet *StreamTcpPseudoSetup(Packet *, uint8_t *, uint32_t); void StreamTcpSetEvent(Packet *p, uint8_t e); +int StreamTcpSegmentForEach(Packet *p, uint8_t flag, + StreamSegmentCallback CallbackFunc, + void *data); + /** ------- Inline functions: ------ */ /** diff --git a/src/stream.c b/src/stream.c index bee05e96ab..ad60eaa612 100644 --- a/src/stream.c +++ b/src/stream.c @@ -29,6 +29,7 @@ #include "stream.h" #include "util-pool.h" #include "util-debug.h" +#include "stream-tcp.h" static SCMutex stream_pool_memuse_mutex; static uint64_t stream_pool_memuse = 0; @@ -219,3 +220,23 @@ void StreamMsgReturnListToPool(void *list) { smsg = smsg_next; } } + +/** \brief Run callback for all segments + * + * \return 1 in case of success + */ +int StreamSegmentForEach(Packet *p, uint8_t flag, StreamSegmentCallback CallbackFunc, void *data) +{ + switch(p->proto) { + case IPPROTO_TCP: + return StreamTcpSegmentForEach(p, flag, CallbackFunc, data); + break; + case IPPROTO_UDP: + SCLogWarning(SC_ERR_UNKNOWN_PROTOCOL, "UDP is currently unsupported"); + break; + default: + SCLogWarning(SC_ERR_UNKNOWN_PROTOCOL, "This protocol is currently unsupported"); + break; + } + return 0; +} diff --git a/src/stream.h b/src/stream.h index eea8aa30c4..bff084e5c1 100644 --- a/src/stream.h +++ b/src/stream.h @@ -85,5 +85,10 @@ uint16_t StreamMsgQueueGetMinChunkLen(uint8_t); void StreamMsgReturnListToPool(void *); +typedef int (*StreamSegmentCallback)(Packet *, void *, uint8_t *, uint32_t); +int StreamSegmentForEach(Packet *p, uint8_t flag, + StreamSegmentCallback CallbackFunc, + void *data); + #endif /* __STREAM_H__ */