Updates & cleanups to stream & l7 stuff

remotes/origin/master-1.0.x
Victor Julien 15 years ago
parent 76581ef967
commit c957dc7775

@ -1,3 +1,5 @@
/* Copyright (c) 2009 Victor Julien */
#include "eidps.h"
#include "debug.h"
#include "decode.h"
@ -16,12 +18,9 @@
#define PROTO_FTP 2
#define PROTO_SMTP 3
#define TYPE_PROTO 0
#define TYPE_BUF 1
static u_int8_t l7_proto_id = 0;
/* XXX type can be 1 bit, 7 bit for proto */
typedef struct _L7AppDetectDataProto {
u_int8_t type;
u_int8_t proto;
} L7AppDetectDataProto;
@ -33,15 +32,14 @@ void *L7AppDetectProtoAlloc(void *null) {
return NULL;
}
d->type = TYPE_PROTO;
d->proto = PROTO_UNKNOWN;
return d;
}
#define L7AppDetectProtoFree free
void L7AppDetectThreadInit(void) {
/* allocate 2 pools, 1 for proto objects, 1 for bufs. Normal stream will
* jump straigth to protos so we alloc a lot less bufs */
l7_proto_id = StreamL7RegisterModule();
l7appdetect_proto_pool = PoolInit(262144, 32768, L7AppDetectProtoAlloc, NULL, L7AppDetectProtoFree);
if (l7appdetect_proto_pool == NULL) {
exit(1);
@ -61,17 +59,17 @@ void *L7AppDetectThread(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
char run = TRUE;
u_int8_t l7_data_id = 0;
/* get the stream msg queue for this thread */
StreamMsgQueue *stream_q = StreamMsgQueueGetByPort(0);
StreamMsgQueueSetMinInitChunkLen(stream_q, INSPECT_BYTES);
/* set the minimum size we expect */
StreamMsgQueueSetMinInitChunkLen(stream_q, FLOW_PKT_TOSERVER, INSPECT_BYTES);
StreamMsgQueueSetMinInitChunkLen(stream_q, FLOW_PKT_TOCLIENT, INSPECT_BYTES);
/* main loop */
while(run) {
/* grab a msg, can return NULL on signals */
StreamMsg *smsg = StreamMsgGetFromQueue(stream_q);
//printf("L7AppDetectThread: smsg %p\n", smsg);
if (smsg != NULL) {
/* keep the flow locked during operation.
* XXX we may be better off adding a mutex
@ -81,10 +79,11 @@ void *L7AppDetectThread(void *td)
TcpSession *ssn = smsg->flow->stream;
if (ssn != NULL) {
if (ssn->l7data == NULL) {
StreamL7DataPtrInit(ssn,1); /* XXX we can use a pool here,
or make it part of the stream setup */
/* XXX we can use a pool here,
or make it part of the stream setup */
StreamL7DataPtrInit(ssn,StreamL7GetStorageSize());
}
void *l7_data_ptr = ssn->l7data[l7_data_id];
void *l7_data_ptr = ssn->l7data[l7_proto_id];
if (smsg->flags & STREAM_START) {
//printf("L7AppDetectThread: stream initializer (len %u (%u))\n", smsg->init.data_len, MSG_INIT_DATA_SIZE);
@ -96,15 +95,19 @@ void *L7AppDetectThread(void *td)
if (l7_data_ptr == NULL) {
L7AppDetectDataProto *l7proto = (L7AppDetectDataProto *)PoolGet(l7appdetect_proto_pool);
if (l7proto != NULL) {
l7proto->type = TYPE_PROTO;
l7proto->proto = L7AppDetectGetProto(smsg->data.data, smsg->data.data_len);
ssn->l7data[l7_data_id] = (void *)l7proto;
/* store */
ssn->l7data[l7_proto_id] = (void *)l7proto;
}
}
} else {
//printf("L7AppDetectThread: stream data (len %u (%u))\n", smsg->data.data_len, MSG_DATA_SIZE);
//printf("=> Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
//printf("=> Stream Data -- end\n");
/* if we don't have a data object here we are not getting it
* a start msg should have gotten us one */
if (l7_data_ptr != NULL) {
@ -113,13 +116,12 @@ void *L7AppDetectThread(void *td)
} else {
printf("L7AppDetectThread: smsg not start, but no l7 data? Weird\n");
}
//printf("=> Stream Data -- start\n");
//PrintRawDataFp(stdout, smsg->data.data, smsg->data.data_len);
//printf("=> Stream Data -- end\n");
}
mutex_unlock(&smsg->flow->m);
}
/* return the used message to the queue */
StreamMsgReturnToPool(smsg);
}

@ -290,7 +290,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
u_int16_t payload_len = 0;
TcpSegment *seg = stream->seg_list;
/* check if we have enough data to send to l7 */
/* check if we have enough data to send to L7 */
if (p->flowflags & FLOW_PKT_TOSERVER) {
if (stream->ra_base_seq == stream->isn) {
if (StreamMsgQueueGetMinInitChunkLen(STREAM_TOSERVER) >
@ -313,6 +313,7 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
}
}
/* loop through the segments and fill one or more msgs */
for ( ; seg != NULL && SEQ_LT(seg->seq,stream->last_ack); ) {
printf("StreamTcpReassembleHandleSegmentUpdateACK: seg %p\n", seg);
@ -342,14 +343,14 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
if (SEQ_LT(stream->last_ack,(seg->seq + seg->payload_len))) {
payload_len = ((seg->seq + seg->payload_len) - stream->last_ack) - payload_offset;
printf("StreamTcpReassembleHandleSegmentUpdateACK: starts "
"before ra_base, ends beyond last_ack, payload_offset %u, "
"payload_len %u\n", payload_offset, payload_len);
//printf("StreamTcpReassembleHandleSegmentUpdateACK: starts "
// "before ra_base, ends beyond last_ack, payload_offset %u, "
// "payload_len %u\n", payload_offset, payload_len);
} else {
payload_len = seg->payload_len - payload_offset;
printf("StreamTcpReassembleHandleSegmentUpdateACK: starts "
"before ra_base, ends normal, payload_offset %u, "
"payload_len %u\n", payload_offset, payload_len);
//printf("StreamTcpReassembleHandleSegmentUpdateACK: starts "
// "before ra_base, ends normal, payload_offset %u, "
// "payload_len %u\n", payload_offset, payload_len);
}
/* handle segments after ra_base_seq */
} else {
@ -357,14 +358,14 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
if (SEQ_LT(stream->last_ack,(seg->seq + seg->payload_len))) {
payload_len = stream->last_ack - seg->seq;
printf("StreamTcpReassembleHandleSegmentUpdateACK: start "
"fine, ends beyond last_ack, payload_offset %u, "
"payload_len %u\n", payload_offset, payload_len);
//printf("StreamTcpReassembleHandleSegmentUpdateACK: start "
// "fine, ends beyond last_ack, payload_offset %u, "
// "payload_len %u\n", payload_offset, payload_len);
} else {
payload_len = seg->payload_len;
printf("StreamTcpReassembleHandleSegmentUpdateACK: normal "
"(smsg_offset %u), payload_offset %u, payload_len %u\n",
smsg_offset, payload_offset, payload_len);
//printf("StreamTcpReassembleHandleSegmentUpdateACK: normal "
// "(smsg_offset %u), payload_offset %u, payload_len %u\n",
// smsg_offset, payload_offset, payload_len);
}
}
@ -372,8 +373,8 @@ int StreamTcpReassembleHandleSegmentUpdateACK (TcpSession *ssn, TcpStream *strea
if (copy_size > payload_len) {
copy_size = payload_len;
}
printf("StreamTcpReassembleHandleSegmentUpdateACK: normal -- "
"copy_size %u (payload %u)\n", copy_size, payload_len);
printf("StreamTcpReassembleHandleSegmentUpdateACK: copy_size %u "
"(payload %u)\n", copy_size, payload_len);
memcpy(smsg->data.data + smsg_offset, seg->payload + payload_offset, copy_size);

@ -9,6 +9,7 @@
#include "util-pool.h"
static StreamMsgQueue stream_q;
/* per queue setting */
static u_int16_t toserver_min_init_chunk_len = 0;
static u_int16_t toserver_min_chunk_len = 0;
static u_int16_t toclient_min_init_chunk_len = 0;
@ -144,8 +145,12 @@ void StreamMsgSignalQueueHack(void) {
pthread_cond_signal(&stream_q.cond_q);
}
void StreamMsgQueueSetMinInitChunkLen(u_int8_t dir, u_int16_t len) {
void StreamMsgQueueSetMinInitChunkLen(StreamMsgQueue *q, u_int8_t dir, u_int16_t len) {
if (dir == FLOW_PKT_TOSERVER) {
toserver_min_init_chunk_len = len;
} else {
toclient_min_init_chunk_len = len;
}
}
u_int16_t StreamMsgQueueGetMinInitChunkLen(u_int8_t dir) {
@ -154,7 +159,6 @@ u_int16_t StreamMsgQueueGetMinInitChunkLen(u_int8_t dir) {
} else {
return toclient_min_init_chunk_len;
}
}
u_int16_t StreamMsgQueueGetMinChunkLen(u_int8_t dir) {
@ -165,3 +169,16 @@ u_int16_t StreamMsgQueueGetMinChunkLen(u_int8_t dir) {
}
}
/* StreamL7RegisterModule
*/
static u_int8_t l7_module_id = 0;
u_int8_t StreamL7RegisterModule(void) {
u_int8_t id = l7_module_id;
l7_module_id++;
return id;
}
u_int8_t StreamL7GetStorageSize(void) {
return l7_module_id;
}

@ -57,5 +57,12 @@ void StreamMsgPutInQueue(StreamMsg *);
StreamMsgQueue *StreamMsgQueueGetByPort(u_int16_t);
void StreamMsgQueueSetMinInitChunkLen(StreamMsgQueue *q, u_int8_t, u_int16_t);
u_int16_t StreamMsgQueueGetMinInitChunkLen(u_int8_t);
u_int16_t StreamMsgQueueGetMinChunkLen(u_int8_t);
u_int8_t StreamL7RegisterModule(void);
u_int8_t StreamL7GetStorageSize(void);
#endif /* __STREAM_H__ */

Loading…
Cancel
Save