We now inspect timed out streams + streams not processed as yet, at engine shutdown

remotes/origin/master-1.1.x
Anoop Saldanha 14 years ago committed by Victor Julien
parent 56432cee16
commit c365bafbf6

@ -44,6 +44,10 @@
#include "flow-var.h"
#include "flow-private.h"
#include "stream-tcp-private.h"
#include "stream-tcp-reassemble.h"
#include "stream-tcp.h"
#include "util-unittest.h"
#include "util-unittest-helper.h"
#include "util-byte.h"
@ -81,6 +85,13 @@ int FlowKill (FlowQueue *);
/* Run mode selected at suricata.c */
extern int run_mode;
static TmSlot *stream_pseudo_pkt_stream_tm_slot = NULL;
static ThreadVars *stream_pseudo_pkt_stream_TV = NULL;
static TmSlot *stream_pseudo_pkt_detect_tm_slot = NULL;
static ThreadVars *stream_pseudo_pkt_detect_TV = NULL;
static ThreadVars *stream_pseudo_pkt_detect_prev_TV = NULL;
/** \brief Initialize the l7data ptr in the Flow session used by the L7 Modules
* for data storage.
*
@ -994,6 +1005,261 @@ void FlowShutdown(void) {
}
}
/**
* \internal
* \brief Forces reassembly for flows that need it.
*
* Please note we don't use locks anywhere. This function is to be
* called right when the engine is not doing anything.
*
* \param q The queue to process flows from.
*/
static inline void FlowForceReassemblyForQ(FlowQueue *q)
{
Flow *f;
TcpSession *ssn;
/* no locks needed, since the engine is virtually dead.
* We are the kings here */
/* get the topmost flow from the QUEUE */
f = q->top;
/* looks like we have no flows in this queue */
if (f == NULL) {
return;
}
/* we need to loop through all the flows in the queue */
while (f != NULL) {
/* We use this packet just for reassembly purpose */
Packet reassemble_p;
memset(&reassemble_p, 0, sizeof(Packet));
/* Get the tcp session for the flow */
ssn = (TcpSession *)f->protoctx;
/* \todo Also skip flows that shouldn't be inspected */
if (ssn == NULL) {
f = f->lnext;
continue;
}
/* ah ah! We have some unattended toserver segments */
if (ssn->client.seg_list != NULL) {
StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
ssn->client.last_ack = (ssn->client.seg_list_tail->seq +
ssn->client.seg_list_tail->payload_len);
reassemble_p.flow = f;
reassemble_p.flags |= PKT_PSEUDO_STREAM_END;
StreamTcpReassembleHandleSegment(stream_pseudo_pkt_detect_TV,
stt->ra_ctx, ssn, &ssn->server,
&reassemble_p, NULL);
StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
}
/* oh oh! We have some unattended toclient segments */
if (ssn->server.seg_list != NULL) {
StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data;
ssn->server.last_ack = (ssn->server.seg_list_tail->seq +
ssn->server.seg_list_tail->payload_len);
reassemble_p.flow = f;
reassemble_p.flags |= PKT_PSEUDO_STREAM_END;
StreamTcpReassembleHandleSegment(stream_pseudo_pkt_detect_TV,
stt->ra_ctx, ssn, &ssn->client,
&reassemble_p, NULL);
StreamTcpReassembleProcessAppLayer(stt->ra_ctx);
}
/* insert a pseudo packet in the toserver direction */
if (ssn->client.seg_list != NULL) {
Packet *p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
printf("packet is NULL\n");
exit(0);
}
p->proto = IPPROTO_TCP;
p->flow = f;
p->flags |= PKT_STREAM_EST;
p->flags |= PKT_STREAM_EOF;
p->flags |= PKT_HAS_FLOW;
p->flags |= PKT_PSEUDO_STREAM_END;
p->flowflags |= FLOW_PKT_TOSERVER;
p->flowflags |= FLOW_PKT_ESTABLISHED;
COPY_ADDRESS(&f->src, &p->src);
COPY_ADDRESS(&f->dst, &p->dst);
p->sp = f->sp;
p->dp = f->dp;
p->payload = NULL;
p->payload_len = 0;
/* set the ip header */
p->ip4h = (IPV4Hdr *)p->pkt;
/* version 4 and length 20 bytes for the tcp header */
p->ip4h->ip_verhl = 0x45;
p->ip4h->ip_len = htons(40);
p->ip4h->ip_proto = IPPROTO_TCP;
p->ip4h->ip_src.s_addr = f->src.addr_data32[0];
p->ip4h->ip_dst.s_addr = f->dst.addr_data32[0];
/* set the tcp header */
p->tcph = (TCPHdr *)((uint8_t *)p->pkt + 20);
p->tcph->th_sport = htons(f->sp);
p->tcph->th_dport = htons(f->dp);
if (stream_pseudo_pkt_detect_prev_TV != NULL) {
stream_pseudo_pkt_detect_prev_TV->
tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
} else {
TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
while (s != NULL) {
s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
&s->slot_post_pq);
s = s->slot_next;
}
if (stream_pseudo_pkt_detect_TV != NULL) {
stream_pseudo_pkt_detect_TV->
tmqh_out(stream_pseudo_pkt_detect_TV, p);
}
}
} /* if (ssn->client.seg_list != NULL) */
if (ssn->server.seg_list != NULL) {
Packet *p = PacketGetFromQueueOrAlloc();
if (p == NULL) {
printf("packet is NULL\n");
exit(0);
}
p->proto = IPPROTO_TCP;
p->flow = f;
p->flags |= PKT_STREAM_EST;
p->flags |= PKT_STREAM_EOF;
p->flags |= PKT_HAS_FLOW;
p->flags |= PKT_PSEUDO_STREAM_END;
p->flowflags |= FLOW_PKT_TOCLIENT;
p->flowflags |= FLOW_PKT_ESTABLISHED;
COPY_ADDRESS(&f->dst, &p->src);
COPY_ADDRESS(&f->src, &p->dst);
p->dp = f->sp;
p->sp = f->dp;
p->payload = NULL;
p->payload_len = 0;
/* set the ip header */
p->ip4h = (IPV4Hdr *)p->pkt;
/* version 4 and length 20 bytes for the tcp header */
p->ip4h->ip_verhl = 0x45;
p->ip4h->ip_len = htons(40);
p->ip4h->ip_proto = IPPROTO_TCP;
p->ip4h->ip_src.s_addr = f->dst.addr_data32[0];
p->ip4h->ip_dst.s_addr = f->src.addr_data32[0];
/* set the tcp header */
p->tcph = (TCPHdr *)((uint8_t *)p->pkt + 20);
p->tcph->th_sport = htons(f->dp);
p->tcph->th_dport = htons(f->sp);
if (stream_pseudo_pkt_detect_prev_TV != NULL) {
stream_pseudo_pkt_detect_prev_TV->
tmqh_out(stream_pseudo_pkt_detect_prev_TV, p);
} else {
TmSlot *s = stream_pseudo_pkt_detect_tm_slot;
while (s != NULL) {
s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq,
&s->slot_post_pq);
s = s->slot_next;
}
if (stream_pseudo_pkt_detect_TV != NULL) {
stream_pseudo_pkt_detect_TV->
tmqh_out(stream_pseudo_pkt_detect_TV, p);
}
}
} /* if (ssn->server.seg_list != NULL) */
/* next flow in the queue */
f = f->lnext;
} /* while (f != NULL) */
return;
}
/**
* \brief Force reassembly for all the flows that have unprocessed segments.
*/
void FlowForceReassembly(void)
{
/* Do remember. We need to have packet acquired disabled by now */
/** ----- Part 1 ----- **/
/* First we need to kill the flow manager thread */
ThreadVars *tv = NULL;
SCMutexLock(&tv_root_lock);
/* flow manager thread(s) is/are a part of mgmt threads */
tv = tv_root[TVT_MGMT];
while (tv != NULL) {
if (strcasecmp(tv->name, "FlowManagerThread") == 0)
break;
tv = tv->next;
}
/* not possible, unless someone decides to rename FlowManagerThread */
if (tv == NULL) {
SCMutexUnlock(&tv_root_lock);
abort();
}
TmThreadsSetFlag(tv, THV_KILL);
/* be sure it has shut down */
while (!TmThreadsCheckFlag(tv, THV_CLOSED)) {
usleep(100);
}
SCMutexUnlock(&tv_root_lock);
/** ----- Part 2 ----- **/
/* Check if all threads are idle. We need this so that we have all
* packets freeds. As a consequence, no flows are in use */
SCMutexLock(&tv_root_lock);
/* all receive threads are part of packet processing threads */
tv = tv_root[TVT_PPT];
/* we are doing this in order receive -> decode -> ... -> log */
while (tv != NULL) {
if (tv->inq != NULL) {
/* we wait till we dry out all the inq packets, before we
* kill this thread. Do note that you should have disabled
* packet acquire by now using TmThreadDisableReceiveThreads()*/
if (!(strlen(tv->inq->name) == strlen("packetpool") &&
strcasecmp(tv->inq->name, "packetpool") == 0)) {
PacketQueue *q = &trans_q[tv->inq->id];
while (q->len != 0) {
usleep(100);
}
}
}
tv = tv->next;
}
SCMutexUnlock(&tv_root_lock);
/** ----- Part 3 ----- **/
/* Carry out flow reassembly for unattended flows */
FlowForceReassemblyForQ(&flow_new_q[FLOW_PROTO_TCP]);
FlowForceReassemblyForQ(&flow_est_q[FLOW_PROTO_TCP]);
FlowForceReassemblyForQ(&flow_close_q[FLOW_PROTO_TCP]);
//exit(EXIT_FAILURE);
}
/** \brief Thread that manages the various queue's and removes timed out flows.
* \param td ThreadVars casted to void ptr
*
@ -1019,6 +1285,46 @@ void *FlowManagerThread(void *td)
memset(&ts, 0, sizeof(ts));
/* get StreamTCP TM's slot and TV containing this slot */
stream_pseudo_pkt_stream_tm_slot = TmSlotGetSlotForTM(TMM_STREAMTCP);
if (stream_pseudo_pkt_stream_tm_slot == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the slot for STREAMTCP TM");
exit(EXIT_FAILURE);
}
stream_pseudo_pkt_stream_TV =
TmThreadsGetTVContainingSlot(stream_pseudo_pkt_stream_tm_slot);
if (stream_pseudo_pkt_stream_TV == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the TV containing STREAMTCP TM slot");
exit(EXIT_FAILURE);
}
/* get detect TM's slot and TV containing this slot */
stream_pseudo_pkt_detect_tm_slot = TmSlotGetSlotForTM(TMM_DETECT);
if (stream_pseudo_pkt_detect_tm_slot == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve a slot for DETECT TM");
exit(EXIT_FAILURE);
}
stream_pseudo_pkt_detect_TV =
TmThreadsGetTVContainingSlot(stream_pseudo_pkt_detect_tm_slot);
if (stream_pseudo_pkt_detect_TV == NULL) {
/* yes, this is fatal! */
SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to "
"retrieve the TV containing the Detect TM slot");
exit(EXIT_FAILURE);
}
if (stream_pseudo_pkt_detect_TV->tm_slots == stream_pseudo_pkt_detect_tm_slot) {
stream_pseudo_pkt_detect_prev_TV = stream_pseudo_pkt_detect_TV->prev;
}
if (stream_pseudo_pkt_detect_TV->next == NULL) {
stream_pseudo_pkt_detect_TV = NULL;
}
/* set the thread name */
SCSetThreadName(th_v->name);
SCLogDebug("%s started...", th_v->name);

@ -267,6 +267,8 @@ int FlowSetProtoFreeFunc (uint8_t , void (*Free)(void *));
int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *));
void FlowUpdateQueue(Flow *);
void FlowForceReassembly(void);
static inline void FlowLockSetNoPacketInspectionFlag(Flow *);
static inline void FlowSetNoPacketInspectionFlag(Flow *);
static inline void FlowLockSetNoPayloadInspectionFlag(Flow *);

@ -82,23 +82,6 @@
#define STREAMTCP_EMERG_EST_TIMEOUT 300
#define STREAMTCP_EMERG_CLOSED_TIMEOUT 20
typedef struct StreamTcpThread_ {
uint64_t pkts;
/** queue for pseudo packet(s) that were created in the stream
* process and need further handling. Currently only used when
* receiving (valid) RST packets */
PacketQueue pseudo_queue;
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
uint16_t counter_tcp_ssn_memcap;
/** pseudo packets processed */
uint16_t counter_tcp_pseudo;
TcpReassemblyThreadCtx *ra_ctx; /**< tcp reassembly thread data */
} StreamTcpThread;
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);
TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **);
TmEcode StreamTcpThreadDeinit(ThreadVars *, void *);

@ -65,6 +65,24 @@ typedef struct TcpStreamCnf_ {
uint8_t flags;
} TcpStreamCnf;
typedef struct StreamTcpThread_ {
uint64_t pkts;
/** queue for pseudo packet(s) that were created in the stream
* process and need further handling. Currently only used when
* receiving (valid) RST packets */
PacketQueue pseudo_queue;
uint16_t counter_tcp_sessions;
/** sessions not picked up because memcap was reached */
uint16_t counter_tcp_ssn_memcap;
/** pseudo packets processed */
uint16_t counter_tcp_pseudo;
/** tcp reassembly thread data */
TcpReassemblyThreadCtx *ra_ctx;
} StreamTcpThread;
TcpStreamCnf stream_config;
void TmModuleStreamTcpRegister (void);
void StreamTcpInitConfig (char);

@ -1566,6 +1566,8 @@ int main(int argc, char **argv)
/* Disable packet acquire thread first */
TmThreadDisableReceiveThreads();
FlowForceReassembly();
TmThreadKillThreads();
SCPerfReleaseResources();
FlowShutdown();

@ -761,6 +761,34 @@ error:
return TM_ECODE_FAILED;
}
ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot)
{
ThreadVars *tv;
int i;
SCMutexLock(&tv_root_lock);
for (i = 0; i < TVT_MAX; i++) {
tv = tv_root[i];
while (tv) {
TmSlot *slots = tv->tm_slots;
while (slots != NULL) {
if (slots == tm_slot) {
SCMutexUnlock(&tv_root_lock);
return tv;
}
slots = slots->slot_next;
}
tv = tv->next;
}
}
SCMutexUnlock(&tv_root_lock);
return NULL;
}
/**
* \brief Appends a new entry to the slots.
*

@ -118,6 +118,7 @@ void TmThreadsMSMasterDisableSlaveSyncPt(ThreadVars *, const char *);
void TmThreadsMSMasterReleaseSlaveAllSyncPts(ThreadVars *);
void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv);
ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *);
void TmThreadDisableReceiveThreads(void);
#if 0

Loading…
Cancel
Save