From 1c2240cfeb9b8eacb786d195b43200829a01e943 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Tue, 21 Jul 2009 16:28:08 +0200 Subject: [PATCH] Stream reassembly update and WIP code for L7 modules. --- src/Makefile.am | 4 +- src/eidps.c | 16 +++ src/l7-app-detect.c | 132 ++++++++++++++++++++++ src/l7-app-detect.h | 6 + src/stream-tcp-private.h | 1 + src/stream-tcp-reassemble.c | 219 +++++++++++++++++++++++++++++++++--- src/stream-tcp.c | 2 + src/stream.c | 167 +++++++++++++++++++++++++++ src/stream.h | 61 ++++++++++ src/tm-threads.c | 3 + 10 files changed, 596 insertions(+), 15 deletions(-) create mode 100644 src/l7-app-detect.c create mode 100644 src/l7-app-detect.h create mode 100644 src/stream.c create mode 100644 src/stream.h diff --git a/src/Makefile.am b/src/Makefile.am index d6a5564425..2567f6f8a3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -87,10 +87,12 @@ alert-debuglog.c alert-debuglog.h \ log-httplog.c log-httplog.h \ alert-unified-log.c alert-unified-log.h \ alert-unified-alert.c alert-unified-alert.h \ +stream.c stream.h \ stream-tcp.c stream-tcp.h stream-tcp-private.h \ stream-tcp-reassemble.c stream-tcp-reassemble.h \ respond-reject.c respond-reject.h \ -respond-reject-libnet11.h respond-reject-libnet11.c +respond-reject-libnet11.h respond-reject-libnet11.c \ +l7-app-detect.c l7-app-detect.h # set the include path found by configure INCLUDES= $(all_includes) diff --git a/src/eidps.c b/src/eidps.c index 4e13a0d342..3b35cebcb7 100644 --- a/src/eidps.c +++ b/src/eidps.c @@ -774,6 +774,9 @@ int main(int argc, char **argv) SigParsePrepare(); PatternMatchPrepare(mpm_ctx); + /* XXX we need an api for this */ + L7AppDetectThreadInit(); + TmModuleReceiveNFQRegister(); TmModuleVerdictNFQRegister(); TmModuleDecodeNFQRegister(); @@ -863,6 +866,19 @@ int main(int argc, char **argv) } TmThreadAppend(&tv_flowmgr); +#include "l7-app-detect.h" + ThreadVars tv_l7appdetect; + memset(&tv_l7appdetect, 0, sizeof(ThreadVars)); + printf("Creating L7 Application layer detect thread (WIP)...\n"); + tv_flowmgr.name = "L7AppDetectThread"; + + rc = pthread_create(&tv_l7appdetect.t, &attr, L7AppDetectThread, (void *)&tv_l7appdetect); + if (rc) { + printf("ERROR; return code from pthread_create() is %d\n", rc); + exit(1); + } + TmThreadAppend(&tv_l7appdetect); + while(1) { if (sigflags) { printf("signal received\n"); diff --git a/src/l7-app-detect.c b/src/l7-app-detect.c new file mode 100644 index 0000000000..09f67d8136 --- /dev/null +++ b/src/l7-app-detect.c @@ -0,0 +1,132 @@ +#include "eidps.h" +#include "debug.h" +#include "decode.h" +#include "threads.h" + +#include "util-print.h" +#include "util-pool.h" + +#include "stream-tcp-private.h" +#include "stream.h" + +#define INSPECT_BYTES 64 + +#define PROTO_UNKNOWN 0 +#define PROTO_HTTP 1 +#define PROTO_FTP 2 +#define PROTO_SMTP 3 + +#define TYPE_PROTO 0 +#define TYPE_BUF 1 + +/* XXX type can be 1 bit, 7 bit for proto */ +typedef struct _L7AppDetectDataProto { + u_int8_t type; + u_int8_t proto; +} L7AppDetectDataProto; + +static Pool *l7appdetect_proto_pool = NULL; + +void *L7AppDetectProtoAlloc(void *null) { + L7AppDetectDataProto *d = malloc(sizeof(L7AppDetectDataProto)); + if (d == NULL) { + return NULL; + } + + d->type = TYPE_PROTO; + d->proto = PROTO_UNKNOWN; + return d; +} +#define L7AppDetectProtoFree free + +void L7AppDetectThreadInit(void) { + /* allocate 2 pools, 1 for proto objects, 1 for bufs. Normal stream will + * jump straigth to protos so we alloc a lot less bufs */ + l7appdetect_proto_pool = PoolInit(262144, 32768, L7AppDetectProtoAlloc, NULL, L7AppDetectProtoFree); + if (l7appdetect_proto_pool == NULL) { + exit(1); + } +} + +u_int8_t L7AppDetectGetProto(u_int8_t *buf, u_int16_t buflen) { + if (buflen < INSPECT_BYTES) + return PROTO_UNKNOWN; + + /* XXX do actual detect */ + printf("L7AppDetectGetProto: protocol detection goes here.\n"); + return PROTO_HTTP; +} + +void *L7AppDetectThread(void *td) +{ + ThreadVars *tv = (ThreadVars *)td; + char run = TRUE; + u_int8_t l7_data_id = 0; + + /* get the stream msg queue for this thread */ + StreamMsgQueue *stream_q = StreamMsgQueueGetByPort(0); + StreamMsgQueueSetMinInitChunkLen(stream_q, INSPECT_BYTES); + + while(run) { + /* grab a msg, can return NULL on signals */ + StreamMsg *smsg = StreamMsgGetFromQueue(stream_q); + //printf("L7AppDetectThread: smsg %p\n", smsg); + + if (smsg != NULL) { + /* keep the flow locked during operation. + * XXX we may be better off adding a mutex + * to the l7data object */ + mutex_lock(&smsg->flow->m); + + TcpSession *ssn = smsg->flow->stream; + if (ssn != NULL) { + if (ssn->l7data == NULL) { + StreamL7DataPtrInit(ssn,1); /* XXX we can use a pool here, + or make it part of the stream setup */ + } + void *l7_data_ptr = ssn->l7data[l7_data_id]; + + if (smsg->flags & STREAM_START) { + //printf("L7AppDetectThread: stream initializer (len %u (%u))\n", smsg->init.data_len, MSG_INIT_DATA_SIZE); + + //printf("=> Init Stream Data -- start\n"); + //PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len); + //printf("=> Init Stream Data -- end\n"); + + if (l7_data_ptr == NULL) { + L7AppDetectDataProto *l7proto = (L7AppDetectDataProto *)PoolGet(l7appdetect_proto_pool); + if (l7proto != NULL) { + l7proto->type = TYPE_PROTO; + l7proto->proto = L7AppDetectGetProto(smsg->data.data, smsg->data.data_len); + + ssn->l7data[l7_data_id] = (void *)l7proto; + } + } + } else { + //printf("L7AppDetectThread: stream data (len %u (%u))\n", smsg->data.data_len, MSG_DATA_SIZE); + + /* if we don't have a data object here we are not getting it + * a start msg should have gotten us one */ + if (l7_data_ptr != NULL) { + L7AppDetectDataProto *l7proto = (L7AppDetectDataProto *)l7_data_ptr; + printf("L7AppDetectThread: already established that the proto is %u\n", l7proto->proto); + } else { + printf("L7AppDetectThread: smsg not start, but no l7 data? Weird\n"); + } + //printf("=> Stream Data -- start\n"); + //PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len); + //printf("=> Stream Data -- end\n"); + } + + mutex_unlock(&smsg->flow->m); + } + StreamMsgReturnToPool(smsg); + } + + if (tv->flags & THV_KILL) + run = 0; + } + + pthread_exit((void *) 0); +} + diff --git a/src/l7-app-detect.h b/src/l7-app-detect.h new file mode 100644 index 0000000000..102cd88a14 --- /dev/null +++ b/src/l7-app-detect.h @@ -0,0 +1,6 @@ +#ifndef __L7_APP_DETECT_H__ +#define __L7_APP_DETECT_H__ + +void *L7AppDetectThread(void *td); + +#endif /* __L7_APP_DETECT_H__ */ diff --git a/src/stream-tcp-private.h b/src/stream-tcp-private.h index 0effe9b1b8..40e674ce17 100644 --- a/src/stream-tcp-private.h +++ b/src/stream-tcp-private.h @@ -51,6 +51,7 @@ typedef struct _TcpSession { u_int8_t state; TcpStream server; TcpStream client; + void **l7data; } TcpSession; #endif /* __STREAM_TCP_PRIVATE_H__ */ diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index 555444c0d6..fe444c7c96 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -31,6 +31,8 @@ #include "stream-tcp-private.h" +#include "stream.h" + void *TcpSegmentAlloc(void *payload_len) { TcpSegment *seg = malloc(sizeof(TcpSegment)); if (seg == NULL) @@ -47,7 +49,6 @@ void *TcpSegmentAlloc(void *payload_len) { return NULL; } - return seg; } @@ -74,6 +75,8 @@ static pthread_mutex_t segment_pool_mutex[segment_pool_num]; static u_int16_t segment_pool_idx[65536]; /* O(1) lookups of the pool */ int StreamTcpReassembleInit(void) { + StreamMsgQueuesInit(); + u_int16_t u16 = 0; for (u16 = 0; u16 < segment_pool_num; u16++) { segment_pool[u16] = PoolInit(segment_pool_poolsizes[u16], segment_pool_poolsizes[u16]/2, TcpSegmentAlloc, (void *)&segment_pool_pktsizes[u16], TcpSegmentFree); @@ -247,41 +250,203 @@ int StreamTcpReassembleHandleSegmentHandleData (TcpSession *ssn, TcpStream *stre memcpy(seg->payload, p->payload, p->payload_len); seg->payload_len = p->payload_len; seg->seq = TCP_GET_SEQ(p); + seg->next = NULL; ReassembleInsertSegment(stream, seg); return 0; } +/* initialize the first msg */ +static void StreamTcpSetupInitMsg(Packet *p, StreamMsg *smsg) { + smsg->flags |= STREAM_START; + + if (p->flowflags & FLOW_PKT_TOSERVER) { + COPY_ADDRESS(&p->flow->src,&smsg->data.src_ip); + COPY_ADDRESS(&p->flow->dst,&smsg->data.dst_ip); + COPY_PORT(p->flow->sp,smsg->data.src_port); + COPY_PORT(p->flow->dp,smsg->data.dst_port); + + smsg->flags |= STREAM_TOSERVER; + } else { + COPY_ADDRESS(&p->flow->dst,&smsg->data.src_ip); + COPY_ADDRESS(&p->flow->src,&smsg->data.dst_ip); + COPY_PORT(p->flow->dp,smsg->data.src_port); + COPY_PORT(p->flow->sp,smsg->data.dst_port); + + smsg->flags |= STREAM_TOCLIENT; + } +} + int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *stream, Packet *p) { if (stream->seg_list == NULL) return 0; + printf("StreamTcpReassembleHandleSegmentUpdateACK: start\n"); + + StreamMsg *smsg = NULL; char remove = FALSE; + u_int16_t smsg_offset = 0; + u_int16_t payload_offset = 0; + u_int16_t payload_len = 0; TcpSegment *seg = stream->seg_list; - printf("STREAM START\n"); - for ( ; seg != NULL && SEQ_LT(seg->seq,stream->last_ack); seg = seg->next) { - if (SEQ_GEQ(seg->seq,stream->ra_base_seq)) { + + /* check if we have enough data to send to l7 */ + if (p->flowflags & FLOW_PKT_TOSERVER) { + if (stream->ra_base_seq == stream->isn) { + if (StreamMsgQueueGetMinInitChunkLen(STREAM_TOSERVER) > + (stream->last_ack - stream->ra_base_seq)) + return 0; + } else { + if (StreamMsgQueueGetMinChunkLen(STREAM_TOSERVER) > + (stream->last_ack - stream->ra_base_seq)) + return 0; + } + } else { + if (stream->ra_base_seq == stream->isn) { + if (StreamMsgQueueGetMinInitChunkLen(STREAM_TOCLIENT) > + (stream->last_ack - stream->ra_base_seq)) + return 0; + } else { + if (StreamMsgQueueGetMinChunkLen(STREAM_TOCLIENT) > + (stream->last_ack - stream->ra_base_seq)) + return 0; + } + } + + for ( ; seg != NULL && SEQ_LT(seg->seq,stream->last_ack); ) { + printf("StreamTcpReassembleHandleSegmentUpdateACK: seg %p\n", seg); + + /* if the segment ends beyond ra_base_seq we need to consider it */ + if (SEQ_GEQ((seg->seq + seg->payload_len),stream->ra_base_seq)) { + /* get a message */ + if (smsg == NULL) { + smsg = StreamMsgGetFromPool(); + if (smsg == NULL) { + printf("StreamTcpReassembleHandleSegmentUpdateACK: couldn't " + "get a stream msg from the pool\n"); + return -1; + } + + smsg_offset = 0; + + if (stream->ra_base_seq == stream->isn) { + StreamTcpSetupInitMsg(p, smsg); + } + smsg->data.data_len = 0; + smsg->flow = p->flow; + } + + /* handle segments partly before ra_base_seq */ if (SEQ_GT(stream->ra_base_seq, seg->seq)) { - u_int16_t offset = stream->ra_base_seq - seg->seq; + payload_offset = stream->ra_base_seq - seg->seq; if (SEQ_LT(stream->last_ack,(seg->seq + seg->payload_len))) { - PrintRawDataFp(stdout, (seg->payload + offset), (stream->last_ack - seg->seq - offset)); + payload_len = ((seg->seq + seg->payload_len) - stream->last_ack) - payload_offset; + printf("StreamTcpReassembleHandleSegmentUpdateACK: starts " + "before ra_base, ends beyond last_ack, payload_offset %u, " + "payload_len %u\n", payload_offset, payload_len); } else { - PrintRawDataFp(stdout, (seg->payload + offset), (seg->payload_len - offset)); - remove = TRUE; + payload_len = seg->payload_len - payload_offset; + printf("StreamTcpReassembleHandleSegmentUpdateACK: starts " + "before ra_base, ends normal, payload_offset %u, " + "payload_len %u\n", payload_offset, payload_len); } + /* handle segments after ra_base_seq */ } else { + payload_offset = 0; + if (SEQ_LT(stream->last_ack,(seg->seq + seg->payload_len))) { - PrintRawDataFp(stdout,seg->payload,(stream->last_ack - seg->seq)); + payload_len = stream->last_ack - seg->seq; + printf("StreamTcpReassembleHandleSegmentUpdateACK: start " + "fine, ends beyond last_ack, payload_offset %u, " + "payload_len %u\n", payload_offset, payload_len); } else { - PrintRawDataFp(stdout,seg->payload,seg->payload_len); - remove = TRUE; + payload_len = seg->payload_len; + printf("StreamTcpReassembleHandleSegmentUpdateACK: normal " + "(smsg_offset %u), payload_offset %u, payload_len %u\n", + smsg_offset, payload_offset, payload_len); } } + + u_int16_t copy_size = sizeof(smsg->data.data) - smsg_offset; + if (copy_size > payload_len) { + copy_size = payload_len; + } + printf("StreamTcpReassembleHandleSegmentUpdateACK: normal -- " + "copy_size %u (payload %u)\n", copy_size, payload_len); + + memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size); + + smsg_offset += copy_size; + stream->ra_base_seq += copy_size; + smsg->data.data_len += copy_size; + + if (copy_size < payload_len) { + printf("StreamTcpReassembleHandleSegmentUpdateACK: " + "copy_size %u < %u\n", copy_size, payload_len); + + StreamMsgPutInQueue(smsg); + smsg = NULL; + payload_offset = copy_size + payload_offset; + printf("StreamTcpReassembleHandleSegmentUpdateACK: " + "payload_offset %u\n", payload_offset); + + /* we need a while loop here as the packets theoretically can be 64k */ + + while (remove == FALSE) { + printf("StreamTcpReassembleHandleSegmentUpdateACK: " + "new msg at offset %u, payload_len %u\n", payload_offset, payload_len); + + /* get a new message */ + smsg = StreamMsgGetFromPool(); + if (smsg == NULL) { + printf("StreamTcpReassembleHandleSegmentUpdateACK: " + "couldn't get a stream msg from the pool (while loop)\n"); + return -1; + } + smsg_offset = 0; + smsg->data.data_len = 0; + smsg->flow = p->flow; + + copy_size = sizeof(smsg->data.data) - smsg_offset; + if (copy_size > (payload_len - payload_offset)) { + copy_size = (payload_len - payload_offset); + } + + printf("StreamTcpReassembleHandleSegmentUpdateACK: copy " + "payload_offset %u, smsg_offset %u, copy_size %u\n", + payload_offset, smsg_offset, copy_size); + + memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size); + smsg_offset += copy_size; + stream->ra_base_seq += copy_size; + smsg->data.data_len += copy_size; + + printf("StreamTcpReassembleHandleSegmentUpdateACK: copied " + "payload_offset %u, smsg_offset %u, copy_size %u\n", + payload_offset, smsg_offset, copy_size); + + if ((copy_size + payload_offset) < payload_len) { + payload_offset += copy_size; + printf("StreamTcpReassembleHandleSegmentUpdateACK: loop not done\n"); + } else { + printf("StreamTcpReassembleHandleSegmentUpdateACK: loop done\n"); + payload_offset = 0; + remove = TRUE; + } + } + } else { + payload_offset = 0; + remove = TRUE; + } } + TcpSegment *next_seg = seg->next; + /* done with this segment, return it to the pool */ if (remove == TRUE) { + printf("StreamTcpReassembleHandleSegmentUpdateACK: removing seg %p, " + "seg->next %p\n", seg, seg->next); stream->seg_list = seg->next; u_int16_t idx = segment_pool_idx[seg->pool_size]; @@ -292,11 +457,16 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea remove = FALSE; } + + seg = next_seg; + } + + /* put the partly filled smsg in the queue to the l7 handler */ + if (smsg != NULL) { + StreamMsgPutInQueue(smsg); + smsg = NULL; } - printf("STREAM END\n"); - /* The base sequence number of the reassembly is updated to last ack */ - stream->ra_base_seq = stream->last_ack; return 0; } @@ -311,3 +481,24 @@ int StreamTcpReassembleHandleSegment (TcpSession *ssn, TcpStream *stream, Packet return 0; } +/* Initialize the l7data ptr in the TCP session used + * by the L7 Modules for data storage. + * + * ssn = TcpSesssion + * cnt = number of items in the array + * + * XXX use a pool? + */ +void StreamL7DataPtrInit(TcpSession *ssn, u_int8_t cnt) { + if (cnt == 0) + return; + + ssn->l7data = (void **)malloc(sizeof(void *) * cnt); + if (ssn->l7data != NULL) { + u_int8_t u; + for (u = 0; u < cnt; u++) { + ssn->l7data[u] = NULL; + } + } +} + diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 60ff9e2074..77b4b9ce7b 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -26,6 +26,8 @@ #include "stream-tcp-private.h" #include "stream-tcp-reassemble.h" +#include "stream.h" + int StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *); int StreamTcpThreadInit(ThreadVars *, void *, void **); int StreamTcpThreadDeinit(ThreadVars *, void *); diff --git a/src/stream.c b/src/stream.c new file mode 100644 index 0000000000..8925f9a32c --- /dev/null +++ b/src/stream.c @@ -0,0 +1,167 @@ +/* Copyright (c) 2009 Victor Julien */ + +#include "eidps.h" +#include "decode.h" +#include "threads.h" + +#include "stream.h" + +#include "util-pool.h" + +static StreamMsgQueue stream_q; +static u_int16_t toserver_min_init_chunk_len = 0; +static u_int16_t toserver_min_chunk_len = 0; +static u_int16_t toclient_min_init_chunk_len = 0; +static u_int16_t toclient_min_chunk_len = 0; + +static Pool *stream_msg_pool = NULL; +static pthread_mutex_t stream_msg_pool_mutex = PTHREAD_MUTEX_INITIALIZER; + +void *StreamMsgAlloc(void *null) { + StreamMsg *s = malloc(sizeof(StreamMsg)); + if (s == NULL) + return NULL; + + memset(s, 0, sizeof(StreamMsg)); + return s; +} + +void StreamMsgFree(void *ptr) { + if (ptr == NULL) + return; + + StreamMsg *s = (StreamMsg *)ptr; + free(s); + return; +} + +static void StreamMsgEnqueue (StreamMsgQueue *q, StreamMsg *s) { + /* more packets in queue */ + if (q->top != NULL) { + s->next = q->top; + q->top->prev = s; + q->top = s; + /* only packet */ + } else { + q->top = s; + q->bot = s; + } + q->len++; +#ifdef DBG_PERF + if (q->len > q->dbg_maxlen) + q->dbg_maxlen = q->len; +#endif /* DBG_PERF */ +} + +static StreamMsg *StreamMsgDequeue (StreamMsgQueue *q) { + /* if the queue is empty there are no packets left. + * In that case we sleep and try again. */ + if (q->len == 0) { + return NULL; + } + + /* pull the bottom packet from the queue */ + StreamMsg *s = q->bot; + + /* more packets in queue */ + if (q->bot->prev != NULL) { + q->bot = q->bot->prev; + q->bot->next = NULL; + /* just the one we remove, so now empty */ + } else { + q->top = NULL; + q->bot = NULL; + } + q->len--; + + s->next = NULL; + s->prev = NULL; + return s; +} + +/* Used by stream reassembler to get msgs */ +StreamMsg *StreamMsgGetFromPool(void) +{ + mutex_lock(&stream_msg_pool_mutex); + StreamMsg *s = (StreamMsg *)PoolGet(stream_msg_pool); + mutex_unlock(&stream_msg_pool_mutex); + return s; +} + +/* Used by l7inspection to return msgs to pool */ +void StreamMsgReturnToPool(StreamMsg *s) { + mutex_lock(&stream_msg_pool_mutex); + PoolReturn(stream_msg_pool, (void *)s); + mutex_unlock(&stream_msg_pool_mutex); +} + +/* Used by l7inspection to get msgs with data */ +StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *q) +{ + mutex_lock(&q->mutex_q); + if (q->len == 0) { + /* if we have no stream msgs in queue, wait... */ + pthread_cond_wait(&q->cond_q, &q->mutex_q); + } + if (q->len > 0) { + StreamMsg *s = StreamMsgDequeue(q); + mutex_unlock(&q->mutex_q); + return s; + } else { + /* return NULL if we have no stream msg. Should only happen on signals. */ + mutex_unlock(&q->mutex_q); + return NULL; + } +} + +/* Used by stream reassembler to fill the queue for l7inspect reading */ +void StreamMsgPutInQueue(StreamMsg *s) +{ + StreamMsgQueue *q = &stream_q; + + mutex_lock(&q->mutex_q); + StreamMsgEnqueue(q, s); + printf("StreamMsgPutInQueue: q->len %u\n", q->len); + pthread_cond_signal(&q->cond_q); + mutex_unlock(&q->mutex_q); +} + +void StreamMsgQueuesInit(void) { + memset(&stream_q, 0, sizeof(stream_q)); + + stream_msg_pool = PoolInit(5000,250,StreamMsgAlloc,NULL,StreamMsgFree); + if (stream_msg_pool == NULL) + exit(1); /* XXX */ +} + +StreamMsgQueue *StreamMsgQueueGetByPort(u_int16_t port) { + /* XXX implement this */ + return &stream_q; +} + +/* XXX hack */ +void StreamMsgSignalQueueHack(void) { + pthread_cond_signal(&stream_q.cond_q); +} + +void StreamMsgQueueSetMinInitChunkLen(u_int8_t dir, u_int16_t len) { + +} + +u_int16_t StreamMsgQueueGetMinInitChunkLen(u_int8_t dir) { + if (dir == FLOW_PKT_TOSERVER) { + return toserver_min_init_chunk_len; + } else { + return toclient_min_init_chunk_len; + } + +} + +u_int16_t StreamMsgQueueGetMinChunkLen(u_int8_t dir) { + if (dir == FLOW_PKT_TOSERVER) { + return toserver_min_chunk_len; + } else { + return toclient_min_chunk_len; + } +} + diff --git a/src/stream.h b/src/stream.h new file mode 100644 index 0000000000..5423f95c81 --- /dev/null +++ b/src/stream.h @@ -0,0 +1,61 @@ +/* API for stream handling */ + +#ifndef __STREAM_H__ +#define __STREAM_H__ + +#include "flow.h" + +#define STREAM_START 0x01 +#define STREAM_EOF 0x02 +#define STREAM_TOSERVER 0x04 +#define STREAM_TOCLIENT 0x08 +#define STREAM_GAP 0x10 + +#define MSG_DATA_SIZE 512 + +typedef struct _StreamMsg { + u_int32_t id; /* unique stream id */ + u_int8_t flags; /* msg flags */ + Flow *flow; /* parent flow */ + + union { + /* case STREAM_START */ + struct { + Address src_ip, dst_ip; + Port src_port, dst_port; + u_int8_t data[MSG_DATA_SIZE]; + u_int16_t data_len; + } data; + /* case STREAM_GAP */ + struct { + u_int32_t gap_size; + } gap; + }; + + struct _StreamMsg *next; + struct _StreamMsg *prev; +} StreamMsg; + +typedef struct _StreamMsgQueue { + StreamMsg *top; + StreamMsg *bot; + u_int16_t len; + pthread_mutex_t mutex_q; + pthread_cond_t cond_q; +#ifdef DBG_PERF + u_int16_t dbg_maxlen; +#endif /* DBG_PERF */ +} StreamMsgQueue; + +/* prototypes */ +void StreamMsgQueuesInit(void); + +StreamMsg *StreamMsgGetFromPool(void); +void StreamMsgReturnToPool(StreamMsg *); +StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *); +void StreamMsgPutInQueue(StreamMsg *); + +StreamMsgQueue *StreamMsgQueueGetByPort(u_int16_t); + +#endif /* __STREAM_H__ */ + diff --git a/src/tm-threads.c b/src/tm-threads.c index cc91ffb285..a2833757c3 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -863,6 +863,9 @@ void TmThreadKillThreads(void) { t->flags |= THV_KILL; printf("TmThreadKillThreads: told thread %s to stop\n", t->name); + /* XXX hack */ + StreamMsgSignalQueueHack(); + if (t->inq != NULL) { int i;