From 235f369ab98923a75fc655b14d53d0def5eed971 Mon Sep 17 00:00:00 2001 From: Victor Julien Date: Fri, 23 Sep 2022 22:54:52 +0200 Subject: [PATCH] stream: reduce pool locking overhead Add thread local cache to avoid locking overhead for ssns and segments. A thread will return segments/ssns to a local cache first, and if that is full, to a return queue where the actual return to the pool returns a batch, to amortize locking overhead. Adds segment and session pool/cache counters to see where how effective the cache is. --- src/Makefile.am | 2 + src/flow-manager.c | 3 + src/stream-tcp-cache.c | 196 ++++++++++++++++++++++++++++++++++++ src/stream-tcp-cache.h | 39 +++++++ src/stream-tcp-private.h | 4 +- src/stream-tcp-reassemble.c | 17 +++- src/stream-tcp-reassemble.h | 4 + src/stream-tcp.c | 40 ++++++-- src/stream-tcp.h | 5 + src/tests/stream-tcp.c | 2 +- src/util-pool-thread.c | 27 ++++- src/util-pool-thread.h | 6 +- 12 files changed, 325 insertions(+), 20 deletions(-) create mode 100644 src/stream-tcp-cache.c create mode 100644 src/stream-tcp-cache.h diff --git a/src/Makefile.am b/src/Makefile.am index f348e89f22..799a8bd972 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -477,6 +477,7 @@ noinst_HEADERS = \ source-windivert-prototypes.h \ stream.h \ stream-tcp.h \ + stream-tcp-cache.h \ stream-tcp-inline.h \ stream-tcp-list.h \ stream-tcp-private.h \ @@ -1076,6 +1077,7 @@ libsuricata_c_a_SOURCES = \ source-windivert.c \ stream.c \ stream-tcp.c \ + stream-tcp-cache.c \ stream-tcp-inline.c \ stream-tcp-list.c \ stream-tcp-reassemble.c \ diff --git a/src/flow-manager.c b/src/flow-manager.c index 23ab9e801a..b26542706b 100644 --- a/src/flow-manager.c +++ b/src/flow-manager.c @@ -715,6 +715,7 @@ static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void * static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data) { + StreamTcpThreadCacheCleanup(); PacketPoolDestroy(); SCFree(data); return TM_ECODE_OK; @@ -1022,6 +1023,8 @@ static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) { + StreamTcpThreadCacheCleanup(); + FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data; if (ftd->output_thread_data != NULL) OutputFlowLogThreadDeinit(t, ftd->output_thread_data); diff --git a/src/stream-tcp-cache.c b/src/stream-tcp-cache.c new file mode 100644 index 0000000000..b74d033758 --- /dev/null +++ b/src/stream-tcp-cache.c @@ -0,0 +1,196 @@ +/* Copyright (C) 2007-2022 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Victor Julien + */ + +#include "suricata-common.h" +#include "stream-tcp-private.h" +#include "stream-tcp-cache.h" + +typedef struct TcpPoolCache { + bool cache_enabled; /**< cache should only be enabled for worker threads */ + TcpSegment *segs_cache[64]; + uint32_t segs_cache_idx; + uint32_t segs_returns_idx; + TcpSegment *segs_returns[64]; + + TcpSession *ssns_cache[64]; + uint32_t ssns_cache_idx; + uint32_t ssns_returns_idx; + TcpSession *ssns_returns[64]; +} TcpPoolCache; + +static thread_local TcpPoolCache tcp_pool_cache; +extern PoolThread *ssn_pool; +extern PoolThread *segment_thread_pool; + +/** \brief enable segment cache. Should only be done for worker threads */ +void StreamTcpThreadCacheEnable(void) +{ + tcp_pool_cache.cache_enabled = true; +} + +void StreamTcpThreadCacheReturnSegment(TcpSegment *seg) +{ + SCEnter(); +#ifdef UNITTESTS + if (RunmodeIsUnittests()) { + PoolThreadReturn(segment_thread_pool, seg); + SCReturn; + } +#endif + + /* cache can have segs from any pool id */ + if (tcp_pool_cache.cache_enabled && tcp_pool_cache.segs_cache_idx < 64) { + tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx++] = seg; + } else { + /* segs_returns should only have a single pool id. If ours is different, + * flush it. */ + bool flush = false; + if (tcp_pool_cache.segs_returns_idx && + tcp_pool_cache.segs_returns[0]->pool_id != seg->pool_id) { + flush = true; + } + if (tcp_pool_cache.segs_returns_idx == 64) { + flush = true; + } + + if (flush) { + PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id; + PoolThreadLock(segment_thread_pool, pool_id); + for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) { + TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i]; + PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg); + } + PoolThreadUnlock(segment_thread_pool, pool_id); + tcp_pool_cache.segs_returns_idx = 0; + } + + tcp_pool_cache.segs_returns[tcp_pool_cache.segs_returns_idx++] = seg; + } +} + +void StreamTcpThreadCacheReturnSession(TcpSession *ssn) +{ + SCEnter(); +#ifdef UNITTESTS + if (RunmodeIsUnittests()) { + PoolThreadReturn(ssn_pool, ssn); + SCReturn; + } +#endif + + /* cache can have ssns from any pool id */ + if (tcp_pool_cache.cache_enabled && tcp_pool_cache.ssns_cache_idx < 64) { + tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx++] = ssn; + } else { + /* ssns_returns should only have a single pool id. If ours is different, + * flush it. */ + bool flush = false; + if (tcp_pool_cache.ssns_returns_idx && + tcp_pool_cache.ssns_returns[0]->pool_id != ssn->pool_id) { + flush = true; + } + if (tcp_pool_cache.ssns_returns_idx == 64) { + flush = true; + } + + if (flush) { + PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id; + PoolThreadLock(ssn_pool, pool_id); + for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) { + TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i]; + PoolThreadReturnRaw(ssn_pool, pool_id, ret_ssn); + } + PoolThreadUnlock(ssn_pool, pool_id); + tcp_pool_cache.ssns_returns_idx = 0; + } + + tcp_pool_cache.ssns_returns[tcp_pool_cache.ssns_returns_idx++] = ssn; + } + SCReturn; +} + +void StreamTcpThreadCacheCleanup(void) +{ + SCEnter(); + + /* segments */ + SCLogDebug("tcp_pool_cache.segs_cache_idx %u", tcp_pool_cache.segs_cache_idx); + for (uint32_t i = 0; i < tcp_pool_cache.segs_cache_idx; i++) { + PoolThreadReturn(segment_thread_pool, tcp_pool_cache.segs_cache[i]); + } + tcp_pool_cache.segs_cache_idx = 0; + + SCLogDebug("tcp_pool_cache.segs_returns_idx %u", tcp_pool_cache.segs_returns_idx); + if (tcp_pool_cache.segs_returns_idx) { + PoolThreadId pool_id = tcp_pool_cache.segs_returns[0]->pool_id; + PoolThreadLock(segment_thread_pool, pool_id); + for (uint32_t i = 0; i < tcp_pool_cache.segs_returns_idx; i++) { + TcpSegment *ret_seg = tcp_pool_cache.segs_returns[i]; + PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_seg); + } + PoolThreadUnlock(segment_thread_pool, pool_id); + tcp_pool_cache.segs_returns_idx = 0; + } + + /* sessions */ + SCLogDebug("tcp_pool_cache.ssns_cache_idx %u", tcp_pool_cache.ssns_cache_idx); + for (uint32_t i = 0; i < tcp_pool_cache.ssns_cache_idx; i++) { + PoolThreadReturn(segment_thread_pool, tcp_pool_cache.ssns_cache[i]); + } + tcp_pool_cache.ssns_cache_idx = 0; + + SCLogDebug("tcp_pool_cache.ssns_returns_idx %u", tcp_pool_cache.ssns_returns_idx); + if (tcp_pool_cache.ssns_returns_idx) { + PoolThreadId pool_id = tcp_pool_cache.ssns_returns[0]->pool_id; + PoolThreadLock(segment_thread_pool, pool_id); + for (uint32_t i = 0; i < tcp_pool_cache.ssns_returns_idx; i++) { + TcpSession *ret_ssn = tcp_pool_cache.ssns_returns[i]; + PoolThreadReturnRaw(segment_thread_pool, pool_id, ret_ssn); + } + PoolThreadUnlock(segment_thread_pool, pool_id); + tcp_pool_cache.ssns_returns_idx = 0; + } + + SCReturn; +} + +TcpSegment *StreamTcpThreadCacheGetSegment(void) +{ + if (tcp_pool_cache.segs_cache_idx) { + TcpSegment *seg = tcp_pool_cache.segs_cache[tcp_pool_cache.segs_cache_idx - 1]; + tcp_pool_cache.segs_cache_idx--; + memset(&seg->sbseg, 0, sizeof(seg->sbseg)); + return seg; + } + return NULL; +} + +TcpSession *StreamTcpThreadCacheGetSession(void) +{ + if (tcp_pool_cache.ssns_cache_idx) { + TcpSession *ssn = tcp_pool_cache.ssns_cache[tcp_pool_cache.ssns_cache_idx - 1]; + tcp_pool_cache.ssns_cache_idx--; + return ssn; + } + return NULL; +} diff --git a/src/stream-tcp-cache.h b/src/stream-tcp-cache.h new file mode 100644 index 0000000000..1a61532a15 --- /dev/null +++ b/src/stream-tcp-cache.h @@ -0,0 +1,39 @@ +/* Copyright (C) 2007-2022 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Victor Julien + */ + +#ifndef __STREAM_TCP_CACHE_H__ +#define __STREAM_TCP_CACHE_H__ + +#include "suricata.h" +#include "flow.h" +#include "stream-tcp-private.h" + +void StreamTcpThreadCacheEnable(void); +void StreamTcpThreadCacheReturnSegment(TcpSegment *seg); +void StreamTcpThreadCacheReturnSession(TcpSession *ssn); +void StreamTcpThreadCacheCleanup(void); + +TcpSegment *StreamTcpThreadCacheGetSegment(void); +TcpSession *StreamTcpThreadCacheGetSession(void); + +#endif /* __STREAM_TCP_CACHE_H__ */ diff --git a/src/stream-tcp-private.h b/src/stream-tcp-private.h index cf7f882eeb..e817533d7a 100644 --- a/src/stream-tcp-private.h +++ b/src/stream-tcp-private.h @@ -70,7 +70,7 @@ typedef struct TcpSegmentPcapHdrStorage_ { } TcpSegmentPcapHdrStorage; typedef struct TcpSegment { - PoolThreadReserved res; + PoolThreadId pool_id; uint16_t payload_len; /**< actual size of the payload */ uint32_t seq; RB_ENTRY(TcpSegment) __attribute__((__packed__)) rb; @@ -269,7 +269,7 @@ enum TcpState { } typedef struct TcpSession_ { - PoolThreadReserved res; + PoolThreadId pool_id; uint8_t state:4; /**< tcp state from state enum */ uint8_t pstate:4; /**< previous state */ uint8_t queue_len; /**< length of queue list below */ diff --git a/src/stream-tcp-reassemble.c b/src/stream-tcp-reassemble.c index 2a26ae6568..5f4032cc30 100644 --- a/src/stream-tcp-reassemble.c +++ b/src/stream-tcp-reassemble.c @@ -48,6 +48,7 @@ #include "stream-tcp.h" #include "stream-tcp-private.h" +#include "stream-tcp-cache.h" #include "stream-tcp-reassemble.h" #include "stream-tcp-inline.h" #include "stream-tcp-list.h" @@ -76,7 +77,7 @@ static uint64_t segment_pool_memcnt = 0; thread_local uint64_t t_pcapcnt = UINT64_MAX; -static PoolThread *segment_thread_pool = NULL; +PoolThread *segment_thread_pool = NULL; /* init only, protect initializing and growing pool */ static SCMutex segment_thread_pool_mutex = SCMUTEX_INITIALIZER; @@ -374,7 +375,7 @@ void StreamTcpSegmentReturntoPool(TcpSegment *seg) seg->pcap_hdr_storage->pktlen = 0; } - PoolThreadReturn(segment_thread_pool, seg); + StreamTcpThreadCacheReturnSegment(seg); } /** @@ -565,6 +566,8 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(ThreadVars *tv) void StreamTcpReassembleFreeThreadCtx(TcpReassemblyThreadCtx *ra_ctx) { SCEnter(); + StreamTcpThreadCacheCleanup(); + if (ra_ctx) { AppLayerDestroyCtxThread(ra_ctx->app_tctx); SCFree(ra_ctx); @@ -2036,7 +2039,14 @@ int StreamTcpReassembleHandleSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ */ TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx) { - TcpSegment *seg = (TcpSegment *)PoolThreadGetById( + TcpSegment *seg = StreamTcpThreadCacheGetSegment(); + if (seg) { + StatsIncr(tv, ra_ctx->counter_tcp_segment_from_cache); + memset(&seg->sbseg, 0, sizeof(seg->sbseg)); + return seg; + } + + seg = (TcpSegment *)PoolThreadGetById( segment_thread_pool, (uint16_t)ra_ctx->segment_thread_pool_id); SCLogDebug("seg we return is %p", seg); if (seg == NULL) { @@ -2045,6 +2055,7 @@ TcpSegment *StreamTcpGetSegment(ThreadVars *tv, TcpReassemblyThreadCtx *ra_ctx) StatsIncr(tv, ra_ctx->counter_tcp_segment_memcap); } else { memset(&seg->sbseg, 0, sizeof(seg->sbseg)); + StatsIncr(tv, ra_ctx->counter_tcp_segment_from_pool); } return seg; diff --git a/src/stream-tcp-reassemble.h b/src/stream-tcp-reassemble.h index 11bdd63851..4135db0399 100644 --- a/src/stream-tcp-reassemble.h +++ b/src/stream-tcp-reassemble.h @@ -63,6 +63,10 @@ typedef struct TcpReassemblyThreadCtx_ { /** TCP segments which are not being reassembled due to memcap was reached */ uint16_t counter_tcp_segment_memcap; + + uint16_t counter_tcp_segment_from_cache; + uint16_t counter_tcp_segment_from_pool; + /** number of streams that stop reassembly because their depth is reached */ uint16_t counter_tcp_stream_depth; /** count number of streams with a unrecoverable stream gap (missing pkts) */ diff --git a/src/stream-tcp.c b/src/stream-tcp.c index e487d6796a..1430e2f9e4 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -51,9 +51,10 @@ #include "util-device.h" #include "stream-tcp-private.h" -#include "stream-tcp-reassemble.h" #include "stream-tcp.h" +#include "stream-tcp-cache.h" #include "stream-tcp-inline.h" +#include "stream-tcp-reassemble.h" #include "stream-tcp-sack.h" #include "stream-tcp-util.h" #include "stream.h" @@ -104,7 +105,7 @@ static int StreamTcpStateDispatch(ThreadVars *tv, Packet *p, extern thread_local uint64_t t_pcapcnt; extern int g_detect_disabled; -static PoolThread *ssn_pool = NULL; +PoolThread *ssn_pool = NULL; static SCMutex ssn_pool_mutex = SCMUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */ #ifdef DEBUG static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */ @@ -250,11 +251,11 @@ void StreamTcpSessionClear(void *ssnptr) StreamTcpSessionCleanup(ssn); /* HACK: don't loose track of thread id */ - PoolThreadReserved a = ssn->res; + PoolThreadId pool_id = ssn->pool_id; memset(ssn, 0, sizeof(TcpSession)); - ssn->res = a; + ssn->pool_id = pool_id; - PoolThreadReturn(ssn_pool, ssn); + StreamTcpThreadCacheReturnSession(ssn); #ifdef DEBUG SCMutexLock(&ssn_pool_mutex); ssn_pool_cnt--; @@ -688,13 +689,26 @@ void StreamTcpFreeConfig(bool quiet) * * \retval ssn new TCP session. */ -static TcpSession *StreamTcpNewSession (Packet *p, int id) +static TcpSession *StreamTcpNewSession(ThreadVars *tv, StreamTcpThread *stt, Packet *p, int id) { TcpSession *ssn = (TcpSession *)p->flow->protoctx; if (ssn == NULL) { DEBUG_VALIDATE_BUG_ON(id < 0 || id > UINT16_MAX); - p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id); + p->flow->protoctx = StreamTcpThreadCacheGetSession(); + if (p->flow->protoctx != NULL) { +#ifdef UNITTESTS + if (tv) +#endif + StatsIncr(tv, stt->counter_tcp_ssn_from_cache); + } else { + p->flow->protoctx = PoolThreadGetById(ssn_pool, (uint16_t)id); + if (p->flow->protoctx != NULL) +#ifdef UNITTESTS + if (tv) +#endif + StatsIncr(tv, stt->counter_tcp_ssn_from_pool); + } #ifdef DEBUG SCMutexLock(&ssn_pool_mutex); if (p->flow->protoctx != NULL) @@ -940,7 +954,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, SCLogDebug("midstream picked up"); if (ssn == NULL) { - ssn = StreamTcpNewSession(p, stt->ssn_pool_id); + ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id); if (ssn == NULL) { StatsIncr(tv, stt->counter_tcp_ssn_memcap); return -1; @@ -1033,7 +1047,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, } else if (p->tcph->th_flags & TH_SYN) { if (ssn == NULL) { - ssn = StreamTcpNewSession(p, stt->ssn_pool_id); + ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id); if (ssn == NULL) { StatsIncr(tv, stt->counter_tcp_ssn_memcap); return -1; @@ -1113,7 +1127,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, SCLogDebug("midstream picked up"); if (ssn == NULL) { - ssn = StreamTcpNewSession(p, stt->ssn_pool_id); + ssn = StreamTcpNewSession(tv, stt, p, stt->ssn_pool_id); if (ssn == NULL) { StatsIncr(tv, stt->counter_tcp_ssn_memcap); return -1; @@ -5394,12 +5408,15 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) SCReturnInt(TM_ECODE_FAILED); memset(stt, 0, sizeof(StreamTcpThread)); stt->ssn_pool_id = -1; + StreamTcpThreadCacheEnable(); *data = (void *)stt; stt->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", tv); stt->counter_tcp_sessions = StatsRegisterCounter("tcp.sessions", tv); stt->counter_tcp_ssn_memcap = StatsRegisterCounter("tcp.ssn_memcap_drop", tv); + stt->counter_tcp_ssn_from_cache = StatsRegisterCounter("tcp.ssn_from_cache", tv); + stt->counter_tcp_ssn_from_pool = StatsRegisterCounter("tcp.ssn_from_pool", tv); stt->counter_tcp_pseudo = StatsRegisterCounter("tcp.pseudo", tv); stt->counter_tcp_pseudo_failed = StatsRegisterCounter("tcp.pseudo_failed", tv); stt->counter_tcp_invalid_checksum = StatsRegisterCounter("tcp.invalid_checksum", tv); @@ -5416,6 +5433,9 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) SCReturnInt(TM_ECODE_FAILED); stt->ra_ctx->counter_tcp_segment_memcap = StatsRegisterCounter("tcp.segment_memcap_drop", tv); + stt->ra_ctx->counter_tcp_segment_from_cache = + StatsRegisterCounter("tcp.segment_from_cache", tv); + stt->ra_ctx->counter_tcp_segment_from_pool = StatsRegisterCounter("tcp.segment_from_pool", tv); stt->ra_ctx->counter_tcp_stream_depth = StatsRegisterCounter("tcp.stream_depth_reached", tv); stt->ra_ctx->counter_tcp_reass_gap = StatsRegisterCounter("tcp.reassembly_gap", tv); stt->ra_ctx->counter_tcp_reass_overlap = StatsRegisterCounter("tcp.overlap", tv); diff --git a/src/stream-tcp.h b/src/stream-tcp.h index a2d857ee1f..3a4b0e9e57 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -84,6 +84,8 @@ typedef struct StreamTcpThread_ { uint16_t counter_tcp_sessions; /** sessions not picked up because memcap was reached */ uint16_t counter_tcp_ssn_memcap; + uint16_t counter_tcp_ssn_from_cache; + uint16_t counter_tcp_ssn_from_pool; /** pseudo packets processed */ uint16_t counter_tcp_pseudo; /** pseudo packets failed to setup */ @@ -211,5 +213,8 @@ void StreamTcpUpdateAppLayerProgress(TcpSession *ssn, char direction, uint64_t StreamTcpGetAcked(const TcpStream *stream); uint64_t StreamTcpGetUsable(const TcpStream *stream, const bool eof); +void StreamTcpThreadCacheEnable(void); +void StreamTcpThreadCacheCleanup(void); + #endif /* __STREAM_TCP_H__ */ diff --git a/src/tests/stream-tcp.c b/src/tests/stream-tcp.c index c6eec9b902..f3c0ff0f88 100644 --- a/src/tests/stream-tcp.c +++ b/src/tests/stream-tcp.c @@ -47,7 +47,7 @@ static int StreamTcpTest01(void) FLOW_INITIALIZE(&f); p->flow = &f; StreamTcpUTInit(&stt.ra_ctx); - TcpSession *ssn = StreamTcpNewSession(p, 0); + TcpSession *ssn = StreamTcpNewSession(NULL, &stt, p, 0); FAIL_IF_NULL(ssn); f.protoctx = ssn; FAIL_IF_NOT_NULL(f.alparser); diff --git a/src/util-pool-thread.c b/src/util-pool-thread.c index 8ed982bb23..d80d29570b 100644 --- a/src/util-pool-thread.c +++ b/src/util-pool-thread.c @@ -177,7 +177,7 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id) data = PoolGet(e->pool); SCMutexUnlock(&e->lock); if (data) { - PoolThreadReserved *did = data; + PoolThreadId *did = data; *did = id; } @@ -186,7 +186,7 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id) void PoolThreadReturn(PoolThread *pt, void *data) { - PoolThreadReserved *id = data; + PoolThreadId *id = data; if (pt == NULL || *id >= pt->size) return; @@ -199,9 +199,30 @@ void PoolThreadReturn(PoolThread *pt, void *data) SCMutexUnlock(&e->lock); } +void PoolThreadLock(PoolThread *pt, PoolThreadId id) +{ + BUG_ON(pt == NULL || id >= pt->size); + PoolThreadElement *e = &pt->array[id]; + SCMutexLock(&e->lock); +} + +void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data) +{ + BUG_ON(pt == NULL || id >= pt->size); + PoolThreadElement *e = &pt->array[id]; + PoolReturn(e->pool, data); +} + +void PoolThreadUnlock(PoolThread *pt, PoolThreadId id) +{ + BUG_ON(pt == NULL || id >= pt->size); + PoolThreadElement *e = &pt->array[id]; + SCMutexUnlock(&e->lock); +} + #ifdef UNITTESTS struct PoolThreadTestData { - PoolThreadReserved res; + PoolThreadId res; int abc; }; diff --git a/src/util-pool-thread.h b/src/util-pool-thread.h index 47e343b2c4..44b76e75e2 100644 --- a/src/util-pool-thread.h +++ b/src/util-pool-thread.h @@ -57,7 +57,7 @@ typedef struct PoolThread_ { /** per data item reserved data containing the * thread pool id */ -typedef uint16_t PoolThreadReserved; +typedef uint16_t PoolThreadId; void PoolThreadRegisterTests(void); @@ -91,6 +91,10 @@ void *PoolThreadGetById(PoolThread *pt, uint16_t id); * \param data memory block to return, with PoolThreadReserved as it's first member */ void PoolThreadReturn(PoolThread *pt, void *data); +void PoolThreadLock(PoolThread *pt, PoolThreadId id); +void PoolThreadReturnRaw(PoolThread *pt, PoolThreadId id, void *data); +void PoolThreadUnlock(PoolThread *pt, PoolThreadId id); + /** \brief get size of PoolThread (number of 'threads', so array elements) * \param pt thread pool * \retval size or -1 on error */