From e567c2d0028cc46ed071587f61b7f8a286a0002c Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Fri, 9 Sep 2011 11:20:33 +0530 Subject: [PATCH] Introduce master-slave synchronization support for ThreadVars --- src/threadvars.h | 18 ++++ src/tm-threads.c | 213 +++++++++++++++++++++++++++++++++++++++++++++++ src/tm-threads.h | 7 ++ 3 files changed, 238 insertions(+) diff --git a/src/threadvars.h b/src/threadvars.h index c96ad9ed48..6bdd6bfe94 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -48,6 +48,22 @@ struct TmSlot_; /** Maximum no of times a thread can be restarted */ #define THV_MAX_RESTARTS 50 +/** + * \brief Feature to enable ThreadVars implement a master-slave + * synchronization feature. + */ +typedef struct ThreadVarsMSSyncPt_ { + const char *name; + + SCMutex m; + SCCondT cond; + + int slave_hit; + int master_go; + + struct ThreadVarsMSSyncPt_ *next; +} ThreadVarsMSSyncPt; + /** \brief Per thread variable structure */ typedef struct ThreadVars_ { pthread_t t; @@ -91,6 +107,8 @@ typedef struct ThreadVars_ { SCMutex *m; SCCondT *cond; + ThreadVarsMSSyncPt *ms_sync_pts; + uint8_t cap_flags; /**< Flags to indicate the capabilities of all the TmModules resgitered under this thread */ struct ThreadVars_ *next; diff --git a/src/tm-threads.c b/src/tm-threads.c index 41502b64bc..43676ebbc0 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -1378,6 +1378,219 @@ void TmThreadKillThreads(void) { return; } +/** + * \brief Let a ThreadVars register a Master-Slave(MS) Synchronization point. + * + * \param tv Pointer to the ThreadVars. + * \param sync_pt_name Name of the new synchronization point. + * + * \retval 0 On success. + * \retval -1 On failure. + */ +int TmThreadMSRegisterSyncPt(ThreadVars *tv, const char *sync_pt_name) +{ + ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts; + ThreadVarsMSSyncPt *ms_sync_pts_prev = NULL; + + while (ms_sync_pts != NULL) { + ms_sync_pts_prev = ms_sync_pts; + if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) { + if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) { + SCLogWarning(SC_ERR_TM_THREADS_ERROR, "This MS thread " + "synchronization point is already registered"); + return 0; + } + } + + ms_sync_pts = ms_sync_pts->next; + } + + ThreadVarsMSSyncPt *sync_pt = SCMalloc(sizeof(ThreadVarsMSSyncPt)); + if (sync_pt == NULL) + return -1; + memset(sync_pt, 0, sizeof(ThreadVarsMSSyncPt)); + SCMutexInit(&sync_pt->m, NULL); + SCCondInit(&sync_pt->cond, NULL); + if ((sync_pt->name = SCStrdup(sync_pt_name)) == NULL) { + return -1; + } + + if (ms_sync_pts_prev == NULL) { + tv->ms_sync_pts = sync_pt; + } else { + ms_sync_pts_prev->next = sync_pt; + } + + return 0; +} + +/** + * \brief Used by a Slave TV, to indicate that it has hit a MS + * synchronization point, specified by the name. + * + * \param tv Pointer to the slave TV. + * \param name Pointer to the name of the synchronization point. + */ +void TmThreadsMSSlaveHitSyncPt(ThreadVars *tv, const char *sync_pt_name) +{ + ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts; + + while (ms_sync_pts != NULL) { + if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) { + if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) { + break; + } + } + ms_sync_pts = ms_sync_pts->next; + } + + if (ms_sync_pts == NULL) { + SCLogInfo("This TV - \"%s\", doesn't have a MSSyncPt by the name " + "\"%s\"", tv->name, sync_pt_name); + return; + } + + SCMutexLock(&ms_sync_pts->m); + { + ms_sync_pts->slave_hit = 1; + while (1) { + SCCondWait(&ms_sync_pts->cond, &ms_sync_pts->m); + + if (!ms_sync_pts->master_go) + continue; + + /* reset them */ + ms_sync_pts->slave_hit = 0; + ms_sync_pts->master_go = 0; + } + } + SCMutexUnlock(&ms_sync_pts->m); + + return; +} + +/** + * \brief Used by a Mater thread, to release a slave TV if it has hit a + * synchronization point. + * + * \param tv Pointer to the slave TV to be released. + * \param name Pointer to the name of the synchronization point. + */ +void TmThreadsMSMasterReleaseSlaveAtSyncPt(ThreadVars *tv, + const char *sync_pt_name) +{ + ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts; + + while (ms_sync_pts != NULL) { + if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) { + if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) { + break; + } + } + ms_sync_pts = ms_sync_pts->next; + } + + if (ms_sync_pts == NULL) { + SCLogInfo("This TV - \"%s\", doesn't have a MSSyncPt by the name " + "\"%s\"", tv->name, sync_pt_name); + return; + } + + SCMutexLock(&ms_sync_pts->m); + { + if (ms_sync_pts->slave_hit) { + ms_sync_pts->master_go = 1; + SCCondSignal(&ms_sync_pts->cond); + } + } + SCMutexUnlock(&ms_sync_pts->m); + + return; +} + +/** + * \brief Used by a Mater thread, to disable a slave TV's + * synchronization point. + * + * \param tv Pointer to the slave TV whose synchronization pt is to be + * disabled. + * \param name Pointer to the name of the synchronization point. + */ +void TmThreadsMSMasterDisableSlaveSyncPt(ThreadVars *tv, + const char *sync_pt_name) +{ + ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts; + + while (ms_sync_pts != NULL) { + if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) { + if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) { + break; + } + } + ms_sync_pts = ms_sync_pts->next; + } + + if (ms_sync_pts == NULL) { + SCLogInfo("This TV - \"%s\", doesn't have a MSSyncPt by the name " + "\"%s\"", tv->name, sync_pt_name); + return; + } + + SCMutexLock(&ms_sync_pts->m); + { + ms_sync_pts->disabled = 1; + if (ms_sync_pts->slave_hit) { + ms_sync_pts->master_go = 1; + SCCondSignal(&ms_sync_pts->cond); + } + } + SCMutexUnlock(&ms_sync_pts->m); + + return; +} + +void TmThreadsMSMasterReleaseSlaveAllSyncPts(ThreadVars *tv) +{ + ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts; + + while (ms_sync_pts != NULL) { + SCMutexLock(&ms_sync_pts->m); + { + if (ms_sync_pts->slave_hit) { + ms_sync_pts->master_go = 1; + SCCondSignal(&ms_sync_pts->cond); + } + } + SCMutexUnlock(&ms_sync_pts->m); + + ms_sync_pts = ms_sync_pts->next; + } + + return; +} + +void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv) +{ + ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts; + + while (ms_sync_pts != NULL) { + SCMutexLock(&ms_sync_pts->m); + { + ms_sync_pts->disabled = 1; + + if (ms_sync_pts->slave_hit) { + ms_sync_pts->master_go = 1; + SCCondSignal(&ms_sync_pts->cond); + } + } + SCMutexUnlock(&ms_sync_pts->m); + + ms_sync_pts = ms_sync_pts->next; + } + + return; +} + /** * \brief Spawns a thread associated with the ThreadVars instance tv * diff --git a/src/tm-threads.h b/src/tm-threads.h index 6812381e83..acb46142bf 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -107,6 +107,13 @@ void TmThreadsUnsetFlag(ThreadVars *, uint8_t); TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot); +int TmThreadMSRegisterSyncPt(ThreadVars *, const char *); +void TmThreadsMSSlaveHitSyncPt(ThreadVars *, const char *); +void TmThreadsMSMasterReleaseSlaveAtSyncPt(ThreadVars *, const char *); +void TmThreadsMSMasterDisableSlaveSyncPt(ThreadVars *, const char *); +void TmThreadsMSMasterReleaseSlaveAllSyncPts(ThreadVars *); +void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv); + #if 0 /**