threads: refactor TmThreadsSlotPktAcqLoop for user threads

Refactor TmThreadsSlotPktAcqLoop for user provided thread by breaking
out the init and finish code into their own functions.

For user provided threads, Suricata should not "drive" the thread, but
the setup and finish code is the same.

The finish function is exported so it can be called by the user
application when its receive loop or equivalent is done.

Also remove obsolete comment.

Ticket: #7240
pull/12891/head
Jason Ish 11 months ago committed by Victor Julien
parent 04b29aa8d3
commit 35d7d77ddb

@ -108,37 +108,5 @@ int RunModeSpawnWorker(void *td)
/** \brief destroy a worker thread */
void RunModeDestroyWorker(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
TmEcode r;
TmSlot *slot = NULL;
StatsSyncCounters(tv);
TmThreadsSetFlag(tv, THV_FLOW_LOOP);
/* process all pseudo packets the flow timeout may throw at us */
TmThreadTimeoutLoop(tv, s);
TmThreadsSetFlag(tv, THV_RUNNING_DONE);
TmThreadWaitForFlag(tv, THV_DEINIT);
PacketPoolDestroy();
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
}
if (slot->SlotThreadDeinit != NULL) {
r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
if (r != TM_ECODE_OK) {
break;
}
}
}
tv->stream_pq = NULL;
SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
SCTmThreadsSlotPktAcqLoopFinish((ThreadVars *)td);
}

@ -204,37 +204,9 @@ int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s)
return r;
}
/*
pcap/nfq
pkt read
callback
process_pkt
pkt read
process_pkt
slot:
setup
pkt_ack_loop(tv, slot_data)
deinit
process_pkt:
while(s)
run s;
queue;
*/
static void *TmThreadsSlotPktAcqLoop(void *td)
static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
TmEcode r = TM_ECODE_OK;
TmSlot *slot = NULL;
SCSetThreadName(tv->name);
@ -244,21 +216,10 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
CaptureStatsSetup(tv);
PacketPoolInit();
/* check if we are setup properly */
if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
" PktAcqLoop=%p, tmqh_in=%p,"
" tmqh_out=%p",
s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit(NULL);
return NULL;
}
for (slot = s; slot != NULL; slot = slot->slot_next) {
for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
void *slot_data = NULL;
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
if (r == TM_ECODE_DONE) {
EngineDone();
@ -280,8 +241,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit(NULL);
return NULL;
goto error;
}
/* setup a queue */
} else if (slot->tm_id == TMM_FLOWWORKER) {
@ -295,8 +255,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit(NULL);
return NULL;
goto error;
}
}
}
@ -304,22 +263,18 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE);
bool run = TmThreadsWaitForUnpause(tv);
while (run) {
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
return true;
error:
return false;
}
bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv)
{
TmSlot *s = tv->tm_slots;
bool rc = true;
if (r == TM_ECODE_FAILED) {
TmThreadsSetFlag(tv, THV_FAILED);
run = false;
}
if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
run = false;
}
if (r == TM_ECODE_DONE) {
run = false;
}
}
StatsSyncCounters(tv);
TmThreadsSetFlag(tv, THV_FLOW_LOOP);
@ -332,28 +287,72 @@ static void *TmThreadsSlotPktAcqLoop(void *td)
PacketPoolDestroy();
for (slot = s; slot != NULL; slot = slot->slot_next) {
for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadExitPrintStats != NULL) {
slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data));
}
if (slot->SlotThreadDeinit != NULL) {
r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data));
if (r != TM_ECODE_OK) {
TmThreadsSetFlag(tv, THV_CLOSED);
goto error;
rc = false;
break;
}
}
}
tv->stream_pq = NULL;
SCLogDebug("%s ending", tv->name);
TmThreadsSetFlag(tv, THV_CLOSED);
return rc;
}
static void *TmThreadsSlotPktAcqLoop(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
TmEcode r = TM_ECODE_OK;
/* check if we are setup properly */
if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
SCLogError("TmSlot or ThreadVars badly setup: s=%p,"
" PktAcqLoop=%p, tmqh_in=%p,"
" tmqh_out=%p",
s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out);
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
pthread_exit(NULL);
return NULL;
}
if (!TmThreadsSlotPktAcqLoopInit(td)) {
goto error;
}
bool run = TmThreadsWaitForUnpause(tv);
while (run) {
r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s);
if (r == TM_ECODE_FAILED) {
TmThreadsSetFlag(tv, THV_FAILED);
run = false;
}
if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) {
run = false;
}
if (r == TM_ECODE_DONE) {
run = false;
}
}
if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) {
goto error;
}
SCLogDebug("%s ending", tv->name);
pthread_exit((void *) 0);
return NULL;
error:
tv->stream_pq = NULL;
pthread_exit(NULL);
return NULL;
}
@ -383,19 +382,6 @@ static void *TmThreadsLib(void *td)
{
ThreadVars *tv = (ThreadVars *)td;
TmSlot *s = tv->tm_slots;
TmEcode r = TM_ECODE_OK;
TmSlot *slot = NULL;
/* Set the thread name */
SCSetThreadName(tv->name);
if (tv->thread_setup_flags != 0)
TmThreadSetupOptions(tv);
/* Drop the capabilities for this thread */
SCDropCaps(tv);
PacketPoolInit();
/* check if we are setup properly */
if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) {
@ -406,52 +392,9 @@ static void *TmThreadsLib(void *td)
return NULL;
}
for (slot = s; slot != NULL; slot = slot->slot_next) {
if (slot->SlotThreadInit != NULL) {
void *slot_data = NULL;
r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data);
if (r != TM_ECODE_OK) {
if (r == TM_ECODE_DONE) {
EngineDone();
TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE);
goto error;
} else {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
goto error;
}
}
(void)SC_ATOMIC_SET(slot->slot_data, slot_data);
}
/* if the flowworker module is the first, get the threads input queue */
if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) {
tv->stream_pq = tv->inq->pq;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq);
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
return NULL;
}
/* setup a queue */
} else if (slot->tm_id == TMM_FLOWWORKER) {
tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue));
if (tv->stream_pq_local == NULL)
FatalError("failed to alloc PacketQueue");
SCMutexInit(&tv->stream_pq_local->mutex_q, NULL);
tv->stream_pq = tv->stream_pq_local;
tv->tm_flowworker = slot;
SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq);
tv->flow_queue = FlowQueueNew();
if (tv->flow_queue == NULL) {
TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE);
return NULL;
}
}
if (!TmThreadsSlotPktAcqLoopInit(tv)) {
goto error;
}
StatsSetupPrivate(tv);
TmThreadsSetFlag(tv, THV_INIT_DONE);
TmThreadsWaitForUnpause(tv);

@ -291,6 +291,7 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts);
SCTime_t TmThreadsGetThreadTime(const int idx);
uint16_t TmThreadsGetWorkerThreadMax(void);
bool TmThreadsTimeSubsysIsReady(void);
bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv);
/** \brief Wait for a thread to become unpaused.
*

Loading…
Cancel
Save