diff --git a/src/decode.h b/src/decode.h index 2811c6964a..ee245aaa3d 100644 --- a/src/decode.h +++ b/src/decode.h @@ -203,6 +203,9 @@ typedef struct PktVar_ { uint16_t value_len; } PktVar; +/* forward declartion since Packet struct definition requires this */ +struct PacketQueue_; + typedef struct Packet_ { /* Addresses, Ports and protocol @@ -326,44 +329,16 @@ typedef struct Packet_ MpmCtx *cuda_mpm_ctx; MpmThreadCtx *cuda_mtc; - /* this mutex corresponds to the condition variable defined below it. - * this mutex would be used for the search phase of the mpm */ - SCMutex cuda_search_mutex_q; - /* we need this condition variable so that the cuda dispatcher thread - * can inform the client threads, when they are done with the pattern - * matching for the search phase */ - SCCondT cuda_search_cond_q; - - /* this mutex corresponds to the condition variable defined below it. - * this mutex would be used for the scan phase of the mpm */ - SCMutex cuda_scan_mutex_q; - /* we need this condition variable so that the cuda dispatcher thread - * can inform the client threads, when they are done with the pattern - * matching for the scan phase*/ - SCCondT cuda_scan_cond_q; - /* used to hold the match results. We can instead use a void *result * instead here. That way we can make them hold any result. *todo* */ uint16_t cuda_matches; - /* the client thread uses this flag to inform the dispatcher that it - * has woken up and has retrieved the results and that the dispatcher - * can now continue its operations */ - uint8_t cuda_done; /* indicates if the dispatcher should call the search or the scan phase * of the pattern matcher. We can instead use a void *cuda_data instead. * This way we can send any data across to the dispatcher */ uint8_t cuda_search; - /* indicates if the dispatcher should free this packet it has received. - * For some modules which don't send a packet to the dispatcher, we will - * have to create dummy Packets, fill them with data and send them over - * to the dispatcher. The callling thread can't free the Packet once - * it gets the results, since the dispatcher might still be accessing - * the final stages of the packets. Instead we make the dispatcher - * free the packet, if we want the packet to be destoryed after it has - * been used. A probable *todo* would be using a static Packet, in the - * the modules that requires a dummy Packet. That way we are not forced - * to create a new Packet every time using the expensive malloc() */ - uint8_t cuda_free_packet; + /* the dispatcher thread would pump the packet into this queue once it has + * processed the packet */ + struct PacketQueue_ *cuda_outq; #endif } Packet; diff --git a/src/detect-engine-mpm.c b/src/detect-engine-mpm.c index aca2e6f981..b319ab2d85 100644 --- a/src/detect-engine-mpm.c +++ b/src/detect-engine-mpm.c @@ -1,5 +1,6 @@ /* Multi pattern matcher */ +#include "suricata.h" #include "suricata-common.h" #include "decode.h" @@ -19,6 +20,8 @@ #include "detect-uricontent.h" #include "util-mpm-b2g-cuda.h" +#include "tmqh-simple.h" + #include "util-enum.h" #include "util-debug.h" @@ -88,18 +91,31 @@ uint32_t PacketPatternScan(ThreadVars *tv, DetectEngineThreadCtx *det_ctx, p->payload_len); SCReturnInt(ret); #else - p->cuda_done = 0; + /* if the user has enabled cuda support, but is not using the cuda mpm + * algo, then we shouldn't take the path of the dispatcher. Call the mpm + * directly */ + if (det_ctx->sgh->mpm_ctx->mpm_type != MPM_B2G_CUDA) { + uint32_t ret; + ret = mpm_table[det_ctx->sgh->mpm_ctx->mpm_type].Scan(det_ctx->sgh->mpm_ctx, + &det_ctx->mtc, + &det_ctx->pmq, + p->payload, + p->payload_len); + SCReturnInt(ret); + } + p->cuda_search = 0; - p->cuda_free_packet = 0; p->cuda_mpm_ctx = det_ctx->sgh->mpm_ctx; p->cuda_mtc = &det_ctx->mtc; p->cuda_pmq = &det_ctx->pmq; + /* this outq is unique to this detection thread instance. The dispatcher thread + * would use this queue to pump the packets back to this detection thread once + * it has processed the packet */ + p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]; B2gCudaPushPacketTo_tv_CMB2_RC(p); - SCMutexLock(&p->cuda_scan_mutex_q); - SCondWait(&p->cuda_scan_cond_q, &p->cuda_scan_mutex_q); - p->cuda_done = 1; - SCMutexUnlock(&p->cuda_scan_mutex_q); - SCReturnInt(p->cuda_matches); + /* wait on the detection thread instance */ + Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]); + SCReturnInt(out_p->cuda_matches); #endif } @@ -124,6 +140,16 @@ uint32_t UriPatternScan(ThreadVars *tv, DetectEngineThreadCtx *det_ctx, uri, uri_len); SCReturnInt(ret); #else + /* if the user has enabled cuda support, but is not using the cuda mpm + * algo, then we shouldn't take the path of the dispatcher. Call the mpm + * directly */ + if (det_ctx->sgh->mpm_uri_ctx->mpm_type != MPM_B2G_CUDA) { + ret = mpm_table[det_ctx->sgh->mpm_uri_ctx->mpm_type].Scan + (det_ctx->sgh->mpm_uri_ctx, &det_ctx->mtcu, &det_ctx->pmq, + uri, uri_len); + SCReturnInt(ret); + } + Packet *p = malloc(sizeof(Packet)); if (p == NULL) { SCLogError(SC_ERR_MEM_ALLOC, "Error allocating memory"); @@ -131,23 +157,19 @@ uint32_t UriPatternScan(ThreadVars *tv, DetectEngineThreadCtx *det_ctx, } memset(p, 0, sizeof(Packet)); - SCMutexInit(&p->cuda_scan_mutex_q, NULL); - SCCondInit(&p->cuda_scan_cond_q, NULL); - - //p->cuda_done = 0; - p->cuda_free_packet = 1; - //p->cuda_search = 0; p->cuda_mpm_ctx = det_ctx->sgh->mpm_uri_ctx; p->cuda_mtc = &det_ctx->mtcu; p->cuda_pmq = &det_ctx->pmq; p->payload = uri; p->payload_len = uri_len; + /* this outq is unique to this detection thread instance. The dispatcher thread + * would use this queue to pump the packets back to this detection thread once + * it has processed the packet */ + p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]; B2gCudaPushPacketTo_tv_CMB2_RC(p); - SCMutexLock(&p->cuda_scan_mutex_q); - SCondWait(&p->cuda_scan_cond_q, &p->cuda_scan_mutex_q); - SCMutexUnlock(&p->cuda_scan_mutex_q); - ret = p->cuda_matches; - p->cuda_done = 1; + Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]); + ret = out_p->cuda_matches; + free(p); SCReturnInt(ret); #endif } @@ -173,17 +195,30 @@ uint32_t PacketPatternMatch(ThreadVars *tv, DetectEngineThreadCtx *det_ctx, p->payload_len); SCReturnInt(ret); #else + /* if the user has enabled cuda support, but is not using the cuda mpm + * algo, then we shouldn't take the path of the dispatcher. Call the mpm + * directly */ + if (det_ctx->sgh->mpm_ctx->mpm_type != MPM_B2G_CUDA) { + uint32_t ret; + ret = mpm_table[det_ctx->sgh->mpm_ctx->mpm_type].Search(det_ctx->sgh->mpm_ctx, + &det_ctx->mtc, + &det_ctx->pmq, + p->payload, + p->payload_len); + SCReturnInt(ret); + } + p->cuda_search = 1; - p->cuda_free_packet = 0; p->cuda_mpm_ctx = det_ctx->sgh->mpm_ctx; p->cuda_mtc = &det_ctx->mtc; p->cuda_pmq = &det_ctx->pmq; - SCMutexLock(&p->cuda_search_mutex_q); + /* this outq is unique to this detection thread instance. The dispatcher thread + * would use this queue to pump the packets back to this detection thread once + * it has processed the packet */ + p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]; B2gCudaPushPacketTo_tv_CMB2_RC(p); - SCondWait(&p->cuda_search_cond_q, &p->cuda_search_mutex_q); - p->cuda_done = 1; - SCMutexUnlock(&p->cuda_search_mutex_q); - SCReturnInt(p->cuda_matches); + Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]); + SCReturnInt(out_p->cuda_matches); #endif } @@ -206,6 +241,16 @@ uint32_t UriPatternMatch(ThreadVars *tv, DetectEngineThreadCtx *det_ctx, uri_len); SCReturnInt(ret); #else + /* if the user has enabled cuda support, but is not using the cuda mpm + * algo, then we shouldn't take the path of the dispatcher. Call the mpm + * directly */ + if (det_ctx->sgh->mpm_uri_ctx->mpm_type != MPM_B2G_CUDA) { + ret = mpm_table[det_ctx->sgh->mpm_uri_ctx->mpm_type].Search + (det_ctx->sgh->mpm_uri_ctx, &det_ctx->mtcu, &det_ctx->pmq, uri, + uri_len); + SCReturnInt(ret); + } + Packet *p = malloc(sizeof(Packet)); if (p == NULL) { SCLogError(SC_ERR_MEM_ALLOC, "Error allocating memory"); @@ -213,23 +258,20 @@ uint32_t UriPatternMatch(ThreadVars *tv, DetectEngineThreadCtx *det_ctx, } memset(p, 0, sizeof(Packet)); - SCMutexInit(&p->cuda_search_mutex_q, NULL); - SCCondInit(&p->cuda_search_cond_q, NULL); - - //p->cuda_done = 0; - p->cuda_free_packet = 1; p->cuda_search = 1; p->cuda_mpm_ctx = det_ctx->sgh->mpm_uri_ctx; p->cuda_mtc = &det_ctx->mtcu; p->cuda_pmq = &det_ctx->pmq; p->payload = uri; p->payload_len = uri_len; + /* this outq is unique to this detection thread instance. The dispatcher thread + * would use this queue to pump the packets back to this detection thread once + * it has processed the packet */ + p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]; B2gCudaPushPacketTo_tv_CMB2_RC(p); - SCMutexLock(&p->cuda_search_mutex_q); - SCondWait(&p->cuda_search_cond_q, &p->cuda_search_mutex_q); - SCMutexUnlock(&p->cuda_search_mutex_q); - ret = p->cuda_matches; - p->cuda_done = 1; + Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]); + ret = out_p->cuda_matches; + free(p); SCReturnInt(ret); #endif diff --git a/src/detect-engine.c b/src/detect-engine.c index 05d0a5eb2c..a4e6cd6b05 100644 --- a/src/detect-engine.c +++ b/src/detect-engine.c @@ -22,6 +22,7 @@ #include "detect-engine-threshold.h" //#include "util-mpm.h" +#include "util-error.h" #include "util-hash.h" #include "util-debug.h" @@ -135,7 +136,50 @@ TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data) { SCPerfAddToClubbedTMTable(tv->name, &tv->sc_perf_pctx); *data = (void *)det_ctx; - //printf("DetectEngineThreadCtxInit: data %p det_ctx %p\n", *data, det_ctx); + +#ifdef __SC_CUDA_SUPPORT__ + if (PatternMatchDefaultMatcher() != MPM_B2G_CUDA) + return TM_ECODE_OK; + + Tmq *tmq; + /* we would prepend this name to the the tv name, to obtain the final unique + * detection thread queue name */ + char *cuda_outq_name = "cuda_mpm_rc_disp_outq"; + uint8_t disp_outq_name_len = (strlen(tv->name) + strlen(cuda_outq_name) + 1); + + char *disp_outq_name = malloc(disp_outq_name_len * sizeof(char)); + if (disp_outq_name == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, "Error allocating memory"); + exit(EXIT_FAILURE); + } + strcpy(disp_outq_name, tv->name); + strcpy(disp_outq_name + strlen(tv->name), cuda_outq_name); + disp_outq_name[disp_outq_name_len] = '\0'; + + tmq = TmqGetQueueByName(disp_outq_name); + if (tmq != NULL) { + SCLogError(SC_ERR_TMQ_ALREADY_REGISTERED, "A queue by the name \"%s\" " + "is already registered, which shouldn't be the case. Queue " + "name is duplicated. Please check if multiple instances of " + "detection module are given different names ", + disp_outq_name); + goto error; + } + tmq = TmqCreateQueue(disp_outq_name); + if (tmq == NULL) + goto error; + + /* hold the queue instane we create under this detection thread instance */ + det_ctx->cuda_mpm_rc_disp_outq = tmq; + det_ctx->cuda_mpm_rc_disp_outq->reader_cnt++; + det_ctx->cuda_mpm_rc_disp_outq->writer_cnt++; + + return TM_ECODE_OK; + + error: + return TM_ECODE_FAILED; +#endif + return TM_ECODE_OK; } diff --git a/src/detect.h b/src/detect.h index 641aebb1c2..505466b974 100644 --- a/src/detect.h +++ b/src/detect.h @@ -355,6 +355,12 @@ typedef struct DetectionEngineThreadCtx_ { DetectEngineIPOnlyThreadCtx io_ctx; DetectEngineCtx *de_ctx; + +#ifdef __SC_CUDA_SUPPORT__ + /* each detection thread would have it's own queue where the cuda dispatcher + * thread can dump the packets once it has processed them */ + Tmq *cuda_mpm_rc_disp_outq; +#endif } DetectEngineThreadCtx; /** \brief a single match condition for a signature */ diff --git a/src/suricata.c b/src/suricata.c index 29bd113fd9..1867b38085 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -194,13 +194,6 @@ Packet *SetupPkt (void) r = SCMutexInit(&p->mutex_rtv_cnt, NULL); -#ifdef __SC_CUDA_SUPPORT__ - SCMutexInit(&p->cuda_scan_mutex_q, NULL); - SCCondInit(&p->cuda_scan_cond_q, NULL); - - SCMutexInit(&p->cuda_search_mutex_q, NULL); - SCCondInit(&p->cuda_search_cond_q, NULL); -#endif SCLogDebug("allocated a new packet..."); } @@ -780,13 +773,6 @@ int main(int argc, char **argv) } memset(p, 0, sizeof(Packet)); SCMutexInit(&p->mutex_rtv_cnt, NULL); -#ifdef __SC_CUDA_SUPPORT__ - SCMutexInit(&p->cuda_scan_mutex_q, NULL); - SCCondInit(&p->cuda_scan_cond_q, NULL); - - SCMutexInit(&p->cuda_search_mutex_q, NULL); - SCCondInit(&p->cuda_search_cond_q, NULL); -#endif PacketEnqueue(&packet_q,p); } @@ -849,9 +835,11 @@ int main(int argc, char **argv) TmThreadPrioSummary("Suricata main()"); #ifdef __SC_CUDA_SUPPORT__ - /* start the dispatcher thread for this module */ - if (B2gCudaStartDispatcherThreadRC("SC_RULES_CONTENT_B2G_CUDA") == -1) - exit(EXIT_FAILURE); + if (PatternMatchDefaultMatcher() == MPM_B2G_CUDA) { + /* start the dispatcher thread for this module */ + if (B2gCudaStartDispatcherThreadRC("SC_RULES_CONTENT_B2G_CUDA") == -1) + exit(EXIT_FAILURE); + } #endif /* Spawn the flow manager thread */ diff --git a/src/tmqh-simple.c b/src/tmqh-simple.c index 4e0bf541ad..18ecdbf56c 100644 --- a/src/tmqh-simple.c +++ b/src/tmqh-simple.c @@ -50,3 +50,51 @@ void TmqhOutputSimple(ThreadVars *t, Packet *p) SCMutexUnlock(&q->mutex_q); } +/** + * \brief Public version of TmqhInputSimple from the tmqh-simple queue + * handler, except that it is a generic version that is directly + * tied to a PacketQueue instance. + * + * Retrieves a packet from the queue. If the queue is empty, it waits + * on the queue, till a packet is enqueued into the queue. + * + * \param q The PacketQueue instance to wait on. + * + * \retval p The returned packet from the queue. + */ +Packet *TmqhInputSimpleOnQ(PacketQueue *q) +{ + SCMutexLock(&q->mutex_q); + if (q->len == 0) { + /* if we have no packets in queue, wait... */ + SCondWait(&q->cond_q, &q->mutex_q); + } + + if (q->len > 0) { + Packet *p = PacketDequeue(q); + SCMutexUnlock(&q->mutex_q); + return p; + } else { + /* return NULL if we have no pkt. Should only happen on signals. */ + SCMutexUnlock(&q->mutex_q); + return NULL; + } +} + +/** + * \brief Public version of TmqhOutputSimple from the tmqh-simple queue + * handler, except that it is a generic version that is directly + * tied to a PacketQueue instance. + * + * Enqueues a packet into the packet queue. + * + * \param q The PacketQueue instance to enqueue the packet into. + * \param p The packet to be enqueued into the above queue. + */ +void TmqhOutputSimpleOnQ(PacketQueue *q, Packet *p) +{ + SCMutexLock(&q->mutex_q); + PacketEnqueue(q, p); + SCCondSignal(&q->cond_q); + SCMutexUnlock(&q->mutex_q); +} diff --git a/src/tmqh-simple.h b/src/tmqh-simple.h index a70f88e0f4..804e393746 100644 --- a/src/tmqh-simple.h +++ b/src/tmqh-simple.h @@ -3,6 +3,8 @@ #ifndef __TMQH_SIMPLE_H__ #define __TMQH_SIMPLE_H__ +Packet *TmqhInputSimpleOnQ(PacketQueue *); +void TmqhOutputSimpleOnQ(PacketQueue *, Packet *); void TmqhSimpleRegister (void); #endif /* __TMQH_SIMPLE_H__ */ diff --git a/src/util-error.c b/src/util-error.c index 2322661082..b62261a283 100644 --- a/src/util-error.c +++ b/src/util-error.c @@ -112,6 +112,7 @@ const char * SCErrorToString(SCError err) CASE_CODE (SC_ERR_TM_MODULES_ERROR); CASE_CODE (SC_ERR_B2G_CUDA_ERROR); CASE_CODE (SC_ERR_INVALID_YAML_CONF_ENTRY); + CASE_CODE (SC_ERR_TMQ_ALREADY_REGISTERED); default: return "UNKNOWN_ERROR"; } diff --git a/src/util-error.h b/src/util-error.h index 8f5c78154e..ca9a9ddbd0 100644 --- a/src/util-error.h +++ b/src/util-error.h @@ -129,6 +129,7 @@ typedef enum { SC_ERR_TM_MODULES_ERROR, SC_ERR_B2G_CUDA_ERROR, SC_ERR_INVALID_YAML_CONF_ENTRY, + SC_ERR_TMQ_ALREADY_REGISTERED, } SCError; const char *SCErrorToString(SCError); diff --git a/src/util-mpm-b2g-cuda.c b/src/util-mpm-b2g-cuda.c index 62f1470472..14bb16d557 100644 --- a/src/util-mpm-b2g-cuda.c +++ b/src/util-mpm-b2g-cuda.c @@ -29,6 +29,7 @@ #include "util-cuda.h" #include "tm-threads.h" #include "threads.h" +#include "tmqh-simple.h" /* macros decides if cuda is enabled for the platform or not */ #ifdef __SC_CUDA_SUPPORT__ @@ -1746,7 +1747,7 @@ uint32_t B2gCudaScanBNDMq(MpmCtx *mpm_ctx, MpmThreadCtx *mpm_thread_ctx, int i = 0; int host_offsets[UINT16_MAX]; - if (buflen < ctx->search_m) + if (buflen < ctx->scan_m) return 0; if (SCCudaMemAlloc(&cuda_buf, buflen * sizeof(char)) == -1) { @@ -2540,37 +2541,15 @@ TmEcode B2gCudaMpmDispatcher(ThreadVars *tv, Packet *p, void *data, p->cuda_pmq, p->payload, p->payload_len); - /* signal the client that the result is ready */ - SCCondSignal(&p->cuda_search_cond_q); - /* wait for the client indication that it has read the results. If the - * client still hasn't sent the indication, signal it again and do so - * every 50 microseconds */ - while (p->cuda_done == 0) { - SCCondSignal(&p->cuda_search_cond_q); - usleep(50); - } } else { p->cuda_matches = mpm_table[p->cuda_mpm_ctx->mpm_type].Scan(p->cuda_mpm_ctx, p->cuda_mtc, p->cuda_pmq, p->payload, p->payload_len); - /* signal the client that the result is ready */ - SCCondSignal(&p->cuda_scan_cond_q); - /* wait for the client indication that it has read the results. If the - * client still hasn't sent the indication, signal it again and do so - * every 50 microseconds */ - while (p->cuda_done == 0) { - SCCondSignal(&p->cuda_scan_cond_q); - usleep(50); - } } - p->cuda_done = 0; - - if (p->cuda_free_packet == 1) { - free(p); - } + TmqhOutputSimpleOnQ(p->cuda_outq, p); return TM_ECODE_OK; }