Introduce master-slave synchronization support for ThreadVars

remotes/origin/master-1.1.x
Anoop Saldanha 14 years ago committed by Victor Julien
parent 94c5ecb069
commit e567c2d002

@ -48,6 +48,22 @@ struct TmSlot_;
/** Maximum no of times a thread can be restarted */
#define THV_MAX_RESTARTS 50
/**
* \brief Feature to enable ThreadVars implement a master-slave
* synchronization feature.
*/
typedef struct ThreadVarsMSSyncPt_ {
const char *name;
SCMutex m;
SCCondT cond;
int slave_hit;
int master_go;
struct ThreadVarsMSSyncPt_ *next;
} ThreadVarsMSSyncPt;
/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
pthread_t t;
@ -91,6 +107,8 @@ typedef struct ThreadVars_ {
SCMutex *m;
SCCondT *cond;
ThreadVarsMSSyncPt *ms_sync_pts;
uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
TmModules resgitered under this thread */
struct ThreadVars_ *next;

@ -1378,6 +1378,219 @@ void TmThreadKillThreads(void) {
return;
}
/**
* \brief Let a ThreadVars register a Master-Slave(MS) Synchronization point.
*
* \param tv Pointer to the ThreadVars.
* \param sync_pt_name Name of the new synchronization point.
*
* \retval 0 On success.
* \retval -1 On failure.
*/
int TmThreadMSRegisterSyncPt(ThreadVars *tv, const char *sync_pt_name)
{
ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts;
ThreadVarsMSSyncPt *ms_sync_pts_prev = NULL;
while (ms_sync_pts != NULL) {
ms_sync_pts_prev = ms_sync_pts;
if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) {
if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) {
SCLogWarning(SC_ERR_TM_THREADS_ERROR, "This MS thread "
"synchronization point is already registered");
return 0;
}
}
ms_sync_pts = ms_sync_pts->next;
}
ThreadVarsMSSyncPt *sync_pt = SCMalloc(sizeof(ThreadVarsMSSyncPt));
if (sync_pt == NULL)
return -1;
memset(sync_pt, 0, sizeof(ThreadVarsMSSyncPt));
SCMutexInit(&sync_pt->m, NULL);
SCCondInit(&sync_pt->cond, NULL);
if ((sync_pt->name = SCStrdup(sync_pt_name)) == NULL) {
return -1;
}
if (ms_sync_pts_prev == NULL) {
tv->ms_sync_pts = sync_pt;
} else {
ms_sync_pts_prev->next = sync_pt;
}
return 0;
}
/**
* \brief Used by a Slave TV, to indicate that it has hit a MS
* synchronization point, specified by the name.
*
* \param tv Pointer to the slave TV.
* \param name Pointer to the name of the synchronization point.
*/
void TmThreadsMSSlaveHitSyncPt(ThreadVars *tv, const char *sync_pt_name)
{
ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts;
while (ms_sync_pts != NULL) {
if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) {
if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) {
break;
}
}
ms_sync_pts = ms_sync_pts->next;
}
if (ms_sync_pts == NULL) {
SCLogInfo("This TV - \"%s\", doesn't have a MSSyncPt by the name "
"\"%s\"", tv->name, sync_pt_name);
return;
}
SCMutexLock(&ms_sync_pts->m);
{
ms_sync_pts->slave_hit = 1;
while (1) {
SCCondWait(&ms_sync_pts->cond, &ms_sync_pts->m);
if (!ms_sync_pts->master_go)
continue;
/* reset them */
ms_sync_pts->slave_hit = 0;
ms_sync_pts->master_go = 0;
}
}
SCMutexUnlock(&ms_sync_pts->m);
return;
}
/**
* \brief Used by a Mater thread, to release a slave TV if it has hit a
* synchronization point.
*
* \param tv Pointer to the slave TV to be released.
* \param name Pointer to the name of the synchronization point.
*/
void TmThreadsMSMasterReleaseSlaveAtSyncPt(ThreadVars *tv,
const char *sync_pt_name)
{
ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts;
while (ms_sync_pts != NULL) {
if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) {
if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) {
break;
}
}
ms_sync_pts = ms_sync_pts->next;
}
if (ms_sync_pts == NULL) {
SCLogInfo("This TV - \"%s\", doesn't have a MSSyncPt by the name "
"\"%s\"", tv->name, sync_pt_name);
return;
}
SCMutexLock(&ms_sync_pts->m);
{
if (ms_sync_pts->slave_hit) {
ms_sync_pts->master_go = 1;
SCCondSignal(&ms_sync_pts->cond);
}
}
SCMutexUnlock(&ms_sync_pts->m);
return;
}
/**
* \brief Used by a Mater thread, to disable a slave TV's
* synchronization point.
*
* \param tv Pointer to the slave TV whose synchronization pt is to be
* disabled.
* \param name Pointer to the name of the synchronization point.
*/
void TmThreadsMSMasterDisableSlaveSyncPt(ThreadVars *tv,
const char *sync_pt_name)
{
ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts;
while (ms_sync_pts != NULL) {
if (strlen(ms_sync_pts->name) == strlen(sync_pt_name)) {
if (strcasecmp(ms_sync_pts->name, sync_pt_name) == 0) {
break;
}
}
ms_sync_pts = ms_sync_pts->next;
}
if (ms_sync_pts == NULL) {
SCLogInfo("This TV - \"%s\", doesn't have a MSSyncPt by the name "
"\"%s\"", tv->name, sync_pt_name);
return;
}
SCMutexLock(&ms_sync_pts->m);
{
ms_sync_pts->disabled = 1;
if (ms_sync_pts->slave_hit) {
ms_sync_pts->master_go = 1;
SCCondSignal(&ms_sync_pts->cond);
}
}
SCMutexUnlock(&ms_sync_pts->m);
return;
}
void TmThreadsMSMasterReleaseSlaveAllSyncPts(ThreadVars *tv)
{
ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts;
while (ms_sync_pts != NULL) {
SCMutexLock(&ms_sync_pts->m);
{
if (ms_sync_pts->slave_hit) {
ms_sync_pts->master_go = 1;
SCCondSignal(&ms_sync_pts->cond);
}
}
SCMutexUnlock(&ms_sync_pts->m);
ms_sync_pts = ms_sync_pts->next;
}
return;
}
void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv)
{
ThreadVarsMSSyncPt *ms_sync_pts = tv->ms_sync_pts;
while (ms_sync_pts != NULL) {
SCMutexLock(&ms_sync_pts->m);
{
ms_sync_pts->disabled = 1;
if (ms_sync_pts->slave_hit) {
ms_sync_pts->master_go = 1;
SCCondSignal(&ms_sync_pts->cond);
}
}
SCMutexUnlock(&ms_sync_pts->m);
ms_sync_pts = ms_sync_pts->next;
}
return;
}
/**
* \brief Spawns a thread associated with the ThreadVars instance tv
*

@ -107,6 +107,13 @@ void TmThreadsUnsetFlag(ThreadVars *, uint8_t);
TmEcode TmThreadsSlotVarRun (ThreadVars *tv, Packet *p, TmSlot *slot);
int TmThreadMSRegisterSyncPt(ThreadVars *, const char *);
void TmThreadsMSSlaveHitSyncPt(ThreadVars *, const char *);
void TmThreadsMSMasterReleaseSlaveAtSyncPt(ThreadVars *, const char *);
void TmThreadsMSMasterDisableSlaveSyncPt(ThreadVars *, const char *);
void TmThreadsMSMasterReleaseSlaveAllSyncPts(ThreadVars *);
void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv);
#if 0
/**

Loading…
Cancel
Save