From a5fb240a4a3e12c0bebe36dc7f2d2cfa75709c67 Mon Sep 17 00:00:00 2001 From: Anoop Saldanha Date: Mon, 3 Aug 2009 13:01:48 +0530 Subject: [PATCH] Changes added for the Performance Counter API --- src/Makefile.am | 3 +- src/counters.c | 928 ++++++++++++++++++++++++++++++++++++++++++ src/counters.h | 209 ++++++++++ src/decode-ethernet.c | 2 + src/decode-ipv4.c | 4 + src/eidps.c | 5 + src/source-pcap.c | 44 +- src/source-pcap.h | 13 + src/threadvars.h | 4 + src/tm-threads.c | 2 +- src/tm-threads.h | 2 + src/tmqh-simple.c | 4 + 12 files changed, 1217 insertions(+), 3 deletions(-) create mode 100644 src/counters.c create mode 100644 src/counters.h diff --git a/src/Makefile.am b/src/Makefile.am index 8adbfaff2d..b069823974 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -92,7 +92,8 @@ stream-tcp.c stream-tcp.h stream-tcp-private.h \ stream-tcp-reassemble.c stream-tcp-reassemble.h \ respond-reject.c respond-reject.h \ respond-reject-libnet11.h respond-reject-libnet11.c \ -l7-app-detect.c l7-app-detect.h +l7-app-detect.c l7-app-detect.h \ +counters.c counter.h # set the include path found by configure INCLUDES= $(all_includes) diff --git a/src/counters.c b/src/counters.c new file mode 100644 index 0000000000..93f86ac15c --- /dev/null +++ b/src/counters.c @@ -0,0 +1,928 @@ +#include "stdio.h" +#include "stdlib.h" +#include "string.h" +#include "pthread.h" +#include "counters.h" +#include "eidps.h" +#include "threadvars.h" +#include "tm-modules.h" +#include "tm-threads.h" +#include "util-unittest.h" +#include +#include "time.h" + +static PerfThreadContext *perf_tc = NULL; +static PerfOPIfaceContext *perf_op_ctx = NULL; + +/** + * Initializes the perf counter api. Things are hard coded currently. + * More work to be done when we implement multiple interfaces + */ +void PerfInitCounterApi() +{ + PerfInitOPCtx(); + + return; +} + +/** + * Initializes the output interface context + */ +void PerfInitOPCtx() +{ + if ( (perf_op_ctx = malloc(sizeof(PerfOPIfaceContext))) == NULL) { + printf("error allocating memory\n"); + exit(0); + } + memset(perf_op_ctx, 0, sizeof(PerfOPIfaceContext)); + + perf_op_ctx->iface = IFACE_FILE; + + if ( (perf_op_ctx->file = strdup("/root/log.txt")) == NULL) { + printf("error allocating memory\n"); + exit(0); + } + + if ( (perf_op_ctx->fp = fopen(perf_op_ctx->file, "w+")) == NULL) { + printf("fopen error opening file /root/log.txt\n"); + exit(0); + } + + // club the counter from multiple instances of the tm before o/p + perf_op_ctx->club_tm = 1; + + // init the lock used by PerfClubTMInst + pthread_mutex_init(&perf_op_ctx->pctmi_lock, NULL); + + return; +} + +/** + * Spawns the wakeup, and the management thread + */ +void PerfSpawnThreads() +{ + pthread_attr_t attr; + + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + + if ( (perf_tc = malloc(sizeof(PerfThreadContext))) == NULL) { + printf("Error allocating memory\n"); + exit(0); + } + memset(perf_tc, 0, sizeof(PerfThreadContext)); + + perf_tc->flags = PT_RUN; + + if (pthread_create(&perf_tc->wakeup_t, &attr, PerfWakeupThread, NULL)) { + printf("Error creating PerfWakeupFunc thread\n"); + exit(0); + } + + if (pthread_create(&perf_tc->mgmt_t, &attr, PerfMgmtThread, NULL)) { + printf("Error creating PerfWakeupFunc thread\n"); + exit(0); + } + + return; +} + +/** + * Kills the wakeup and the management threads + */ +void PerfDestroyThreads() +{ + perf_tc->flags |= PT_KILL; + + pthread_join(perf_tc->wakeup_t, NULL); + pthread_join(perf_tc->mgmt_t, NULL); + + return; +} + + +/** + * The management thread. This thread is responsible for writing the performance + * stats information. + * + * @param arg is NULL always + */ +void * PerfMgmtThread(void *arg) +{ + u_int8_t run = 1; + + if (perf_op_ctx == NULL) { + printf("error: PerfInitCounterApi() has to be called first\n"); + return NULL; + } + + while (run) { + sleep(MGMTT_TTS); + + PerfOutputCounters(); + + if (perf_tc->flags & PT_KILL) + run = 0; + } + + return NULL; +} + +/** + * Wake up thread. This thread wakes up every TTS(time to sleep) seconds and + * sets the flag for every ThreadVars' PerfContext + * + * @param arg is NULL always + */ +void * PerfWakeupThread(void *arg) +{ + u_int8_t run = 1; + ThreadVars *tv; + PacketQueue *q; + + while (run) { + sleep(WUT_TTS); + + tv = tv_root; + + while (tv) { + if (!tv->inq || !tv->pctx.head) { + tv = tv->next; + continue; + } + + q = &trans_q[tv->inq->id]; + + // assuming the assignment of an int to be atomic, and even if it's + // not, it should be okay + tv->pctx.perf_flag = 1; + + pthread_cond_signal(&q->cond_q); + + tv = tv->next; + } + + if (perf_tc->flags & PT_KILL) + run = 0; + } + + return NULL; +} + +/** + * Registers a counter + * + * @param cname holds the counter name + * @param tm_name holds the tm_name + * @param tid holds the tid running this module + * @param type holds the datatype of this counter variable + * @param head holds the PerfCounter + * + * @returns the counter id + */ +u_int32_t PerfRegisterCounter(char *cname, char *tm_name, pthread_t tid, int type, + char *desc, PerfContext *pctx) +{ + PerfCounter **head = &pctx->head; + PerfCounter *temp, *prev; + PerfCounter *pc; + + if (cname == NULL || tm_name == NULL || pctx == NULL) { + printf("counter name, tm name null or PerfContext NULL\n"); + return 0; + } + + // (TYPE_MAX - 1) because we still haven't implemented TYPE_STR + if ((type >= (TYPE_MAX - 1)) || (type < 0)) { + printf("Error:Counters of this type can't be registered\n"); + return(0); + } + + temp = prev = *head; + while (temp != NULL) { + prev = temp; + + if (!strcmp(cname, temp->name->cname) && + !strcmp(tm_name, temp->name->tm_name)) + break; + + temp = temp->next; + } + + // We already have a counter registered by this name + if (temp != NULL) + return(temp->id); + + if ( (pc = malloc(sizeof(PerfCounter))) == NULL) { + printf("error allocating memory\n"); + exit(0); + } + memset(pc, 0, sizeof(PerfCounter)); + + if (prev == NULL) { + *head = pc; + } + else + prev->next = pc; + + if( (pc->name = malloc(sizeof(PerfCounterName))) == NULL) { + printf("error allocating memory. aborting\n"); + free(pc); + exit(0); + } + memset(pc->name, 0, sizeof(PerfCounterName)); + + if ( (pc->value = malloc(sizeof(PerfCounterValue))) == NULL) { + printf("error allocating memory. aborting\n"); + free(pc->name); + free(pc); + exit(0); + } + memset(pc->value, 0, sizeof(PerfCounterValue)); + + pc->name->cname = strdup(cname); + pc->name->tm_name = strdup(tm_name); + pc->name->tid = tid; + + pc->value->type = type; + switch(pc->value->type) { + case TYPE_UINT64: + pc->value->size = sizeof(u_int64_t); + break; + case TYPE_DOUBLE: + pc->value->size = sizeof(double); + break; + } + if ( (pc->value->cvalue = malloc(pc->value->size)) == NULL) { + printf("error allocating memory\n"); + exit(0); + } + memset(pc->value->cvalue, 0, pc->value->size); + + // assign a unique id to this PerfCounter. The id is local to this tv. + // please note that the ids start from 1 and not 0 + pc->id = ++(pctx->curr_id); + + if (desc != NULL) + pc->desc = strdup(desc); + + return(pc->id); +} + +void PerfAddToClubbedTMTable(char *tm_name, PerfContext *pctx) +{ + PerfClubTMInst *pctmi, *prev, *temp; + PerfContext **hpctx; + int i = 0; + + pthread_mutex_lock(&perf_op_ctx->pctmi_lock); + + pctmi = perf_op_ctx->pctmi; + prev = pctmi; + + while (pctmi != NULL) { + prev = pctmi; + if (strcmp(tm_name, pctmi->tm_name)) { + pctmi = pctmi->next; + continue; + } + break; + } + + if (pctmi == NULL) { + temp = malloc(sizeof(PerfClubTMInst)); + memset(temp, 0, sizeof(PerfClubTMInst)); + + temp->size++; + temp->head = realloc(temp->head, temp->size * sizeof(PerfContext **)); + temp->head[0] = pctx; + temp->tm_name = strdup(tm_name); + + if (prev == NULL) + perf_op_ctx->pctmi = temp; + else + prev->next = temp; + + pthread_mutex_unlock(&perf_op_ctx->pctmi_lock); + return; + } + + hpctx = pctmi->head; + for (i = 0; i < pctmi->size; i++) { + if (hpctx[i] != pctx) + continue; + + pthread_mutex_unlock(&perf_op_ctx->pctmi_lock); + return; + } + + pctmi->head = realloc(pctmi->head, (pctmi->size + 1) * sizeof(PerfContext **)); + hpctx = pctmi->head; + + hpctx[pctmi->size] = pctx; + for (i = pctmi->size - 1; i >= 0; i--) { + if (pctx->curr_id <= hpctx[i]->curr_id) { + hpctx[i + 1] = hpctx[i]; + hpctx[i] = pctx; + continue; + } + break; + } + pctmi->size++; + + pthread_mutex_unlock(&perf_op_ctx->pctmi_lock); + + return; +} + + +/** + * Returns a counter array for counters in this id range(s_id - e_id) + * + * @param s_id is the start id of the counter + * @param e_id is the end id of the counter + * @param pctx is a pointer to the tv's PerfContext + * + * @returns a counter-array in this(s_id-e_id) range for this tm instance + */ +PerfCounterArray * PerfGetCounterArrayRange(u_int32_t s_id, u_int32_t e_id, + PerfContext *pctx) +{ + PerfCounterArray *pca; + u_int8_t i; + + if (pctx == NULL) { + printf("pctx is NULL\n"); + return NULL; + } + + if (s_id < 1 || e_id < 1 || s_id > e_id) { + printf("error with the counter ids\n"); + return NULL; + } + + if (e_id > pctx->curr_id) { + printf("end id is greater than the max id for this tv\n"); + return NULL; + } + + if (pctx == NULL) { + printf("perfcontext is NULL\n"); + return NULL; + } + + if ( (pca = malloc(sizeof(PerfCounterArray))) == NULL) { + printf("Error allocating memory\n"); + exit(0); + } + memset(pca, 0, sizeof(PerfCounterArray)); + + if ( (pca->head = malloc(sizeof(PCAElem) * (e_id - s_id + 2))) == NULL) { + printf("Error allocating memory\n"); + exit(0); + } + memset(pca->head, 0, sizeof(PCAElem) * (e_id - s_id + 2)); + + i = 1; + while (s_id <= e_id) { + pca->head[i].id = s_id++; + i++; + } + pca->size = i - 1; + + return pca; +} + +/** + * Returns a counter array for all counters registered for this tm instance + * + * @param pctx is a pointer to the tv's PerfContext + * + * returns a counter-array for all the counters of this tm instance + */ +PerfCounterArray * PerfGetAllCountersArray(PerfContext *pctx) +{ + return((pctx)?PerfGetCounterArrayRange(1, pctx->curr_id, pctx):NULL); +} + + +/** + * Updates an individual counter + * + * @param cname holds the counter name + * @param tm_name holds the tm name + * @param id holds the counter id for this tm + * @param value holds a pointer to the local counter from the client thread + * @param pctx holds the PerfContext associated with this instance of the tm + */ +int PerfUpdateCounter(char *cname, char *tm_name, u_int32_t id, void *value, + PerfContext *pctx) +{ + PerfCounter *pc = pctx->head; + + if (pctx == NULL) { + printf("pctx null inside PerfUpdateCounter\n"); + return 0; + } + + if ((cname == NULL || tm_name == NULL) && (id > pctx->curr_id || id < 1)) { + printf("id supplied doesn't exist. Please supply cname and " + "tm_name instead\n"); + return 0; + } + + while(pc != NULL) { + if (pc->id != id) { + pc = pc->next; + continue; + } + + memcpy(pc->value->cvalue, value, pc->value->size); + pc->updated++; + + break; + } + + if (pc == NULL) { + printf("this counter isn't registered in this tm\n"); + return 0; + } + + return 1; +} + +/** + * Syncs the counter array with the global counter variables + * + * @param pca holds a pointer to the PerfCounterArray + * @param pctx holds a pointer the the tv's PerfContext + * @param reset_lc indicates whether the local counter has to be reset or not + */ +int PerfUpdateCounterArray(PerfCounterArray *pca, PerfContext *pctx, int reset_lc) +{ + u_int32_t i; + PerfCounter *pc = pctx->head; + PCAElem *pcae = pca->head; + + if (pca == NULL || pctx == NULL) { + printf("pca or pctx is NULL inside PerfUpdateCounterArray\n"); + return -1; + } + + pthread_mutex_lock(&pctx->m); + for (i = 1; i <= pca->size; i++) { + while (pc != NULL) { + if (pc->id != pcae[i].id) { + pc = pc->next; + continue; + } + + memcpy(pc->value->cvalue, &(pcae[i].cnt), pc->value->size); + + pc->updated++; + + if (reset_lc) + pcae[i].cnt = 0; + + pc = pc->next; + break; + } + } + pthread_mutex_unlock(&pctx->m); + + pctx->perf_flag = 0; + + return 1; +} + +/** + * The output interface dispatcher for the counter api + */ +void PerfOutputCounters() +{ + switch (perf_op_ctx->iface) { + case IFACE_FILE: + PerfOutputCounterFileIface(); + break; + case IFACE_CONSOLE: + // yet to be implemented + break; + case IFACE_NETWORK: + // yet to be implemented + break; + case IFACE_SYSLOG: + // yet to be implemented + break; + } + + return; +} + +/** + * The file output interface for the counter api + */ +int PerfOutputCounterFileIface() +{ + ThreadVars *tv = tv_root; + PerfClubTMInst *pctmi; + PerfCounter *pc; + PerfCounter **pc_heads; + + u_int64_t *ui64_cvalue; + u_int64_t result; + + struct timeval tval; + struct tm *tms; + + int i; + int flag; + + if (perf_op_ctx->fp == NULL) { + printf("perf_op_ctx->fp is NULL"); + return 0; + } + + memset(&tval, 0, sizeof(struct timeval)); + + gettimeofday(&tval, NULL); + tms = (struct tm *)localtime(&tval.tv_sec); + + fprintf(perf_op_ctx->fp, "-------------------------------------------------" + "------------------\n"); + fprintf(perf_op_ctx->fp, "%d/%d/%d -- %d:%d:%d\n", tms->tm_mday, + tms->tm_mon, tms->tm_year, tms->tm_hour, tms->tm_min, tms->tm_sec); + fprintf(perf_op_ctx->fp, "-------------------------------------------------" + "------------------\n"); + fprintf(perf_op_ctx->fp, "%-25s | %-25s | %-s\n", "Counter", "TM Name", + "Value"); + fprintf(perf_op_ctx->fp, "-------------------------------------------------" + "------------------\n"); + + if (perf_op_ctx->club_tm == 0) { + while (tv) { + pthread_mutex_lock(&tv->pctx.m); + pc = tv->pctx.head; + + while (pc) { + ui64_cvalue = (u_int64_t *)pc->value->cvalue; + fprintf(perf_op_ctx->fp, "%-25s | %-25s | %-u\n", pc->name->cname, + pc->name->tm_name, *ui64_cvalue); + //printf("**** %-10d %-10d %-10s %-10u\n", pc->name->tid, pc->id, + // pc->name->cname, *ui64_cvalue); + pc = pc->next; + } + + pthread_mutex_unlock(&tv->pctx.m); + tv = tv->next; + } + + fflush(perf_op_ctx->fp); + + return 1; + } + + pctmi = perf_op_ctx->pctmi; + while (pctmi) { + if ( (pc_heads = malloc(pctmi->size * sizeof(PerfCounter **))) == NULL) { + printf("error allocating memory\n"); + exit(0); + } + memset(pc_heads, 0, pctmi->size * sizeof(PerfCounter **)); + + for (i = 0; i < pctmi->size; i++) { + pc_heads[i] = pctmi->head[i]->head; + + pthread_mutex_lock(&pctmi->head[i]->m); + + while(strcmp(pctmi->tm_name, pc_heads[i]->name->tm_name)) + pc_heads[i] = pc_heads[i]->next; + } + + flag = 1; + while(flag) { + result = 0; + pc = pc_heads[0]; + for (i = 0; i < pctmi->size; i++) { + ui64_cvalue = pc_heads[i]->value->cvalue; + result += *ui64_cvalue; + + pc_heads[i] = pc_heads[i]->next; + + if (pc_heads[i] == NULL || + strcmp(pctmi->tm_name, pc_heads[0]->name->tm_name)) + flag = 0; + } + fprintf(perf_op_ctx->fp, "%-25s | %-25s | %-u\n", + pc->name->cname, pctmi->tm_name, result); + //printf("%-25s | %-25s | %-u\n", pc->name->cname, + // pctmi->tm_name, result); + + } + for (i = 0; i < pctmi->size; i++) + pthread_mutex_unlock(&pctmi->head[i]->m); + + pctmi = pctmi->next; + free(pc_heads); + } + + return 1; +} + +/** + * Kills the perf threads and releases other resources. + */ +void PerfReleaseResources() +{ + PerfDestroyThreads(); + + PerfReleaseOPCtx(); + + return; +} + +void PerfReleaseOPCtx() +{ + if (perf_op_ctx) { + if (perf_op_ctx->fp) + fclose(perf_op_ctx->fp); + + if (perf_op_ctx->file) + free(perf_op_ctx->file); + + if (perf_op_ctx->pctmi) { + if (perf_op_ctx->pctmi->tm_name) + free(perf_op_ctx->pctmi->tm_name); + if (perf_op_ctx->pctmi->head) + free(perf_op_ctx->pctmi->head); + free(perf_op_ctx->pctmi); + } + + free(perf_op_ctx); + } + + return; +} + +void PerfReleasePerfCounterS(PerfCounter *head) +{ + PerfCounter *pc; + + while (head) { + pc = head; + head = head->next; + PerfReleaseCounter(pc); + } + + return; +} + +void PerfReleaseCounter(PerfCounter *pc) +{ + if (pc) { + if (pc->name) { + if (pc->name->cname) free(pc->name->cname); + if (pc->name->tm_name) free(pc->name->tm_name); + free(pc->name); + } + if (pc->value) { + if (pc->value->cvalue) free(pc->value->cvalue); + free(pc->value); + } + if (pc->desc) free(pc->desc); + free(pc); + } + + return; +} + +void PerfReleasePCA(PerfCounterArray *pca) +{ + if (pca) { + if (pca->head) + free(pca->head); + free(pca); + } + + return; +} + +void PerfRegisterTests() +{ + UtRegisterTest("PerfTestCounterReg01", PerfTestCounterReg01, 0); + UtRegisterTest("PerfTestCounterReg02", PerfTestCounterReg02, 0); + UtRegisterTest("PerfTestCounterReg03", PerfTestCounterReg03, 1); + UtRegisterTest("PerfTestCounterReg04", PerfTestCounterReg04, 1); + UtRegisterTest("PerfTestGetCntArray05", PerfTestGetCntArray05, 1); + UtRegisterTest("PerfTestGetCntArray06", PerfTestGetCntArray06, 1); + UtRegisterTest("PerfTestCntArraySize07", PerfTestCntArraySize07, 2); + UtRegisterTest("PerfTestUpdateCounter08", PerfTestUpdateCounter08, 101); + UtRegisterTest("PerfTestUpdateCounter09", PerfTestUpdateCounter09, 1); + UtRegisterTest("PerfTestUpdateGlobalCounter10", PerfTestUpdateGlobalCounter10, 1); + + return; +} + + +//------------------------------------Unit_Tests------------------------------------ + + +int PerfTestCounterReg01() +{ + PerfContext pctx; + + memset(&pctx, 0, sizeof(PerfContext)); + + return PerfRegisterCounter("t1", "c1", 100, 5, NULL, &pctx); +} + +int PerfTestCounterReg02() +{ + PerfContext pctx; + + memset(&pctx, 0, sizeof(PerfContext)); + + return PerfRegisterCounter(NULL, NULL, 100, TYPE_UINT64, NULL, &pctx); +} + +int PerfTestCounterReg03() +{ + PerfContext pctx; + int result; + + memset(&pctx, 0, sizeof(PerfContext)); + + result = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &pctx); + + PerfReleasePerfCounterS(pctx.head); + + return result; +} + +int PerfTestCounterReg04() +{ + PerfContext pctx; + int result; + + memset(&pctx, 0, sizeof(PerfContext)); + + PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &pctx); + PerfRegisterCounter("t2", "c2", 100, TYPE_UINT64, NULL, &pctx); + PerfRegisterCounter("t3", "c3", 100, TYPE_UINT64, NULL, &pctx); + + result = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &pctx); + + PerfReleasePerfCounterS(pctx.head); + + return result; +} + +int PerfTestGetCntArray05() +{ + ThreadVars tv; + int id; + + memset(&tv, 0, sizeof(ThreadVars)); + + id = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &tv.pctx); + + tv.pca = PerfGetAllCountersArray(NULL); + + return (!tv.pca)?1:0; +} + +int PerfTestGetCntArray06() +{ + ThreadVars tv; + int id; + int result; + + memset(&tv, 0, sizeof(ThreadVars)); + + id = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &tv.pctx); + + tv.pca = PerfGetAllCountersArray(&tv.pctx); + + result = (tv.pca)?1:0; + + PerfReleasePerfCounterS(tv.pctx.head); + PerfReleasePCA(tv.pca); + + return result; +} + +int PerfTestCntArraySize07() +{ + ThreadVars tv; + PerfCounterArray *pca; + int result; + + memset(&tv, 0, sizeof(ThreadVars)); + + pca = (PerfCounterArray *)&tv.pca; + + PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &tv.pctx); + PerfRegisterCounter("t2", "c2", 100, TYPE_UINT64, NULL, &tv.pctx); + + pca = PerfGetAllCountersArray(&tv.pctx); + + PerfCounterIncr(1, pca); + PerfCounterIncr(2, pca); + + result = pca->size; + + PerfReleasePerfCounterS(tv.pctx.head); + PerfReleasePCA(pca); + + return result; +} + +int PerfTestUpdateCounter08() +{ + ThreadVars tv; + PerfCounterArray *pca; + int id; + int result; + + memset(&tv, 0, sizeof(ThreadVars)); + + id = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &tv.pctx); + + pca = PerfGetAllCountersArray(&tv.pctx); + + PerfCounterIncr(id, pca); + PerfCounterAdd(id, pca, 100); + + result = pca->head[id].cnt; + + PerfReleasePerfCounterS(tv.pctx.head); + PerfReleasePCA(pca); + + return result; +} + +int PerfTestUpdateCounter09() +{ + ThreadVars tv; + PerfCounterArray *pca; + int id1, id2; + int result; + + memset(&tv, 0, sizeof(ThreadVars)); + + id1 = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &tv.pctx); + PerfRegisterCounter("t2", "c2", 100, TYPE_UINT64, NULL, &tv.pctx); + PerfRegisterCounter("t3", "c3", 100, TYPE_UINT64, NULL, &tv.pctx); + PerfRegisterCounter("t4", "c4", 100, TYPE_UINT64, NULL, &tv.pctx); + id2 = PerfRegisterCounter("t5", "c5", 100, TYPE_UINT64, NULL, &tv.pctx); + + pca = PerfGetAllCountersArray(&tv.pctx); + + PerfCounterIncr(id2, pca); + PerfCounterAdd(id2, pca, 100); + + result = (pca->head[id1].cnt == 0) && (pca->head[id2].cnt == 101); + + PerfReleasePerfCounterS(tv.pctx.head); + PerfReleasePCA(pca); + + return result; +} + +int PerfTestUpdateGlobalCounter10() +{ + ThreadVars tv; + PerfCounterArray *pca; + + int result = 1; + int id1, id2, id3; + u_int64_t *p; + u_int64_t m; + + memset(&tv, 0, sizeof(ThreadVars)); + + id1 = PerfRegisterCounter("t1", "c1", 100, TYPE_UINT64, NULL, &tv.pctx); + id2 = PerfRegisterCounter("t2", "c2", 100, TYPE_UINT64, NULL, &tv.pctx); + id3 = PerfRegisterCounter("t3", "c3", 100, TYPE_UINT64, NULL, &tv.pctx); + pca = PerfGetAllCountersArray(&tv.pctx); + + PerfCounterIncr(id1, pca); + PerfCounterAdd(id2, pca, 100); + PerfCounterIncr(id3, pca); + PerfCounterAdd(id3, pca, 100); + + PerfUpdateCounterArray(pca, &tv.pctx, 0); + + printf("%d\n", result); + p = (u_int64_t *)tv.pctx.head->value->cvalue; + m = *p; + result = (m == 1); + + p = (u_int64_t *)tv.pctx.head->next->value->cvalue; + result &= (*p == 100); + + p = (u_int64_t *)tv.pctx.head->next->next->value->cvalue; + result &= (*p == 101); + + return result; +} diff --git a/src/counters.h b/src/counters.h new file mode 100644 index 0000000000..4d5e8c25f2 --- /dev/null +++ b/src/counters.h @@ -0,0 +1,209 @@ +/* Anoop Saldanha */ + +#ifndef __COUNTERS_H__ +#define __COUNTERS_H__ + + +// Time interval for syncing the local counters with the global ones +#define WUT_TTS 3 +// Time interval at which the mgmt thread o/p the stats +#define MGMTT_TTS 10 + +#define PT_RUN 0x01 +#define PT_KILL 0x02 + +/* These 2 macros can only be used when all the registered counters for the tm, + * are in the counter array */ +#define PerfCounterIncr(id, pca) do { \ + if (!pca) { \ + printf("counterarray is NULL\n"); \ + break; \ + } \ + if ((id < 1) || (id > pca->size)) { \ + printf("counter doesn't exist\n"); \ + break; \ + } \ + pca->head[id].cnt++; \ + } while(0) + +#define PerfCounterAdd(id, pca, x) do { \ + if (!pca) { \ + printf("counterarray is NULL\n"); \ + break; \ + } \ + if ((id < 1) || (id > pca->size)) { \ + printf("counter doesn't exist\n"); \ + break; \ + } \ + pca->head[id].cnt += x; \ + } while(0) + +enum { + TYPE_UINT64, + TYPE_DOUBLE, + TYPE_STR, + TYPE_MAX, +}; + +enum { + IFACE_FILE, + IFACE_CONSOLE, + IFACE_NETWORK, + IFACE_SYSLOG, +}; + +/* Holds the thread context for the counter api */ +typedef struct _PerfThreadContext { + pthread_t wakeup_t; + pthread_t mgmt_t; + + /* state of the 2 threads, determined by PT_RUN AND PT_KILL */ + u_int32_t flags; +} PerfThreadContext; + +typedef struct _PerfCounterName { + char *cname; + char *tm_name; + int tid; +} PerfCounterName; + +typedef struct _PerfCounterValue { + void *cvalue; + u_int32_t size; + u_int32_t type; +} PerfCounterValue; + +/* Container to hold the counter variable */ +typedef struct _PerfCounter { + PerfCounterName *name; + PerfCounterValue *value; + + /* local id for this counter in this tm*/ + pthread_t id; + + char *desc; + + /* no of times the local counter has been synced with this counter */ + u_int64_t updated; + + /* the next perfcounter for this tv's tm instance */ + struct _PerfCounter *next; +} PerfCounter; + +/* Holds the Perf Context for a ThreadVars instance */ +typedef struct _PerfContext { + PerfCounter *head; + + /* flag set by the wakeup thread, to inform the client threads to sync */ + u_int32_t perf_flag; + u_int32_t curr_id; + + /* mutex to prevent simultaneous access during update_counter/output_stat */ + pthread_mutex_t m; +} PerfContext; + +/* PerfCounterArray(PCA) Node*/ +typedef struct _PCAElem { + u_int32_t id; + u_int32_t cnt; +} PCAElem; + +/* The PerfCounterArray */ +typedef struct _PerfCounterArray { + /* points to the array holding PCAElems */ + PCAElem *head; + + /* no of PCAElems in head */ + u_int32_t size; +} PerfCounterArray; + +/* Holds multiple instances of the same TM together, used when the stats + * have to be clubbed based on TM, before being sent out*/ +typedef struct _PerfClubTMInst { + char *tm_name; + + PerfContext **head; + u_int32_t size; + + struct _PerfClubTMInst *next; +} PerfClubTMInst; + +/* Holds the output interface context for the counter api */ +typedef struct _PerfOPIfaceContext { + u_int32_t iface; + char *file; + + /* more interfaces to be supported later. For now just a file */ + FILE *fp; + + u_int32_t club_tm; + + PerfClubTMInst *pctmi; + pthread_mutex_t pctmi_lock; +} PerfOPIfaceContext; + +void PerfInitCounterApi(void); + +void PerfInitOPCtx(void); + +void PerfSpawnThreads(void); + +void PerfDestroyThreads(void); + +void * PerfMgmtThread(void *); + +void * PerfWakeupThread(void *); + +u_int32_t PerfRegisterCounter(char *, char *, pthread_t, int, char *, + PerfContext *); + +void PerfAddToClubbedTMTable(char *, PerfContext *); + +PerfCounterArray * PerfGetCounterArrayRange(u_int32_t, u_int32_t, + PerfContext *); + +PerfCounterArray * PerfGetAllCountersArray(PerfContext *); + + +int PerfUpdateCounter(char *, char *, u_int32_t, void *, + PerfContext *); + +int PerfUpdateCounterArray(PerfCounterArray *, PerfContext *, int); + +void PerfOutputCounters(void); + +int PerfOutputCounterFileIface(void); + +void PerfReleaseResources(void); + +void PerfReleaseOPCtx(void); + +void PerfReleasePerfCounterS(PerfCounter *); + +void PerfReleaseCounter(PerfCounter *); + +void PerfReleasePCA(PerfCounterArray *); + +void PerfRegisterTests(void); + +int PerfTestCounterReg01(void); + +int PerfTestCounterReg02(void); + +int PerfTestCounterReg03(void); + +int PerfTestCounterReg04(void); + +int PerfTestGetCntArray05(void); + +int PerfTestGetCntArray06(void); + +int PerfTestCntArraySize07(void); + +int PerfTestUpdateCounter08(void); + +int PerfTestUpdateCounter09(void); + +int PerfTestUpdateGlobalCounter10(void); + +#endif /* __COUNTERS_H__ */ diff --git a/src/decode-ethernet.c b/src/decode-ethernet.c index 8d0bd22c57..b1f3d10e87 100644 --- a/src/decode-ethernet.c +++ b/src/decode-ethernet.c @@ -21,9 +21,11 @@ void DecodeEthernet(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len, Pack if (ntohs(ethh->eth_type) == ETHERNET_TYPE_IP) { //printf("DecodeEthernet ip4\n"); + PerfCounterIncr(DECODER_IPV4, t->pca); DecodeIPV4(t, p, pkt + ETHERNET_HEADER_LEN, len - ETHERNET_HEADER_LEN, pq); } else if(ntohs(ethh->eth_type) == ETHERNET_TYPE_IPV6) { //printf("DecodeEthernet ip6\n"); + PerfCounterIncr(DECODER_IPV6, t->pca); DecodeIPV6(t, p, pkt + ETHERNET_HEADER_LEN, len - ETHERNET_HEADER_LEN); } diff --git a/src/decode-ipv4.c b/src/decode-ipv4.c index 812ad16a2d..2bb296618b 100644 --- a/src/decode-ipv4.c +++ b/src/decode-ipv4.c @@ -91,18 +91,22 @@ void DecodeIPV4(ThreadVars *t, Packet *p, u_int8_t *pkt, u_int16_t len, PacketQu } break; case IPPROTO_TCP: + PerfCounterIncr(DECODER_TCP, t->pca); return(DecodeTCP(t, p, pkt + IPV4_GET_HLEN(p), IPV4_GET_IPLEN(p) - IPV4_GET_HLEN(p))); break; case IPPROTO_UDP: //printf("DecodeIPV4: next layer is UDP\n"); + PerfCounterIncr(DECODER_UDP, t->pca); return(DecodeUDP(t, p, pkt + IPV4_GET_HLEN(p), IPV4_GET_IPLEN(p) - IPV4_GET_HLEN(p))); break; case IPPROTO_ICMP: //printf("DecodeIPV4: next layer is ICMP\n"); + PerfCounterIncr(DECODER_ICMPV4, t->pca); return(DecodeICMPV4(t, p, pkt + IPV4_GET_HLEN(p), IPV4_GET_IPLEN(p) - IPV4_GET_HLEN(p))); break; case IPPROTO_IPV6: { + PerfCounterIncr(DECODER_ICMPV6, t->pca); if (pq != NULL) { //printf("DecodeIPV4: next layer is IPV6\n"); //printf("DecodeIPV4: we are p %p\n", p); diff --git a/src/eidps.c b/src/eidps.c index 23518ddf08..71eacf52b8 100644 --- a/src/eidps.c +++ b/src/eidps.c @@ -841,6 +841,7 @@ int main(int argc, char **argv) CIDRInit(); SigParsePrepare(); PatternMatchPrepare(mpm_ctx); + PerfInitCounterApi(); /* XXX we need an api for this */ L7AppDetectThreadInit(); @@ -882,6 +883,7 @@ int main(int argc, char **argv) MpmRegisterTests(); FlowBitRegisterTests(); SigRegisterTests(); + PerfRegisterTests(); DecodePPPRegisterTests(); if (argc > 1&& (strcmp(argv[1],"runtests") == 0)) { UtRunTests(); @@ -957,6 +959,8 @@ int main(int argc, char **argv) } TmThreadAppend(&tv_l7appdetect); + PerfSpawnThreads(); + while(1) { if (sigflags) { printf("signal received\n"); @@ -993,6 +997,7 @@ int main(int argc, char **argv) printf("time elapsed %lus\n", end_time.tv_sec - start_time.tv_sec); + PerfReleaseResources(); TmThreadKillThreads(); #if 0 #ifdef DBG_PERF diff --git a/src/source-pcap.c b/src/source-pcap.c index d1d93f9dd9..9dfeffc298 100644 --- a/src/source-pcap.c +++ b/src/source-pcap.c @@ -42,6 +42,7 @@ int ReceivePcapThreadInit(ThreadVars *, void *, void **); void ReceivePcapThreadExitStats(ThreadVars *, void *); int ReceivePcapThreadDeinit(ThreadVars *, void *); +int DecodePcapThreadInit(ThreadVars *, void *, void **); int DecodePcap(ThreadVars *, Packet *, void *, PacketQueue *); /** @@ -63,7 +64,7 @@ void TmModuleReceivePcapRegister (void) { */ void TmModuleDecodePcapRegister (void) { tmm_modules[TMM_DECODEPCAP].name = "DecodePcap"; - tmm_modules[TMM_DECODEPCAP].Init = NULL; + tmm_modules[TMM_DECODEPCAP].Init = DecodePcapThreadInit; tmm_modules[TMM_DECODEPCAP].Func = DecodePcap; tmm_modules[TMM_DECODEPCAP].ExitPrintStats = NULL; tmm_modules[TMM_DECODEPCAP].Deinit = NULL; @@ -284,15 +285,21 @@ int ReceivePcapThreadDeinit(ThreadVars *tv, void *data) { */ int DecodePcap(ThreadVars *t, Packet *p, void *data, PacketQueue *pq) { + PerfCounterIncr(DECODER_PKTS, t->pca); + PerfCounterAdd(DECODER_BYTES, t->pca, p->pktlen); + /* call the decoder */ switch(p->pcap_v.datalink) { case LINKTYPE_LINUX_SLL: + PerfCounterIncr(DECODER_SLL, t->pca); DecodeSll(t,p,p->pkt,p->pktlen,pq); break; case LINKTYPE_ETHERNET: + PerfCounterIncr(DECODER_ETH, t->pca); DecodeEthernet(t,p,p->pkt,p->pktlen,pq); break; case LINKTYPE_PPP: + PerfCounterIncr(DECODER_PPP, t->pca); DecodePPP(t,p,p->pkt,p->pktlen,pq); break; default: @@ -303,5 +310,40 @@ int DecodePcap(ThreadVars *t, Packet *p, void *data, PacketQueue *pq) { return 0; } +int DecodePcapThreadInit(ThreadVars *tv, void *initdata, void **data) +{ + pthread_t self_tid = pthread_self(); + + PerfRegisterCounter("decoder.pkts", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.bytes", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.ipv4", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.ipv6", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.ethernet", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.sll", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.tcp", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.udp", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.icmpv4", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.icmpv6", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + PerfRegisterCounter("decoder.ppp", "DecodePcap", self_tid, TYPE_UINT64, + "NULL", &tv->pctx); + + tv->pca = PerfGetAllCountersArray(&tv->pctx); + + PerfAddToClubbedTMTable("DecodePcap", &tv->pctx); + + return 0; +} + + /* eof */ diff --git a/src/source-pcap.h b/src/source-pcap.h index 3059d18eb5..3b3b8d8a68 100644 --- a/src/source-pcap.h +++ b/src/source-pcap.h @@ -11,6 +11,19 @@ void TmModuleDecodePcapRegister (void); #define LIBPCAP_COPYWAIT 500 #define LIBPCAP_PROMISC 1 +// The counter ids. In case you can't recollect the ids, use the counter name +#define DECODER_PKTS 1 +#define DECODER_BYTES 2 +#define DECODER_IPV4 3 +#define DECODER_IPV6 4 +#define DECODER_ETH 5 +#define DECODER_SLL 6 +#define DECODER_TCP 7 +#define DECODER_UDP 8 +#define DECODER_ICMPV4 9 +#define DECODER_ICMPV6 10 +#define DECODER_PPP 11 + /* per packet Pcap vars */ typedef struct PcapPacketVars_ { diff --git a/src/threadvars.h b/src/threadvars.h index 97db9b81dc..2387095fd4 100644 --- a/src/threadvars.h +++ b/src/threadvars.h @@ -6,6 +6,7 @@ //#include "source-nfq.h" #include "util-mpm.h" #include "tm-queues.h" +#include "counters.h" #define THV_USE 0x01 #define THV_KILL 0x02 @@ -31,6 +32,9 @@ typedef struct ThreadVars_ { char set_cpu_affinity; /* bool: 0 no, 1 yes */ int cpu_affinity; /* cpu or core to set affinity to */ + PerfContext pctx; + PerfCounterArray *pca; + struct ThreadVars_ *next; struct ThreadVars_ *prev; } ThreadVars; diff --git a/src/tm-threads.c b/src/tm-threads.c index b043522bd0..db6b6f4f89 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -17,7 +17,7 @@ static int SetCPUAffinity(int cpu); /* root of the threadvars list */ -static ThreadVars *tv_root; +ThreadVars *tv_root = NULL; typedef struct TmSlot_ { /* function pointers */ diff --git a/src/tm-threads.h b/src/tm-threads.h index c1e52028bc..202b3b0b96 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -1,6 +1,8 @@ #ifndef __TM_THREADS_H__ #define __TM_THREADS_H__ +extern ThreadVars *tv_root; + void Tm1SlotSetFunc(ThreadVars *, TmModule *, void *); void Tm2SlotSetFunc1(ThreadVars *, TmModule *, void *); void Tm2SlotSetFunc2(ThreadVars *, TmModule *, void *); diff --git a/src/tmqh-simple.c b/src/tmqh-simple.c index 6daa91dd77..c32f283916 100644 --- a/src/tmqh-simple.c +++ b/src/tmqh-simple.c @@ -25,6 +25,10 @@ Packet *TmqhInputSimple(ThreadVars *t) /* if we have no packets in queue, wait... */ pthread_cond_wait(&q->cond_q, &q->mutex_q); } + + if (t->pctx.perf_flag == 1) + PerfUpdateCounterArray(t->pca, &t->pctx, 0); + if (q->len > 0) { Packet *p = PacketDequeue(q); mutex_unlock(&q->mutex_q);