signal the post pq if possible, whenever pseudo packets are injected into engine flow. Also carry out post pq processing irrespective of packet retrieval from the flow.

remotes/origin/master-1.1.x
Anoop Saldanha 14 years ago committed by Victor Julien
parent fd9bacb02d
commit a559bfc165

@ -461,6 +461,9 @@ static inline int FlowForceReassemblyForFlowV2(ThreadVars *tv, Flow *f)
if (p3 != NULL) if (p3 != NULL)
PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3); PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3);
SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q);
if (stream_pseudo_pkt_decode_TV->inq != NULL) {
SCCondSignal(&trans_q[stream_pseudo_pkt_decode_TV->inq->id].cond_q);
}
return 1; return 1;
} }
@ -1762,6 +1765,14 @@ void *FlowManagerThread(void *td)
"retrieve the slot for DECODE TM"); "retrieve the slot for DECODE TM");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
stream_pseudo_pkt_decode_TV =
TmThreadsGetTVContainingSlot(stream_pseudo_pkt_decode_tm_slot);
if (stream_pseudo_pkt_decode_TV == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the TV containing the Decode TM slot");
exit(EXIT_FAILURE);
}
/* set the thread name */ /* set the thread name */
SCSetThreadName(th_v->name); SCSetThreadName(th_v->name);

@ -401,15 +401,14 @@ void *TmThreadsSlot1(void *td)
/* output the packet */ /* output the packet */
tv->tmqh_out(tv, p); tv->tmqh_out(tv, p);
}
while (s->slot_post_pq.top != NULL) { while (s->slot_post_pq.top != NULL) {
/* handle new packets from this func */ /* handle new packets from this func */
SCMutexLock(&s->slot_post_pq.mutex_q); SCMutexLock(&s->slot_post_pq.mutex_q);
Packet *extra_p = PacketDequeue(&s->slot_post_pq); Packet *extra_p = PacketDequeue(&s->slot_post_pq);
SCMutexUnlock(&s->slot_post_pq.mutex_q); SCMutexUnlock(&s->slot_post_pq.mutex_q);
if (extra_p != NULL) { if (extra_p != NULL) {
tv->tmqh_out(tv, extra_p); tv->tmqh_out(tv, extra_p);
}
} }
} }
@ -663,29 +662,29 @@ void *TmThreadsSlotVar(void *td)
/* output the packet */ /* output the packet */
tv->tmqh_out(tv, p); tv->tmqh_out(tv, p);
/* now handle the post_pq packets */ } /* if (p != NULL) */
TmSlot *slot; /* now handle the post_pq packets */
for (slot = s; slot != NULL; slot = slot->slot_next) { TmSlot *slot;
while (slot->slot_post_pq.top != NULL) { for (slot = s; slot != NULL; slot = slot->slot_next) {
SCMutexLock(&slot->slot_post_pq.mutex_q); while (slot->slot_post_pq.top != NULL) {
Packet *extra_p = PacketDequeue(&slot->slot_post_pq); SCMutexLock(&slot->slot_post_pq.mutex_q);
SCMutexUnlock(&slot->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&slot->slot_post_pq);
if (extra_p == NULL) SCMutexUnlock(&slot->slot_post_pq.mutex_q);
if (extra_p == NULL)
break;
if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
break; break;
if (slot->slot_next != NULL) {
r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);
if (r == TM_ECODE_FAILED) {
TmqhOutputPacketpool(tv, extra_p);
TmThreadsSetFlag(tv, THV_FAILED);
break;
}
} }
/* output the packet */ }
tv->tmqh_out(tv, extra_p); /* output the packet */
} /* while (slot->slot_post_pq.top != NULL) */ tv->tmqh_out(tv, extra_p);
} /* for (slot = s; slot != NULL; slot = slot->slot_next) */ } /* while (slot->slot_post_pq.top != NULL) */
} /* if (p != NULL) */ } /* for (slot = s; slot != NULL; slot = slot->slot_next) */
if (TmThreadsCheckFlag(tv, THV_KILL)) { if (TmThreadsCheckFlag(tv, THV_KILL)) {
run = 0; run = 0;

Loading…
Cancel
Save