diff --git a/src/stream-tcp-private.h b/src/stream-tcp-private.h index 2933e3f700..dc60c742eb 100644 --- a/src/stream-tcp-private.h +++ b/src/stream-tcp-private.h @@ -25,6 +25,8 @@ #define __STREAM_TCP_PRIVATE_H__ #include "decode.h" +#include "util-pool.h" +#include "util-pool-thread.h" #define STREAMTCP_QUEUE_FLAG_TS 0x01 #define STREAMTCP_QUEUE_FLAG_WS 0x02 @@ -200,6 +202,7 @@ enum } typedef struct TcpSession_ { + PoolThreadReserved res; uint8_t state; uint8_t queue_len; /**< length of queue list below */ uint16_t flags; diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 1aee65c480..0c433e76ac 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -44,6 +44,7 @@ #include "tm-threads.h" #include "util-pool.h" +#include "util-pool-thread.h" #include "util-checksum.h" #include "util-unittest.h" #include "util-print.h" @@ -71,8 +72,7 @@ //#define DEBUG -#define STREAMTCP_DEFAULT_SESSIONS 262144 -#define STREAMTCP_DEFAULT_PREALLOC 32768 +#define STREAMTCP_DEFAULT_PREALLOC 2048 #define STREAMTCP_DEFAULT_MEMCAP (32 * 1024 * 1024) /* 32mb */ #define STREAMTCP_DEFAULT_REASSEMBLY_MEMCAP (64 * 1024 * 1024) /* 64mb */ #define STREAMTCP_DEFAULT_TOSERVER_CHUNK_SIZE 2560 @@ -104,8 +104,8 @@ static int StreamTcpHandleTimestamp(TcpSession * , Packet *); static int StreamTcpValidateRst(TcpSession * , Packet *); static inline int StreamTcpValidateAck(TcpSession *ssn, TcpStream *, Packet *); -static Pool *ssn_pool = NULL; -static SCMutex ssn_pool_mutex; +static PoolThread *ssn_pool = NULL; +static SCMutex ssn_pool_mutex = PTHREAD_MUTEX_INITIALIZER; /**< init only, protect initializing and growing pool */ #ifdef DEBUG static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex */ #endif @@ -210,12 +210,12 @@ void StreamTcpSessionClear(void *ssnptr) ssn->toclient_smsg_head = NULL; memset(ssn, 0, sizeof(TcpSession)); - SCMutexLock(&ssn_pool_mutex); - PoolReturn(ssn_pool, ssn); + PoolThreadReturn(ssn_pool, ssn); #ifdef DEBUG + SCMutexLock(&ssn_pool_mutex); ssn_pool_cnt--; -#endif SCMutexUnlock(&ssn_pool_mutex); +#endif SCReturn; } @@ -337,17 +337,10 @@ void StreamTcpInitConfig(char quiet) memset(&stream_config, 0, sizeof(stream_config)); - /** set config defaults */ if ((ConfGetInt("stream.max-sessions", &value)) == 1) { - stream_config.max_sessions = (uint32_t)value; - } else { - if (RunmodeIsUnittests()) - stream_config.max_sessions = 1024; - else - stream_config.max_sessions = STREAMTCP_DEFAULT_SESSIONS; - } - if (!quiet) { - SCLogInfo("stream \"max-sessions\": %"PRIu32"", stream_config.max_sessions); + SCLogWarning(SC_WARN_OPTION_OBSOLETE, "max-sessions is obsolete. " + "Number of concurrent sessions is now only limited by Flow and " + "TCP stream engine memcaps."); } if ((ConfGetInt("stream.prealloc-sessions", &value)) == 1) { @@ -359,7 +352,8 @@ void StreamTcpInitConfig(char quiet) stream_config.prealloc_sessions = STREAMTCP_DEFAULT_PREALLOC; } if (!quiet) { - SCLogInfo("stream \"prealloc-sessions\": %"PRIu32"", stream_config.prealloc_sessions); + SCLogInfo("stream \"prealloc-sessions\": %"PRIu32" (per thread)", + stream_config.prealloc_sessions); } char *temp_stream_memcap_str; @@ -566,21 +560,6 @@ void StreamTcpInitConfig(char quiet) /* init the memcap/use tracking */ SC_ATOMIC_INIT(st_memuse); - SCMutexInit(&ssn_pool_mutex, NULL); - SCMutexLock(&ssn_pool_mutex); - ssn_pool = PoolInit(stream_config.max_sessions, - stream_config.prealloc_sessions, - sizeof(TcpSession), - StreamTcpSessionPoolAlloc, - StreamTcpSessionPoolInit, NULL, - StreamTcpSessionPoolCleanup, NULL); - if (ssn_pool == NULL) { - SCLogError(SC_ERR_POOL_INIT, "ssn_pool is not initialized"); - SCMutexUnlock(&ssn_pool_mutex); - exit(EXIT_FAILURE); - } - SCMutexUnlock(&ssn_pool_mutex); - StreamTcpReassembleInit(quiet); /* set the default free function and flow state function @@ -595,7 +574,7 @@ void StreamTcpFreeConfig(char quiet) SCMutexLock(&ssn_pool_mutex); if (ssn_pool != NULL) { - PoolFree(ssn_pool); + PoolThreadFree(ssn_pool); ssn_pool = NULL; } SCMutexUnlock(&ssn_pool_mutex); @@ -611,18 +590,18 @@ void StreamTcpFreeConfig(char quiet) * * \retval TcpSession A new TCP session with field initilaized to 0/NULL. */ -TcpSession *StreamTcpNewSession (Packet *p) +TcpSession *StreamTcpNewSession (Packet *p, int id) { TcpSession *ssn = (TcpSession *)p->flow->protoctx; if (ssn == NULL) { - SCMutexLock(&ssn_pool_mutex); - p->flow->protoctx = PoolGet(ssn_pool); + p->flow->protoctx = PoolThreadGetById(ssn_pool, id); #ifdef DEBUG + SCMutexLock(&ssn_pool_mutex); if (p->flow->protoctx != NULL) ssn_pool_cnt++; -#endif SCMutexUnlock(&ssn_pool_mutex); +#endif ssn = (TcpSession *)p->flow->protoctx; if (ssn == NULL) { @@ -770,7 +749,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, return 0; if (ssn == NULL) { - ssn = StreamTcpNewSession(p); + ssn = StreamTcpNewSession(p, tv->id); if (ssn == NULL) { SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca); return -1; @@ -844,7 +823,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, } else if (p->tcph->th_flags & TH_SYN) { if (ssn == NULL) { - ssn = StreamTcpNewSession(p); + ssn = StreamTcpNewSession(p, tv->id); if (ssn == NULL) { SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca); return -1; @@ -897,7 +876,7 @@ static int StreamTcpPacketStateNone(ThreadVars *tv, Packet *p, return 0; if (ssn == NULL) { - ssn = StreamTcpNewSession(p); + ssn = StreamTcpNewSession(p, tv->id); if (ssn == NULL) { SCPerfCounterIncr(stt->counter_tcp_ssn_memcap, tv->sc_perf_pca); return -1; @@ -4521,6 +4500,34 @@ TmEcode StreamTcpThreadInit(ThreadVars *tv, void *initdata, void **data) SCLogDebug("StreamTcp thread specific ctx online at %p, reassembly ctx %p", stt, stt->ra_ctx); + + int r = 0; + SCMutexLock(&ssn_pool_mutex); + if (ssn_pool == NULL) + ssn_pool = PoolThreadInit(1, /* thread */ + 0, /* unlimited */ + stream_config.prealloc_sessions, + sizeof(TcpSession), + StreamTcpSessionPoolAlloc, + StreamTcpSessionPoolInit, NULL, + StreamTcpSessionPoolCleanup, NULL); + else { + /* grow ssn_pool until we have a element for our thread id */ + do { + r = PoolThreadGrow(ssn_pool, + 0, /* unlimited */ + stream_config.prealloc_sessions, + sizeof(TcpSession), + StreamTcpSessionPoolAlloc, + StreamTcpSessionPoolInit, NULL, + StreamTcpSessionPoolCleanup, NULL); + } while (r != -1 && r < tv->id); + SCLogDebug("pool size %d, thread %d", PoolThreadSize(ssn_pool), tv->id); + } + SCMutexUnlock(&ssn_pool_mutex); + if (r < 0 || ssn_pool == NULL) + SCReturnInt(TM_ECODE_FAILED); + SCReturnInt(TM_ECODE_OK); } @@ -5384,7 +5391,7 @@ static int StreamTcpTest01 (void) { StreamTcpInitConfig(TRUE); - TcpSession *ssn = StreamTcpNewSession(p); + TcpSession *ssn = StreamTcpNewSession(p, 0); if (ssn == NULL) { printf("Session can not be allocated: "); goto end; diff --git a/src/stream-tcp.h b/src/stream-tcp.h index 31b3c467b8..095461ff85 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -48,8 +48,7 @@ typedef struct TcpStreamCnf_ { uint64_t memcap; uint64_t reassembly_memcap; /**< max memory usage for stream reassembly */ - uint32_t max_sessions; - uint32_t prealloc_sessions; + uint32_t prealloc_sessions; /**< ssns to prealloc per stream thread */ int midstream; int async_oneside; uint32_t reassembly_depth; /**< Depth until when we reassemble the stream */ diff --git a/src/util-error.c b/src/util-error.c index d500f74c1b..9990474ab2 100644 --- a/src/util-error.c +++ b/src/util-error.c @@ -273,6 +273,7 @@ const char * SCErrorToString(SCError err) CASE_CODE (SC_ERR_MAGIC_LOAD); CASE_CODE (SC_ERR_CUDA_BUFFER_ERROR); CASE_CODE (SC_ERR_DNS_LOG_GENERIC); + CASE_CODE (SC_WARN_OPTION_OBSOLETE); } return "UNKNOWN_ERROR"; diff --git a/src/util-error.h b/src/util-error.h index d91adf0966..411ee3e970 100644 --- a/src/util-error.h +++ b/src/util-error.h @@ -262,6 +262,7 @@ typedef enum { SC_WARN_UNCOMMON, SC_ERR_CUDA_BUFFER_ERROR, SC_ERR_DNS_LOG_GENERIC, + SC_WARN_OPTION_OBSOLETE, } SCError; const char *SCErrorToString(SCError); diff --git a/suricata.yaml.in b/suricata.yaml.in index 1931fd4777..77891008b3 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -587,8 +587,7 @@ flow-timeouts: # # of checksum. You can control the handling of checksum # # on a per-interface basis via the 'checksum-checks' # # option -# max-sessions: 262144 # 256k concurrent sessions -# prealloc-sessions: 32768 # 32k sessions prealloc'd +# prealloc-sessions: 2k # 2k sessions prealloc'd per stream thread # midstream: false # don't allow midstream session pickups # async-oneside: false # don't enable async stream handling # inline: no # stream inline mode