Threading update for tunneling and high load

remotes/origin/master-1.0.x
Victor Julien 17 years ago
parent 4b6a8715fd
commit 298d4be7bb

@ -27,9 +27,11 @@ Packet *PacketDequeue (PacketQueue *q) {
/* if the queue is empty there are no packets left.
* In that case we sleep and try again. */
if (q->len == 0) {
printf("PacketDequeue: queue is empty, waiting...\n");
usleep(100000); /* sleep 100ms */
return PacketDequeue(q);
// printf("PacketDequeue: queue is empty, waiting...\n");
// TmqDebugList();
// usleep(100000); /* sleep 100ms */
// return PacketDequeue(q);
return NULL;
}
/* pull the bottom packet from the queue */

@ -1,4 +1,6 @@
#include "vips.h"
#include "threads.h"
#include "tm-queues.h"
#define TMQ_MAX_QUEUES 256
@ -42,3 +44,13 @@ Tmq* TmqGetQueueByName(char *name) {
return NULL;
}
void TmqDebugList(void) {
u_int16_t i = 0;
for (i = 0; i < tmq_id; i++) {
/* get a lock accessing the len */
mutex_lock(&trans_q[tmqs[i].id].mutex_q);
printf("TmqDebugList: id %u, name \'%s\', len %u\n", tmqs[i].id, tmqs[i].name, trans_q[tmqs[i].id].len);
mutex_unlock(&trans_q[tmqs[i].id].mutex_q);
}
}

@ -186,19 +186,23 @@ void *TmThreadsSlot1(void *td) {
memset(&s1->slot1_pq, 0, sizeof(PacketQueue));
while(run) {
/* input a packet */
p = tv->tmqh_in(tv);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = s1->Slot1Func(tv, p, s1->slot1_data, &s1->slot1_pq);
while (s1->slot1_pq.len > 0) {
Packet *extra = PacketDequeue(&s1->slot1_pq);
tv->tmqh_out(tv, extra);
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s1->slot1_pq);
tv->tmqh_out(tv, extra_p);
}
//printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r);
/* XXX handle error */
/* output the packet */
tv->tmqh_out(tv, p);
}
@ -242,15 +246,36 @@ void *TmThreadsSlot2(void *td) {
}
while(run) {
/* input a packet */
p = tv->tmqh_in(tv);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
r = s2->Slot1Func(tv, p, s2->slot1_data, &s2->slot1_pq);
while (s2->slot1_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s2->slot1_pq);
r = s2->Slot2Func(tv, extra_p, s2->slot2_data, &s2->slot2_pq);
while (s2->slot2_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p2 = PacketDequeue(&s2->slot2_pq);
tv->tmqh_out(tv, extra_p2);
}
tv->tmqh_out(tv, extra_p);
}
r = s2->Slot2Func(tv, p, s2->slot2_data, &s2->slot2_pq);
while (s2->slot2_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s2->slot2_pq);
tv->tmqh_out(tv, extra_p);
}
//printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r);
/* XXX handle error */
/* output the packet */
tv->tmqh_out(tv, p);
}
@ -306,16 +331,61 @@ void *TmThreadsSlot3(void *td) {
}
while(run) {
/* input a packet */
p = tv->tmqh_in(tv);
if (p == NULL) {
//printf("%s: TmThreadsSlot1: p == NULL\n", tv->name);
} else {
/* slot 1 */
r = s3->Slot1Func(tv, p, s3->slot1_data, &s3->slot1_pq);
while (s3->slot1_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s3->slot1_pq);
r = s3->Slot2Func(tv, extra_p, s3->slot2_data, &s3->slot2_pq);
while (s3->slot2_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p2 = PacketDequeue(&s3->slot2_pq);
r = s3->Slot3Func(tv, extra_p2, s3->slot3_data, &s3->slot3_pq);
while (s3->slot3_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p3 = PacketDequeue(&s3->slot3_pq);
tv->tmqh_out(tv, extra_p3);
}
tv->tmqh_out(tv, extra_p2);
}
tv->tmqh_out(tv, extra_p);
}
/* slot 2 */
r = s3->Slot2Func(tv, p, s3->slot2_data, &s3->slot2_pq);
while (s3->slot2_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s3->slot2_pq);
r = s3->Slot3Func(tv, extra_p, s3->slot3_data, &s3->slot3_pq);
while (s3->slot3_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p2 = PacketDequeue(&s3->slot3_pq);
tv->tmqh_out(tv, extra_p2);
}
tv->tmqh_out(tv, extra_p);
}
/* slot 3 */
r = s3->Slot3Func(tv, p, s3->slot3_data, &s3->slot3_pq);
while (s3->slot3_pq.len > 0) {
/* handle new packets from this func */
Packet *extra_p = PacketDequeue(&s3->slot3_pq);
tv->tmqh_out(tv, extra_p);
}
//printf("%s: TmThreadsSlot1: p %p, r %d\n", tv->name, p, r);
/* XXX handle error */
/* output the packet */
tv->tmqh_out(tv, p);
}

@ -42,7 +42,10 @@
#endif /* NFQ */
#include "respond-reject.h"
#include "flow.h"
#include "flow-var.h"
#include "pkt-var.h"
#include "util-cidr.h"
#include "util-unittest.h"
@ -80,9 +83,20 @@ setup_signal_handler(int sig, void (*handler)())
Packet *SetupPkt (void)
{
mutex_lock(&packet_q.mutex_q);
Packet *p = PacketDequeue(&packet_q);
mutex_unlock(&packet_q.mutex_q);
Packet *p = NULL;
do {
mutex_lock(&packet_q.mutex_q);
p = PacketDequeue(&packet_q);
mutex_unlock(&packet_q.mutex_q);
if (p == NULL) {
//TmqDebugList();
usleep(1000); /* sleep 1ms */
/* XXX check for recv'd signals, so
* we can exit on signals received */
}
} while (p == NULL);
CLEAR_PACKET(p);
return p;
@ -93,9 +107,20 @@ Packet *TunnelPktSetup(ThreadVars *t, Packet *parent, u_int8_t *pkt, u_int16_t l
//printf("TunnelPktSetup: pkt %p, len %u, proto %u\n", pkt, len, proto);
/* get us a packet */
mutex_lock(&packet_q.mutex_q);
Packet *p = PacketDequeue(&packet_q);
mutex_unlock(&packet_q.mutex_q);
Packet *p = NULL;
do {
mutex_lock(&packet_q.mutex_q);
p = PacketDequeue(&packet_q);
mutex_unlock(&packet_q.mutex_q);
if (p == NULL) {
//TmqDebugList();
usleep(1000); /* sleep 1ms */
/* XXX check for recv'd signals, so
* we can exit on signals received */
}
} while (p == NULL);
mutex_lock(&mutex_pending);
pending++;
@ -124,111 +149,6 @@ Packet *TunnelPktSetup(ThreadVars *t, Packet *parent, u_int8_t *pkt, u_int16_t l
return p;
}
/* this function should only be called for tunnel packets
* ( I could also add a check for that here, but better do
* that at the caller, it saves us a functioncall for all
* non-tunnel packets)
*
* the problem we have is this: we reinject a pseudo packet
* into the pickup queue when we encounter a tunnel. This way
* we can independently inspect both the raw packet and any
* tunneled packet. We can however, reinject only one, and
* we can only do it when all are inspected. This is why
* all packets that are done set the RTV (Ready To Verdict)
* flag. Each time a packet is done, it checks if it is the
* last one. If not, we do nothing except return it to the
* memory pool. If we have handled everything, verdict this
* one.
*
*/
#if 0
static Packet * VerdictTunnelPacket(Packet *p) {
char verdict = 1;
Packet *vp = NULL;
INCR_PKT_RTV(p);
pthread_mutex_t *m = p->root ? &p->root->mutex_rtv_cnt : &p->mutex_rtv_cnt;
mutex_lock(m);
/* if there are more tunnel packets than ready to verdict packets,
* we won't verdict this one */
if ((PKT_TPR(p)+1) > PKT_RTV(p)) {
verdict = 0;
}
mutex_unlock(m);
/* don't set a verdict, we are not done yet with all packets */
if (verdict == 0) {
/* if this is not the root, we don't need it any longer */
if (!(IS_TUNNEL_ROOT_PKT(p))) {
mutex_lock(&packet_q.mutex_q);
PacketEnqueue(&packet_q, p);
mutex_unlock(&packet_q.mutex_q);
}
return NULL;
}
/* okay, we are going to set a verdict */
/* just verdict this one if it is the root */
if (IS_TUNNEL_ROOT_PKT(p)) {
return p;
}
/* not a tunnel root, so verdict p->root and get p
* into the packet_q */
vp = p->root;
mutex_lock(&packet_q.mutex_q);
PacketEnqueue(&packet_q, p);
mutex_unlock(&packet_q.mutex_q);
return vp;
}
#endif
#if 0
void *DetectThread(void *td) {
ThreadVars *th_v = (ThreadVars *)td;
int run = 1;
u_int32_t cnt = 0;
printf("DetectThread[%d] started... th_v %p\n", th_v->tid, th_v);
while(run) {
Packet *p = th_v->tmqh_in(th_v);
if (p == NULL) {
if (threadflags & VIPS_KILLDETECT)
run = 0;
} else {
#ifdef COUNTERS
cnt++;
#endif /* COUNTERS */
SigMatchSignatures(th_v, p);
/* handle normal packets and packets containing tunnels
* differently. Normal packets are just forwarded to the
* next queue. Tunnel packets need more care. */
if (!(IS_TUNNEL_PKT(p))) {
th_v->tmqh_out(th_v, p);
} else {
/* verdict the packet VerdictTunnelPacket returns. The
* function handles the rest */
Packet *vp = VerdictTunnelPacket(p);
if (vp != NULL) {
th_v->tmqh_out(th_v, p);
}
}
}
}
printf("DetectThread[%d] cnt %u\n", th_v->tid, cnt);
printf("DetectThread[%d] ended...\n", th_v->tid);
pthread_exit((void *) 0);
}
#endif
int main(int argc, char **argv)
{
int rc;
@ -288,12 +208,14 @@ int main(int argc, char **argv)
printf("Preallocating packets... packet size %u\n", sizeof(Packet));
int i = 0;
for (i = 0; i < MAX_PENDING; i++) {
/* XXX pkt alloc function */
Packet *p = malloc(sizeof(Packet));
if (p == NULL) {
printf("ERROR: malloc failed: %s\n", strerror(errno));
exit(1);
}
p->pktvar = NULL;
CLEAR_TCP_PACKET(p);
CLEAR_PACKET(p);
@ -373,9 +295,6 @@ int main(int argc, char **argv)
}
Tm1SlotSetFunc(tv_detect1,tm_module);
/* XXX this needs an api way of doing this */
//PatternMatcherThreadInit(tv_detect1);
if (TmThreadSpawn(tv_detect1) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
@ -393,9 +312,6 @@ int main(int argc, char **argv)
}
Tm1SlotSetFunc(tv_detect2,tm_module);
/* XXX this needs an api way of doing this */
//PatternMatcherThreadInit(tv_detect2);
if (TmThreadSpawn(tv_detect2) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
@ -430,7 +346,6 @@ int main(int argc, char **argv)
}
Tm1SlotSetFunc(tv_rreject,tm_module);
/* XXX this needs an api way of doing this */
if (TmThreadSpawn(tv_rreject) != 0) {
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
@ -484,6 +399,7 @@ int main(int argc, char **argv)
printf("ERROR: TmThreadSpawn failed\n");
exit(1);
}
/*
ThreadVars *tv_unifiedalert = TmThreadCreate("AlertUnifiedAlert","alert-queue3","simple","packetpool","packetpool","1slot");
if (tv_unifiedalert == NULL) {

Loading…
Cancel
Save