diff --git a/src/runmode-lib.c b/src/runmode-lib.c index b5643d99c8..2699d47e44 100644 --- a/src/runmode-lib.c +++ b/src/runmode-lib.c @@ -108,37 +108,5 @@ int RunModeSpawnWorker(void *td) /** \brief destroy a worker thread */ void RunModeDestroyWorker(void *td) { - ThreadVars *tv = (ThreadVars *)td; - TmSlot *s = tv->tm_slots; - TmEcode r; - TmSlot *slot = NULL; - - StatsSyncCounters(tv); - - TmThreadsSetFlag(tv, THV_FLOW_LOOP); - - /* process all pseudo packets the flow timeout may throw at us */ - TmThreadTimeoutLoop(tv, s); - - TmThreadsSetFlag(tv, THV_RUNNING_DONE); - TmThreadWaitForFlag(tv, THV_DEINIT); - - PacketPoolDestroy(); - - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadExitPrintStats != NULL) { - slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); - } - - if (slot->SlotThreadDeinit != NULL) { - r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); - if (r != TM_ECODE_OK) { - break; - } - } - } - - tv->stream_pq = NULL; - SCLogDebug("%s ending", tv->name); - TmThreadsSetFlag(tv, THV_CLOSED); + SCTmThreadsSlotPktAcqLoopFinish((ThreadVars *)td); } diff --git a/src/tm-threads.c b/src/tm-threads.c index a1d74a80fb..19aa6eff3d 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -204,37 +204,9 @@ int TmThreadTimeoutLoop(ThreadVars *tv, TmSlot *s) return r; } -/* - - pcap/nfq - - pkt read - callback - process_pkt - - pkt read - process_pkt - - slot: - setup - - pkt_ack_loop(tv, slot_data) - - deinit - - process_pkt: - while(s) - run s; - queue; - - */ - -static void *TmThreadsSlotPktAcqLoop(void *td) +static bool TmThreadsSlotPktAcqLoopInit(ThreadVars *tv) { - ThreadVars *tv = (ThreadVars *)td; TmSlot *s = tv->tm_slots; - TmEcode r = TM_ECODE_OK; - TmSlot *slot = NULL; SCSetThreadName(tv->name); @@ -244,21 +216,10 @@ static void *TmThreadsSlotPktAcqLoop(void *td) CaptureStatsSetup(tv); PacketPoolInit(); - /* check if we are setup properly */ - if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { - SCLogError("TmSlot or ThreadVars badly setup: s=%p," - " PktAcqLoop=%p, tmqh_in=%p," - " tmqh_out=%p", - s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); - return NULL; - } - - for (slot = s; slot != NULL; slot = slot->slot_next) { + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadInit != NULL) { void *slot_data = NULL; - r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); + TmEcode r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); if (r != TM_ECODE_OK) { if (r == TM_ECODE_DONE) { EngineDone(); @@ -280,8 +241,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); - return NULL; + goto error; } /* setup a queue */ } else if (slot->tm_id == TMM_FLOWWORKER) { @@ -295,8 +255,7 @@ static void *TmThreadsSlotPktAcqLoop(void *td) tv->flow_queue = FlowQueueNew(); if (tv->flow_queue == NULL) { TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - pthread_exit(NULL); - return NULL; + goto error; } } } @@ -304,22 +263,18 @@ static void *TmThreadsSlotPktAcqLoop(void *td) StatsSetupPrivate(tv); TmThreadsSetFlag(tv, THV_INIT_DONE); - bool run = TmThreadsWaitForUnpause(tv); - while (run) { - r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); + return true; + +error: + return false; +} + +bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv) +{ + TmSlot *s = tv->tm_slots; + bool rc = true; - if (r == TM_ECODE_FAILED) { - TmThreadsSetFlag(tv, THV_FAILED); - run = false; - } - if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) { - run = false; - } - if (r == TM_ECODE_DONE) { - run = false; - } - } StatsSyncCounters(tv); TmThreadsSetFlag(tv, THV_FLOW_LOOP); @@ -332,28 +287,72 @@ static void *TmThreadsSlotPktAcqLoop(void *td) PacketPoolDestroy(); - for (slot = s; slot != NULL; slot = slot->slot_next) { + for (TmSlot *slot = s; slot != NULL; slot = slot->slot_next) { if (slot->SlotThreadExitPrintStats != NULL) { slot->SlotThreadExitPrintStats(tv, SC_ATOMIC_GET(slot->slot_data)); } if (slot->SlotThreadDeinit != NULL) { - r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); + TmEcode r = slot->SlotThreadDeinit(tv, SC_ATOMIC_GET(slot->slot_data)); if (r != TM_ECODE_OK) { TmThreadsSetFlag(tv, THV_CLOSED); - goto error; + rc = false; + break; } } } tv->stream_pq = NULL; - SCLogDebug("%s ending", tv->name); TmThreadsSetFlag(tv, THV_CLOSED); + return rc; +} + +static void *TmThreadsSlotPktAcqLoop(void *td) +{ + ThreadVars *tv = (ThreadVars *)td; + TmSlot *s = tv->tm_slots; + TmEcode r = TM_ECODE_OK; + + /* check if we are setup properly */ + if (s == NULL || s->PktAcqLoop == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { + SCLogError("TmSlot or ThreadVars badly setup: s=%p," + " PktAcqLoop=%p, tmqh_in=%p," + " tmqh_out=%p", + s, s ? s->PktAcqLoop : NULL, tv->tmqh_in, tv->tmqh_out); + TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); + pthread_exit(NULL); + return NULL; + } + + if (!TmThreadsSlotPktAcqLoopInit(td)) { + goto error; + } + + bool run = TmThreadsWaitForUnpause(tv); + + while (run) { + r = s->PktAcqLoop(tv, SC_ATOMIC_GET(s->slot_data), s); + + if (r == TM_ECODE_FAILED) { + TmThreadsSetFlag(tv, THV_FAILED); + run = false; + } + if (TmThreadsCheckFlag(tv, THV_KILL_PKTACQ) || suricata_ctl_flags) { + run = false; + } + if (r == TM_ECODE_DONE) { + run = false; + } + } + if (!SCTmThreadsSlotPktAcqLoopFinish(tv)) { + goto error; + } + + SCLogDebug("%s ending", tv->name); pthread_exit((void *) 0); return NULL; error: - tv->stream_pq = NULL; pthread_exit(NULL); return NULL; } @@ -383,19 +382,6 @@ static void *TmThreadsLib(void *td) { ThreadVars *tv = (ThreadVars *)td; TmSlot *s = tv->tm_slots; - TmEcode r = TM_ECODE_OK; - TmSlot *slot = NULL; - - /* Set the thread name */ - SCSetThreadName(tv->name); - - if (tv->thread_setup_flags != 0) - TmThreadSetupOptions(tv); - - /* Drop the capabilities for this thread */ - SCDropCaps(tv); - - PacketPoolInit(); /* check if we are setup properly */ if (s == NULL || tv->tmqh_in == NULL || tv->tmqh_out == NULL) { @@ -406,52 +392,9 @@ static void *TmThreadsLib(void *td) return NULL; } - for (slot = s; slot != NULL; slot = slot->slot_next) { - if (slot->SlotThreadInit != NULL) { - void *slot_data = NULL; - r = slot->SlotThreadInit(tv, slot->slot_initdata, &slot_data); - if (r != TM_ECODE_OK) { - if (r == TM_ECODE_DONE) { - EngineDone(); - TmThreadsSetFlag(tv, THV_CLOSED | THV_INIT_DONE | THV_RUNNING_DONE); - goto error; - } else { - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - goto error; - } - } - (void)SC_ATOMIC_SET(slot->slot_data, slot_data); - } - - /* if the flowworker module is the first, get the threads input queue */ - if (slot == (TmSlot *)tv->tm_slots && (slot->tm_id == TMM_FLOWWORKER)) { - tv->stream_pq = tv->inq->pq; - tv->tm_flowworker = slot; - SCLogDebug("pre-stream packetqueue %p (inq)", tv->stream_pq); - tv->flow_queue = FlowQueueNew(); - if (tv->flow_queue == NULL) { - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - return NULL; - } - /* setup a queue */ - } else if (slot->tm_id == TMM_FLOWWORKER) { - tv->stream_pq_local = SCCalloc(1, sizeof(PacketQueue)); - if (tv->stream_pq_local == NULL) - FatalError("failed to alloc PacketQueue"); - SCMutexInit(&tv->stream_pq_local->mutex_q, NULL); - tv->stream_pq = tv->stream_pq_local; - tv->tm_flowworker = slot; - SCLogDebug("pre-stream packetqueue %p (local)", tv->stream_pq); - tv->flow_queue = FlowQueueNew(); - if (tv->flow_queue == NULL) { - TmThreadsSetFlag(tv, THV_CLOSED | THV_RUNNING_DONE); - return NULL; - } - } + if (!TmThreadsSlotPktAcqLoopInit(tv)) { + goto error; } - StatsSetupPrivate(tv); - - TmThreadsSetFlag(tv, THV_INIT_DONE); TmThreadsWaitForUnpause(tv); diff --git a/src/tm-threads.h b/src/tm-threads.h index 13ce78d51d..d4c8e898a5 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -291,6 +291,7 @@ void TmThreadsGetMinimalTimestamp(struct timeval *ts); SCTime_t TmThreadsGetThreadTime(const int idx); uint16_t TmThreadsGetWorkerThreadMax(void); bool TmThreadsTimeSubsysIsReady(void); +bool SCTmThreadsSlotPktAcqLoopFinish(ThreadVars *tv); /** \brief Wait for a thread to become unpaused. *