stream-tcp: implement thread pool for segments

Config option:

stream:
  reassembly:
    segment-prealloc: 2048
pull/2673/head
Victor Julien 9 years ago
parent bd821f57f2
commit 807312320f

@ -52,14 +52,14 @@ typedef struct StreamTcpSackRecord_ {
} StreamTcpSackRecord;
typedef struct TcpSegment_ {
PoolThreadReserved res;
StreamingBufferSegment sbseg;
uint16_t payload_len; /**< actual size of the payload */
uint16_t pool_size; /**< size of the memory */
/* coccinelle: TcpSegment:flags:SEGMENTTCP_FLAG */
uint8_t flags;
uint32_t seq;
struct TcpSegment_ *next;
struct TcpSegment_ *prev;
/* coccinelle: TcpSegment:flags:SEGMENTTCP_FLAG */
uint8_t flags;
} TcpSegment;
#define TCP_SEG_LEN(seg) (seg)->payload_len

@ -71,27 +71,15 @@ static uint64_t segment_pool_memuse = 0;
static uint64_t segment_pool_memcnt = 0;
#endif
/* We define several pools with prealloced segments with fixed size
* payloads. We do this to prevent having to do an SCMalloc call for every
* data segment we receive, which would be a large performance penalty.
* The cost is in memory of course. The number of pools and the properties
* of the pools are determined by the yaml. */
static int segment_pool_num = 0;
static Pool **segment_pool = NULL;
static SCMutex *segment_pool_mutex = NULL;
static uint16_t *segment_pool_pktsizes = NULL;
#ifdef DEBUG
static SCMutex segment_pool_cnt_mutex;
static uint64_t segment_pool_cnt = 0;
#endif
/* index to the right pool for all packet sizes. */
static uint16_t segment_pool_idx[65536]; /* O(1) lookups of the pool */
static PoolThread *segment_thread_pool = NULL;
/* init only, protect initializing and growing pool */
static SCMutex segment_thread_pool_mutex = SCMUTEX_INITIALIZER;
/* Memory use counter */
SC_ATOMIC_DECLARE(uint64_t, ra_memuse);
/* prototypes */
TcpSegment* StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *, uint16_t);
TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *);
void StreamTcpCreateTestPacket(uint8_t *, uint8_t, uint8_t, uint8_t);
void StreamTcpReassemblePseudoPacketCreate(TcpStream *, Packet *, PacketQueue *);
@ -232,36 +220,27 @@ void *TcpSegmentPoolAlloc()
return seg;
}
int TcpSegmentPoolInit(void *data, void *payload_len)
int TcpSegmentPoolInit(void *data, void *initdata)
{
TcpSegment *seg = (TcpSegment *) data;
uint16_t size = *((uint16_t *) payload_len);
/* do this before the can bail, so TcpSegmentPoolCleanup
* won't have uninitialized memory to consider. */
memset(seg, 0, sizeof (TcpSegment));
if (StreamTcpReassembleCheckMemcap((uint32_t)size + (uint32_t)sizeof(TcpSegment)) == 0) {
if (StreamTcpReassembleCheckMemcap((uint32_t)sizeof(TcpSegment)) == 0) {
return 0;
}
seg->pool_size = size;
TCP_SEG_LEN(seg) = seg->pool_size;
#if 0
TCP_SEG_PAYLOAD(seg) = SCMalloc(TCP_SEG_LEN(seg));
if (TCP_SEG_PAYLOAD(seg) == NULL) {
return 0;
}
#endif
#ifdef DEBUG
SCMutexLock(&segment_pool_memuse_mutex);
segment_pool_memuse += TCP_SEG_LEN(seg);
segment_pool_memuse += sizeof(TcpSegment);
segment_pool_memcnt++;
SCLogDebug("segment_pool_memcnt %"PRIu64"", segment_pool_memcnt);
SCMutexUnlock(&segment_pool_memuse_mutex);
#endif
StreamTcpReassembleIncrMemuse((uint32_t)seg->pool_size + sizeof(TcpSegment));
StreamTcpReassembleIncrMemuse((uint32_t)sizeof(TcpSegment));
return 1;
}
@ -271,20 +250,15 @@ void TcpSegmentPoolCleanup(void *ptr)
if (ptr == NULL)
return;
TcpSegment *seg = (TcpSegment *) ptr;
StreamTcpReassembleDecrMemuse((uint32_t)seg->pool_size + sizeof(TcpSegment));
StreamTcpReassembleDecrMemuse((uint32_t)sizeof(TcpSegment));
#ifdef DEBUG
SCMutexLock(&segment_pool_memuse_mutex);
segment_pool_memuse -= seg->pool_size;
segment_pool_memuse -= sizeof(TcpSegment);
segment_pool_memcnt--;
SCLogDebug("segment_pool_memcnt %"PRIu64"", segment_pool_memcnt);
SCMutexUnlock(&segment_pool_memuse_mutex);
#endif
//SCFree(TCP_SEG_PAYLOAD(seg));
return;
}
/**
@ -299,19 +273,7 @@ void StreamTcpSegmentReturntoPool(TcpSegment *seg)
seg->next = NULL;
seg->prev = NULL;
uint16_t idx = segment_pool_idx[seg->pool_size];
SCMutexLock(&segment_pool_mutex[idx]);
PoolReturn(segment_pool[idx], (void *) seg);
SCLogDebug("segment_pool[%"PRIu16"]->empty_stack_size %"PRIu32"",
idx,segment_pool[idx]->empty_stack_size);
SCMutexUnlock(&segment_pool_mutex[idx]);
#ifdef DEBUG
SCMutexLock(&segment_pool_cnt_mutex);
segment_pool_cnt--;
SCMutexUnlock(&segment_pool_cnt_mutex);
#endif
PoolThreadReturn(segment_thread_pool, seg);
}
/**
@ -378,190 +340,23 @@ int StreamTcpAppLayerIsDisabled(Flow *f)
return (ssn->flags & STREAMTCP_FLAG_APP_LAYER_DISABLED);
}
typedef struct SegmentSizes_
{
uint16_t pktsize;
uint32_t prealloc;
} SegmentSizes;
/* sort small to big */
static int SortByPktsize(const void *a, const void *b)
{
const SegmentSizes *s0 = a;
const SegmentSizes *s1 = b;
return s0->pktsize - s1->pktsize;
}
int StreamTcpReassemblyConfig(char quiet)
{
Pool **my_segment_pool = NULL;
SCMutex *my_segment_lock = NULL;
uint16_t *my_segment_pktsizes = NULL;
SegmentSizes sizes[256];
memset(&sizes, 0x00, sizeof(sizes));
int npools = 0;
ConfNode *segs = ConfGetNode("stream.reassembly.segments");
if (segs != NULL) {
ConfNode *seg;
TAILQ_FOREACH(seg, &segs->head, next) {
ConfNode *segsize = ConfNodeLookupChild(seg,"size");
if (segsize == NULL)
continue;
ConfNode *segpre = ConfNodeLookupChild(seg,"prealloc");
if (segpre == NULL)
continue;
if (npools >= 256) {
SCLogError(SC_ERR_INVALID_ARGUMENT, "too many segment packet "
"pools defined, max is 256");
return -1;
}
SCLogDebug("segsize->val %s", segsize->val);
SCLogDebug("segpre->val %s", segpre->val);
uint16_t pktsize = 0;
if (strcmp("from_mtu", segsize->val) == 0) {
int mtu = g_default_mtu ? g_default_mtu : DEFAULT_MTU;
if (mtu < MINIMUM_MTU) {
FatalErrorOnInit(SC_ERR_INVALID_ARGUMENT, "invalid mtu %d", mtu);
continue;
}
pktsize = mtu - 40;
} else {
if (ByteExtractStringUint16(&pktsize, 10, strlen(segsize->val),
segsize->val) == -1)
{
SCLogError(SC_ERR_INVALID_ARGUMENT, "segment packet size "
"of %s is invalid", segsize->val);
return -1;
}
}
uint32_t prealloc = 0;
if (ByteExtractStringUint32(&prealloc, 10, strlen(segpre->val),
segpre->val) == -1)
{
SCLogError(SC_ERR_INVALID_ARGUMENT, "segment prealloc of "
"%s is invalid", segpre->val);
return -1;
}
sizes[npools].pktsize = pktsize;
sizes[npools].prealloc = prealloc;
SCLogDebug("pktsize %u, prealloc %u", sizes[npools].pktsize,
sizes[npools].prealloc);
npools++;
}
}
SCLogDebug("npools %d", npools);
if (npools > 0) {
/* sort the array as the index code below relies on it */
qsort(&sizes, npools, sizeof(sizes[0]), SortByPktsize);
if (sizes[npools - 1].pktsize != 0xffff) {
sizes[npools].pktsize = 0xffff;
sizes[npools].prealloc = 8;
npools++;
SCLogConfig("appended a segment pool for pktsize 65536");
}
} else if (npools == 0) {
int mtu = g_default_mtu;
if (mtu < MINIMUM_MTU)
mtu = DEFAULT_MTU;
/* defaults */
sizes[0].pktsize = 4;
sizes[0].prealloc = 256;
sizes[1].pktsize = 16;
sizes[1].prealloc = 512;
sizes[2].pktsize = 112;
sizes[2].prealloc = 512;
sizes[3].pktsize = 248;
sizes[3].prealloc = 512;
sizes[4].pktsize = 512;
sizes[4].prealloc = 512;
sizes[5].pktsize = 768;
sizes[5].prealloc = 1024;
sizes[6].pktsize = mtu - 40; // min size of ipv4+tcp hdrs
sizes[6].prealloc = 1024;
sizes[7].pktsize = 0xffff;
sizes[7].prealloc = 128;
npools = 8;
}
int i = 0;
for (i = 0; i < npools; i++) {
SCLogDebug("pktsize %u, prealloc %u", sizes[i].pktsize, sizes[i].prealloc);
}
my_segment_pool = SCMalloc(npools * sizeof(Pool *));
if (my_segment_pool == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "malloc failed");
return -1;
}
my_segment_lock = SCMalloc(npools * sizeof(SCMutex));
if (my_segment_lock == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "malloc failed");
SCFree(my_segment_pool);
return -1;
}
my_segment_pktsizes = SCMalloc(npools * sizeof(uint16_t));
if (my_segment_pktsizes == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "malloc failed");
SCFree(my_segment_lock);
SCFree(my_segment_pool);
return -1;
}
uint32_t my_segment_poolsizes[npools];
for (i = 0; i < npools; i++) {
my_segment_pktsizes[i] = sizes[i].pktsize;
my_segment_poolsizes[i] = sizes[i].prealloc;
SCMutexInit(&my_segment_lock[i], NULL);
/* setup the pool */
SCMutexLock(&my_segment_lock[i]);
my_segment_pool[i] = PoolInit(0, my_segment_poolsizes[i], 0,
TcpSegmentPoolAlloc, TcpSegmentPoolInit,
(void *) &my_segment_pktsizes[i],
TcpSegmentPoolCleanup, NULL);
SCMutexUnlock(&my_segment_lock[i]);
if (my_segment_pool[i] == NULL) {
SCLogError(SC_ERR_INITIALIZATION, "couldn't set up segment pool "
"for packet size %u. Memcap too low?", my_segment_pktsizes[i]);
exit(EXIT_FAILURE);
}
SCLogDebug("my_segment_pktsizes[i] %u, my_segment_poolsizes[i] %u",
my_segment_pktsizes[i], my_segment_poolsizes[i]);
if (!quiet)
SCLogConfig("segment pool: pktsize %u, prealloc %u",
my_segment_pktsizes[i], my_segment_poolsizes[i]);
}
uint16_t idx = 0;
uint16_t u16 = 0;
while (1) {
if (idx <= my_segment_pktsizes[u16]) {
segment_pool_idx[idx] = u16;
if (my_segment_pktsizes[u16] == idx)
u16++;
uint32_t segment_prealloc = 2048;
ConfNode *seg = ConfGetNode("stream.reassembly.segment-prealloc");
if (seg) {
uint32_t prealloc = 0;
if (ByteExtractStringUint32(&prealloc, 10, strlen(seg->val), seg->val) == -1)
{
SCLogError(SC_ERR_INVALID_ARGUMENT, "segment-prealloc of "
"%s is invalid", seg->val);
return -1;
}
if (idx == 0xffff)
break;
idx++;
segment_prealloc = prealloc;
}
/* set the globals */
segment_pool = my_segment_pool;
segment_pool_mutex = my_segment_lock;
segment_pool_pktsizes = my_segment_pktsizes;
segment_pool_num = npools;
if (!quiet)
SCLogInfo("stream.reassembly \"segment-prealloc\": %u", segment_prealloc);
stream_config.prealloc_segments = segment_prealloc;
uint32_t stream_chunk_prealloc = 250;
ConfNode *chunk = ConfGetNode("stream.reassembly.chunk-prealloc");
@ -596,11 +391,10 @@ int StreamTcpReassembleInit(char quiet)
if (StreamTcpReassemblyConfig(quiet) < 0)
return -1;
#ifdef DEBUG
SCMutexInit(&segment_pool_memuse_mutex, NULL);
SCMutexInit(&segment_pool_cnt_mutex, NULL);
#endif
StatsRegisterGlobalCounter("tcp.reassembly_memuse",
StreamTcpReassembleMemuseGlobalCounter);
return 0;
@ -608,44 +402,20 @@ int StreamTcpReassembleInit(char quiet)
void StreamTcpReassembleFree(char quiet)
{
uint16_t u16 = 0;
for (u16 = 0; u16 < segment_pool_num; u16++) {
SCMutexLock(&segment_pool_mutex[u16]);
if (quiet == FALSE) {
PoolPrintSaturation(segment_pool[u16]);
SCLogDebug("segment_pool[u16]->empty_stack_size %"PRIu32", "
"segment_pool[u16]->alloc_stack_size %"PRIu32", alloced "
"%"PRIu32"", segment_pool[u16]->empty_stack_size,
segment_pool[u16]->alloc_stack_size,
segment_pool[u16]->allocated);
if (segment_pool[u16]->max_outstanding > segment_pool[u16]->allocated) {
SCLogPerf("TCP segment pool of size %u had a peak use of %u segments, "
"more than the prealloc setting of %u", segment_pool_pktsizes[u16],
segment_pool[u16]->max_outstanding, segment_pool[u16]->allocated);
}
}
PoolFree(segment_pool[u16]);
StreamMsgQueuesDeinit(quiet);
SCMutexUnlock(&segment_pool_mutex[u16]);
SCMutexDestroy(&segment_pool_mutex[u16]);
SCMutexLock(&segment_thread_pool_mutex);
if (segment_thread_pool != NULL) {
PoolThreadFree(segment_thread_pool);
segment_thread_pool = NULL;
}
SCFree(segment_pool);
SCFree(segment_pool_mutex);
SCFree(segment_pool_pktsizes);
segment_pool = NULL;
segment_pool_mutex = NULL;
segment_pool_pktsizes = NULL;
StreamMsgQueuesDeinit(quiet);
SCMutexUnlock(&segment_thread_pool_mutex);
SCMutexDestroy(&segment_thread_pool_mutex);
#ifdef DEBUG
SCLogDebug("segment_pool_cnt %"PRIu64"", segment_pool_cnt);
SCLogDebug("segment_pool_memuse %"PRIu64"", segment_pool_memuse);
SCLogDebug("segment_pool_memcnt %"PRIu64"", segment_pool_memcnt);
SCLogInfo("segment_pool_memuse %"PRIu64"", segment_pool_memuse);
SCLogInfo("segment_pool_memcnt %"PRIu64"", segment_pool_memcnt);
SCMutexDestroy(&segment_pool_memuse_mutex);
SCMutexDestroy(&segment_pool_cnt_mutex);
#endif
}
@ -660,6 +430,36 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(ThreadVars *tv)
ra_ctx->app_tctx = AppLayerGetCtxThread(tv);
SCMutexLock(&segment_thread_pool_mutex);
if (segment_thread_pool == NULL) {
segment_thread_pool = PoolThreadInit(1, /* thread */
0, /* unlimited */
stream_config.prealloc_segments,
sizeof(TcpSegment),
TcpSegmentPoolAlloc,
TcpSegmentPoolInit, NULL,
TcpSegmentPoolCleanup, NULL);
ra_ctx->segment_thread_pool_id = 0;
SCLogDebug("pool size %d, thread segment_thread_pool_id %d",
PoolThreadSize(segment_thread_pool),
ra_ctx->segment_thread_pool_id);
} else {
/* grow segment_thread_pool until we have a element for our thread id */
ra_ctx->segment_thread_pool_id = PoolThreadGrow(segment_thread_pool,
0, /* unlimited */
stream_config.prealloc_segments,
sizeof(TcpSegment),
TcpSegmentPoolAlloc,
TcpSegmentPoolInit, NULL,
TcpSegmentPoolCleanup, NULL);
SCLogDebug("pool size %d, thread segment_thread_pool_id %d",
PoolThreadSize(segment_thread_pool),
ra_ctx->segment_thread_pool_id);
}
SCMutexUnlock(&segment_thread_pool_mutex);
if (ra_ctx->segment_thread_pool_id < 0 || segment_thread_pool == NULL)
abort();
SCReturnPtr(ra_ctx, "TcpReassemblyThreadCtx");
}
@ -888,10 +688,9 @@ int StreamTcpReassembleHandleSegmentHandleData(ThreadVars *tv, TcpReassemblyThre
size = p->payload_len;
#endif
TcpSegment *seg = StreamTcpGetSegment(tv, ra_ctx, size);
TcpSegment *seg = StreamTcpGetSegment(tv, ra_ctx);
if (seg == NULL) {
SCLogDebug("segment_pool[%"PRIu16"] is empty", segment_pool_idx[size]);
SCLogDebug("segment_pool is empty");
StreamTcpSetEvent(p, STREAM_REASSEMBLY_NO_SEGMENT);
SCReturnInt(-1);
}
@ -1708,32 +1507,15 @@ int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_
}
/**
* \brief Function to get the segment of required length from the pool.
*
* \param len Length which tells the required size of needed segment.
* \brief get a segment from the pool
*
* \retval seg Segment from the pool or NULL
*/
TcpSegment* StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, uint16_t len)
TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx)
{
uint16_t idx = segment_pool_idx[len];
SCLogDebug("segment_pool_idx %" PRIu32 " for payload_len %" PRIu32 "",
idx, len);
SCMutexLock(&segment_pool_mutex[idx]);
TcpSegment *seg = (TcpSegment *) PoolGet(segment_pool[idx]);
SCLogDebug("segment_pool[%u]->empty_stack_size %u, segment_pool[%u]->alloc_"
"list_size %u, alloc %u", idx, segment_pool[idx]->empty_stack_size,
idx, segment_pool[idx]->alloc_stack_size,
segment_pool[idx]->allocated);
SCMutexUnlock(&segment_pool_mutex[idx]);
TcpSegment *seg = (TcpSegment *) PoolThreadGetById(segment_thread_pool, ra_ctx->segment_thread_pool_id);
SCLogDebug("seg we return is %p", seg);
if (seg == NULL) {
SCLogDebug("segment_pool[%u]->empty_stack_size %u, "
"alloc %u", idx, segment_pool[idx]->empty_stack_size,
segment_pool[idx]->allocated);
/* Increment the counter to show that we are not able to serve the
segment request due to memcap limit */
StatsIncr(tv, ra_ctx->counter_tcp_segment_memcap);
@ -1745,12 +1527,6 @@ TcpSegment* StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
memset(&seg->sbseg, 0, sizeof(seg->sbseg));
}
#ifdef DEBUG
SCMutexLock(&segment_pool_cnt_mutex);
segment_pool_cnt++;
SCMutexUnlock(&segment_pool_cnt_mutex);
#endif
return seg;
}

@ -53,6 +53,9 @@ enum
typedef struct TcpReassemblyThreadCtx_ {
void *app_tctx;
int segment_thread_pool_id;
/** TCP segments which are not being reassembled due to memcap was reached */
uint16_t counter_tcp_segment_memcap;
/** number of streams that stop reassembly because their depth is reached */
@ -92,7 +95,7 @@ void StreamTcpSetOSPolicy(TcpStream *, Packet *);
int StreamTcpReassembleHandleSegmentHandleData(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx,
TcpSession *ssn, TcpStream *stream, Packet *p);
int StreamTcpReassembleInsertSegment(ThreadVars *, TcpReassemblyThreadCtx *, TcpStream *, TcpSegment *, Packet *, uint32_t pkt_seq, uint8_t *pkt_data, uint16_t pkt_datalen);
TcpSegment* StreamTcpGetSegment(ThreadVars *, TcpReassemblyThreadCtx *, uint16_t);
TcpSegment *StreamTcpGetSegment(ThreadVars *, TcpReassemblyThreadCtx *);
void StreamTcpReturnStreamSegments(TcpStream *);
void StreamTcpSegmentReturntoPool(TcpSegment *);

@ -112,7 +112,7 @@ int StreamTcpUTAddPayload(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, TcpSes
int StreamTcpUTAddSegmentWithPayload(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, TcpStream *stream, uint32_t seq, uint8_t *payload, uint16_t len)
{
TcpSegment *s = StreamTcpGetSegment(tv, ra_ctx, len);
TcpSegment *s = StreamTcpGetSegment(tv, ra_ctx);
if (s == NULL) {
return -1;
}
@ -135,7 +135,7 @@ int StreamTcpUTAddSegmentWithPayload(ThreadVars *tv, TcpReassemblyThreadCtx *ra_
int StreamTcpUTAddSegmentWithByte(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx, TcpStream *stream, uint32_t seq, uint8_t byte, uint16_t len)
{
TcpSegment *s = StreamTcpGetSegment(tv, ra_ctx, len);
TcpSegment *s = StreamTcpGetSegment(tv, ra_ctx);
if (s == NULL) {
return -1;
}

@ -48,6 +48,7 @@ typedef struct TcpStreamCnf_ {
uint8_t segment_init_flags; /**< new seg flags will be initialized to this */
uint32_t prealloc_sessions; /**< ssns to prealloc per stream thread */
uint32_t prealloc_segments; /**< segments to prealloc per stream thread */
int midstream;
int async_oneside;
uint32_t reassembly_depth; /**< Depth until when we reassemble the stream */

@ -1204,10 +1204,9 @@ flow-timeouts:
#
# chunk-prealloc: 250 # Number of preallocated stream chunks. These
# # are used during stream inspection (raw).
# segments: # Settings for reassembly segment pool.
# - size: 4 # Size of the (data)segment for a pool
# prealloc: 256 # Number of segments to prealloc and keep
# # in the pool.
#
# segment-prealloc: 2048 # number of segments preallocated per thread
#
# check-overlap-different-data: true|false
# # check if a segment contains different data
# # than what we've already seen for that
@ -1229,25 +1228,7 @@ stream:
#randomize-chunk-range: 10
#raw: yes
#chunk-prealloc: 250
#segments:
# - size: 4
# prealloc: 256
# - size: 16
# prealloc: 512
# - size: 112
# prealloc: 512
# - size: 248
# prealloc: 512
# - size: 512
# prealloc: 512
# - size: 768
# prealloc: 1024
# 'from_mtu' means that the size is mtu - 40,
# or 1460 if mtu couldn't be determined.
# - size: from_mtu
# prealloc: 1024
# - size: 65535
# prealloc: 128
#segment-prealloc: 2048
#check-overlap-different-data: true
# Host table:

Loading…
Cancel
Save