stream: remove per thread queue for stream msgs

StreamMsgs would be stored in a per thread queue before being
attached to the tcp ssn. This is unnecessary, so this patch
removes this queue and puts the smsgs into the ssn directly.

Large patch as it affects a lot of tests.
pull/759/head
Victor Julien 12 years ago
parent b159c1714c
commit 261881fce2

@ -352,80 +352,17 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
}
/**
* \brief Attach a stream message to the TCP session for inspection
* in the detection engine.
* \brief Handle a app layer UDP message
*
* If the protocol is yet unknown, the proto detection code is run first.
*
* \param dp_ctx Thread app layer detect context
* \param smsg Stream message
* \param f unlocked flow
* \param p UDP packet
*
* \retval 0 ok
* \retval -1 error
*/
int AppLayerHandleTCPMsg(StreamMsg *smsg, TcpSession *ssn)
{
SCEnter();
StreamMsg *cur;
#ifdef PRINT
printf("=> Stream Data (raw reassembly) -- start %s%s\n",
smsg->flags & STREAM_TOCLIENT ? "toclient" : "",
smsg->flags & STREAM_TOSERVER ? "toserver" : "");
PrintRawDataFp(stdout, smsg->data, smsg->data_len);
printf("=> Stream Data -- end\n");
#endif
SCLogDebug("smsg %p", smsg);
if (ssn != NULL) {
SCLogDebug("storing smsg %p in the tcp session", smsg);
/* store the smsg in the tcp stream */
if (smsg->flags & STREAM_TOSERVER) {
SCLogDebug("storing smsg in the to_server");
/* put the smsg in the stream list */
if (ssn->toserver_smsg_head == NULL) {
ssn->toserver_smsg_head = smsg;
ssn->toserver_smsg_tail = smsg;
smsg->next = NULL;
smsg->prev = NULL;
} else {
cur = ssn->toserver_smsg_tail;
cur->next = smsg;
smsg->prev = cur;
smsg->next = NULL;
ssn->toserver_smsg_tail = smsg;
}
} else {
SCLogDebug("storing smsg in the to_client");
/* put the smsg in the stream list */
if (ssn->toclient_smsg_head == NULL) {
ssn->toclient_smsg_head = smsg;
ssn->toclient_smsg_tail = smsg;
smsg->next = NULL;
smsg->prev = NULL;
} else {
cur = ssn->toclient_smsg_tail;
cur->next = smsg;
smsg->prev = cur;
smsg->next = NULL;
ssn->toclient_smsg_tail = smsg;
}
}
} else { /* no ssn ptr */
/* if there is no ssn ptr we won't
* be inspecting this msg in detect
* so return it to the pool. */
/* return the used message to the queue */
StreamMsgReturnToPool(smsg);
}
SCReturnInt(0);
}
int AppLayerHandleUdp(AppLayerThreadCtx *tctx, Packet *p, Flow *f)
{
SCEnter();

@ -45,18 +45,6 @@ int AppLayerHandleTCPData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
uint8_t *data, uint32_t data_len,
uint8_t flags);
/**
* \brief Attach a stream message to the TCP session for inspection
* in the detection engine.
*
* \param app_layer_tctx Pointer to the app layer thread context.
* \param smsg Stream message.
*
* \retval 0 On success.
* \retval -1 On failure.
*/
int AppLayerHandleTCPMsg(StreamMsg *smsg, TcpSession *ssn);
/**
* \brief Handles an udp chunk.
*/

@ -563,11 +563,6 @@ static inline void FlowForceReassemblyForHash(void)
stt->ra_ctx, ssn, &ssn->server,
reassemble_p, NULL);
FlowDeReference(&reassemble_p->flow);
if (StreamTcpReassembleProcessAppLayer(stt->ra_ctx, ssn) < 0) {
SCLogDebug("shutdown flow timeout "
"StreamTcpReassembleProcessAppLayer() erroring "
"over something");
}
}
/* oh oh! We have some unattended toclient segments */
if (server_ok == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_REASSEMBLY) {
@ -581,11 +576,6 @@ static inline void FlowForceReassemblyForHash(void)
stt->ra_ctx, ssn, &ssn->client,
reassemble_p, NULL);
FlowDeReference(&reassemble_p->flow);
if (StreamTcpReassembleProcessAppLayer(stt->ra_ctx, ssn) < 0) {
SCLogDebug("shutdown flow timeout "
"StreamTcpReassembleProcessAppLayer() erroring "
"over something");
}
}
FLOWLOCK_UNLOCK(f);

@ -363,7 +363,6 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void)
return NULL;
memset(ra_ctx, 0x00, sizeof(TcpReassemblyThreadCtx));
ra_ctx->stream_q = StreamMsgQueueGetNew();
ra_ctx->app_tctx = AppLayerGetCtxThread();
@ -373,16 +372,6 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void)
void StreamTcpReassembleFreeThreadCtx(TcpReassemblyThreadCtx *ra_ctx)
{
SCEnter();
if (ra_ctx->stream_q != NULL) {
StreamMsg *smsg;
while ((smsg = StreamMsgGetFromQueue(ra_ctx->stream_q)) != NULL) {
StreamMsgReturnToPool(smsg);
}
StreamMsgQueueFree(ra_ctx->stream_q);
}
ra_ctx->stream_q = NULL;
AppLayerDestroyCtxThread(ra_ctx->app_tctx);
SCFree(ra_ctx);
SCReturn;
@ -1612,6 +1601,55 @@ static uint32_t StreamTcpReassembleCheckDepth(TcpStream *stream,
SCReturnUInt(0);
}
static void StreamTcpStoreStreamChunk(TcpSession *ssn, StreamMsg *smsg, const Packet *p, int streaminline) {
uint8_t direction = 0;
if ((!streaminline && (p->flowflags & FLOW_PKT_TOSERVER)) ||
( streaminline && (p->flowflags & FLOW_PKT_TOCLIENT)))
{
direction = STREAM_TOCLIENT;
SCLogDebug("stream chunk is to_client");
} else {
direction = STREAM_TOSERVER;
SCLogDebug("stream chunk is to_server");
}
/* store the smsg in the tcp stream */
if (direction == STREAM_TOSERVER) {
SCLogDebug("storing smsg in the to_server");
/* put the smsg in the stream list */
if (ssn->toserver_smsg_head == NULL) {
ssn->toserver_smsg_head = smsg;
ssn->toserver_smsg_tail = smsg;
smsg->next = NULL;
smsg->prev = NULL;
} else {
StreamMsg *cur = ssn->toserver_smsg_tail;
cur->next = smsg;
smsg->prev = cur;
smsg->next = NULL;
ssn->toserver_smsg_tail = smsg;
}
} else {
SCLogDebug("storing smsg in the to_client");
/* put the smsg in the stream list */
if (ssn->toclient_smsg_head == NULL) {
ssn->toclient_smsg_head = smsg;
ssn->toclient_smsg_tail = smsg;
smsg->next = NULL;
smsg->prev = NULL;
} else {
StreamMsg *cur = ssn->toclient_smsg_tail;
cur->next = smsg;
smsg->prev = cur;
smsg->next = NULL;
ssn->toclient_smsg_tail = smsg;
}
}
}
/**
* \brief Insert a packets TCP data into the stream reassembly engine.
*
@ -1739,17 +1777,6 @@ static void StreamTcpSetupMsg(TcpSession *ssn, TcpStream *stream, Packet *p,
SCLogDebug("setting STREAM_EOF");
smsg->flags |= STREAM_EOF;
}
if ((!StreamTcpInlineMode() && (p->flowflags & FLOW_PKT_TOSERVER)) ||
( StreamTcpInlineMode() && (p->flowflags & FLOW_PKT_TOCLIENT)))
{
smsg->flags |= STREAM_TOCLIENT;
SCLogDebug("stream mesage is to_client");
} else {
smsg->flags |= STREAM_TOSERVER;
SCLogDebug("stream mesage is to_server");
}
smsg->data_len = 0;
SCLogDebug("smsg %p", smsg);
@ -2292,7 +2319,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
if (SEQ_GT(seg->seq, next_seq)) {
/* pass on pre existing smsg (if any) */
if (smsg != NULL && smsg->data_len > 0) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 1);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
@ -2376,7 +2403,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
/* queue the smsg if it's full */
if (smsg->data_len == sizeof (smsg->data)) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 1);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
@ -2437,7 +2464,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
"smsg_offset %" PRIu32 ", copy_size %" PRIu32 "",
payload_offset, smsg_offset, copy_size);
if (smsg->data_len == sizeof (smsg->data)) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 1);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
@ -2476,7 +2503,7 @@ static int StreamTcpReassembleInlineRaw (TcpReassemblyThreadCtx *ra_ctx,
/* put the partly filled smsg in the queue */
if (smsg != NULL) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 1);
smsg = NULL;
stream->ra_raw_base_seq = ra_base_seq;
}
@ -3051,7 +3078,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
SCReturnInt(-1);
}
StreamTcpSetupMsg(ssn, stream, p, smsg);
StreamMsgPutInQueue(ra_ctx->stream_q,smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 0);
} else {
SCLogDebug("no segments in the list to reassemble");
@ -3150,7 +3177,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
or not. If not then sent the message and set flag that first
message has been sent. No more data till proto has not
been detected */
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 0);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
@ -3258,7 +3285,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
/* queue the smsg if it's full */
if (smsg->data_len == sizeof (smsg->data)) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 0);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
@ -3318,7 +3345,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
"smsg_offset %" PRIu32 ", copy_size %" PRIu32 "",
payload_offset, smsg_offset, copy_size);
if (smsg->data_len == sizeof (smsg->data)) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 0);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
@ -3348,7 +3375,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
/* put the partly filled smsg in the queue to the l7 handler */
if (smsg != NULL) {
StreamMsgPutInQueue(ra_ctx->stream_q, smsg);
StreamTcpStoreStreamChunk(ssn, smsg, p, 0);
smsg = NULL;
stream->ra_raw_base_seq = ra_base_seq;
}
@ -3379,50 +3406,6 @@ int StreamTcpReassembleHandleSegmentUpdateACK (ThreadVars *tv,
SCReturnInt(r);
}
/** \brief Handle the queue'd smsgs containing reassembled app layer data when
* we're running the app layer handling as part of the stream threads.
*
* \param ra_ctx Reassembly thread ctx, contains the queue with stream msgs
*
* \todo Currently we process all msgs even if we encounter an error in one
* of them. We do this to make sure the thread ctx's queue is emptied.
* Maybe we should just clear & return the msgs in case of error.
*
* \retval 0 ok
* \retval -1 error
*/
int StreamTcpReassembleProcessAppLayer(TcpReassemblyThreadCtx *ra_ctx, TcpSession *ssn)
{
SCEnter();
int r = 0;
if (ra_ctx != NULL && ra_ctx->stream_q && ra_ctx->stream_q->len > 0) {
StreamMsg *smsg = NULL;
do {
smsg = StreamMsgGetFromQueue(ra_ctx->stream_q);
if (smsg != NULL) {
SCLogDebug("smsg %p, next %p, prev %p, q->len %u, "
"smsg->datalen %u, direction %s%s",
smsg, smsg->next, smsg->prev,
ra_ctx->stream_q->len, smsg->data_len,
smsg->flags & STREAM_TOSERVER ? "toserver":"",
smsg->flags & STREAM_TOCLIENT ? "toclient":"");
//PrintRawDataFp(stderr, smsg->data, smsg->data_len);
/* Handle the stream msg. No need to use locking, flow is
* already locked at this point. Don't break out of the
* loop if we encounter an error. */
if (AppLayerHandleTCPMsg(smsg, ssn) != 0)
r = -1;
}
} while (ra_ctx->stream_q->len > 0);
}
SCReturnInt(r);
}
int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
TcpSession *ssn, TcpStream *stream,
Packet *p, PacketQueue *pq)
@ -3694,6 +3677,18 @@ void StreamTcpReassembleTriggerRawReassembly(TcpSession *ssn) {
#ifdef UNITTESTS
/** unit tests and it's support functions below */
static uint32_t UtSsnSmsgCnt(TcpSession *ssn, uint8_t direction) {
uint32_t cnt = 0;
StreamMsg *smsg = (direction == STREAM_TOSERVER) ?
ssn->toserver_smsg_head :
ssn->toclient_smsg_head;
while (smsg) {
cnt++;
smsg = smsg->next;
}
return cnt;
}
/** \brief The Function tests the reassembly engine working for different
* OSes supported. It includes all the OS cases and send
* crafted packets to test the reassembly.
@ -3987,7 +3982,7 @@ int StreamTcpCheckStreamContents(uint8_t *stream_policy, uint16_t sp_size, TcpSt
*
* \retval On success the function returns 1, on failure 0.
*/
static int StreamTcpCheckQueue (uint8_t *stream_contents, StreamMsgQueue *q) {
static int StreamTcpCheckChunks (TcpSession *ssn, uint8_t *stream_contents) {
SCEnter();
StreamMsg *msg;
@ -3995,17 +3990,17 @@ static int StreamTcpCheckQueue (uint8_t *stream_contents, StreamMsgQueue *q) {
uint8_t j;
uint8_t cnt = 0;
if (q == NULL) {
printf("q == NULL, ");
if (ssn == NULL) {
printf("ssn == NULL, ");
SCReturnInt(0);
}
if (q->len == 0) {
printf("q->len == 0, ");
if (ssn->toserver_smsg_head == NULL) {
printf("ssn->toserver_smsg_head == NULL, ");
SCReturnInt(0);
}
msg = StreamMsgGetFromQueue(q);
msg = ssn->toserver_smsg_head;
while(msg != NULL) {
cnt++;
j = 0;
@ -4019,11 +4014,7 @@ static int StreamTcpCheckQueue (uint8_t *stream_contents, StreamMsgQueue *q) {
SCReturnInt(0);
}
}
if (q->len > 0) {
msg = StreamMsgGetFromQueue(q);
} else {
SCReturnInt(1);
}
msg = msg->next;
}
SCReturnInt(1);
}
@ -5245,7 +5236,6 @@ static int StreamTcpReassembleTest28 (void) {
TcpSession ssn;
memset(&ssn, 0, sizeof (TcpSession));
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
StreamMsgQueue *q = ra_ctx->stream_q;
StreamTcpInitConfig(TRUE);
StreamMsgQueueSetMinChunkLen(FLOW_PKT_TOSERVER, 4096);
@ -5276,12 +5266,6 @@ static int StreamTcpReassembleTest28 (void) {
goto end;
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (3): ");
goto end;
}
flowflags = FLOW_PKT_TOSERVER;
StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/
seq = 12;
@ -5300,7 +5284,7 @@ static int StreamTcpReassembleTest28 (void) {
goto end;
}
if (StreamTcpCheckQueue(check_contents, q) == 0) {
if (StreamTcpCheckChunks(&ssn, check_contents) == 0) {
printf("failed in stream matching (6): ");
goto end;
}
@ -5330,7 +5314,6 @@ static int StreamTcpReassembleTest29 (void) {
uint8_t flowflags;
uint8_t check_contents[5] = {0x41, 0x41, 0x42, 0x42, 0x42};
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
StreamMsgQueue *q = ra_ctx->stream_q;
TcpSession ssn;
memset(&ssn, 0, sizeof (TcpSession));
@ -5360,12 +5343,6 @@ static int StreamTcpReassembleTest29 (void) {
goto end;
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs\n");
goto end;
}
flowflags = FLOW_PKT_TOSERVER;
StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/
seq = 15;
@ -5384,7 +5361,7 @@ static int StreamTcpReassembleTest29 (void) {
goto end;
}
if (StreamTcpCheckQueue(check_contents, q) == 0) {
if (StreamTcpCheckChunks(&ssn, check_contents) == 0) {
printf("failed in stream matching: ");
goto end;
}
@ -5417,7 +5394,6 @@ static int StreamTcpReassembleTest30 (void) {
memset(&ssn, 0, sizeof (TcpSession));
TcpReassemblyThreadCtx *ra_ctx = StreamTcpReassembleInitThreadCtx();
StreamMsgQueue *q = ra_ctx->stream_q;
flowflags = FLOW_PKT_TOSERVER;
th_flag = TH_ACK|TH_PUSH;
@ -5445,12 +5421,6 @@ static int StreamTcpReassembleTest30 (void) {
goto end;
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs\n");
goto end;
}
flowflags = FLOW_PKT_TOSERVER;
StreamTcpCreateTestPacket(payload, 0x42, 3, 4); /*BBB*/
seq = 12;
@ -5469,12 +5439,6 @@ static int StreamTcpReassembleTest30 (void) {
goto end;
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs\n");
goto end;
}
th_flag = TH_FIN|TH_ACK;
seq = 18;
ack = 20;
@ -5494,7 +5458,7 @@ static int StreamTcpReassembleTest30 (void) {
goto end;
}
if (StreamTcpCheckQueue(check_contents, q) == 0) {
if (StreamTcpCheckChunks(&ssn, check_contents) == 0) {
printf("failed in stream matching: ");
goto end;
}
@ -6141,7 +6105,7 @@ static int StreamTcpReassembleTest38 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len > 0) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) > 0) {
printf("there shouldn't be any stream smsgs in the queue (2): ");
goto end;
}
@ -6159,7 +6123,7 @@ static int StreamTcpReassembleTest38 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len != 1) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("there should one stream smsg in the queue (6): ");
goto end;
}
@ -6229,7 +6193,8 @@ static int StreamTcpReassembleTest39 (void) {
FLOW_IS_PM_DONE(&f, STREAM_TOCLIENT) || FLOW_IS_PP_DONE(&f, STREAM_TOCLIENT) ||
ssn->client.seg_list != NULL ||
ssn->server.seg_list != NULL ||
stt->ra_ctx->stream_q->len != 0 ||
ssn->toserver_smsg_head != NULL ||
ssn->toclient_smsg_head != NULL ||
ssn->data_first_seen_dir != 0) {
printf("failure 1\n");
goto end;
@ -6255,7 +6220,8 @@ static int StreamTcpReassembleTest39 (void) {
FLOW_IS_PM_DONE(&f, STREAM_TOCLIENT) || FLOW_IS_PP_DONE(&f, STREAM_TOCLIENT) ||
ssn->client.seg_list != NULL ||
ssn->server.seg_list != NULL ||
stt->ra_ctx->stream_q->len != 0 ||
ssn->toserver_smsg_head != NULL ||
ssn->toclient_smsg_head != NULL ||
ssn->data_first_seen_dir != 0) {
printf("failure 2\n");
goto end;
@ -6282,7 +6248,8 @@ static int StreamTcpReassembleTest39 (void) {
FLOW_IS_PM_DONE(&f, STREAM_TOCLIENT) || FLOW_IS_PP_DONE(&f, STREAM_TOCLIENT) ||
ssn->client.seg_list != NULL ||
ssn->server.seg_list != NULL ||
stt->ra_ctx->stream_q->len != 0 ||
ssn->toserver_smsg_head != NULL ||
ssn->toclient_smsg_head != NULL ||
ssn->data_first_seen_dir != 0) {
printf("failure 3\n");
goto end;
@ -6311,7 +6278,8 @@ static int StreamTcpReassembleTest39 (void) {
ssn->client.seg_list == NULL ||
ssn->client.seg_list->next != NULL ||
ssn->server.seg_list != NULL ||
stt->ra_ctx->stream_q->len != 0 ||
ssn->toserver_smsg_head != NULL ||
ssn->toclient_smsg_head != NULL ||
ssn->data_first_seen_dir != STREAM_TOSERVER) {
printf("failure 4\n");
goto end;
@ -6340,7 +6308,8 @@ static int StreamTcpReassembleTest39 (void) {
ssn->client.seg_list == NULL ||
ssn->client.seg_list->next != NULL ||
ssn->server.seg_list != NULL ||
stt->ra_ctx->stream_q->len != 0 ||
ssn->toserver_smsg_head != NULL ||
ssn->toclient_smsg_head != NULL ||
ssn->data_first_seen_dir != STREAM_TOSERVER) {
printf("failure 5\n");
goto end;
@ -6381,7 +6350,8 @@ static int StreamTcpReassembleTest39 (void) {
ssn->client.seg_list->next == NULL ||
ssn->client.seg_list->next->next != NULL ||
ssn->server.seg_list != NULL ||
stt->ra_ctx->stream_q->len != 0 ||
ssn->toserver_smsg_head != NULL ||
ssn->toclient_smsg_head != NULL ||
ssn->data_first_seen_dir != STREAM_TOSERVER) {
printf("failure 6\n");
goto end;
@ -6843,7 +6813,7 @@ static int StreamTcpReassembleTest40 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len > 0) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOCLIENT) > 0) {
printf("there shouldn't be any stream smsgs in the queue, as we didn't"
" processed any smsg from toserver side till yet (2): ");
goto end;
@ -6862,12 +6832,6 @@ static int StreamTcpReassembleTest40 (void) {
goto end;
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (4): ");
goto end;
}
p->flowflags = FLOW_PKT_TOSERVER;
p->payload = httpbuf3;
p->payload_len = httplen3;
@ -6902,19 +6866,6 @@ static int StreamTcpReassembleTest40 (void) {
goto end;
}
/* Check if we have stream smsgs in queue */
#if 0
if (ra_ctx->stream_q->len == 0) {
printf("there should be a stream smsgs in the queue, as we have detected"
" the app layer protocol and one smsg from toserver side has "
"been sent (8): ");
goto end;
/* Process stream smsgs we may have in queue */
} else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (9): ");
goto end;
}
#endif
p->flowflags = FLOW_PKT_TOSERVER;
p->payload = httpbuf4;
p->payload_len = httplen4;
@ -6941,19 +6892,6 @@ static int StreamTcpReassembleTest40 (void) {
goto end;
}
/* Check if we have stream smsgs in queue */
#if 0
if (ra_ctx->stream_q->len == 0) {
printf("there should be a stream smsgs in the queue, as we have detected"
" the app layer protocol and one smsg from toserver side has "
"been sent (12): ");
goto end;
/* Process stream smsgs we may have in queue */
} else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (13): ");
goto end;
}
#endif
p->flowflags = FLOW_PKT_TOSERVER;
p->payload = httpbuf5;
p->payload_len = httplen5;
@ -6981,17 +6919,11 @@ static int StreamTcpReassembleTest40 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len == 0) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) == 0) {
printf("there should be a stream smsgs in the queue, as we have detected"
" the app layer protocol and one smsg from toserver side has "
"been sent (16): ");
goto end;
/* Process stream smsgs we may have in queue */
}
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (17): ");
goto end;
}
if (f->alproto != ALPROTO_HTTP) {
@ -7085,7 +7017,7 @@ static int StreamTcpReassembleTest43 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len > 0) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) > 0) {
printf("there shouldn't be any stream smsgs in the queue (2): ");
goto end;
}
@ -7103,7 +7035,7 @@ static int StreamTcpReassembleTest43 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len > 0) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOCLIENT) > 0) {
printf("there shouldn't be any stream smsgs in the queue, as we didn't"
" processed any smsg from toserver side till yet (4): ");
goto end;
@ -7120,17 +7052,6 @@ static int StreamTcpReassembleTest43 (void) {
printf("failed in segments reassembly, while processing toserver packet (5): ");
goto end;
}
#if 0
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len == 0) {
printf("there should be a stream smsgs in the queue (6): ");
goto end;
/* Process stream smsgs we may have in queue */
} else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (7): ");
goto end;
}
#endif
if (!StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(&ssn.client)) {
printf("app layer detected flag isn't set, it should be (8): ");
goto end;
@ -7156,7 +7077,7 @@ static int StreamTcpReassembleTest43 (void) {
}
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len > 0) {
if (UtSsnSmsgCnt(&ssn, STREAM_TOCLIENT) > 0) {
printf("there shouldn't be any stream smsgs in the queue, as we didn't"
" detected the app layer protocol till yet (10): ");
goto end;
@ -7173,18 +7094,6 @@ static int StreamTcpReassembleTest43 (void) {
printf("failed in segments reassembly, while processing toserver packet (11): ");
goto end;
}
#if 0
/* Check if we have stream smsgs in queue */
if (ra_ctx->stream_q->len == 0) {
printf("there should be a stream smsgs in the queue, as reassembling has"
" been unpaused now (12): ");
goto end;
/* Process stream smsgs we may have in queue */
} else if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs (13): ");
goto end;
}
#endif
/* the flag should be set, as the smsg scanned size has crossed the max.
signature size for app proto detection */
if (!StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(&ssn.client)) {
@ -7566,12 +7475,6 @@ static int StreamTcpReassembleTest47 (void) {
"packet\n");
goto end;
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(ra_ctx, &ssn) < 0) {
printf("failed in processing stream smsgs\n");
goto end;
}
}
if (f->alproto != ALPROTO_HTTP) {
@ -7635,12 +7538,12 @@ static int StreamTcpReassembleInlineTest01(void) {
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("expected a single stream message: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -7713,12 +7616,12 @@ static int StreamTcpReassembleInlineTest02(void) {
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("expected a single stream message: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -7744,12 +7647,12 @@ static int StreamTcpReassembleInlineTest02(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected a single stream message: ");
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 20) {
printf("expected data length to be 20, got %u: ", smsg->data_len);
goto end;
@ -7826,12 +7729,12 @@ static int StreamTcpReassembleInlineTest03(void) {
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("expected a single stream message 1: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -7859,12 +7762,12 @@ static int StreamTcpReassembleInlineTest03(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected two stream messages: ");
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -7941,12 +7844,12 @@ static int StreamTcpReassembleInlineTest04(void) {
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("expected a single stream message: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -7974,12 +7877,12 @@ static int StreamTcpReassembleInlineTest04(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected a single stream message: ");
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 16) {
printf("expected data length to be 16, got %u: ", smsg->data_len);
goto end;
@ -8053,12 +7956,12 @@ static int StreamTcpReassembleInlineTest05(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected a single stream message: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top->next;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 10) {
printf("expected data length to be 10, got %u: ", smsg->data_len);
goto end;
@ -8072,7 +7975,7 @@ static int StreamTcpReassembleInlineTest05(void) {
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 5) {
printf("expected data length to be 5, got %u: ", smsg->data_len);
goto end;
@ -8147,12 +8050,12 @@ static int StreamTcpReassembleInlineTest06(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected two stream messages: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top->next;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 10) {
printf("expected data length to be 10, got %u: ", smsg->data_len);
goto end;
@ -8166,7 +8069,7 @@ static int StreamTcpReassembleInlineTest06(void) {
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 5) {
printf("expected data length to be 5, got %u: ", smsg->data_len);
goto end;
@ -8194,12 +8097,12 @@ static int StreamTcpReassembleInlineTest06(void) {
goto end;
}
if (ra_ctx->stream_q->len != 3) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 3) {
printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER));
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next->next;
if (smsg->data_len != 20) {
printf("expected data length to be 20, got %u: ", smsg->data_len);
goto end;
@ -8278,12 +8181,12 @@ static int StreamTcpReassembleInlineTest07(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER));
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top->next;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 6) {
printf("expected data length to be 6, got %u: ", smsg->data_len);
goto end;
@ -8297,7 +8200,7 @@ static int StreamTcpReassembleInlineTest07(void) {
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 5) {
printf("expected data length to be 5, got %u: ", smsg->data_len);
goto end;
@ -8325,12 +8228,12 @@ static int StreamTcpReassembleInlineTest07(void) {
goto end;
}
if (ra_ctx->stream_q->len != 3) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 3) {
printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER));
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next->next;
if (smsg->data_len != 16) {
printf("expected data length to be 16, got %u: ", smsg->data_len);
goto end;
@ -8409,12 +8312,12 @@ static int StreamTcpReassembleInlineTest08(void) {
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("expected a single stream message: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -8447,12 +8350,12 @@ static int StreamTcpReassembleInlineTest08(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected a single stream message, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER));
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 15) {
printf("expected data length to be 15, got %u: ", smsg->data_len);
goto end;
@ -8542,12 +8445,12 @@ static int StreamTcpReassembleInlineTest09(void) {
goto end;
}
if (ra_ctx->stream_q->len != 2) {
printf("expected 2 stream message2, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 2) {
printf("expected 2 stream message2, got %u: ", UtSsnSmsgCnt(&ssn, STREAM_TOSERVER));
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->bot;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 10) {
printf("expected data length to be 10, got %u (bot): ", smsg->data_len);
goto end;
@ -8561,7 +8464,7 @@ static int StreamTcpReassembleInlineTest09(void) {
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next;
if (smsg->data_len != 5) {
printf("expected data length to be 5, got %u (top): ", smsg->data_len);
goto end;
@ -8595,12 +8498,12 @@ static int StreamTcpReassembleInlineTest09(void) {
goto end;
}
if (ra_ctx->stream_q->len != 3) {
printf("expected 3 stream messages, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 3) {
printf("expected 3 stream messages: ");
goto end;
}
smsg = ra_ctx->stream_q->top;
smsg = ssn.toserver_smsg_head->next->next;
if (smsg->data_len != 20) {
printf("expected data length to be 20, got %u: ", smsg->data_len);
goto end;
@ -8774,12 +8677,12 @@ static int StreamTcpReassembleInsertTest01(void) {
goto end;
}
if (ra_ctx->stream_q->len != 1) {
printf("expected a single stream message, got %u: ", ra_ctx->stream_q->len);
if (UtSsnSmsgCnt(&ssn, STREAM_TOSERVER) != 1) {
printf("expected a single stream message: ");
goto end;
}
StreamMsg *smsg = ra_ctx->stream_q->top;
StreamMsg *smsg = ssn.toserver_smsg_head;
if (smsg->data_len != 20) {
printf("expected data length to be 20, got %u: ", smsg->data_len);
goto end;

@ -52,7 +52,6 @@ enum
};
typedef struct TcpReassemblyThreadCtx_ {
StreamMsgQueue *stream_q;
void *app_tctx;
/** TCP segments which are not being reassembled due to memcap was reached */
uint16_t counter_tcp_segment_memcap;
@ -84,7 +83,6 @@ int StreamTcpReassembleInlineAppLayer(ThreadVars *tv,
TcpReassemblyThreadCtx *ra_ctx,
TcpSession *ssn, TcpStream *stream,
Packet *p);
int StreamTcpReassembleProcessAppLayer(TcpReassemblyThreadCtx *, TcpSession *);
void StreamTcpCreateTestPacket(uint8_t *, uint8_t, uint8_t, uint8_t);

@ -4369,11 +4369,6 @@ int StreamTcpPacket (ThreadVars *tv, Packet *p, StreamTcpThread *stt,
SCLogDebug("processing pseudo packet / stream end done");
}
/* Process stream smsgs we may have in queue */
if (StreamTcpReassembleProcessAppLayer(stt->ra_ctx, ssn) < 0) {
goto error;
}
/* recalc the csum on the packet if it was modified */
if (p->flags & PKT_STREAM_MODIFIED) {
ReCalculateChecksum(p);
@ -5501,10 +5496,8 @@ static int StreamTcpTest02 (void) {
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx ra_ctx;
StreamMsgQueue stream_q;
PacketQueue pq;
memset(&pq,0,sizeof(PacketQueue));
memset(&stream_q, 0, sizeof(StreamMsgQueue));
memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
memset(p, 0, SIZE_OF_PACKET);
memset (&f, 0, sizeof(Flow));
@ -5517,7 +5510,6 @@ static int StreamTcpTest02 (void) {
p->tcph = &tcph;
p->flowflags = FLOW_PKT_TOSERVER;
int ret = 0;
ra_ctx.stream_q = &stream_q;
stt.ra_ctx = &ra_ctx;
StreamTcpInitConfig(TRUE);
@ -9312,10 +9304,8 @@ static int StreamTcpTest38 (void) {
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx ra_ctx;
StreamMsgQueue stream_q;
PacketQueue pq;
memset(&stream_q, 0, sizeof(StreamMsgQueue));
memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
@ -9333,7 +9323,6 @@ static int StreamTcpTest38 (void) {
tcph.th_flags = TH_SYN;
p->tcph = &tcph;
p->flowflags = FLOW_PKT_TOSERVER;
ra_ctx.stream_q = &stream_q;
stt.ra_ctx = &ra_ctx;
StreamTcpInitConfig(TRUE);
@ -9430,10 +9419,8 @@ static int StreamTcpTest39 (void) {
uint8_t payload[4];
TCPHdr tcph;
TcpReassemblyThreadCtx ra_ctx;
StreamMsgQueue stream_q;
PacketQueue pq;
memset(&stream_q, 0, sizeof(StreamMsgQueue));
memset(&ra_ctx, 0, sizeof(TcpReassemblyThreadCtx));
memset (&f, 0, sizeof(Flow));
memset(&tv, 0, sizeof (ThreadVars));
@ -9452,7 +9439,6 @@ static int StreamTcpTest39 (void) {
p->tcph = &tcph;
p->flowflags = FLOW_PKT_TOSERVER;
int ret = 0;
ra_ctx.stream_q = &stream_q;
stt.ra_ctx = &ra_ctx;
StreamTcpInitConfig(TRUE);

Loading…
Cancel
Save