stream: reduce pool locking overhead

Add thread local cache to avoid locking overhead for ssns and segments.

A thread will return segments/ssns to a local cache first, and if that
is full, to a return queue where the actual return to the pool returns
a batch, to amortize locking overhead.

Adds segment and session pool/cache counters to see where how effective
the cache is.
pull/7957/head
Victor Julien 3 years ago
parent 536d66e344
commit 235f369ab9

@ -477,6 +477,7 @@ noinst_HEADERS = \
source-windivert-prototypes.h \
stream.h \
stream-tcp.h \
stream-tcp-cache.h \
stream-tcp-inline.h \
stream-tcp-list.h \
stream-tcp-private.h \
@ -1076,6 +1077,7 @@ libsuricata_c_a_SOURCES = \
source-windivert.c \
stream.c \
stream-tcp.c \
stream-tcp-cache.c \
stream-tcp-inline.c \
stream-tcp-list.c \
stream-tcp-reassemble.c \

@ -715,6 +715,7 @@ static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void *
static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
{
StreamTcpThreadCacheCleanup();
PacketPoolDestroy();
SCFree(data);
return TM_ECODE_OK;
@ -1022,6 +1023,8 @@ static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void
static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
{
StreamTcpThreadCacheCleanup();
FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
if (ftd->output_thread_data != NULL)
OutputFlowLogThreadDeinit(t, ftd->output_thread_data);

@ -0,0 +1,196 @@
/* Copyright (C) 2007-2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*/
#include "suricata-common.h"
#include "stream-tcp-private.h"
#include "stream-tcp-cache.h"
typedef struct TcpPoolCache {
bool cache_enabled; /**< cache should only be enabled for worker threads */
TcpSegment *segs_cache[64];
uint32_t segs_cache_idx;
uint32_t segs_returns_idx;
TcpSegment *segs_returns[64];
TcpSession *ssns_cache[64];
uint32_t ssns_cache_idx;
uint32_t ssns_returns_idx;
TcpSession *ssns_returns[64];
} TcpPoolCache;
static thread_local TcpPoolCache tcp_pool_cache;
extern PoolThread *ssn_pool;
extern PoolThread *segment_thread_pool;
/** \brief enable segment cache. Should only be done for worker threads */
void StreamTcpThreadCacheEnable(void)
{
tcp_pool_cache.cache_enabled = true;
}
void StreamTcpThreadCacheReturnSegment(TcpSegment *seg)
{
SCEnter();
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
PoolThreadReturn(segment_thread_pool, seg);
SCReturn;
}
#endif
/* cache can have segs from any pool id */
if (tcp_pool_cache.cache_enabled && tcp_pool_cache.segs_cache_idx < 64) {
tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx++] = seg;
} else {
/* segs_returns should only have a single pool id. If ours is different,
* flush it. */
bool flush = false;
if (tcp_pool_cache.segs_returns_idx &&
tcp_pool_cache.segs_returns[0]->pool_id != seg->pool_id) {
flush = true;
}
if (tcp_pool_cache.segs_returns_idx == 64) {
flush = true;
}
if (flush) {
PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id;
PoolThreadLock(segment_thread_pool, pool_id);
for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) {
TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i];
PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg);
}
PoolThreadUnlock(segment_thread_pool, pool_id);
tcp_pool_cache.segs_returns_idx = 0;
}
tcp_pool_cache.segs_returns[tcp_pool_cache.segs_returns_idx++] = seg;
}
}
void StreamTcpThreadCacheReturnSession(TcpSession *ssn)
{
SCEnter();
#ifdef UNITTESTS
if (RunmodeIsUnittests()) {
PoolThreadReturn(ssn_pool, ssn);
SCReturn;
}
#endif
/* cache can have ssns from any pool id */
if (tcp_pool_cache.cache_enabled && tcp_pool_cache.ssns_cache_idx < 64) {
tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx++] = ssn;
} else {
/* ssns_returns should only have a single pool id. If ours is different,
* flush it. */
bool flush = false;
if (tcp_pool_cache.ssns_returns_idx &&
tcp_pool_cache.ssns_returns[0]->pool_id != ssn->pool_id) {
flush = true;
}
if (tcp_pool_cache.ssns_returns_idx == 64) {
flush = true;
}
if (flush) {
PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id;
PoolThreadLock(ssn_pool, pool_id);
for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) {
TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i];
PoolThreadReturnRaw(ssn_pool, pool_id, ret_ssn);
}
PoolThreadUnlock(ssn_pool, pool_id);
tcp_pool_cache.ssns_returns_idx = 0;
}
tcp_pool_cache.ssns_returns[tcp_pool_cache.ssns_returns_idx++] = ssn;
}
SCReturn;
}
void StreamTcpThreadCacheCleanup(void)
{
SCEnter();
/* segments */
SCLogDebug("tcp_pool_cache.segs_cache_idx %u", tcp_pool_cache.segs_cache_idx);
for (uint32_t i = 0; i < tcp_pool_cache.segs_cache_idx; i++) {
PoolThreadReturn(segment_thread_pool, tcp_pool_cache.segs_cache[i]);
}
tcp_pool_cache.segs_cache_idx = 0;
SCLogDebug("tcp_pool_cache.segs_returns_idx %u", tcp_pool_cache.segs_returns_idx);
if (tcp_pool_cache.segs_returns_idx) {
PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id;
PoolThreadLock(segment_thread_pool, pool_id);
for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) {
TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i];
PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg);
}
PoolThreadUnlock(segment_thread_pool, pool_id);
tcp_pool_cache.segs_returns_idx = 0;
}
/* sessions */
SCLogDebug("tcp_pool_cache.ssns_cache_idx %u", tcp_pool_cache.ssns_cache_idx);
for (uint32_t i = 0; i < tcp_pool_cache.ssns_cache_idx; i++) {
PoolThreadReturn(segment_thread_pool, tcp_pool_cache.ssns_cache[i]);
}
tcp_pool_cache.ssns_cache_idx = 0;
SCLogDebug("tcp_pool_cache.ssns_returns_idx %u", tcp_pool_cache.ssns_returns_idx);
if (tcp_pool_cache.ssns_returns_idx) {
PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id;
PoolThreadLock(segment_thread_pool, pool_id);
for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) {
TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i];
PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_ssn);
}
PoolThreadUnlock(segment_thread_pool, pool_id);
tcp_pool_cache.ssns_returns_idx = 0;
}
SCReturn;
}
TcpSegment *StreamTcpThreadCacheGetSegment(void)
{
if (tcp_pool_cache.segs_cache_idx) {
TcpSegment *seg = tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx - 1];
tcp_pool_cache.segs_cache_idx--;
memset(&seg->sbseg, 0, sizeof(seg->sbseg));
return seg;
}
return NULL;
}
TcpSession *StreamTcpThreadCacheGetSession(void)
{
if (tcp_pool_cache.ssns_cache_idx) {
TcpSession *ssn = tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx - 1];
tcp_pool_cache.ssns_cache_idx--;
return ssn;
}
return NULL;
}

@ -0,0 +1,39 @@
/* Copyright (C) 2007-2022 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
* Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301, USA.
*/
/**
* \file
*
* \author Victor Julien <victor@inliniac.net>
*/
#ifndef __STREAM_TCP_CACHE_H__
#define __STREAM_TCP_CACHE_H__
#include "suricata.h"
#include "flow.h"
#include "stream-tcp-private.h"
void StreamTcpThreadCacheEnable(void);
void StreamTcpThreadCacheReturnSegment(TcpSegment *seg);
void StreamTcpThreadCacheReturnSession(TcpSession *ssn);
void StreamTcpThreadCacheCleanup(void);
TcpSegment *StreamTcpThreadCacheGetSegment(void);
TcpSession *StreamTcpThreadCacheGetSession(void);
#endif /* __STREAM_TCP_CACHE_H__ */

@ -70,7 +70,7 @@ typedef struct TcpSegmentPcapHdrStorage_ {
} TcpSegmentPcapHdrStorage;
typedef struct TcpSegment {
PoolThreadReserved res;
PoolThreadId pool_id;
uint16_t payload_len; /**< actual size of the payload */
uint32_t seq;
RB_ENTRY(TcpSegment) __attribute__((__packed__)) rb;
@ -269,7 +269,7 @@ enum TcpState {
}
typedef struct TcpSession_ {
PoolThreadReserved res;
PoolThreadId pool_id;
uint8_t state:4; /**< tcp state from state enum */
uint8_t pstate:4; /**< previous state */
uint8_t queue_len; /**< length of queue list below */

@ -48,6 +48,7 @@
#include "stream-tcp.h"
#include "stream-tcp-private.h"
#include "stream-tcp-cache.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp-inline.h"
#include "stream-tcp-list.h"
@ -76,7 +77,7 @@ static uint64_t segment_pool_memcnt = 0;
thread_local uint64_t t_pcapcnt = UINT64_MAX;
static PoolThread *segment_thread_pool = NULL;
PoolThread *segment_thread_pool = NULL;
/* init only, protect initializing and growing pool */
static SCMutex segment_thread_pool_mutex = SCMUTEX_INITIALIZER;
@ -374,7 +375,7 @@ void StreamTcpSegmentReturntoPool(TcpSegment *seg)
seg->pcap_hdr_storage->pktlen = 0;
}
PoolThreadReturn(segment_thread_pool, seg);
StreamTcpThreadCacheReturnSegment(seg);
}
/**
@ -565,6 +566,8 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(ThreadVars *tv)
void StreamTcpReassembleFreeThreadCtx(TcpReassemblyThreadCtx *ra_ctx)
{
SCEnter();
StreamTcpThreadCacheCleanup();
if (ra_ctx) {
AppLayerDestroyCtxThread(ra_ctx->app_tctx);
SCFree(ra_ctx);
@ -2036,7 +2039,14 @@ int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_
*/
TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx)
{
TcpSegment *seg = (TcpSegment *)PoolThreadGetById(
TcpSegment *seg = StreamTcpThreadCacheGetSegment();
if (seg) {
StatsIncr(tv, ra_ctx->counter_tcp_segment_from_cache);
memset(&seg->sbseg, 0, sizeof(seg->sbseg));
return seg;
}
seg = (TcpSegment *)PoolThreadGetById(
segment_thread_pool, (uint16_t)ra_ctx->segment_thread_pool_id);
SCLogDebug("seg we return is %p", seg);
if (seg == NULL) {
@ -2045,6 +2055,7 @@ TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx)
StatsIncr(tv, ra_ctx->counter_tcp_segment_memcap);
} else {
memset(&seg->sbseg, 0, sizeof(seg->sbseg));
StatsIncr(tv, ra_ctx->counter_tcp_segment_from_pool);
}
return seg;

@ -63,6 +63,10 @@ typedef struct TcpReassemblyThreadCtx_ {
/** TCP segments which are not being reassembled due to memcap was reached */
uint16_t counter_tcp_segment_memcap;
uint16_t counter_tcp_segment_from_cache;
uint16_t counter_tcp_segment_from_pool;
/** number of streams that stop reassembly because their depth is reached */
uint16_t counter_tcp_stream_depth;
/** count number of streams with a unrecoverable stream gap (missing pkts) */

@ -51,9 +51,10 @@
#include "util-device.h"
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp.h"
#include "stream-tcp-cache.h"
#include "stream-tcp-inline.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp-sack.h"
#include "stream-tcp-util.h"
#include "stream.h"
@ -104,7 +105,7 @@ static int StreamTcpStateDispatch(ThreadVars *tv, Packet *p,
extern thread_local uint64_t t_pcapcnt;
extern int g_detect_disabled;
static PoolThread *ssn_pool = NULL;
PoolThread *ssn_pool = NULL;
static SCMutex ssn_pool_mutex = SCMUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */
#ifdef DEBUG
static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */
@ -250,11 +251,11 @@ void StreamTcpSessionClear(void *ssnptr)
StreamTcpSessionCleanup(ssn);
/* HACK: don't loose track of thread id */
PoolThreadReserved a = ssn->res;
PoolThreadId pool_id = ssn->pool_id;
memset(ssn, 0, sizeof(TcpSession));
ssn->res = a;
ssn->pool_id = pool_id;
PoolThreadReturn(ssn_pool, ssn);
StreamTcpThreadCacheReturnSession(ssn);
#ifdef DEBUG
SCMutexLock(&ssn_pool_mutex);
ssn_pool_cnt--;
@ -688,13 +689,26 @@ void StreamTcpFreeConfig(bool quiet)
*
* \retval ssn new TCP session.
*/
static TcpSession *StreamTcpNewSession (Packet *p, int id)
static TcpSession *StreamTcpNewSession(ThreadVars *tv, StreamTcpThread *stt, Packet *p, int id)
{
TcpSession *ssn = (TcpSession *)p->flow->protoctx;
if (ssn == NULL) {
DEBUG_VALIDATE_BUG_ON(id < 0 || id > UINT16_MAX);
p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id);
p->flow->protoctx = StreamTcpThreadCacheGetSession();
if (p->flow->protoctx != NULL) {
#ifdef UNITTESTS
if (tv)
#endif
StatsIncr(tv, stt->counter_tcp_ssn_from_cache);
} else {
p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id);
if (p->flow->protoctx != NULL)
#ifdef UNITTESTS
if (tv)
#endif
StatsIncr(tv, stt->counter_tcp_ssn_from_pool);
}
#ifdef DEBUG
SCMutexLock(&ssn_pool_mutex);
if (p->flow->protoctx != NULL)
@ -940,7 +954,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
SCLogDebug("midstream picked up");
if (ssn == NULL) {
ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
if (ssn == NULL) {
StatsIncr(tv, stt->counter_tcp_ssn_memcap);
return -1;
@ -1033,7 +1047,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
} else if (p->tcph->th_flags & TH_SYN) {
if (ssn == NULL) {
ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
if (ssn == NULL) {
StatsIncr(tv, stt->counter_tcp_ssn_memcap);
return -1;
@ -1113,7 +1127,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p,
SCLogDebug("midstream picked up");
if (ssn == NULL) {
ssn = StreamTcpNewSession(p, stt->ssn_pool_id);
ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id);
if (ssn == NULL) {
StatsIncr(tv, stt->counter_tcp_ssn_memcap);
return -1;
@ -5394,12 +5408,15 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
SCReturnInt(TM_ECODE_FAILED);
memset(stt, 0, sizeof(StreamTcpThread));
stt->ssn_pool_id = -1;
StreamTcpThreadCacheEnable();
*data = (void *)stt;
stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv);
stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv);
stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv);
stt->counter_tcp_ssn_from_cache = StatsRegisterCounter("tcp.ssn_from_cache", tv);
stt->counter_tcp_ssn_from_pool = StatsRegisterCounter("tcp.ssn_from_pool", tv);
stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv);
stt->counter_tcp_pseudo_failed = StatsRegisterCounter("tcp.pseudo_failed", tv);
stt->counter_tcp_invalid_checksum = StatsRegisterCounter("tcp.invalid_checksum", tv);
@ -5416,6 +5433,9 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data)
SCReturnInt(TM_ECODE_FAILED);
stt->ra_ctx->counter_tcp_segment_memcap = StatsRegisterCounter("tcp.segment_memcap_drop", tv);
stt->ra_ctx->counter_tcp_segment_from_cache =
StatsRegisterCounter("tcp.segment_from_cache", tv);
stt->ra_ctx->counter_tcp_segment_from_pool = StatsRegisterCounter("tcp.segment_from_pool", tv);
stt->ra_ctx->counter_tcp_stream_depth = StatsRegisterCounter("tcp.stream_depth_reached", tv);
stt->ra_ctx->counter_tcp_reass_gap = StatsRegisterCounter("tcp.reassembly_gap", tv);
stt->ra_ctx->counter_tcp_reass_overlap = StatsRegisterCounter("tcp.overlap", tv);

@ -84,6 +84,8 @@ typedef struct StreamTcpThread_ {
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
uint16_t counter_tcp_ssn_memcap;
uint16_t counter_tcp_ssn_from_cache;
uint16_t counter_tcp_ssn_from_pool;
/** pseudo packets processed */
uint16_t counter_tcp_pseudo;
/** pseudo packets failed to setup */
@ -211,5 +213,8 @@ void StreamTcpUpdateAppLayerProgress(TcpSession *ssn, char direction,
uint64_t StreamTcpGetAcked(const TcpStream *stream);
uint64_t StreamTcpGetUsable(const TcpStream *stream, const bool eof);
void StreamTcpThreadCacheEnable(void);
void StreamTcpThreadCacheCleanup(void);
#endif /* __STREAM_TCP_H__ */

@ -47,7 +47,7 @@ static int StreamTcpTest01(void)
FLOW_INITIALIZE(&f);
p->flow = &f;
StreamTcpUTInit(&stt.ra_ctx);
TcpSession *ssn = StreamTcpNewSession(p, 0);
TcpSession *ssn = StreamTcpNewSession(NULL, &stt, p, 0);
FAIL_IF_NULL(ssn);
f.protoctx = ssn;
FAIL_IF_NOT_NULL(f.alparser);

@ -177,7 +177,7 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id)
data = PoolGet(e->pool);
SCMutexUnlock(&e->lock);
if (data) {
PoolThreadReserved *did = data;
PoolThreadId *did = data;
*did = id;
}
@ -186,7 +186,7 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id)
void PoolThreadReturn(PoolThread *pt, void *data)
{
PoolThreadReserved *id = data;
PoolThreadId *id = data;
if (pt == NULL || *id >= pt->size)
return;
@ -199,9 +199,30 @@ void PoolThreadReturn(PoolThread *pt, void *data)
SCMutexUnlock(&e->lock);
}
void PoolThreadLock(PoolThread *pt, PoolThreadId id)
{
BUG_ON(pt == NULL || id >= pt->size);
PoolThreadElement *e = &pt->array[id];
SCMutexLock(&e->lock);
}
void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data)
{
BUG_ON(pt == NULL || id >= pt->size);
PoolThreadElement *e = &pt->array[id];
PoolReturn(e->pool, data);
}
void PoolThreadUnlock(PoolThread *pt, PoolThreadId id)
{
BUG_ON(pt == NULL || id >= pt->size);
PoolThreadElement *e = &pt->array[id];
SCMutexUnlock(&e->lock);
}
#ifdef UNITTESTS
struct PoolThreadTestData {
PoolThreadReserved res;
PoolThreadId res;
int abc;
};

@ -57,7 +57,7 @@ typedef struct PoolThread_ {
/** per data item reserved data containing the
* thread pool id */
typedef uint16_t PoolThreadReserved;
typedef uint16_t PoolThreadId;
void PoolThreadRegisterTests(void);
@ -91,6 +91,10 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id);
* \param data memory block to return, with PoolThreadReserved as it's first member */
void PoolThreadReturn(PoolThread *pt, void *data);
void PoolThreadLock(PoolThread *pt, PoolThreadId id);
void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data);
void PoolThreadUnlock(PoolThread *pt, PoolThreadId id);
/** \brief get size of PoolThread (number of 'threads', so array elements)
* \param pt thread pool
* \retval size or -1 on error */

Loading…
Cancel
Save