Changed the way cuda dispatcher passes back results. Now each detection thread has it's own queue to which the dispatcher can pump packets back to the detect thread. Also, with cuda enabled and a non-cuda mpm being used, we won't create a dispatcher and instead call the b2g scan/search funtions directly instead of using the dispatcher.

remotes/origin/master-1.0.x
Anoop Saldanha 16 years ago committed by Victor Julien
parent c26e92733d
commit 8cf60d6645

@ -203,6 +203,9 @@ typedef struct PktVar_ {
uint16_t value_len;
} PktVar;
/* forward declartion since Packet struct definition requires this */
struct PacketQueue_;
typedef struct Packet_
{
/* Addresses, Ports and protocol
@ -326,44 +329,16 @@ typedef struct Packet_
MpmCtx *cuda_mpm_ctx;
MpmThreadCtx *cuda_mtc;
/* this mutex corresponds to the condition variable defined below it.
* this mutex would be used for the search phase of the mpm */
SCMutex cuda_search_mutex_q;
/* we need this condition variable so that the cuda dispatcher thread
* can inform the client threads, when they are done with the pattern
* matching for the search phase */
SCCondT cuda_search_cond_q;
/* this mutex corresponds to the condition variable defined below it.
* this mutex would be used for the scan phase of the mpm */
SCMutex cuda_scan_mutex_q;
/* we need this condition variable so that the cuda dispatcher thread
* can inform the client threads, when they are done with the pattern
* matching for the scan phase*/
SCCondT cuda_scan_cond_q;
/* used to hold the match results. We can instead use a void *result
* instead here. That way we can make them hold any result. *todo* */
uint16_t cuda_matches;
/* the client thread uses this flag to inform the dispatcher that it
* has woken up and has retrieved the results and that the dispatcher
* can now continue its operations */
uint8_t cuda_done;
/* indicates if the dispatcher should call the search or the scan phase
* of the pattern matcher. We can instead use a void *cuda_data instead.
* This way we can send any data across to the dispatcher */
uint8_t cuda_search;
/* indicates if the dispatcher should free this packet it has received.
* For some modules which don't send a packet to the dispatcher, we will
* have to create dummy Packets, fill them with data and send them over
* to the dispatcher. The callling thread can't free the Packet once
* it gets the results, since the dispatcher might still be accessing
* the final stages of the packets. Instead we make the dispatcher
* free the packet, if we want the packet to be destoryed after it has
* been used. A probable *todo* would be using a static Packet, in the
* the modules that requires a dummy Packet. That way we are not forced
* to create a new Packet every time using the expensive malloc() */
uint8_t cuda_free_packet;
/* the dispatcher thread would pump the packet into this queue once it has
* processed the packet */
struct PacketQueue_ *cuda_outq;
#endif
} Packet;

@ -1,5 +1,6 @@
/* Multi pattern matcher */
#include "suricata.h"
#include "suricata-common.h"
#include "decode.h"
@ -19,6 +20,8 @@
#include "detect-uricontent.h"
#include "util-mpm-b2g-cuda.h"
#include "tmqh-simple.h"
#include "util-enum.h"
#include "util-debug.h"
@ -88,18 +91,31 @@ uint32_t PacketPatternScan(ThreadVars *tv, DetectEngineThreadCtx *det_ctx,
p->payload_len);
SCReturnInt(ret);
#else
p->cuda_done = 0;
/* if the user has enabled cuda support, but is not using the cuda mpm
* algo, then we shouldn't take the path of the dispatcher. Call the mpm
* directly */
if (det_ctx->sgh->mpm_ctx->mpm_type != MPM_B2G_CUDA) {
uint32_t ret;
ret = mpm_table[det_ctx->sgh->mpm_ctx->mpm_type].Scan(det_ctx->sgh->mpm_ctx,
&det_ctx->mtc,
&det_ctx->pmq,
p->payload,
p->payload_len);
SCReturnInt(ret);
}
p->cuda_search = 0;
p->cuda_free_packet = 0;
p->cuda_mpm_ctx = det_ctx->sgh->mpm_ctx;
p->cuda_mtc = &det_ctx->mtc;
p->cuda_pmq = &det_ctx->pmq;
/* this outq is unique to this detection thread instance. The dispatcher thread
* would use this queue to pump the packets back to this detection thread once
* it has processed the packet */
p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id];
B2gCudaPushPacketTo_tv_CMB2_RC(p);
SCMutexLock(&p->cuda_scan_mutex_q);
SCondWait(&p->cuda_scan_cond_q, &p->cuda_scan_mutex_q);
p->cuda_done = 1;
SCMutexUnlock(&p->cuda_scan_mutex_q);
SCReturnInt(p->cuda_matches);
/* wait on the detection thread instance */
Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]);
SCReturnInt(out_p->cuda_matches);
#endif
}
@ -124,6 +140,16 @@ uint32_t UriPatternScan(ThreadVars *tv, DetectEngineThreadCtx *det_ctx,
uri, uri_len);
SCReturnInt(ret);
#else
/* if the user has enabled cuda support, but is not using the cuda mpm
* algo, then we shouldn't take the path of the dispatcher. Call the mpm
* directly */
if (det_ctx->sgh->mpm_uri_ctx->mpm_type != MPM_B2G_CUDA) {
ret = mpm_table[det_ctx->sgh->mpm_uri_ctx->mpm_type].Scan
(det_ctx->sgh->mpm_uri_ctx, &det_ctx->mtcu, &det_ctx->pmq,
uri, uri_len);
SCReturnInt(ret);
}
Packet *p = malloc(sizeof(Packet));
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "Error allocating memory");
@ -131,23 +157,19 @@ uint32_t UriPatternScan(ThreadVars *tv, DetectEngineThreadCtx *det_ctx,
}
memset(p, 0, sizeof(Packet));
SCMutexInit(&p->cuda_scan_mutex_q, NULL);
SCCondInit(&p->cuda_scan_cond_q, NULL);
//p->cuda_done = 0;
p->cuda_free_packet = 1;
//p->cuda_search = 0;
p->cuda_mpm_ctx = det_ctx->sgh->mpm_uri_ctx;
p->cuda_mtc = &det_ctx->mtcu;
p->cuda_pmq = &det_ctx->pmq;
p->payload = uri;
p->payload_len = uri_len;
/* this outq is unique to this detection thread instance. The dispatcher thread
* would use this queue to pump the packets back to this detection thread once
* it has processed the packet */
p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id];
B2gCudaPushPacketTo_tv_CMB2_RC(p);
SCMutexLock(&p->cuda_scan_mutex_q);
SCondWait(&p->cuda_scan_cond_q, &p->cuda_scan_mutex_q);
SCMutexUnlock(&p->cuda_scan_mutex_q);
ret = p->cuda_matches;
p->cuda_done = 1;
Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]);
ret = out_p->cuda_matches;
free(p);
SCReturnInt(ret);
#endif
}
@ -173,17 +195,30 @@ uint32_t PacketPatternMatch(ThreadVars *tv, DetectEngineThreadCtx *det_ctx,
p->payload_len);
SCReturnInt(ret);
#else
/* if the user has enabled cuda support, but is not using the cuda mpm
* algo, then we shouldn't take the path of the dispatcher. Call the mpm
* directly */
if (det_ctx->sgh->mpm_ctx->mpm_type != MPM_B2G_CUDA) {
uint32_t ret;
ret = mpm_table[det_ctx->sgh->mpm_ctx->mpm_type].Search(det_ctx->sgh->mpm_ctx,
&det_ctx->mtc,
&det_ctx->pmq,
p->payload,
p->payload_len);
SCReturnInt(ret);
}
p->cuda_search = 1;
p->cuda_free_packet = 0;
p->cuda_mpm_ctx = det_ctx->sgh->mpm_ctx;
p->cuda_mtc = &det_ctx->mtc;
p->cuda_pmq = &det_ctx->pmq;
SCMutexLock(&p->cuda_search_mutex_q);
/* this outq is unique to this detection thread instance. The dispatcher thread
* would use this queue to pump the packets back to this detection thread once
* it has processed the packet */
p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id];
B2gCudaPushPacketTo_tv_CMB2_RC(p);
SCondWait(&p->cuda_search_cond_q, &p->cuda_search_mutex_q);
p->cuda_done = 1;
SCMutexUnlock(&p->cuda_search_mutex_q);
SCReturnInt(p->cuda_matches);
Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]);
SCReturnInt(out_p->cuda_matches);
#endif
}
@ -206,6 +241,16 @@ uint32_t UriPatternMatch(ThreadVars *tv, DetectEngineThreadCtx *det_ctx,
uri_len);
SCReturnInt(ret);
#else
/* if the user has enabled cuda support, but is not using the cuda mpm
* algo, then we shouldn't take the path of the dispatcher. Call the mpm
* directly */
if (det_ctx->sgh->mpm_uri_ctx->mpm_type != MPM_B2G_CUDA) {
ret = mpm_table[det_ctx->sgh->mpm_uri_ctx->mpm_type].Search
(det_ctx->sgh->mpm_uri_ctx, &det_ctx->mtcu, &det_ctx->pmq, uri,
uri_len);
SCReturnInt(ret);
}
Packet *p = malloc(sizeof(Packet));
if (p == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "Error allocating memory");
@ -213,23 +258,20 @@ uint32_t UriPatternMatch(ThreadVars *tv, DetectEngineThreadCtx *det_ctx,
}
memset(p, 0, sizeof(Packet));
SCMutexInit(&p->cuda_search_mutex_q, NULL);
SCCondInit(&p->cuda_search_cond_q, NULL);
//p->cuda_done = 0;
p->cuda_free_packet = 1;
p->cuda_search = 1;
p->cuda_mpm_ctx = det_ctx->sgh->mpm_uri_ctx;
p->cuda_mtc = &det_ctx->mtcu;
p->cuda_pmq = &det_ctx->pmq;
p->payload = uri;
p->payload_len = uri_len;
/* this outq is unique to this detection thread instance. The dispatcher thread
* would use this queue to pump the packets back to this detection thread once
* it has processed the packet */
p->cuda_outq = &trans_q[det_ctx->cuda_mpm_rc_disp_outq->id];
B2gCudaPushPacketTo_tv_CMB2_RC(p);
SCMutexLock(&p->cuda_search_mutex_q);
SCondWait(&p->cuda_search_cond_q, &p->cuda_search_mutex_q);
SCMutexUnlock(&p->cuda_search_mutex_q);
ret = p->cuda_matches;
p->cuda_done = 1;
Packet *out_p = TmqhInputSimpleOnQ(&trans_q[det_ctx->cuda_mpm_rc_disp_outq->id]);
ret = out_p->cuda_matches;
free(p);
SCReturnInt(ret);
#endif

@ -22,6 +22,7 @@
#include "detect-engine-threshold.h"
//#include "util-mpm.h"
#include "util-error.h"
#include "util-hash.h"
#include "util-debug.h"
@ -135,7 +136,50 @@ TmEcode DetectEngineThreadCtxInit(ThreadVars *tv, void *initdata, void **data) {
SCPerfAddToClubbedTMTable(tv->name, &tv->sc_perf_pctx);
*data = (void *)det_ctx;
//printf("DetectEngineThreadCtxInit: data %p det_ctx %p\n", *data, det_ctx);
#ifdef __SC_CUDA_SUPPORT__
if (PatternMatchDefaultMatcher() != MPM_B2G_CUDA)
return TM_ECODE_OK;
Tmq *tmq;
/* we would prepend this name to the the tv name, to obtain the final unique
* detection thread queue name */
char *cuda_outq_name = "cuda_mpm_rc_disp_outq";
uint8_t disp_outq_name_len = (strlen(tv->name) + strlen(cuda_outq_name) + 1);
char *disp_outq_name = malloc(disp_outq_name_len * sizeof(char));
if (disp_outq_name == NULL) {
SCLogError(SC_ERR_MEM_ALLOC, "Error allocating memory");
exit(EXIT_FAILURE);
}
strcpy(disp_outq_name, tv->name);
strcpy(disp_outq_name + strlen(tv->name), cuda_outq_name);
disp_outq_name[disp_outq_name_len] = '\0';
tmq = TmqGetQueueByName(disp_outq_name);
if (tmq != NULL) {
SCLogError(SC_ERR_TMQ_ALREADY_REGISTERED, "A queue by the name \"%s\" "
"is already registered, which shouldn't be the case. Queue "
"name is duplicated. Please check if multiple instances of "
"detection module are given different names ",
disp_outq_name);
goto error;
}
tmq = TmqCreateQueue(disp_outq_name);
if (tmq == NULL)
goto error;
/* hold the queue instane we create under this detection thread instance */
det_ctx->cuda_mpm_rc_disp_outq = tmq;
det_ctx->cuda_mpm_rc_disp_outq->reader_cnt++;
det_ctx->cuda_mpm_rc_disp_outq->writer_cnt++;
return TM_ECODE_OK;
error:
return TM_ECODE_FAILED;
#endif
return TM_ECODE_OK;
}

@ -355,6 +355,12 @@ typedef struct DetectionEngineThreadCtx_ {
DetectEngineIPOnlyThreadCtx io_ctx;
DetectEngineCtx *de_ctx;
#ifdef __SC_CUDA_SUPPORT__
/* each detection thread would have it's own queue where the cuda dispatcher
* thread can dump the packets once it has processed them */
Tmq *cuda_mpm_rc_disp_outq;
#endif
} DetectEngineThreadCtx;
/** \brief a single match condition for a signature */

@ -194,13 +194,6 @@ Packet *SetupPkt (void)
r = SCMutexInit(&p->mutex_rtv_cnt, NULL);
#ifdef __SC_CUDA_SUPPORT__
SCMutexInit(&p->cuda_scan_mutex_q, NULL);
SCCondInit(&p->cuda_scan_cond_q, NULL);
SCMutexInit(&p->cuda_search_mutex_q, NULL);
SCCondInit(&p->cuda_search_cond_q, NULL);
#endif
SCLogDebug("allocated a new packet...");
}
@ -780,13 +773,6 @@ int main(int argc, char **argv)
}
memset(p, 0, sizeof(Packet));
SCMutexInit(&p->mutex_rtv_cnt, NULL);
#ifdef __SC_CUDA_SUPPORT__
SCMutexInit(&p->cuda_scan_mutex_q, NULL);
SCCondInit(&p->cuda_scan_cond_q, NULL);
SCMutexInit(&p->cuda_search_mutex_q, NULL);
SCCondInit(&p->cuda_search_cond_q, NULL);
#endif
PacketEnqueue(&packet_q,p);
}
@ -849,9 +835,11 @@ int main(int argc, char **argv)
TmThreadPrioSummary("Suricata main()");
#ifdef __SC_CUDA_SUPPORT__
/* start the dispatcher thread for this module */
if (B2gCudaStartDispatcherThreadRC("SC_RULES_CONTENT_B2G_CUDA") == -1)
exit(EXIT_FAILURE);
if (PatternMatchDefaultMatcher() == MPM_B2G_CUDA) {
/* start the dispatcher thread for this module */
if (B2gCudaStartDispatcherThreadRC("SC_RULES_CONTENT_B2G_CUDA") == -1)
exit(EXIT_FAILURE);
}
#endif
/* Spawn the flow manager thread */

@ -50,3 +50,51 @@ void TmqhOutputSimple(ThreadVars *t, Packet *p)
SCMutexUnlock(&q->mutex_q);
}
/**
* \brief Public version of TmqhInputSimple from the tmqh-simple queue
* handler, except that it is a generic version that is directly
* tied to a PacketQueue instance.
*
* Retrieves a packet from the queue. If the queue is empty, it waits
* on the queue, till a packet is enqueued into the queue.
*
* \param q The PacketQueue instance to wait on.
*
* \retval p The returned packet from the queue.
*/
Packet *TmqhInputSimpleOnQ(PacketQueue *q)
{
SCMutexLock(&q->mutex_q);
if (q->len == 0) {
/* if we have no packets in queue, wait... */
SCondWait(&q->cond_q, &q->mutex_q);
}
if (q->len > 0) {
Packet *p = PacketDequeue(q);
SCMutexUnlock(&q->mutex_q);
return p;
} else {
/* return NULL if we have no pkt. Should only happen on signals. */
SCMutexUnlock(&q->mutex_q);
return NULL;
}
}
/**
* \brief Public version of TmqhOutputSimple from the tmqh-simple queue
* handler, except that it is a generic version that is directly
* tied to a PacketQueue instance.
*
* Enqueues a packet into the packet queue.
*
* \param q The PacketQueue instance to enqueue the packet into.
* \param p The packet to be enqueued into the above queue.
*/
void TmqhOutputSimpleOnQ(PacketQueue *q, Packet *p)
{
SCMutexLock(&q->mutex_q);
PacketEnqueue(q, p);
SCCondSignal(&q->cond_q);
SCMutexUnlock(&q->mutex_q);
}

@ -3,6 +3,8 @@
#ifndef __TMQH_SIMPLE_H__
#define __TMQH_SIMPLE_H__
Packet *TmqhInputSimpleOnQ(PacketQueue *);
void TmqhOutputSimpleOnQ(PacketQueue *, Packet *);
void TmqhSimpleRegister (void);
#endif /* __TMQH_SIMPLE_H__ */

@ -112,6 +112,7 @@ const char * SCErrorToString(SCError err)
CASE_CODE (SC_ERR_TM_MODULES_ERROR);
CASE_CODE (SC_ERR_B2G_CUDA_ERROR);
CASE_CODE (SC_ERR_INVALID_YAML_CONF_ENTRY);
CASE_CODE (SC_ERR_TMQ_ALREADY_REGISTERED);
default:
return "UNKNOWN_ERROR";
}

@ -129,6 +129,7 @@ typedef enum {
SC_ERR_TM_MODULES_ERROR,
SC_ERR_B2G_CUDA_ERROR,
SC_ERR_INVALID_YAML_CONF_ENTRY,
SC_ERR_TMQ_ALREADY_REGISTERED,
} SCError;
const char *SCErrorToString(SCError);

@ -29,6 +29,7 @@
#include "util-cuda.h"
#include "tm-threads.h"
#include "threads.h"
#include "tmqh-simple.h"
/* macros decides if cuda is enabled for the platform or not */
#ifdef __SC_CUDA_SUPPORT__
@ -1746,7 +1747,7 @@ uint32_t B2gCudaScanBNDMq(MpmCtx *mpm_ctx, MpmThreadCtx *mpm_thread_ctx,
int i = 0;
int host_offsets[UINT16_MAX];
if (buflen < ctx->search_m)
if (buflen < ctx->scan_m)
return 0;
if (SCCudaMemAlloc(&cuda_buf, buflen * sizeof(char)) == -1) {
@ -2540,37 +2541,15 @@ TmEcode B2gCudaMpmDispatcher(ThreadVars *tv, Packet *p, void *data,
p->cuda_pmq,
p->payload,
p->payload_len);
/* signal the client that the result is ready */
SCCondSignal(&p->cuda_search_cond_q);
/* wait for the client indication that it has read the results. If the
* client still hasn't sent the indication, signal it again and do so
* every 50 microseconds */
while (p->cuda_done == 0) {
SCCondSignal(&p->cuda_search_cond_q);
usleep(50);
}
} else {
p->cuda_matches = mpm_table[p->cuda_mpm_ctx->mpm_type].Scan(p->cuda_mpm_ctx,
p->cuda_mtc,
p->cuda_pmq,
p->payload,
p->payload_len);
/* signal the client that the result is ready */
SCCondSignal(&p->cuda_scan_cond_q);
/* wait for the client indication that it has read the results. If the
* client still hasn't sent the indication, signal it again and do so
* every 50 microseconds */
while (p->cuda_done == 0) {
SCCondSignal(&p->cuda_scan_cond_q);
usleep(50);
}
}
p->cuda_done = 0;
if (p->cuda_free_packet == 1) {
free(p);
}
TmqhOutputSimpleOnQ(p->cuda_outq, p);
return TM_ECODE_OK;
}

Loading…
Cancel
Save