Implement stream memcap enforcements using atomics instead of spinlocked counters.

remotes/origin/master
Victor Julien 13 years ago
parent d72b82fae0
commit fddaca6e8b

@ -93,10 +93,8 @@ static uint64_t segment_pool_cnt = 0;
/* index to the right pool for all packet sizes. */
static uint16_t segment_pool_idx[65536]; /* O(1) lookups of the pool */
/* Memory use counters */
static SCSpinlock stream_reassembly_memuse_spinlock;
static uint64_t stream_reassembly_memuse;
static uint64_t stream_reassembly_memuse_max;
/* Memory use counter */
SC_ATOMIC_DECLARE(uint64_t, ra_memuse);
/* prototypes */
static int HandleSegmentStartsBeforeListSegment(ThreadVars *, TcpReassemblyThreadCtx *,
@ -118,14 +116,8 @@ void StreamTcpReassemblePseudoPacketCreate(TcpStream *, Packet *, PacketQueue *)
* \param size Size of the TCP segment and its payload length memory allocated
*/
void StreamTcpReassembleIncrMemuse(uint64_t size) {
SCSpinLock(&stream_reassembly_memuse_spinlock);
stream_reassembly_memuse += size;
if (stream_reassembly_memuse > stream_reassembly_memuse_max)
stream_reassembly_memuse_max = stream_reassembly_memuse;
SCSpinUnlock(&stream_reassembly_memuse_spinlock);
SC_ATOMIC_ADD(ra_memuse, size);
return;
}
/**
@ -135,25 +127,15 @@ void StreamTcpReassembleIncrMemuse(uint64_t size) {
* \param size Size of the TCP segment and its payload length memory allocated
*/
void StreamTcpReassembleDecrMemuse(uint64_t size) {
SCSpinLock(&stream_reassembly_memuse_spinlock);
if (size <= stream_reassembly_memuse) {
stream_reassembly_memuse -= size;
} else {
BUG_ON(size > stream_reassembly_memuse);
stream_reassembly_memuse = 0;
}
SCSpinUnlock(&stream_reassembly_memuse_spinlock);
SC_ATOMIC_SUB(ra_memuse, size);
return;
}
void StreamTcpReassembleMemuseCounter(ThreadVars *tv, TcpReassemblyThreadCtx *rtv) {
if (tv != NULL && rtv != NULL) {
SCSpinLock(&stream_reassembly_memuse_spinlock);
SCPerfCounterSetUI64(rtv->counter_tcp_reass_memuse, tv->sc_perf_pca,
stream_reassembly_memuse);
SCSpinUnlock(&stream_reassembly_memuse_spinlock);
}
uint64_t smemuse = SC_ATOMIC_GET(ra_memuse);
if (tv != NULL && rtv != NULL)
SCPerfCounterSetUI64(rtv->counter_tcp_reass_memuse, tv->sc_perf_pca, smemuse);
return;
}
/**
@ -165,15 +147,9 @@ void StreamTcpReassembleMemuseCounter(ThreadVars *tv, TcpReassemblyThreadCtx *rt
* \retval 0 if not in bounds
*/
int StreamTcpReassembleCheckMemcap(uint32_t size) {
SCEnter();
int ret = 0;
SCSpinLock(&stream_reassembly_memuse_spinlock);
if (stream_config.reassembly_memcap == 0 || size + stream_reassembly_memuse <= stream_config.reassembly_memcap)
ret = 1;
SCSpinUnlock(&stream_reassembly_memuse_spinlock);
SCReturnInt(ret);
if (stream_config.reassembly_memcap == 0 || size + SC_ATOMIC_GET(ra_memuse) <= stream_config.reassembly_memcap)
return 1;
return 0;
}
/** \brief alloc a tcp segment pool entry */
@ -287,12 +263,8 @@ int StreamTcpReassembleInit(char quiet)
{
StreamMsgQueuesInit();
/* init the memcap and it's lock */
SCSpinInit(&stream_reassembly_memuse_spinlock, PTHREAD_PROCESS_PRIVATE);
SCSpinLock(&stream_reassembly_memuse_spinlock);
stream_reassembly_memuse = 0;
stream_reassembly_memuse_max = 0;
SCSpinUnlock(&stream_reassembly_memuse_spinlock);
/* init the memcap/use tracker */
SC_ATOMIC_INIT(ra_memuse);
#ifdef DEBUG
SCMutexInit(&segment_pool_memuse_mutex, NULL);
@ -360,16 +332,6 @@ void StreamTcpReassembleFree(char quiet)
StreamMsgQueuesDeinit(quiet);
if (!quiet) {
SCSpinLock(&stream_reassembly_memuse_spinlock);
SCLogInfo("Max memuse of the stream reassembly engine %"PRIu64" (in use"
" %"PRIu64")", stream_reassembly_memuse_max,
stream_reassembly_memuse);
SCSpinUnlock(&stream_reassembly_memuse_spinlock);
}
SCSpinDestroy(&stream_reassembly_memuse_spinlock);
#ifdef DEBUG
SCLogDebug("segment_pool_cnt %"PRIu64"", segment_pool_cnt);
SCLogDebug("segment_pool_memuse %"PRIu64"", segment_pool_memuse);
@ -7069,16 +7031,16 @@ static int StreamTcpReassembleTest44(void)
{
uint8_t ret = 0;
StreamTcpInitConfig(TRUE);
uint32_t memuse = stream_reassembly_memuse;
uint32_t memuse = SC_ATOMIC_GET(ra_memuse);
StreamTcpReassembleIncrMemuse(500);
if (stream_reassembly_memuse != (memuse+500)) {
if (SC_ATOMIC_GET(ra_memuse) != (memuse+500)) {
printf("failed in incrementing the memory");
goto end;
}
StreamTcpReassembleDecrMemuse(500);
if (stream_reassembly_memuse != memuse) {
if (SC_ATOMIC_GET(ra_memuse) != memuse) {
printf("failed in decrementing the memory");
goto end;
}
@ -7095,7 +7057,7 @@ static int StreamTcpReassembleTest44(void)
StreamTcpFreeConfig(TRUE);
if (stream_reassembly_memuse != 0) {
if (SC_ATOMIC_GET(ra_memuse) != 0) {
printf("failed in clearing the memory");
goto end;
}

@ -111,9 +111,7 @@ static uint64_t ssn_pool_cnt = 0; /** counts ssns, protected by ssn_pool_mutex *
extern uint8_t engine_mode;
static SCSpinlock stream_memuse_spinlock;
static uint64_t stream_memuse = 0;
static uint64_t stream_memuse_max = 0;
SC_ATOMIC_DECLARE(uint64_t, st_memuse);
/* stream engine running in "inline" mode. */
int stream_inline = 0;
@ -130,26 +128,19 @@ void TmModuleStreamTcpRegister (void)
}
void StreamTcpIncrMemuse(uint64_t size) {
SCSpinLock(&stream_memuse_spinlock);
stream_memuse += (uint64_t)size;
if (stream_memuse > stream_memuse_max)
stream_memuse_max = stream_memuse;
SCSpinUnlock(&stream_memuse_spinlock);
SC_ATOMIC_ADD(st_memuse, size);
return;
}
void StreamTcpDecrMemuse(uint64_t size) {
SCSpinLock(&stream_memuse_spinlock);
if ((uint64_t)size <= stream_memuse)
stream_memuse -= (uint64_t)size;
else
stream_memuse = 0;
SCSpinUnlock(&stream_memuse_spinlock);
SC_ATOMIC_SUB(st_memuse, size);
return;
}
void StreamTcpMemuseCounter(ThreadVars *tv, StreamTcpThread *stt) {
SCSpinLock(&stream_memuse_spinlock);
SCPerfCounterSetUI64(stt->counter_tcp_memuse, tv->sc_perf_pca, stream_memuse);
SCSpinUnlock(&stream_memuse_spinlock);
uint64_t memusecopy = SC_ATOMIC_GET(st_memuse);
SCPerfCounterSetUI64(stt->counter_tcp_memuse, tv->sc_perf_pca, memusecopy);
return;
}
/**
@ -159,15 +150,9 @@ void StreamTcpMemuseCounter(ThreadVars *tv, StreamTcpThread *stt) {
* \retval 0 if not in bounds
*/
int StreamTcpCheckMemcap(uint64_t size) {
SCEnter();
int ret = 0;
SCSpinLock(&stream_memuse_spinlock);
if (stream_config.memcap == 0 || (size + stream_memuse) <= stream_config.memcap)
ret = 1;
SCSpinUnlock(&stream_memuse_spinlock);
SCReturnInt(ret);
if (stream_config.memcap == 0 || size + SC_ATOMIC_GET(st_memuse) <= stream_config.memcap)
return 1;
return 0;
}
/**
@ -495,12 +480,8 @@ void StreamTcpInitConfig(char quiet)
stream_config.reassembly_toclient_chunk_size);
}
/* init the memcap and it's lock */
SCSpinInit(&stream_memuse_spinlock, PTHREAD_PROCESS_PRIVATE);
SCSpinLock(&stream_memuse_spinlock);
stream_memuse = 0;
stream_memuse_max = 0;
SCSpinUnlock(&stream_memuse_spinlock);
/* init the memcap/use tracking */
SC_ATOMIC_INIT(st_memuse);
SCMutexInit(&ssn_pool_mutex, NULL);
SCMutexLock(&ssn_pool_mutex);
@ -536,14 +517,6 @@ void StreamTcpFreeConfig(char quiet)
SCMutexDestroy(&ssn_pool_mutex);
SCLogDebug("ssn_pool_cnt %"PRIu64"", ssn_pool_cnt);
if (!quiet) {
SCSpinLock(&stream_memuse_spinlock);
SCLogInfo("Max memuse of stream engine %"PRIu64" (in use %"PRIu64")",
stream_memuse_max, stream_memuse);
SCSpinUnlock(&stream_memuse_spinlock);
}
SCSpinDestroy(&stream_memuse_spinlock);
}
/** \brief The function is used to to fetch a TCP session from the
@ -7233,10 +7206,10 @@ static int StreamTcpTest23(void)
end:
StreamTcpReturnStreamSegments(&ssn.client);
StreamTcpFreeConfig(TRUE);
if (stream_memuse == 0) {
if (SC_ATOMIC_GET(st_memuse) == 0) {
result &= 1;
} else {
printf("stream_memuse %"PRIu64"\n", stream_memuse);
printf("smemuse.stream_memuse %"PRIu64"\n", SC_ATOMIC_GET(st_memuse));
}
SCFree(p);
return result;
@ -7321,10 +7294,10 @@ static int StreamTcpTest24(void)
end:
StreamTcpReturnStreamSegments(&ssn.client);
StreamTcpFreeConfig(TRUE);
if (stream_memuse == 0) {
if (SC_ATOMIC_GET(st_memuse) == 0) {
result &= 1;
} else {
printf("stream_memuse %"PRIu64"\n", stream_memuse);
printf("smemuse.stream_memuse %"PRIu64"\n", SC_ATOMIC_GET(st_memuse));
}
SCFree(p);
return result;
@ -7621,16 +7594,16 @@ static int StreamTcpTest28(void)
{
uint8_t ret = 0;
StreamTcpInitConfig(TRUE);
uint32_t memuse = stream_memuse;
uint32_t memuse = SC_ATOMIC_GET(st_memuse);
StreamTcpIncrMemuse(500);
if (stream_memuse != (memuse+500)) {
if (SC_ATOMIC_GET(st_memuse) != (memuse+500)) {
printf("failed in incrementing the memory");
goto end;
}
StreamTcpDecrMemuse(500);
if (stream_memuse != memuse) {
if (SC_ATOMIC_GET(st_memuse) != memuse) {
printf("failed in decrementing the memory");
goto end;
}
@ -7647,7 +7620,7 @@ static int StreamTcpTest28(void)
StreamTcpFreeConfig(TRUE);
if (stream_memuse != 0) {
if (SC_ATOMIC_GET(st_memuse) != 0) {
printf("failed in clearing the memory");
goto end;
}

Loading…
Cancel
Save