From 35d7d77ddb05bc3ba6df6e21e8fe7afe6d8aec1d Mon Sep 17 00:00:00 2001 From: Jason Ish Date: Fri, 23 Aug 2024 12:49:20 -0600 Subject: [PATCH] threads: refactor TmThreadsSlotPktAcqLoop for user threads Refactor TmThreadsSlotPktAcqLoop for user provided thread by breaking out the init and finish code into their own functions. For user provided threads, Suricata should not "drive" the thread, but the setup and finish code is the same. The finish function is exported so it can be called by the user application when its receive loop or equivalent is done. Also remove obsolete comment. Ticket: #7240 --- src/runmode-lib.c | 34 +-------- src/tm-threads.c | 189 ++++++++++++++++------------------------------ src/tm-threads.h | 1 + 3 files changed, 68 insertions(+), 156 deletions(-) diff --git a/src/runmode-lib.c b/src/runmode-lib.c index b5643d99c8..2699d47e44 100644 --- a/src/runmode-lib.c +++ b/src/runmode-lib.c @@ -108,37 +108,5 @@ int RunModeSpawnWorker(void *td) /** \brief destroy a worker thread */ void RunModeDestroyWorker(void *td) { - ThreadVars *tv = (ThreadVars *)td; - TmSlot *s = tv->tm_slots; - TmEcode r; - TmSlot *slot = NULL; - - StatsSyncCounters(tv); - - TmThreadsSetFlag(tv, THV_FLOW_LOOP); - - /* process all pseudo packets the flow timeout may throw at us */ - TmThreadTimeoutLoop(tv, s); - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - PacketPoolDestroy(); - - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadExitPrintStats != NULL) { - slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); - } - - if (slot->SlotThreadDeinit != NULL) { - r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); - if (r != TM_ECODE_OK) { - break; - } - } - } - - tv->stream_pq = NULL; - SCLogDebug("%s ending", tv->name); - TmThreadsSetFlag(tv, THV_CLOSED); + SCTmThreadsSlotPktAcqLoopFinish((ThreadVars *)td); } diff --git a/src/tm-threads.c b/src/tm-threads.c index a1d74a80fb..19aa6eff3d 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -204,37 +204,9 @@ int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) return r; } -/* - - pcap/nfq - - pkt read - callback - process_pkt - - pkt read - process_pkt - - slot: - setup - - pkt_ack_loop(tv, slot_data) - - deinit - - process_pkt: - while(s) - run s; - queue; - - */ - -static void *TmThreadsSlotPktAcqLoop(void *td) +static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv) { - ThreadVars *tv = (ThreadVars *)td; TmSlot *s = tv->tm_slots; - TmEcode r = TM_ECODE_OK; - TmSlot *slot = NULL; SCSetThreadName(tv->name); @@ -244,21 +216,10 @@ static void *TmThreadsSlotPktAcqLoop(void *td) CaptureStatsSetup(tv); PacketPoolInit(); - /* check if we are setup properly */ - if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { - SCLogError("TmSlot or ThreadVars badly setup: s=%p," - " PktAcqLoop=%p, tmqh_in=%p," - " tmqh_out=%p", - s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); - return NULL; - } - - for (slot = s; slot != NULL; slot = slot->slot_next) { + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadInit != NULL) { void *slot_data = NULL; - r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); + TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); if (r != TM_ECODE_OK) { if (r == TM_ECODE_DONE) { EngineDone(); @@ -280,8 +241,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); - return NULL; + goto error; } /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -295,8 +255,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); - return NULL; + goto error; } } } @@ -304,22 +263,18 @@ static void *TmThreadsSlotPktAcqLoop(void *td) StatsSetupPrivate(tv); TmThreadsSetFlag(tv, THV_INIT_DONE); - bool run = TmThreadsWaitForUnpause(tv); - while (run) { - r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); + return true; + +error: + return false; +} + +bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv) +{ + TmSlot *s = tv->tm_slots; + bool rc = true; - if (r == TM_ECODE_FAILED) { - TmThreadsSetFlag(tv, THV_FAILED); - run = false; - } - if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) { - run = false; - } - if (r == TM_ECODE_DONE) { - run = false; - } - } StatsSyncCounters(tv); TmThreadsSetFlag(tv, THV_FLOW_LOOP); @@ -332,28 +287,72 @@ static void *TmThreadsSlotPktAcqLoop(void *td) PacketPoolDestroy(); - for (slot = s; slot != NULL; slot = slot->slot_next) { + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadExitPrintStats != NULL) { slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); } if (slot->SlotThreadDeinit != NULL) { - r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); + TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); - goto error; + rc = false; + break; } } } tv->stream_pq = NULL; - SCLogDebug("%s ending", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); + return rc; +} + +static void *TmThreadsSlotPktAcqLoop(void *td) +{ + ThreadVars *tv = (ThreadVars *)td; + TmSlot *s = tv->tm_slots; + TmEcode r = TM_ECODE_OK; + + /* check if we are setup properly */ + if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { + SCLogError("TmSlot or ThreadVars badly setup: s=%p," + " PktAcqLoop=%p, tmqh_in=%p," + " tmqh_out=%p", + s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); + pthread_exit(NULL); + return NULL; + } + + if (!TmThreadsSlotPktAcqLoopInit(td)) { + goto error; + } + + bool run = TmThreadsWaitForUnpause(tv); + + while (run) { + r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); + + if (r == TM_ECODE_FAILED) { + TmThreadsSetFlag(tv, THV_FAILED); + run = false; + } + if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) { + run = false; + } + if (r == TM_ECODE_DONE) { + run = false; + } + } + if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) { + goto error; + } + + SCLogDebug("%s ending", tv->name); pthread_exit((void *) 0); return NULL; error: - tv->stream_pq = NULL; pthread_exit(NULL); return NULL; } @@ -383,19 +382,6 @@ static void *TmThreadsLib(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = tv->tm_slots; - TmEcode r = TM_ECODE_OK; - TmSlot *slot = NULL; - - /* Set the thread name */ - SCSetThreadName(tv->name); - - if (tv->thread_setup_flags != 0) - TmThreadSetupOptions(tv); - - /* Drop the capabilities for this thread */ - SCDropCaps(tv); - - PacketPoolInit(); /* check if we are setup properly */ if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { @@ -406,52 +392,9 @@ static void *TmThreadsLib(void *td) return NULL; } - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadInit != NULL) { - void *slot_data = NULL; - r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); - if (r != TM_ECODE_OK) { - if (r == TM_ECODE_DONE) { - EngineDone(); - TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE); - goto error; - } else { - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - goto error; - } - } - (void)SC_ATOMIC_SET(slot->slot_data, slot_data); - } - - /* if the flowworker module is the first, get the threads input queue */ - if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { - tv->stream_pq = tv->inq->pq; - tv->tm_flowworker = slot; - SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); - tv->flow_queue = FlowQueueNew(); - if (tv->flow_queue == NULL) { - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - return NULL; - } - /* setup a queue */ - } else if (slot->tm_id == TMM_FLOWWORKER) { - tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue)); - if (tv->stream_pq_local == NULL) - FatalError("failed to alloc PacketQueue"); - SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); - tv->stream_pq = tv->stream_pq_local; - tv->tm_flowworker = slot; - SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); - tv->flow_queue = FlowQueueNew(); - if (tv->flow_queue == NULL) { - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - return NULL; - } - } + if (!TmThreadsSlotPktAcqLoopInit(tv)) { + goto error; } - StatsSetupPrivate(tv); - - TmThreadsSetFlag(tv, THV_INIT_DONE); TmThreadsWaitForUnpause(tv); diff --git a/src/tm-threads.h b/src/tm-threads.h index 13ce78d51d..d4c8e898a5 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -291,6 +291,7 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts); SCTime_t TmThreadsGetThreadTime(const int idx); uint16_t TmThreadsGetWorkerThreadMax(void); bool TmThreadsTimeSubsysIsReady(void); +bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv); /** \brief Wait for a thread to become unpaused. *