threading: hide 'trans_q' from queue handlers

pull/4531/head
Victor Julien 6 years ago
parent 45e5e19e6e
commit 550cfdd98d

@ -499,7 +499,7 @@ static void *StatsWakeupThread(void *arg)
tv->perf_public_ctx.perf_flag = 1; tv->perf_public_ctx.perf_flag = 1;
if (tv->inq != NULL) { if (tv->inq != NULL) {
PacketQueue *q = &trans_q[tv->inq->id]; PacketQueue *q = tv->inq->pq;
SCCondSignal(&q->cond_q); SCCondSignal(&q->cond_q);
} }

@ -1731,7 +1731,7 @@ static void InjectPackets(ThreadVars **detect_tvs,
if (p != NULL) { if (p != NULL) {
p->flags |= PKT_PSEUDO_STREAM_END; p->flags |= PKT_PSEUDO_STREAM_END;
PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH); PKT_SET_SRC(p, PKT_SRC_DETECT_RELOAD_FLUSH);
PacketQueue *q = &trans_q[detect_tvs[i]->inq->id]; PacketQueue *q = detect_tvs[i]->inq->pq;
SCMutexLock(&q->mutex_q); SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p); PacketEnqueue(q, p);
SCCondSignal(&q->cond_q); SCCondSignal(&q->cond_q);

@ -46,6 +46,8 @@ Tmq *TmqCreateQueue(const char *name)
q->id = tmq_id++; q->id = tmq_id++;
q->is_packet_pool = (strcmp(q->name, "packetpool") == 0); q->is_packet_pool = (strcmp(q->name, "packetpool") == 0);
q->pq = &trans_q[q->id];
SCLogDebug("created queue \'%s\', %p", name, q); SCLogDebug("created queue \'%s\', %p", name, q);
return q; return q;
@ -67,9 +69,9 @@ void TmqDebugList(void)
{ {
for (int i = 0; i < tmq_id; i++) { for (int i = 0; i < tmq_id; i++) {
/* get a lock accessing the len */ /* get a lock accessing the len */
SCMutexLock(&trans_q[tmqs[i].id].mutex_q); SCMutexLock(&tmqs[i].pq->mutex_q);
printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, trans_q[tmqs[i].id].len); printf("TmqDebugList: id %" PRIu32 ", name \'%s\', len %" PRIu32 "\n", tmqs[i].id, tmqs[i].name, tmqs[i].pq->len);
SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q); SCMutexUnlock(&tmqs[i].pq->mutex_q);
} }
} }
@ -93,7 +95,7 @@ void TmValidateQueueState(void)
bool err = false; bool err = false;
for (int i = 0; i < tmq_id; i++) { for (int i = 0; i < tmq_id; i++) {
SCMutexLock(&trans_q[tmqs[i].id].mutex_q); SCMutexLock(&tmqs[i].pq->mutex_q);
if (tmqs[i].reader_cnt == 0) { if (tmqs[i].reader_cnt == 0) {
SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a reader (id %d, max %u)", tmqs[i].name, i, tmq_id); SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a reader (id %d, max %u)", tmqs[i].name, i, tmq_id);
err = true; err = true;
@ -101,7 +103,7 @@ void TmValidateQueueState(void)
SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a writer (id %d, max %u)", tmqs[i].name, i, tmq_id); SCLogError(SC_ERR_THREAD_QUEUE, "queue \"%s\" doesn't have a writer (id %d, max %u)", tmqs[i].name, i, tmq_id);
err = true; err = true;
} }
SCMutexUnlock(&trans_q[tmqs[i].id].mutex_q); SCMutexUnlock(&tmqs[i].pq->mutex_q);
if (err == true) if (err == true)
goto error; goto error;

@ -24,12 +24,15 @@
#ifndef __TM_QUEUES_H__ #ifndef __TM_QUEUES_H__
#define __TM_QUEUES_H__ #define __TM_QUEUES_H__
#include "packet-queue.h"
typedef struct Tmq_ { typedef struct Tmq_ {
char *name; char *name;
bool is_packet_pool; bool is_packet_pool;
uint16_t id; uint16_t id;
uint16_t reader_cnt; uint16_t reader_cnt;
uint16_t writer_cnt; uint16_t writer_cnt;
PacketQueue *pq;
} Tmq; } Tmq;
Tmq* TmqCreateQueue(const char *name); Tmq* TmqCreateQueue(const char *name);

@ -273,7 +273,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
/* if the flowworker module is the first, get the threads input queue */ /* if the flowworker module is the first, get the threads input queue */
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id]; tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot; tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
/* setup a queue */ /* setup a queue */
@ -398,7 +398,7 @@ static void *TmThreadsSlotPktAcqLoopAFL(void *td)
/* if the flowworker module is the first, get the threads input queue */ /* if the flowworker module is the first, get the threads input queue */
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id]; tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot; tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
/* setup a queue */ /* setup a queue */
@ -510,7 +510,7 @@ static void *TmThreadsSlotVar(void *td)
/* if the flowworker module is the first, get the threads input queue */ /* if the flowworker module is the first, get the threads input queue */
if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) { if (s == (TmSlot *)tv->tm_slots && (s->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = &trans_q[tv->inq->id]; tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = s; tv->tm_flowworker = s;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
/* setup a queue */ /* setup a queue */
@ -1313,7 +1313,7 @@ static bool ThreadStillHasPackets(ThreadVars *tv)
/* we wait till we dry out all the inq packets, before we /* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled * kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/ * packet acquire by now using TmThreadDisableReceiveThreads()*/
PacketQueue *q = &trans_q[tv->inq->id]; PacketQueue *q = tv->inq->pq;
SCMutexLock(&q->mutex_q); SCMutexLock(&q->mutex_q);
uint32_t len = q->len; uint32_t len = q->len;
SCMutexUnlock(&q->mutex_q); SCMutexUnlock(&q->mutex_q);
@ -1367,7 +1367,7 @@ static int TmThreadKillThread(ThreadVars *tv)
} }
if (tv->inq != NULL) { if (tv->inq != NULL) {
for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
SCCondSignal(&trans_q[tv->inq->id].cond_q); SCCondSignal(&tv->inq->pq->cond_q);
} }
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
} }
@ -1503,7 +1503,7 @@ again:
if (tv->inq != NULL) { if (tv->inq != NULL) {
for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
SCCondSignal(&trans_q[tv->inq->id].cond_q); SCCondSignal(&tv->inq->pq->cond_q);
} }
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
} }
@ -1579,7 +1579,7 @@ again:
* THV_KILL flag. */ * THV_KILL flag. */
if (tv->inq != NULL) { if (tv->inq != NULL) {
for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) { for (int i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) {
SCCondSignal(&trans_q[tv->inq->id].cond_q); SCCondSignal(&tv->inq->pq->cond_q);
} }
SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id); SCLogDebug("signalled tv->inq->id %" PRIu32 "", tv->inq->id);
} }
@ -2075,7 +2075,7 @@ void TmThreadDumpThreads(void)
const uint32_t flags = SC_ATOMIC_GET(tv->flags); const uint32_t flags = SC_ATOMIC_GET(tv->flags);
SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p", SCLogNotice("tv %p: type %u name %s tmm_flags %02X flags %X stream_pq %p",
tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq); tv, tv->type, tv->name, tv->tmm_flags, flags, tv->stream_pq);
if (tv->inq && tv->stream_pq == &trans_q[tv->inq->id]) { if (tv->inq && tv->stream_pq == tv->inq->pq) {
SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id); SCLogNotice("tv %p: stream_pq at tv->inq %u", tv, tv->inq->id);
} else if (tv->stream_pq_local != NULL) { } else if (tv->stream_pq_local != NULL) {
for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) { for (Packet *xp = tv->stream_pq_local->top; xp != NULL; xp = xp->next) {
@ -2287,7 +2287,7 @@ int TmThreadsInjectPacketsById(Packet **packets, const int id)
/* wake up listening thread(s) if necessary */ /* wake up listening thread(s) if necessary */
if (tv->inq != NULL) { if (tv->inq != NULL) {
SCCondSignal(&trans_q[tv->inq->id].cond_q); SCCondSignal(&tv->inq->pq->cond_q);
} }
return 1; return 1;
} }

@ -94,7 +94,7 @@ void TmqhFlowPrintAutofpHandler(void)
/* same as 'simple' */ /* same as 'simple' */
Packet *TmqhInputFlow(ThreadVars *tv) Packet *TmqhInputFlow(ThreadVars *tv)
{ {
PacketQueue *q = &trans_q[tv->inq->id]; PacketQueue *q = tv->inq->pq;
StatsSyncCountersIfSignalled(tv); StatsSyncCountersIfSignalled(tv);
@ -126,8 +126,6 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
} }
tmq->writer_cnt++; tmq->writer_cnt++;
uint16_t id = tmq->id;
if (ctx->queues == NULL) { if (ctx->queues == NULL) {
ctx->size = 1; ctx->size = 1;
ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode)); ctx->queues = SCMalloc(ctx->size * sizeof(TmqhFlowMode));
@ -147,7 +145,7 @@ static int StoreQueueId(TmqhFlowCtx *ctx, char *name)
memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode)); memset(ctx->queues + (ctx->size - 1), 0, sizeof(TmqhFlowMode));
} }
ctx->queues[ctx->size - 1].q = &trans_q[id]; ctx->queues[ctx->size - 1].q = tmq->pq;
return 0; return 0;
} }
@ -284,138 +282,98 @@ void TmqhOutputFlowIPPair(ThreadVars *tv, Packet *p)
static int TmqhOutputFlowSetupCtxTest01(void) static int TmqhOutputFlowSetupCtxTest01(void)
{ {
int retval = 0;
Tmq *tmq = NULL;
TmqhFlowCtx *fctx = NULL;
TmqResetQueues(); TmqResetQueues();
tmq = TmqCreateQueue("queue1"); Tmq *tmq1 = TmqCreateQueue("queue1");
if (tmq == NULL) FAIL_IF_NULL(tmq1);
goto end; Tmq *tmq2 = TmqCreateQueue("queue2");
tmq = TmqCreateQueue("queue2"); FAIL_IF_NULL(tmq2);
if (tmq == NULL) Tmq *tmq3 = TmqCreateQueue("another");
goto end; FAIL_IF_NULL(tmq3);
tmq = TmqCreateQueue("another"); Tmq *tmq4 = TmqCreateQueue("yetanother");
if (tmq == NULL) FAIL_IF_NULL(tmq4);
goto end;
tmq = TmqCreateQueue("yetanother");
if (tmq == NULL)
goto end;
const char *str = "queue1,queue2,another,yetanother"; const char *str = "queue1,queue2,another,yetanother";
void *ctx = TmqhOutputFlowSetupCtx(str); void *ctx = TmqhOutputFlowSetupCtx(str);
FAIL_IF_NULL(ctx);
if (ctx == NULL) TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
goto end;
fctx = (TmqhFlowCtx *)ctx;
if (fctx->size != 4) FAIL_IF_NOT(fctx->size == 4);
goto end;
if (fctx->queues == NULL) FAIL_IF_NULL(fctx->queues);
goto end;
if (fctx->queues[0].q != &trans_q[0]) FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
goto end; FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
if (fctx->queues[1].q != &trans_q[1]) FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
goto end; FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
if (fctx->queues[2].q != &trans_q[2])
goto end;
if (fctx->queues[3].q != &trans_q[3])
goto end;
retval = 1; TmqhOutputFlowFreeCtx(fctx);
end:
if (fctx != NULL)
TmqhOutputFlowFreeCtx(fctx);
TmqResetQueues(); TmqResetQueues();
return retval; PASS;
} }
static int TmqhOutputFlowSetupCtxTest02(void) static int TmqhOutputFlowSetupCtxTest02(void)
{ {
int retval = 0;
Tmq *tmq = NULL;
TmqhFlowCtx *fctx = NULL;
TmqResetQueues(); TmqResetQueues();
tmq = TmqCreateQueue("queue1"); Tmq *tmq1 = TmqCreateQueue("queue1");
if (tmq == NULL) FAIL_IF_NULL(tmq1);
goto end; Tmq *tmq2 = TmqCreateQueue("queue2");
tmq = TmqCreateQueue("queue2"); FAIL_IF_NULL(tmq2);
if (tmq == NULL) Tmq *tmq3 = TmqCreateQueue("another");
goto end; FAIL_IF_NULL(tmq3);
tmq = TmqCreateQueue("another"); Tmq *tmq4 = TmqCreateQueue("yetanother");
if (tmq == NULL) FAIL_IF_NULL(tmq4);
goto end;
tmq = TmqCreateQueue("yetanother");
if (tmq == NULL)
goto end;
const char *str = "queue1"; const char *str = "queue1";
void *ctx = TmqhOutputFlowSetupCtx(str); void *ctx = TmqhOutputFlowSetupCtx(str);
FAIL_IF_NULL(ctx);
if (ctx == NULL) TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
goto end;
fctx = (TmqhFlowCtx *)ctx;
if (fctx->size != 1)
goto end;
if (fctx->queues == NULL) FAIL_IF_NOT(fctx->size == 1);
goto end;
if (fctx->queues[0].q != &trans_q[0]) FAIL_IF_NULL(fctx->queues);
goto end;
retval = 1; FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
end: TmqhOutputFlowFreeCtx(fctx);
if (fctx != NULL)
TmqhOutputFlowFreeCtx(fctx);
TmqResetQueues(); TmqResetQueues();
return retval;
PASS;
} }
static int TmqhOutputFlowSetupCtxTest03(void) static int TmqhOutputFlowSetupCtxTest03(void)
{ {
int retval = 0;
TmqhFlowCtx *fctx = NULL;
TmqResetQueues(); TmqResetQueues();
const char *str = "queue1,queue2,another,yetanother"; const char *str = "queue1,queue2,another,yetanother";
void *ctx = TmqhOutputFlowSetupCtx(str); void *ctx = TmqhOutputFlowSetupCtx(str);
FAIL_IF_NULL(ctx);
if (ctx == NULL) TmqhFlowCtx *fctx = (TmqhFlowCtx *)ctx;
goto end;
fctx = (TmqhFlowCtx *)ctx; FAIL_IF_NOT(fctx->size == 4);
if (fctx->size != 4) FAIL_IF_NULL(fctx->queues);
goto end;
if (fctx->queues == NULL) Tmq *tmq1 = TmqGetQueueByName("queue1");
goto end; FAIL_IF_NULL(tmq1);
Tmq *tmq2 = TmqGetQueueByName("queue2");
FAIL_IF_NULL(tmq2);
Tmq *tmq3 = TmqGetQueueByName("another");
FAIL_IF_NULL(tmq3);
Tmq *tmq4 = TmqGetQueueByName("yetanother");
FAIL_IF_NULL(tmq4);
if (fctx->queues[0].q != &trans_q[0]) FAIL_IF_NOT(fctx->queues[0].q == tmq1->pq);
goto end; FAIL_IF_NOT(fctx->queues[1].q == tmq2->pq);
if (fctx->queues[1].q != &trans_q[1]) FAIL_IF_NOT(fctx->queues[2].q == tmq3->pq);
goto end; FAIL_IF_NOT(fctx->queues[3].q == tmq4->pq);
if (fctx->queues[2].q != &trans_q[2])
goto end;
if (fctx->queues[3].q != &trans_q[3])
goto end;
retval = 1; TmqhOutputFlowFreeCtx(fctx);
end:
if (fctx != NULL)
TmqhOutputFlowFreeCtx(fctx);
TmqResetQueues(); TmqResetQueues();
return retval; PASS;
} }
#endif /* UNITTESTS */ #endif /* UNITTESTS */

@ -46,7 +46,7 @@ void TmqhSimpleRegister (void)
Packet *TmqhInputSimple(ThreadVars *t) Packet *TmqhInputSimple(ThreadVars *t)
{ {
PacketQueue *q = &trans_q[t->inq->id]; PacketQueue *q = t->inq->pq;
StatsSyncCountersIfSignalled(t); StatsSyncCountersIfSignalled(t);
@ -77,14 +77,14 @@ void TmqhInputSimpleShutdownHandler(ThreadVars *tv)
} }
for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++) for (i = 0; i < (tv->inq->reader_cnt + tv->inq->writer_cnt); i++)
SCCondSignal(&trans_q[tv->inq->id].cond_q); SCCondSignal(&tv->inq->pq->cond_q);
} }
void TmqhOutputSimple(ThreadVars *t, Packet *p) void TmqhOutputSimple(ThreadVars *t, Packet *p)
{ {
SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false"); SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true":"false");
PacketQueue *q = &trans_q[t->outq->id]; PacketQueue *q = t->outq->pq;
SCMutexLock(&q->mutex_q); SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p); PacketEnqueue(q, p);

Loading…
Cancel
Save