diff --git a/src/flow.c b/src/flow.c index 0441eb4aff..5bf0da13e9 100644 --- a/src/flow.c +++ b/src/flow.c @@ -44,6 +44,10 @@ #include "flow-var.h" #include "flow-private.h" +#include "stream-tcp-private.h" +#include "stream-tcp-reassemble.h" +#include "stream-tcp.h" + #include "util-unittest.h" #include "util-unittest-helper.h" #include "util-byte.h" @@ -81,6 +85,13 @@ int FlowKill (FlowQueue *); /* Run mode selected at suricata.c */ extern int run_mode; +static TmSlot *stream_pseudo_pkt_stream_tm_slot = NULL; +static ThreadVars *stream_pseudo_pkt_stream_TV = NULL; + +static TmSlot *stream_pseudo_pkt_detect_tm_slot = NULL; +static ThreadVars *stream_pseudo_pkt_detect_TV = NULL; +static ThreadVars *stream_pseudo_pkt_detect_prev_TV = NULL; + /** \brief Initialize the l7data ptr in the Flow session used by the L7 Modules * for data storage. * @@ -994,6 +1005,261 @@ void FlowShutdown(void) { } } +/** + * \internal + * \brief Forces reassembly for flows that need it. + * + * Please note we don't use locks anywhere. This function is to be + * called right when the engine is not doing anything. + * + * \param q The queue to process flows from. + */ +static inline void FlowForceReassemblyForQ(FlowQueue *q) +{ + Flow *f; + TcpSession *ssn; + + /* no locks needed, since the engine is virtually dead. + * We are the kings here */ + + /* get the topmost flow from the QUEUE */ + f = q->top; + /* looks like we have no flows in this queue */ + if (f == NULL) { + return; + } + + /* we need to loop through all the flows in the queue */ + while (f != NULL) { + /* We use this packet just for reassembly purpose */ + Packet reassemble_p; + memset(&reassemble_p, 0, sizeof(Packet)); + + /* Get the tcp session for the flow */ + ssn = (TcpSession *)f->protoctx; + + /* \todo Also skip flows that shouldn't be inspected */ + if (ssn == NULL) { + f = f->lnext; + continue; + } + + /* ah ah! We have some unattended toserver segments */ + if (ssn->client.seg_list != NULL) { + StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data; + + ssn->client.last_ack = (ssn->client.seg_list_tail->seq + + ssn->client.seg_list_tail->payload_len); + reassemble_p.flow = f; + reassemble_p.flags |= PKT_PSEUDO_STREAM_END; + StreamTcpReassembleHandleSegment(stream_pseudo_pkt_detect_TV, + stt->ra_ctx, ssn, &ssn->server, + &reassemble_p, NULL); + StreamTcpReassembleProcessAppLayer(stt->ra_ctx); + } + /* oh oh! We have some unattended toclient segments */ + if (ssn->server.seg_list != NULL) { + StreamTcpThread *stt = stream_pseudo_pkt_stream_tm_slot->slot_data; + + ssn->server.last_ack = (ssn->server.seg_list_tail->seq + + ssn->server.seg_list_tail->payload_len); + reassemble_p.flow = f; + reassemble_p.flags |= PKT_PSEUDO_STREAM_END; + StreamTcpReassembleHandleSegment(stream_pseudo_pkt_detect_TV, + stt->ra_ctx, ssn, &ssn->client, + &reassemble_p, NULL); + StreamTcpReassembleProcessAppLayer(stt->ra_ctx); + } + + /* insert a pseudo packet in the toserver direction */ + if (ssn->client.seg_list != NULL) { + Packet *p = PacketGetFromQueueOrAlloc(); + if (p == NULL) { + printf("packet is NULL\n"); + exit(0); + } + p->proto = IPPROTO_TCP; + p->flow = f; + p->flags |= PKT_STREAM_EST; + p->flags |= PKT_STREAM_EOF; + p->flags |= PKT_HAS_FLOW; + p->flags |= PKT_PSEUDO_STREAM_END; + p->flowflags |= FLOW_PKT_TOSERVER; + p->flowflags |= FLOW_PKT_ESTABLISHED; + COPY_ADDRESS(&f->src, &p->src); + COPY_ADDRESS(&f->dst, &p->dst); + p->sp = f->sp; + p->dp = f->dp; + p->payload = NULL; + p->payload_len = 0; + + /* set the ip header */ + p->ip4h = (IPV4Hdr *)p->pkt; + /* version 4 and length 20 bytes for the tcp header */ + p->ip4h->ip_verhl = 0x45; + p->ip4h->ip_len = htons(40); + p->ip4h->ip_proto = IPPROTO_TCP; + p->ip4h->ip_src.s_addr = f->src.addr_data32[0]; + p->ip4h->ip_dst.s_addr = f->dst.addr_data32[0]; + + /* set the tcp header */ + p->tcph = (TCPHdr *)((uint8_t *)p->pkt + 20); + p->tcph->th_sport = htons(f->sp); + p->tcph->th_dport = htons(f->dp); + + if (stream_pseudo_pkt_detect_prev_TV != NULL) { + stream_pseudo_pkt_detect_prev_TV-> + tmqh_out(stream_pseudo_pkt_detect_prev_TV, p); + } else { + TmSlot *s = stream_pseudo_pkt_detect_tm_slot; + while (s != NULL) { + s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq, + &s->slot_post_pq); + s = s->slot_next; + } + + if (stream_pseudo_pkt_detect_TV != NULL) { + stream_pseudo_pkt_detect_TV-> + tmqh_out(stream_pseudo_pkt_detect_TV, p); + } + } + } /* if (ssn->client.seg_list != NULL) */ + if (ssn->server.seg_list != NULL) { + Packet *p = PacketGetFromQueueOrAlloc(); + if (p == NULL) { + printf("packet is NULL\n"); + exit(0); + } + p->proto = IPPROTO_TCP; + p->flow = f; + p->flags |= PKT_STREAM_EST; + p->flags |= PKT_STREAM_EOF; + p->flags |= PKT_HAS_FLOW; + p->flags |= PKT_PSEUDO_STREAM_END; + p->flowflags |= FLOW_PKT_TOCLIENT; + p->flowflags |= FLOW_PKT_ESTABLISHED; + COPY_ADDRESS(&f->dst, &p->src); + COPY_ADDRESS(&f->src, &p->dst); + p->dp = f->sp; + p->sp = f->dp; + p->payload = NULL; + p->payload_len = 0; + + /* set the ip header */ + p->ip4h = (IPV4Hdr *)p->pkt; + /* version 4 and length 20 bytes for the tcp header */ + p->ip4h->ip_verhl = 0x45; + p->ip4h->ip_len = htons(40); + p->ip4h->ip_proto = IPPROTO_TCP; + p->ip4h->ip_src.s_addr = f->dst.addr_data32[0]; + p->ip4h->ip_dst.s_addr = f->src.addr_data32[0]; + + /* set the tcp header */ + p->tcph = (TCPHdr *)((uint8_t *)p->pkt + 20); + p->tcph->th_sport = htons(f->dp); + p->tcph->th_dport = htons(f->sp); + + if (stream_pseudo_pkt_detect_prev_TV != NULL) { + stream_pseudo_pkt_detect_prev_TV-> + tmqh_out(stream_pseudo_pkt_detect_prev_TV, p); + } else { + TmSlot *s = stream_pseudo_pkt_detect_tm_slot; + while (s != NULL) { + s->SlotFunc(NULL, p, s->slot_data, &s->slot_pre_pq, + &s->slot_post_pq); + s = s->slot_next; + } + + if (stream_pseudo_pkt_detect_TV != NULL) { + stream_pseudo_pkt_detect_TV-> + tmqh_out(stream_pseudo_pkt_detect_TV, p); + } + } + } /* if (ssn->server.seg_list != NULL) */ + + /* next flow in the queue */ + f = f->lnext; + } /* while (f != NULL) */ + + return; +} + +/** + * \brief Force reassembly for all the flows that have unprocessed segments. + */ +void FlowForceReassembly(void) +{ + /* Do remember. We need to have packet acquired disabled by now */ + + /** ----- Part 1 ----- **/ + /* First we need to kill the flow manager thread */ + + ThreadVars *tv = NULL; + + SCMutexLock(&tv_root_lock); + + /* flow manager thread(s) is/are a part of mgmt threads */ + tv = tv_root[TVT_MGMT]; + + while (tv != NULL) { + if (strcasecmp(tv->name, "FlowManagerThread") == 0) + break; + tv = tv->next; + } + + /* not possible, unless someone decides to rename FlowManagerThread */ + if (tv == NULL) { + SCMutexUnlock(&tv_root_lock); + abort(); + } + + TmThreadsSetFlag(tv, THV_KILL); + + /* be sure it has shut down */ + while (!TmThreadsCheckFlag(tv, THV_CLOSED)) { + usleep(100); + } + + SCMutexUnlock(&tv_root_lock); + + + /** ----- Part 2 ----- **/ + /* Check if all threads are idle. We need this so that we have all + * packets freeds. As a consequence, no flows are in use */ + + SCMutexLock(&tv_root_lock); + + /* all receive threads are part of packet processing threads */ + tv = tv_root[TVT_PPT]; + + /* we are doing this in order receive -> decode -> ... -> log */ + while (tv != NULL) { + if (tv->inq != NULL) { + /* we wait till we dry out all the inq packets, before we + * kill this thread. Do note that you should have disabled + * packet acquire by now using TmThreadDisableReceiveThreads()*/ + if (!(strlen(tv->inq->name) == strlen("packetpool") && + strcasecmp(tv->inq->name, "packetpool") == 0)) { + PacketQueue *q = &trans_q[tv->inq->id]; + while (q->len != 0) { + usleep(100); + } + } + } + tv = tv->next; + } + + SCMutexUnlock(&tv_root_lock); + + /** ----- Part 3 ----- **/ + /* Carry out flow reassembly for unattended flows */ + FlowForceReassemblyForQ(&flow_new_q[FLOW_PROTO_TCP]); + FlowForceReassemblyForQ(&flow_est_q[FLOW_PROTO_TCP]); + FlowForceReassemblyForQ(&flow_close_q[FLOW_PROTO_TCP]); + + //exit(EXIT_FAILURE); +} + /** \brief Thread that manages the various queue's and removes timed out flows. * \param td ThreadVars casted to void ptr * @@ -1019,6 +1285,46 @@ void *FlowManagerThread(void *td) memset(&ts, 0, sizeof(ts)); + /* get StreamTCP TM's slot and TV containing this slot */ + stream_pseudo_pkt_stream_tm_slot = TmSlotGetSlotForTM(TMM_STREAMTCP); + if (stream_pseudo_pkt_stream_tm_slot == NULL) { + /* yes, this is fatal! */ + SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to " + "retrieve the slot for STREAMTCP TM"); + exit(EXIT_FAILURE); + } + stream_pseudo_pkt_stream_TV = + TmThreadsGetTVContainingSlot(stream_pseudo_pkt_stream_tm_slot); + if (stream_pseudo_pkt_stream_TV == NULL) { + /* yes, this is fatal! */ + SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to " + "retrieve the TV containing STREAMTCP TM slot"); + exit(EXIT_FAILURE); + } + + /* get detect TM's slot and TV containing this slot */ + stream_pseudo_pkt_detect_tm_slot = TmSlotGetSlotForTM(TMM_DETECT); + if (stream_pseudo_pkt_detect_tm_slot == NULL) { + /* yes, this is fatal! */ + SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to " + "retrieve a slot for DETECT TM"); + exit(EXIT_FAILURE); + } + stream_pseudo_pkt_detect_TV = + TmThreadsGetTVContainingSlot(stream_pseudo_pkt_detect_tm_slot); + if (stream_pseudo_pkt_detect_TV == NULL) { + /* yes, this is fatal! */ + SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to " + "retrieve the TV containing the Detect TM slot"); + exit(EXIT_FAILURE); + } + if (stream_pseudo_pkt_detect_TV->tm_slots == stream_pseudo_pkt_detect_tm_slot) { + stream_pseudo_pkt_detect_prev_TV = stream_pseudo_pkt_detect_TV->prev; + } + if (stream_pseudo_pkt_detect_TV->next == NULL) { + stream_pseudo_pkt_detect_TV = NULL; + } + /* set the thread name */ SCSetThreadName(th_v->name); SCLogDebug("%s started...", th_v->name); diff --git a/src/flow.h b/src/flow.h index 731f6569c6..0a33ce3891 100644 --- a/src/flow.h +++ b/src/flow.h @@ -267,6 +267,8 @@ int FlowSetProtoFreeFunc (uint8_t , void (*Free)(void *)); int FlowSetFlowStateFunc (uint8_t , int (*GetProtoState)(void *)); void FlowUpdateQueue(Flow *); +void FlowForceReassembly(void); + static inline void FlowLockSetNoPacketInspectionFlag(Flow *); static inline void FlowSetNoPacketInspectionFlag(Flow *); static inline void FlowLockSetNoPayloadInspectionFlag(Flow *); diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 519012bb28..11bb40957f 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -82,23 +82,6 @@ #define STREAMTCP_EMERG_EST_TIMEOUT 300 #define STREAMTCP_EMERG_CLOSED_TIMEOUT 20 -typedef struct StreamTcpThread_ { - uint64_t pkts; - - /** queue for pseudo packet(s) that were created in the stream - * process and need further handling. Currently only used when - * receiving (valid) RST packets */ - PacketQueue pseudo_queue; - - uint16_t counter_tcp_sessions; - /** sessions not picked up because memcap was reached */ - uint16_t counter_tcp_ssn_memcap; - /** pseudo packets processed */ - uint16_t counter_tcp_pseudo; - - TcpReassemblyThreadCtx *ra_ctx; /**< tcp reassembly thread data */ -} StreamTcpThread; - TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *); TmEcode StreamTcpThreadInit(ThreadVars *, void *, void **); TmEcode StreamTcpThreadDeinit(ThreadVars *, void *); diff --git a/src/stream-tcp.h b/src/stream-tcp.h index 47b18a0d1a..27dddc170f 100644 --- a/src/stream-tcp.h +++ b/src/stream-tcp.h @@ -65,6 +65,24 @@ typedef struct TcpStreamCnf_ { uint8_t flags; } TcpStreamCnf; +typedef struct StreamTcpThread_ { + uint64_t pkts; + + /** queue for pseudo packet(s) that were created in the stream + * process and need further handling. Currently only used when + * receiving (valid) RST packets */ + PacketQueue pseudo_queue; + + uint16_t counter_tcp_sessions; + /** sessions not picked up because memcap was reached */ + uint16_t counter_tcp_ssn_memcap; + /** pseudo packets processed */ + uint16_t counter_tcp_pseudo; + + /** tcp reassembly thread data */ + TcpReassemblyThreadCtx *ra_ctx; +} StreamTcpThread; + TcpStreamCnf stream_config; void TmModuleStreamTcpRegister (void); void StreamTcpInitConfig (char); diff --git a/src/suricata.c b/src/suricata.c index 7f8d9fdb6b..cc2a80fef2 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -1566,6 +1566,8 @@ int main(int argc, char **argv) /* Disable packet acquire thread first */ TmThreadDisableReceiveThreads(); + FlowForceReassembly(); + TmThreadKillThreads(); SCPerfReleaseResources(); FlowShutdown(); diff --git a/src/tm-threads.c b/src/tm-threads.c index 02ac828e58..7264c84cbd 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -761,6 +761,34 @@ error: return TM_ECODE_FAILED; } +ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *tm_slot) +{ + ThreadVars *tv; + int i; + + SCMutexLock(&tv_root_lock); + + for (i = 0; i < TVT_MAX; i++) { + tv = tv_root[i]; + + while (tv) { + TmSlot *slots = tv->tm_slots; + while (slots != NULL) { + if (slots == tm_slot) { + SCMutexUnlock(&tv_root_lock); + return tv; + } + slots = slots->slot_next; + } + tv = tv->next; + } + } + + SCMutexUnlock(&tv_root_lock); + + return NULL; +} + /** * \brief Appends a new entry to the slots. * diff --git a/src/tm-threads.h b/src/tm-threads.h index 7fcbceff10..2aa747debc 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -118,6 +118,7 @@ void TmThreadsMSMasterDisableSlaveSyncPt(ThreadVars *, const char *); void TmThreadsMSMasterReleaseSlaveAllSyncPts(ThreadVars *); void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv); +ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *); void TmThreadDisableReceiveThreads(void); #if 0