Stream reassembly update and WIP code for L7 modules.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent 2c8df73d24
commit 1c2240cfeb

@ -87,10 +87,12 @@ alert-debuglog.c alert-debuglog.h \
log-httplog.c log-httplog.h \
alert-unified-log.c alert-unified-log.h \
alert-unified-alert.c alert-unified-alert.h \
stream.c stream.h \
stream-tcp.c stream-tcp.h stream-tcp-private.h \
stream-tcp-reassemble.c stream-tcp-reassemble.h \
respond-reject.c respond-reject.h \
respond-reject-libnet11.h respond-reject-libnet11.c
respond-reject-libnet11.h respond-reject-libnet11.c \
l7-app-detect.c l7-app-detect.h
# set the include path found by configure
INCLUDES= $(all_includes)

@ -774,6 +774,9 @@ int main(int argc, char **argv)
SigParsePrepare();
PatternMatchPrepare(mpm_ctx);
/* XXX we need an api for this */
L7AppDetectThreadInit();
TmModuleReceiveNFQRegister();
TmModuleVerdictNFQRegister();
TmModuleDecodeNFQRegister();
@ -863,6 +866,19 @@ int main(int argc, char **argv)
}
TmThreadAppend(&tv_flowmgr);
#include "l7-app-detect.h"
ThreadVars tv_l7appdetect;
memset(&tv_l7appdetect, 0, sizeof(ThreadVars));
printf("Creating L7 Application layer detect thread (WIP)...\n");
tv_flowmgr.name = "L7AppDetectThread";
rc = pthread_create(&tv_l7appdetect.t, &attr, L7AppDetectThread, (void *)&tv_l7appdetect);
if (rc) {
printf("ERROR; return code from pthread_create() is %d\n", rc);
exit(1);
}
TmThreadAppend(&tv_l7appdetect);
while(1) {
if (sigflags) {
printf("signal received\n");

@ -0,0 +1,132 @@
#include "eidps.h"
#include "debug.h"
#include "decode.h"
#include "threads.h"
#include "util-print.h"
#include "util-pool.h"
#include "stream-tcp-private.h"
#include "stream.h"
#define INSPECT_BYTES 64
#define PROTO_UNKNOWN 0
#define PROTO_HTTP 1
#define PROTO_FTP 2
#define PROTO_SMTP 3
#define TYPE_PROTO 0
#define TYPE_BUF 1
/* XXX type can be 1 bit, 7 bit for proto */
typedef struct _L7AppDetectDataProto {
u_int8_t type;
u_int8_t proto;
} L7AppDetectDataProto;
static Pool *l7appdetect_proto_pool = NULL;
void *L7AppDetectProtoAlloc(void *null) {
L7AppDetectDataProto *d = malloc(sizeof(L7AppDetectDataProto));
if (d == NULL) {
return NULL;
}
d->type = TYPE_PROTO;
d->proto = PROTO_UNKNOWN;
return d;
}
#define L7AppDetectProtoFree free
void L7AppDetectThreadInit(void) {
/* allocate 2 pools, 1 for proto objects, 1 for bufs. Normal stream will
* jump straigth to protos so we alloc a lot less bufs */
l7appdetect_proto_pool = PoolInit(262144, 32768, L7AppDetectProtoAlloc, NULL, L7AppDetectProtoFree);
if (l7appdetect_proto_pool == NULL) {
exit(1);
}
}
u_int8_t L7AppDetectGetProto(u_int8_t *buf, u_int16_t buflen) {
if (buflen < INSPECT_BYTES)
return PROTO_UNKNOWN;
/* XXX do actual detect */
printf("L7AppDetectGetProto: protocol detection goes here.\n");
return PROTO_HTTP;
}
void *L7AppDetectThread(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
char run = TRUE;
u_int8_t l7_data_id = 0;
/* get the stream msg queue for this thread */
StreamMsgQueue *stream_q = StreamMsgQueueGetByPort(0);
StreamMsgQueueSetMinInitChunkLen(stream_q, INSPECT_BYTES);
while(run) {
/* grab a msg, can return NULL on signals */
StreamMsg *smsg = StreamMsgGetFromQueue(stream_q);
//printf("L7AppDetectThread: smsg %p\n", smsg);
if (smsg != NULL) {
/* keep the flow locked during operation.
* XXX we may be better off adding a mutex
* to the l7data object */
mutex_lock(&smsg->flow->m);
TcpSession *ssn = smsg->flow->stream;
if (ssn != NULL) {
if (ssn->l7data == NULL) {
StreamL7DataPtrInit(ssn,1); /* XXX we can use a pool here,
or make it part of the stream setup */
}
void *l7_data_ptr = ssn->l7data[l7_data_id];
if (smsg->flags & STREAM_START) {
//printf("L7AppDetectThread: stream initializer (len %u (%u))\n", smsg->init.data_len, MSG_INIT_DATA_SIZE);
//printf("=> Init Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len);
//printf("=> Init Stream Data -- end\n");
if (l7_data_ptr == NULL) {
L7AppDetectDataProto *l7proto = (L7AppDetectDataProto *)PoolGet(l7appdetect_proto_pool);
if (l7proto != NULL) {
l7proto->type = TYPE_PROTO;
l7proto->proto = L7AppDetectGetProto(smsg->data.data, smsg->data.data_len);
ssn->l7data[l7_data_id] = (void *)l7proto;
}
}
} else {
//printf("L7AppDetectThread: stream data (len %u (%u))\n", smsg->data.data_len, MSG_DATA_SIZE);
/* if we don't have a data object here we are not getting it
* a start msg should have gotten us one */
if (l7_data_ptr != NULL) {
L7AppDetectDataProto *l7proto = (L7AppDetectDataProto *)l7_data_ptr;
printf("L7AppDetectThread: already established that the proto is %u\n", l7proto->proto);
} else {
printf("L7AppDetectThread: smsg not start, but no l7 data? Weird\n");
}
//printf("=> Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
//printf("=> Stream Data -- end\n");
}
mutex_unlock(&smsg->flow->m);
}
StreamMsgReturnToPool(smsg);
}
if (tv->flags & THV_KILL)
run = 0;
}
pthread_exit((void *) 0);
}

@ -0,0 +1,6 @@
#ifndef __L7_APP_DETECT_H__
#define __L7_APP_DETECT_H__
void *L7AppDetectThread(void *td);
#endif /* __L7_APP_DETECT_H__ */

@ -51,6 +51,7 @@ typedef struct _TcpSession {
u_int8_t state;
TcpStream server;
TcpStream client;
void **l7data;
} TcpSession;
#endif /* __STREAM_TCP_PRIVATE_H__ */

@ -31,6 +31,8 @@
#include "stream-tcp-private.h"
#include "stream.h"
void *TcpSegmentAlloc(void *payload_len) {
TcpSegment *seg = malloc(sizeof(TcpSegment));
if (seg == NULL)
@ -47,7 +49,6 @@ void *TcpSegmentAlloc(void *payload_len) {
return NULL;
}
return seg;
}
@ -74,6 +75,8 @@ static pthread_mutex_t segment_pool_mutex[segment_pool_num];
static u_int16_t segment_pool_idx[65536]; /* O(1) lookups of the pool */
int StreamTcpReassembleInit(void) {
StreamMsgQueuesInit();
u_int16_t u16 = 0;
for (u16 = 0; u16 < segment_pool_num; u16++) {
segment_pool[u16] = PoolInit(segment_pool_poolsizes[u16], segment_pool_poolsizes[u16]/2, TcpSegmentAlloc, (void *)&segment_pool_pktsizes[u16], TcpSegmentFree);
@ -247,41 +250,203 @@ int StreamTcpReassembleHandleSegmentHandleData (TcpSession *ssn, TcpStream *stre
memcpy(seg->payload, p->payload, p->payload_len);
seg->payload_len = p->payload_len;
seg->seq = TCP_GET_SEQ(p);
seg->next = NULL;
ReassembleInsertSegment(stream, seg);
return 0;
}
/* initialize the first msg */
static void StreamTcpSetupInitMsg(Packet *p, StreamMsg *smsg) {
smsg->flags |= STREAM_START;
if (p->flowflags & FLOW_PKT_TOSERVER) {
COPY_ADDRESS(&p->flow->src,&smsg->data.src_ip);
COPY_ADDRESS(&p->flow->dst,&smsg->data.dst_ip);
COPY_PORT(p->flow->sp,smsg->data.src_port);
COPY_PORT(p->flow->dp,smsg->data.dst_port);
smsg->flags |= STREAM_TOSERVER;
} else {
COPY_ADDRESS(&p->flow->dst,&smsg->data.src_ip);
COPY_ADDRESS(&p->flow->src,&smsg->data.dst_ip);
COPY_PORT(p->flow->dp,smsg->data.src_port);
COPY_PORT(p->flow->sp,smsg->data.dst_port);
smsg->flags |= STREAM_TOCLIENT;
}
}
int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *stream, Packet *p) {
if (stream->seg_list == NULL)
return 0;
printf("StreamTcpReassembleHandleSegmentUpdateACK: start\n");
StreamMsg *smsg = NULL;
char remove = FALSE;
u_int16_t smsg_offset = 0;
u_int16_t payload_offset = 0;
u_int16_t payload_len = 0;
TcpSegment *seg = stream->seg_list;
printf("STREAM START\n");
for ( ; seg != NULL && SEQ_LT(seg->seq,stream->last_ack); seg = seg->next) {
if (SEQ_GEQ(seg->seq,stream->ra_base_seq)) {
/* check if we have enough data to send to l7 */
if (p->flowflags & FLOW_PKT_TOSERVER) {
if (stream->ra_base_seq == stream->isn) {
if (StreamMsgQueueGetMinInitChunkLen(STREAM_TOSERVER) >
(stream->last_ack - stream->ra_base_seq))
return 0;
} else {
if (StreamMsgQueueGetMinChunkLen(STREAM_TOSERVER) >
(stream->last_ack - stream->ra_base_seq))
return 0;
}
} else {
if (stream->ra_base_seq == stream->isn) {
if (StreamMsgQueueGetMinInitChunkLen(STREAM_TOCLIENT) >
(stream->last_ack - stream->ra_base_seq))
return 0;
} else {
if (StreamMsgQueueGetMinChunkLen(STREAM_TOCLIENT) >
(stream->last_ack - stream->ra_base_seq))
return 0;
}
}
for ( ; seg != NULL && SEQ_LT(seg->seq,stream->last_ack); ) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: seg %p\n", seg);
/* if the segment ends beyond ra_base_seq we need to consider it */
if (SEQ_GEQ((seg->seq + seg->payload_len),stream->ra_base_seq)) {
/* get a message */
if (smsg == NULL) {
smsg = StreamMsgGetFromPool();
if (smsg == NULL) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: couldn't "
"get a stream msg from the pool\n");
return -1;
}
smsg_offset = 0;
if (stream->ra_base_seq == stream->isn) {
StreamTcpSetupInitMsg(p, smsg);
}
smsg->data.data_len = 0;
smsg->flow = p->flow;
}
/* handle segments partly before ra_base_seq */
if (SEQ_GT(stream->ra_base_seq, seg->seq)) {
u_int16_t offset = stream->ra_base_seq - seg->seq;
payload_offset = stream->ra_base_seq - seg->seq;
if (SEQ_LT(stream->last_ack,(seg->seq + seg->payload_len))) {
PrintRawDataFp(stdout, (seg->payload + offset), (stream->last_ack - seg->seq - offset));
payload_len = ((seg->seq + seg->payload_len) - stream->last_ack) - payload_offset;
printf("StreamTcpReassembleHandleSegmentUpdateACK: starts "
"before ra_base, ends beyond last_ack, payload_offset %u, "
"payload_len %u\n", payload_offset, payload_len);
} else {
PrintRawDataFp(stdout, (seg->payload + offset), (seg->payload_len - offset));
remove = TRUE;
payload_len = seg->payload_len - payload_offset;
printf("StreamTcpReassembleHandleSegmentUpdateACK: starts "
"before ra_base, ends normal, payload_offset %u, "
"payload_len %u\n", payload_offset, payload_len);
}
/* handle segments after ra_base_seq */
} else {
payload_offset = 0;
if (SEQ_LT(stream->last_ack,(seg->seq + seg->payload_len))) {
PrintRawDataFp(stdout,seg->payload,(stream->last_ack - seg->seq));
payload_len = stream->last_ack - seg->seq;
printf("StreamTcpReassembleHandleSegmentUpdateACK: start "
"fine, ends beyond last_ack, payload_offset %u, "
"payload_len %u\n", payload_offset, payload_len);
} else {
PrintRawDataFp(stdout,seg->payload,seg->payload_len);
remove = TRUE;
payload_len = seg->payload_len;
printf("StreamTcpReassembleHandleSegmentUpdateACK: normal "
"(smsg_offset %u), payload_offset %u, payload_len %u\n",
smsg_offset, payload_offset, payload_len);
}
}
u_int16_t copy_size = sizeof(smsg->data.data) - smsg_offset;
if (copy_size > payload_len) {
copy_size = payload_len;
}
printf("StreamTcpReassembleHandleSegmentUpdateACK: normal -- "
"copy_size %u (payload %u)\n", copy_size, payload_len);
memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size);
smsg_offset += copy_size;
stream->ra_base_seq += copy_size;
smsg->data.data_len += copy_size;
if (copy_size < payload_len) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: "
"copy_size %u < %u\n", copy_size, payload_len);
StreamMsgPutInQueue(smsg);
smsg = NULL;
payload_offset = copy_size + payload_offset;
printf("StreamTcpReassembleHandleSegmentUpdateACK: "
"payload_offset %u\n", payload_offset);
/* we need a while loop here as the packets theoretically can be 64k */
while (remove == FALSE) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: "
"new msg at offset %u, payload_len %u\n", payload_offset, payload_len);
/* get a new message */
smsg = StreamMsgGetFromPool();
if (smsg == NULL) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: "
"couldn't get a stream msg from the pool (while loop)\n");
return -1;
}
smsg_offset = 0;
smsg->data.data_len = 0;
smsg->flow = p->flow;
copy_size = sizeof(smsg->data.data) - smsg_offset;
if (copy_size > (payload_len - payload_offset)) {
copy_size = (payload_len - payload_offset);
}
printf("StreamTcpReassembleHandleSegmentUpdateACK: copy "
"payload_offset %u, smsg_offset %u, copy_size %u\n",
payload_offset, smsg_offset, copy_size);
memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size);
smsg_offset += copy_size;
stream->ra_base_seq += copy_size;
smsg->data.data_len += copy_size;
printf("StreamTcpReassembleHandleSegmentUpdateACK: copied "
"payload_offset %u, smsg_offset %u, copy_size %u\n",
payload_offset, smsg_offset, copy_size);
if ((copy_size + payload_offset) < payload_len) {
payload_offset += copy_size;
printf("StreamTcpReassembleHandleSegmentUpdateACK: loop not done\n");
} else {
printf("StreamTcpReassembleHandleSegmentUpdateACK: loop done\n");
payload_offset = 0;
remove = TRUE;
}
}
} else {
payload_offset = 0;
remove = TRUE;
}
}
TcpSegment *next_seg = seg->next;
/* done with this segment, return it to the pool */
if (remove == TRUE) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: removing seg %p, "
"seg->next %p\n", seg, seg->next);
stream->seg_list = seg->next;
u_int16_t idx = segment_pool_idx[seg->pool_size];
@ -292,11 +457,16 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
remove = FALSE;
}
seg = next_seg;
}
/* put the partly filled smsg in the queue to the l7 handler */
if (smsg != NULL) {
StreamMsgPutInQueue(smsg);
smsg = NULL;
}
printf("STREAM END\n");
/* The base sequence number of the reassembly is updated to last ack */
stream->ra_base_seq = stream->last_ack;
return 0;
}
@ -311,3 +481,24 @@ int StreamTcpReassembleHandleSegment (TcpSession *ssn, TcpStream *stream, Packet
return 0;
}
/* Initialize the l7data ptr in the TCP session used
* by the L7 Modules for data storage.
*
* ssn = TcpSesssion
* cnt = number of items in the array
*
* XXX use a pool?
*/
void StreamL7DataPtrInit(TcpSession *ssn, u_int8_t cnt) {
if (cnt == 0)
return;
ssn->l7data = (void **)malloc(sizeof(void *) * cnt);
if (ssn->l7data != NULL) {
u_int8_t u;
for (u = 0; u < cnt; u++) {
ssn->l7data[u] = NULL;
}
}
}

@ -26,6 +26,8 @@
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
#include "stream.h"
int StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *);
int StreamTcpThreadInit(ThreadVars *, void *, void **);
int StreamTcpThreadDeinit(ThreadVars *, void *);

@ -0,0 +1,167 @@
/* Copyright (c) 2009 Victor Julien <victor@inliniac.net> */
#include "eidps.h"
#include "decode.h"
#include "threads.h"
#include "stream.h"
#include "util-pool.h"
static StreamMsgQueue stream_q;
static u_int16_t toserver_min_init_chunk_len = 0;
static u_int16_t toserver_min_chunk_len = 0;
static u_int16_t toclient_min_init_chunk_len = 0;
static u_int16_t toclient_min_chunk_len = 0;
static Pool *stream_msg_pool = NULL;
static pthread_mutex_t stream_msg_pool_mutex = PTHREAD_MUTEX_INITIALIZER;
void *StreamMsgAlloc(void *null) {
StreamMsg *s = malloc(sizeof(StreamMsg));
if (s == NULL)
return NULL;
memset(s, 0, sizeof(StreamMsg));
return s;
}
void StreamMsgFree(void *ptr) {
if (ptr == NULL)
return;
StreamMsg *s = (StreamMsg *)ptr;
free(s);
return;
}
static void StreamMsgEnqueue (StreamMsgQueue *q, StreamMsg *s) {
/* more packets in queue */
if (q->top != NULL) {
s->next = q->top;
q->top->prev = s;
q->top = s;
/* only packet */
} else {
q->top = s;
q->bot = s;
}
q->len++;
#ifdef DBG_PERF
if (q->len > q->dbg_maxlen)
q->dbg_maxlen = q->len;
#endif /* DBG_PERF */
}
static StreamMsg *StreamMsgDequeue (StreamMsgQueue *q) {
/* if the queue is empty there are no packets left.
* In that case we sleep and try again. */
if (q->len == 0) {
return NULL;
}
/* pull the bottom packet from the queue */
StreamMsg *s = q->bot;
/* more packets in queue */
if (q->bot->prev != NULL) {
q->bot = q->bot->prev;
q->bot->next = NULL;
/* just the one we remove, so now empty */
} else {
q->top = NULL;
q->bot = NULL;
}
q->len--;
s->next = NULL;
s->prev = NULL;
return s;
}
/* Used by stream reassembler to get msgs */
StreamMsg *StreamMsgGetFromPool(void)
{
mutex_lock(&stream_msg_pool_mutex);
StreamMsg *s = (StreamMsg *)PoolGet(stream_msg_pool);
mutex_unlock(&stream_msg_pool_mutex);
return s;
}
/* Used by l7inspection to return msgs to pool */
void StreamMsgReturnToPool(StreamMsg *s) {
mutex_lock(&stream_msg_pool_mutex);
PoolReturn(stream_msg_pool, (void *)s);
mutex_unlock(&stream_msg_pool_mutex);
}
/* Used by l7inspection to get msgs with data */
StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *q)
{
mutex_lock(&q->mutex_q);
if (q->len == 0) {
/* if we have no stream msgs in queue, wait... */
pthread_cond_wait(&q->cond_q, &q->mutex_q);
}
if (q->len > 0) {
StreamMsg *s = StreamMsgDequeue(q);
mutex_unlock(&q->mutex_q);
return s;
} else {
/* return NULL if we have no stream msg. Should only happen on signals. */
mutex_unlock(&q->mutex_q);
return NULL;
}
}
/* Used by stream reassembler to fill the queue for l7inspect reading */
void StreamMsgPutInQueue(StreamMsg *s)
{
StreamMsgQueue *q = &stream_q;
mutex_lock(&q->mutex_q);
StreamMsgEnqueue(q, s);
printf("StreamMsgPutInQueue: q->len %u\n", q->len);
pthread_cond_signal(&q->cond_q);
mutex_unlock(&q->mutex_q);
}
void StreamMsgQueuesInit(void) {
memset(&stream_q, 0, sizeof(stream_q));
stream_msg_pool = PoolInit(5000,250,StreamMsgAlloc,NULL,StreamMsgFree);
if (stream_msg_pool == NULL)
exit(1); /* XXX */
}
StreamMsgQueue *StreamMsgQueueGetByPort(u_int16_t port) {
/* XXX implement this */
return &stream_q;
}
/* XXX hack */
void StreamMsgSignalQueueHack(void) {
pthread_cond_signal(&stream_q.cond_q);
}
void StreamMsgQueueSetMinInitChunkLen(u_int8_t dir, u_int16_t len) {
}
u_int16_t StreamMsgQueueGetMinInitChunkLen(u_int8_t dir) {
if (dir == FLOW_PKT_TOSERVER) {
return toserver_min_init_chunk_len;
} else {
return toclient_min_init_chunk_len;
}
}
u_int16_t StreamMsgQueueGetMinChunkLen(u_int8_t dir) {
if (dir == FLOW_PKT_TOSERVER) {
return toserver_min_chunk_len;
} else {
return toclient_min_chunk_len;
}
}

@ -0,0 +1,61 @@
/* API for stream handling */
#ifndef __STREAM_H__
#define __STREAM_H__
#include "flow.h"
#define STREAM_START 0x01
#define STREAM_EOF 0x02
#define STREAM_TOSERVER 0x04
#define STREAM_TOCLIENT 0x08
#define STREAM_GAP 0x10
#define MSG_DATA_SIZE 512
typedef struct _StreamMsg {
u_int32_t id; /* unique stream id */
u_int8_t flags; /* msg flags */
Flow *flow; /* parent flow */
union {
/* case STREAM_START */
struct {
Address src_ip, dst_ip;
Port src_port, dst_port;
u_int8_t data[MSG_DATA_SIZE];
u_int16_t data_len;
} data;
/* case STREAM_GAP */
struct {
u_int32_t gap_size;
} gap;
};
struct _StreamMsg *next;
struct _StreamMsg *prev;
} StreamMsg;
typedef struct _StreamMsgQueue {
StreamMsg *top;
StreamMsg *bot;
u_int16_t len;
pthread_mutex_t mutex_q;
pthread_cond_t cond_q;
#ifdef DBG_PERF
u_int16_t dbg_maxlen;
#endif /* DBG_PERF */
} StreamMsgQueue;
/* prototypes */
void StreamMsgQueuesInit(void);
StreamMsg *StreamMsgGetFromPool(void);
void StreamMsgReturnToPool(StreamMsg *);
StreamMsg *StreamMsgGetFromQueue(StreamMsgQueue *);
void StreamMsgPutInQueue(StreamMsg *);
StreamMsgQueue *StreamMsgQueueGetByPort(u_int16_t);
#endif /* __STREAM_H__ */

@ -863,6 +863,9 @@ void TmThreadKillThreads(void) {
t->flags |= THV_KILL;
printf("TmThreadKillThreads: told thread %s to stop\n", t->name);
/* XXX hack */
StreamMsgSignalQueueHack();
if (t->inq != NULL) {
int i;

Loading…
Cancel
Save