Fix app layer proto detection code not being thread safe.

remotes/origin/master-1.0.x
Victor Julien 16 years ago
parent fde948f488
commit c1283a6628

@ -39,11 +39,6 @@
#define INSPECT_BYTES 32 #define INSPECT_BYTES 32
#define ALP_DETECT_MAX 256 #define ALP_DETECT_MAX 256
typedef struct AlpProtoDetectDirectionThread_ {
MpmThreadCtx mpm_ctx;
PatternMatcherQueue pmq;
} AlpProtoDetectDirectionThread;
typedef struct AlpProtoDetectDirection_ { typedef struct AlpProtoDetectDirection_ {
MpmCtx mpm_ctx; MpmCtx mpm_ctx;
uint32_t id; uint32_t id;
@ -53,18 +48,13 @@ typedef struct AlpProtoDetectDirection_ {
limit the search */ limit the search */
} AlpProtoDetectDirection; } AlpProtoDetectDirection;
typedef struct AlpProtoDetectThreadCtx_ {
AlpProtoDetectDirectionThread toserver;
AlpProtoDetectDirectionThread toclient;
} AlpProtoDetectThreadCtx;
typedef struct AlpProtoDetectCtx_ { typedef struct AlpProtoDetectCtx_ {
AlpProtoDetectDirection toserver; AlpProtoDetectDirection toserver;
AlpProtoDetectDirection toclient; AlpProtoDetectDirection toclient;
} AlpProtoDetectCtx; } AlpProtoDetectCtx;
static AlpProtoDetectCtx alp_proto_ctx; static AlpProtoDetectCtx alp_proto_ctx;
static AlpProtoDetectThreadCtx alp_proto_tctx; //static AlpProtoDetectThreadCtx alp_proto_tctx;
void AlpProtoInit(AlpProtoDetectCtx *ctx) { void AlpProtoInit(AlpProtoDetectCtx *ctx) {
@ -139,6 +129,12 @@ void AlpProtoFinalizeThread(AlpProtoDetectCtx *ctx, AlpProtoDetectThreadCtx *tct
} }
} }
/** \brief to be called by ReassemblyThreadInit
* \todo this is a hack, we need a proper place to store the global ctx */
void AlpProtoFinalize2Thread(AlpProtoDetectThreadCtx *tctx) {
return AlpProtoFinalizeThread(&alp_proto_ctx, tctx);
}
void AlpProtoFinalizeGlobal(AlpProtoDetectCtx *ctx) { void AlpProtoFinalizeGlobal(AlpProtoDetectCtx *ctx) {
if (ctx == NULL) if (ctx == NULL)
return; return;
@ -226,7 +222,7 @@ void AppLayerDetectProtoThreadInit(void) {
AlpProtoAdd(&alp_proto_ctx, IPPROTO_TCP, ALPROTO_DCERPC, "|05 00|", 2, 0, STREAM_TOSERVER); AlpProtoAdd(&alp_proto_ctx, IPPROTO_TCP, ALPROTO_DCERPC, "|05 00|", 2, 0, STREAM_TOSERVER);
AlpProtoFinalizeGlobal(&alp_proto_ctx); AlpProtoFinalizeGlobal(&alp_proto_ctx);
AlpProtoFinalizeThread(&alp_proto_ctx, &alp_proto_tctx); //AlpProtoFinalizeThread(&alp_proto_ctx, &alp_proto_tctx);
} }
uint16_t AppLayerDetectGetProto(AlpProtoDetectCtx *ctx, AlpProtoDetectThreadCtx *tctx, uint8_t *buf, uint16_t buflen, uint8_t flags) { uint16_t AppLayerDetectGetProto(AlpProtoDetectCtx *ctx, AlpProtoDetectThreadCtx *tctx, uint8_t *buf, uint16_t buflen, uint8_t flags) {
@ -330,7 +326,7 @@ end:
return proto; return proto;
} }
int AppLayerHandleMsg(StreamMsg *smsg, char need_lock) int AppLayerHandleMsg(AlpProtoDetectThreadCtx *dp_ctx, StreamMsg *smsg, char need_lock)
{ {
SCEnter(); SCEnter();
uint16_t alproto = ALPROTO_UNKNOWN; uint16_t alproto = ALPROTO_UNKNOWN;
@ -352,7 +348,7 @@ int AppLayerHandleMsg(StreamMsg *smsg, char need_lock)
//PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len); //PrintRawDataFp(stdout, smsg->init.data, smsg->init.data_len);
//printf("=> Init Stream Data -- end\n"); //printf("=> Init Stream Data -- end\n");
alproto = AppLayerDetectGetProto(&alp_proto_ctx, &alp_proto_tctx, alproto = AppLayerDetectGetProto(&alp_proto_ctx, dp_ctx,
smsg->data.data, smsg->data.data_len, smsg->flags); smsg->data.data, smsg->data.data_len, smsg->flags);
if (alproto != ALPROTO_UNKNOWN) { if (alproto != ALPROTO_UNKNOWN) {
/* store the proto and setup the L7 data array */ /* store the proto and setup the L7 data array */
@ -395,7 +391,7 @@ int AppLayerHandleMsg(StreamMsg *smsg, char need_lock)
SCReturnInt(r); SCReturnInt(r);
} }
#if 0
void *AppLayerDetectProtoThread(void *td) void *AppLayerDetectProtoThread(void *td)
{ {
ThreadVars *tv = (ThreadVars *)td; ThreadVars *tv = (ThreadVars *)td;
@ -448,7 +444,7 @@ void AppLayerDetectProtoThreadSpawn()
#endif #endif
return; return;
} }
#endif
#ifdef UNITTESTS #ifdef UNITTESTS
int AlpDetectTest01(void) { int AlpDetectTest01(void) {

@ -3,7 +3,17 @@
#include "stream.h" #include "stream.h"
int AppLayerHandleMsg(StreamMsg *smsg, char); typedef struct AlpProtoDetectDirectionThread_ {
MpmThreadCtx mpm_ctx;
PatternMatcherQueue pmq;
} AlpProtoDetectDirectionThread;
typedef struct AlpProtoDetectThreadCtx_ {
AlpProtoDetectDirectionThread toserver;
AlpProtoDetectDirectionThread toclient;
} AlpProtoDetectThreadCtx;
int AppLayerHandleMsg(AlpProtoDetectThreadCtx *, StreamMsg *smsg, char);
void *AppLayerDetectProtoThread(void *td); void *AppLayerDetectProtoThread(void *td);
void AppLayerDetectProtoThreadInit(void); void AppLayerDetectProtoThreadInit(void);
@ -11,5 +21,7 @@ void AppLayerDetectProtoThreadInit(void);
void AppLayerDetectProtoThreadSpawn(void); void AppLayerDetectProtoThreadSpawn(void);
void AlpDetectRegisterTests(void); void AlpDetectRegisterTests(void);
void AlpProtoFinalize2Thread(AlpProtoDetectThreadCtx *);
#endif /* __APP_LAYER_DETECT_PROTO_H__ */ #endif /* __APP_LAYER_DETECT_PROTO_H__ */

@ -32,7 +32,6 @@
#include "stream.h" #include "stream.h"
#include "app-layer-detect-proto.h"
#include "util-debug.h" #include "util-debug.h"
//#define DEBUG //#define DEBUG
@ -185,6 +184,7 @@ TcpReassemblyThreadCtx *StreamTcpReassembleInitThreadCtx(void)
memset(ra_ctx, 0x00, sizeof(TcpReassemblyThreadCtx)); memset(ra_ctx, 0x00, sizeof(TcpReassemblyThreadCtx));
ra_ctx->stream_q = StreamMsgQueueGetNew(); ra_ctx->stream_q = StreamMsgQueueGetNew();
AlpProtoFinalize2Thread(&ra_ctx->dp_ctx);
return ra_ctx; return ra_ctx;
} }
@ -1502,7 +1502,7 @@ int StreamTcpReassembleProcessAppLayer(TcpReassemblyThreadCtx *ra_ctx)
/** Handle the stream msg. No need to use locking, flow is already /** Handle the stream msg. No need to use locking, flow is already
* locked */ * locked */
r = AppLayerHandleMsg(smsg, FALSE); r = AppLayerHandleMsg(&ra_ctx->dp_ctx, smsg, FALSE);
if (r < 0) if (r < 0)
break; break;
} while (ra_ctx->stream_q->len > 0); } while (ra_ctx->stream_q->len > 0);

@ -9,6 +9,7 @@
#define __STREAM_TCP_REASSEMBLE_H__ #define __STREAM_TCP_REASSEMBLE_H__
#include "stream.h" #include "stream.h"
#include "app-layer-detect-proto.h"
/** Supported OS list and default OS policy is BSD */ /** Supported OS list and default OS policy is BSD */
enum enum
@ -31,6 +32,7 @@ enum
typedef struct TcpReassemblyThreadCtx_ { typedef struct TcpReassemblyThreadCtx_ {
StreamMsgQueue *stream_q; StreamMsgQueue *stream_q;
AlpProtoDetectThreadCtx dp_ctx; /**< proto detection thread data */
} TcpReassemblyThreadCtx; } TcpReassemblyThreadCtx;
#define OS_POLICY_DEFAULT OS_POLICY_BSD #define OS_POLICY_DEFAULT OS_POLICY_BSD

@ -42,7 +42,7 @@ typedef struct StreamTcpThread_ {
uint16_t counter_tcp_sessions; uint16_t counter_tcp_sessions;
TcpReassemblyThreadCtx *ra_ctx; TcpReassemblyThreadCtx *ra_ctx; /**< tcp reassembly thread data */
} StreamTcpThread; } StreamTcpThread;
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *); TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *);

Loading…
Cancel
Save