stream: move raw reassembly into util func

pull/1315/head
Victor Julien 11 years ago
parent ff2fecf590
commit 6ca9c8eb32

@ -2951,121 +2951,40 @@ int StreamTcpReassembleAppLayer (ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
SCReturnInt(0); SCReturnInt(0);
} }
/** typedef struct ReassembleRawData_ {
* \brief Update the stream reassembly upon receiving an ACK packet. uint32_t ra_base_seq;
* \todo this function is too long, we need to break it up. It needs it BAD int partial; /* last segment was processed only partially */
*/ StreamMsg *smsg;
static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx, uint16_t smsg_offset; // TODO diff with smsg->data_len?
TcpSession *ssn, TcpStream *stream, Packet *p) } ReassembleRawData;
{
SCEnter();
SCLogDebug("start p %p", p);
if (ssn->flags & STREAMTCP_FLAG_DISABLE_RAW)
SCReturnInt(0);
if (stream->seg_list == NULL) {
SCLogDebug("no segments in the list to reassemble");
SCReturnInt(0);
}
#if 0
if (ssn->state <= TCP_ESTABLISHED &&
!StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(stream)) {
SCLogDebug("only starting raw reassembly after app layer protocol "
"detection has completed.");
SCReturnInt(0);
}
#endif
/* check if we have enough data */
if (StreamTcpReassembleRawCheckLimit(ssn,stream,p) == 0) {
SCLogDebug("not yet reassembling");
SCReturnInt(0);
}
uint32_t ra_base_seq = stream->ra_raw_base_seq; static int DoRawReassemble(TcpSession *ssn, TcpStream *stream, TcpSegment *seg, Packet *p,
StreamMsg *smsg = NULL; ReassembleRawData *rd)
uint16_t smsg_offset = 0; {
uint16_t payload_offset = 0; uint16_t payload_offset = 0;
uint16_t payload_len = 0; uint16_t payload_len = 0;
TcpSegment *seg = stream->seg_list;
uint32_t next_seq = ra_base_seq + 1;
SCLogDebug("ra_base_seq %"PRIu32", last_ack %"PRIu32", next_seq %"PRIu32,
ra_base_seq, stream->last_ack, next_seq);
/* loop through the segments and fill one or more msgs */
for (; seg != NULL && SEQ_LT(seg->seq, stream->last_ack);)
{
SCLogDebug("seg %p, SEQ %"PRIu32", LEN %"PRIu16", SUM %"PRIu32", flags %02x",
seg, seg->seq, seg->payload_len,
(uint32_t)(seg->seq + seg->payload_len), seg->flags);
if (StreamTcpReturnSegmentCheck(p->flow, ssn, stream, seg) == 1) {
SCLogDebug("removing segment");
TcpSegment *next_seg = seg->next;
StreamTcpRemoveSegmentFromStream(stream, seg);
StreamTcpSegmentReturntoPool(seg);
seg = next_seg;
continue;
} else if(seg->flags & SEGMENTTCP_FLAG_RAW_PROCESSED) {
TcpSegment *next_seg = seg->next;
seg = next_seg;
continue;
}
/* we've run into a sequence gap */ /* start clean */
if (SEQ_GT(seg->seq, next_seq)) { rd->partial = FALSE;
/* pass on pre existing smsg (if any) */
if (smsg != NULL && smsg->data_len > 0) {
/* if app layer protocol has not been detected till yet,
then check did we have sent message to app layer already
or not. If not then sent the message and set flag that first
message has been sent. No more data till proto has not
been detected */
StreamTcpStoreStreamChunk(ssn, smsg, p, 0);
stream->ra_raw_base_seq = ra_base_seq;
smsg = NULL;
}
/* see what the length of the gap is, gap length is seg->seq -
* (ra_base_seq +1) */
#ifdef DEBUG
uint32_t gap_len = seg->seq - next_seq;
SCLogDebug("expected next_seq %" PRIu32 ", got %" PRIu32 " , "
"stream->last_ack %" PRIu32 ". Seq gap %" PRIu32"",
next_seq, seg->seq, stream->last_ack, gap_len);
#endif
stream->ra_raw_base_seq = ra_base_seq;
/* We have missed the packet and end host has ack'd it, so
* IDS should advance it's ra_base_seq and should not consider this
* packet any longer, even if it is retransmitted, as end host will
* drop it anyway */
ra_base_seq = seg->seq - 1;
}
int partial = FALSE;
/* if the segment ends beyond ra_base_seq we need to consider it */ /* if the segment ends beyond ra_base_seq we need to consider it */
if (SEQ_GT((seg->seq + seg->payload_len), ra_base_seq+1)) { if (SEQ_GT((seg->seq + seg->payload_len), rd->ra_base_seq+1)) {
SCLogDebug("seg->seq %" PRIu32 ", seg->payload_len %" PRIu32 ", " SCLogDebug("seg->seq %" PRIu32 ", seg->payload_len %" PRIu32 ", "
"ra_base_seq %" PRIu32 "", seg->seq, "ra_base_seq %" PRIu32 "", seg->seq,
seg->payload_len, ra_base_seq); seg->payload_len, rd->ra_base_seq);
/* handle segments partly before ra_base_seq */ /* handle segments partly before ra_base_seq */
if (SEQ_GT(ra_base_seq, seg->seq)) { if (SEQ_GT(rd->ra_base_seq, seg->seq)) {
payload_offset = ra_base_seq - seg->seq; payload_offset = rd->ra_base_seq - seg->seq;
if (SEQ_LT(stream->last_ack, (seg->seq + seg->payload_len))) { if (SEQ_LT(stream->last_ack, (seg->seq + seg->payload_len))) {
if (SEQ_LT(stream->last_ack, ra_base_seq)) { if (SEQ_LT(stream->last_ack, rd->ra_base_seq)) {
payload_len = (stream->last_ack - seg->seq); payload_len = (stream->last_ack - seg->seq);
} else { } else {
payload_len = (stream->last_ack - seg->seq) - payload_offset; payload_len = (stream->last_ack - seg->seq) - payload_offset;
} }
partial = TRUE; rd->partial = TRUE;
} else { } else {
payload_len = seg->payload_len - payload_offset; payload_len = seg->payload_len - payload_offset;
} }
@ -3079,7 +2998,7 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
if (SEQ_LT(stream->last_ack, (seg->seq + seg->payload_len))) { if (SEQ_LT(stream->last_ack, (seg->seq + seg->payload_len))) {
payload_len = stream->last_ack - seg->seq; payload_len = stream->last_ack - seg->seq;
partial = TRUE; rd->partial = TRUE;
} else { } else {
payload_len = seg->payload_len; payload_len = seg->payload_len;
} }
@ -3090,45 +3009,45 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
if (payload_len == 0) { if (payload_len == 0) {
SCLogDebug("no payload_len, so bail out"); SCLogDebug("no payload_len, so bail out");
break; return 1; // TODO
} }
if (smsg == NULL) { if (rd->smsg == NULL) {
smsg = StreamMsgGetFromPool(); rd->smsg = StreamMsgGetFromPool();
if (smsg == NULL) { if (rd->smsg == NULL) {
SCLogDebug("stream_msg_pool is empty"); SCLogDebug("stream_msg_pool is empty");
return -1; return -1;
} }
smsg_offset = 0; rd->smsg_offset = 0;
StreamTcpSetupMsg(ssn, stream, p, smsg); StreamTcpSetupMsg(ssn, stream, p, rd->smsg);
smsg->seq = ra_base_seq + 1; rd->smsg->seq = rd->ra_base_seq + 1;
SCLogDebug("smsg->seq %u", smsg->seq); SCLogDebug("smsg->seq %u", rd->smsg->seq);
} }
/* copy the data into the smsg */ /* copy the data into the smsg */
uint16_t copy_size = sizeof (smsg->data) - smsg_offset; uint16_t copy_size = sizeof (rd->smsg->data) - rd->smsg_offset;
if (copy_size > payload_len) { if (copy_size > payload_len) {
copy_size = payload_len; copy_size = payload_len;
} }
if (SCLogDebugEnabled()) { if (SCLogDebugEnabled()) {
BUG_ON(copy_size > sizeof(smsg->data)); BUG_ON(copy_size > sizeof(rd->smsg->data));
} }
SCLogDebug("copy_size is %"PRIu16"", copy_size); SCLogDebug("copy_size is %"PRIu16"", copy_size);
memcpy(smsg->data + smsg_offset, seg->payload + payload_offset, memcpy(rd->smsg->data + rd->smsg_offset, seg->payload + payload_offset,
copy_size); copy_size);
smsg_offset += copy_size; rd->smsg_offset += copy_size;
ra_base_seq += copy_size; rd->ra_base_seq += copy_size;
SCLogDebug("ra_base_seq %"PRIu32, ra_base_seq); SCLogDebug("ra_base_seq %"PRIu32, rd->ra_base_seq);
smsg->data_len += copy_size; rd->smsg->data_len += copy_size;
/* queue the smsg if it's full */ /* queue the smsg if it's full */
if (smsg->data_len == sizeof (smsg->data)) { if (rd->smsg->data_len == sizeof (rd->smsg->data)) {
StreamTcpStoreStreamChunk(ssn, smsg, p, 0); StreamTcpStoreStreamChunk(ssn, rd->smsg, p, 0);
stream->ra_raw_base_seq = ra_base_seq; stream->ra_raw_base_seq = rd->ra_base_seq;
smsg = NULL; rd->smsg = NULL;
} }
/* if the payload len is bigger than what we copied, we handle the /* if the payload len is bigger than what we copied, we handle the
@ -3155,40 +3074,40 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
/* get a new message /* get a new message
XXX we need a setup function */ XXX we need a setup function */
smsg = StreamMsgGetFromPool(); rd->smsg = StreamMsgGetFromPool();
if (smsg == NULL) { if (rd->smsg == NULL) {
SCLogDebug("stream_msg_pool is empty"); SCLogDebug("stream_msg_pool is empty");
SCReturnInt(-1); SCReturnInt(-1);
} }
smsg_offset = 0; rd->smsg_offset = 0;
StreamTcpSetupMsg(ssn, stream,p,smsg); StreamTcpSetupMsg(ssn, stream, p, rd->smsg);
smsg->seq = ra_base_seq + 1; rd->smsg->seq = rd->ra_base_seq + 1;
copy_size = sizeof(smsg->data) - smsg_offset; copy_size = sizeof(rd->smsg->data) - rd->smsg_offset;
if (copy_size > payload_len) { if (copy_size > payload_len) {
copy_size = payload_len; copy_size = payload_len;
} }
if (SCLogDebugEnabled()) { if (SCLogDebugEnabled()) {
BUG_ON(copy_size > sizeof(smsg->data)); BUG_ON(copy_size > sizeof(rd->smsg->data));
} }
SCLogDebug("copy payload_offset %" PRIu32 ", smsg_offset " SCLogDebug("copy payload_offset %" PRIu32 ", smsg_offset "
"%" PRIu32 ", copy_size %" PRIu32 "", "%" PRIu32 ", copy_size %" PRIu32 "",
payload_offset, smsg_offset, copy_size); payload_offset, rd->smsg_offset, copy_size);
memcpy(smsg->data + smsg_offset, seg->payload + memcpy(rd->smsg->data + rd->smsg_offset, seg->payload +
payload_offset, copy_size); payload_offset, copy_size);
smsg_offset += copy_size; rd->smsg_offset += copy_size;
ra_base_seq += copy_size; rd->ra_base_seq += copy_size;
SCLogDebug("ra_base_seq %"PRIu32, ra_base_seq); SCLogDebug("ra_base_seq %"PRIu32, rd->ra_base_seq);
smsg->data_len += copy_size; rd->smsg->data_len += copy_size;
SCLogDebug("copied payload_offset %" PRIu32 ", " SCLogDebug("copied payload_offset %" PRIu32 ", "
"smsg_offset %" PRIu32 ", copy_size %" PRIu32 "", "smsg_offset %" PRIu32 ", copy_size %" PRIu32 "",
payload_offset, smsg_offset, copy_size); payload_offset, rd->smsg_offset, copy_size);
if (smsg->data_len == sizeof (smsg->data)) { if (rd->smsg->data_len == sizeof(rd->smsg->data)) {
StreamTcpStoreStreamChunk(ssn, smsg, p, 0); StreamTcpStoreStreamChunk(ssn, rd->smsg, p, 0);
stream->ra_raw_base_seq = ra_base_seq; stream->ra_raw_base_seq = rd->ra_base_seq;
smsg = NULL; rd->smsg = NULL;
} }
/* see if we have segment payload left to process */ /* see if we have segment payload left to process */
@ -3206,11 +3125,110 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
} }
} }
} }
return 1;
}
/**
* \brief Update the stream reassembly upon receiving an ACK packet.
* \todo this function is too long, we need to break it up. It needs it BAD
*/
static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
TcpSession *ssn, TcpStream *stream, Packet *p)
{
SCEnter();
SCLogDebug("start p %p", p);
if (ssn->flags & STREAMTCP_FLAG_DISABLE_RAW)
SCReturnInt(0);
if (stream->seg_list == NULL) {
SCLogDebug("no segments in the list to reassemble");
SCReturnInt(0);
}
#if 0
if (ssn->state <= TCP_ESTABLISHED &&
!StreamTcpIsSetStreamFlagAppProtoDetectionCompleted(stream)) {
SCLogDebug("only starting raw reassembly after app layer protocol "
"detection has completed.");
SCReturnInt(0);
}
#endif
/* check if we have enough data */
if (StreamTcpReassembleRawCheckLimit(ssn,stream,p) == 0) {
SCLogDebug("not yet reassembling");
SCReturnInt(0);
}
TcpSegment *seg = stream->seg_list;
ReassembleRawData rd;
rd.smsg = NULL;
rd.ra_base_seq = stream->ra_raw_base_seq;
rd.smsg_offset = 0;
uint32_t next_seq = rd.ra_base_seq + 1;
SCLogDebug("ra_base_seq %"PRIu32", last_ack %"PRIu32", next_seq %"PRIu32,
rd.ra_base_seq, stream->last_ack, next_seq);
/* loop through the segments and fill one or more msgs */
for (; seg != NULL && SEQ_LT(seg->seq, stream->last_ack);)
{
SCLogDebug("seg %p, SEQ %"PRIu32", LEN %"PRIu16", SUM %"PRIu32", flags %02x",
seg, seg->seq, seg->payload_len,
(uint32_t)(seg->seq + seg->payload_len), seg->flags);
if (StreamTcpReturnSegmentCheck(p->flow, ssn, stream, seg) == 1) {
SCLogDebug("removing segment");
TcpSegment *next_seg = seg->next;
StreamTcpRemoveSegmentFromStream(stream, seg);
StreamTcpSegmentReturntoPool(seg);
seg = next_seg;
continue;
} else if(seg->flags & SEGMENTTCP_FLAG_RAW_PROCESSED) {
TcpSegment *next_seg = seg->next;
seg = next_seg;
continue;
}
/* we've run into a sequence gap */
if (SEQ_GT(seg->seq, next_seq)) {
/* pass on pre existing smsg (if any) */
if (rd.smsg != NULL && rd.smsg->data_len > 0) {
/* if app layer protocol has not been detected till yet,
then check did we have sent message to app layer already
or not. If not then sent the message and set flag that first
message has been sent. No more data till proto has not
been detected */
StreamTcpStoreStreamChunk(ssn, rd.smsg, p, 0);
stream->ra_raw_base_seq = rd.ra_base_seq;
rd.smsg = NULL;
}
/* see what the length of the gap is, gap length is seg->seq -
* (ra_base_seq +1) */
#ifdef DEBUG
uint32_t gap_len = seg->seq - next_seq;
SCLogDebug("expected next_seq %" PRIu32 ", got %" PRIu32 " , "
"stream->last_ack %" PRIu32 ". Seq gap %" PRIu32"",
next_seq, seg->seq, stream->last_ack, gap_len);
#endif
stream->ra_raw_base_seq = rd.ra_base_seq;
/* We have missed the packet and end host has ack'd it, so
* IDS should advance it's ra_base_seq and should not consider this
* packet any longer, even if it is retransmitted, as end host will
* drop it anyway */
rd.ra_base_seq = seg->seq - 1;
}
if (DoRawReassemble(ssn, stream, seg, p, &rd) == 0)
break;
/* done with this segment, return it to the pool */ /* done with this segment, return it to the pool */
TcpSegment *next_seg = seg->next; TcpSegment *next_seg = seg->next;
next_seq = seg->seq + seg->payload_len; next_seq = seg->seq + seg->payload_len;
if (partial == FALSE) { if (rd.partial == FALSE) {
SCLogDebug("fully done with segment in raw reassembly (seg %p seq %"PRIu32")", SCLogDebug("fully done with segment in raw reassembly (seg %p seq %"PRIu32")",
seg, seg->seq); seg, seg->seq);
seg->flags |= SEGMENTTCP_FLAG_RAW_PROCESSED; seg->flags |= SEGMENTTCP_FLAG_RAW_PROCESSED;
@ -3222,10 +3240,10 @@ static int StreamTcpReassembleRaw (TcpReassemblyThreadCtx *ra_ctx,
} }
/* put the partly filled smsg in the queue to the l7 handler */ /* put the partly filled smsg in the queue to the l7 handler */
if (smsg != NULL) { if (rd.smsg != NULL) {
StreamTcpStoreStreamChunk(ssn, smsg, p, 0); StreamTcpStoreStreamChunk(ssn, rd.smsg, p, 0);
smsg = NULL; rd.smsg = NULL;
stream->ra_raw_base_seq = ra_base_seq; stream->ra_raw_base_seq = rd.ra_base_seq;
} }
SCReturnInt(0); SCReturnInt(0);

Loading…
Cancel
Save