diff --git a/src/flow.c b/src/flow.c index df5d757455..a9bf08d8cd 100644 --- a/src/flow.c +++ b/src/flow.c @@ -92,6 +92,10 @@ static TmSlot *stream_pseudo_pkt_detect_tm_slot = NULL; static ThreadVars *stream_pseudo_pkt_detect_TV = NULL; static ThreadVars *stream_pseudo_pkt_detect_prev_TV = NULL; +static TmSlot *stream_pseudo_pkt_decode_tm_slot = NULL; + +static ThreadVars *flow_manager_TV = NULL; + /** \brief Initialize the l7data ptr in the Flow session used by the L7 Modules * for data storage. * @@ -189,6 +193,273 @@ static uint64_t prune_no_timeout = 0; static uint64_t prune_usecnt = 0; #endif +static inline Packet *FFRPseudoPacketSetup(int direction, Flow *f) +{ + Packet *p = PacketGetFromQueueOrAlloc(); + if (p == NULL) + return NULL; + + p->proto = IPPROTO_TCP; + p->flow = f; + p->flags |= PKT_STREAM_EST; + p->flags |= PKT_STREAM_EOF; + p->flags |= PKT_HAS_FLOW; + p->flags |= PKT_PSEUDO_STREAM_END; + if (direction == 0) + p->flowflags |= FLOW_PKT_TOSERVER; + else + p->flowflags |= FLOW_PKT_TOCLIENT; + p->flowflags |= FLOW_PKT_ESTABLISHED; + if (direction == 0) { + COPY_ADDRESS(&f->src, &p->src); + COPY_ADDRESS(&f->dst, &p->dst); + p->sp = f->sp; + p->dp = f->dp; + } else { + COPY_ADDRESS(&f->src, &p->dst); + COPY_ADDRESS(&f->dst, &p->src); + p->sp = f->dp; + p->dp = f->sp; + } + p->payload = NULL; + p->payload_len = 0; + if (f->src.family == AF_INET) { + /* set the ip header */ + p->ip4h = (IPV4Hdr *)p->pkt; + /* version 4 and length 20 bytes for the tcp header */ + p->ip4h->ip_verhl = 0x45; + p->ip4h->ip_tos = 0; + p->ip4h->ip_len = htons(40); + p->ip4h->ip_id = 0; + p->ip4h->ip_off = 0; + p->ip4h->ip_ttl = 64; + p->ip4h->ip_proto = IPPROTO_TCP; + //p->ip4h->ip_csum = + if (direction == 0) { + p->ip4h->ip_src.s_addr = f->src.addr_data32[0]; + p->ip4h->ip_dst.s_addr = f->dst.addr_data32[0]; + } else { + p->ip4h->ip_src.s_addr = f->dst.addr_data32[0]; + p->ip4h->ip_dst.s_addr = f->src.addr_data32[0]; + } + + /* set the tcp header */ + p->tcph = (TCPHdr *)((uint8_t *)p->pkt + 20); + + } else { + /* set the ip header */ + p->ip6h = (IPV6Hdr *)p->pkt; + /* version 6 */ + p->ip6h->s_ip6_vfc = 0x60; + p->ip6h->s_ip6_flow = 0; + p->ip6h->s_ip6_nxt = IPPROTO_TCP; + p->ip6h->s_ip6_plen = htons(20); + p->ip6h->s_ip6_hlim = 64; + if (direction == 0) { + p->ip6h->ip6_src[0] = f->src.addr_data32[0]; + p->ip6h->ip6_src[1] = f->src.addr_data32[0]; + p->ip6h->ip6_src[2] = f->src.addr_data32[0]; + p->ip6h->ip6_src[3] = f->src.addr_data32[0]; + p->ip6h->ip6_dst[0] = f->dst.addr_data32[0]; + p->ip6h->ip6_dst[1] = f->dst.addr_data32[0]; + p->ip6h->ip6_dst[2] = f->dst.addr_data32[0]; + p->ip6h->ip6_dst[3] = f->dst.addr_data32[0]; + } else { + p->ip6h->ip6_src[0] = f->dst.addr_data32[0]; + p->ip6h->ip6_src[1] = f->dst.addr_data32[0]; + p->ip6h->ip6_src[2] = f->dst.addr_data32[0]; + p->ip6h->ip6_src[3] = f->dst.addr_data32[0]; + p->ip6h->ip6_dst[0] = f->src.addr_data32[0]; + p->ip6h->ip6_dst[1] = f->src.addr_data32[0]; + p->ip6h->ip6_dst[2] = f->src.addr_data32[0]; + p->ip6h->ip6_dst[3] = f->src.addr_data32[0]; + } + + /* set the tcp header */ + p->tcph = (TCPHdr *)((uint8_t *)p->pkt + 40); + } + if (direction == 0) { + p->tcph->th_sport = htons(f->sp); + p->tcph->th_dport = htons(f->dp); + } else { + p->tcph->th_sport = htons(f->dp); + p->tcph->th_dport = htons(f->sp); + } + + return p; +} + +/** + * \internal + * \brief Forces reassembly for flow if it needs it. + * + * The function requires flow to be locked beforehand. + * + * \param f Pointer to the flow. + * + * \retval 0 This flow doesn't need any reassembly processing; 1 otherwise. + */ +static inline int FlowForceReassemblyForFlowV2(ThreadVars *tv, Flow *f) +{ + TcpSession *ssn; + + int client_ok = 1; + int server_ok = 1; + + //Packet *reassemble_p; + + /* looks like we have no flows in this queue */ + if (f == NULL || f->flags & FLOW_TIMEOUT_REASSEMBLY_DONE) { + return 0; + } + + /* Get the tcp session for the flow */ + ssn = (TcpSession *)f->protoctx; + /* \todo Also skip flows that shouldn't be inspected */ + if (ssn == NULL) { + return 0; + } + + if (ssn->client.seg_list == NULL || + (ssn->client.seg_list_tail->flags & SEGMENTTCP_FLAG_RAW_PROCESSED && + ssn->client.seg_list_tail->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED && + ssn->toserver_smsg_head == NULL)) { + client_ok = 0; + } + if (ssn->server.seg_list == NULL || + (ssn->server.seg_list_tail->flags & SEGMENTTCP_FLAG_RAW_PROCESSED && + ssn->server.seg_list_tail->flags & SEGMENTTCP_FLAG_APPLAYER_PROCESSED && + ssn->toclient_smsg_head == NULL)) { + server_ok = 0; + } + + /* nothing to do */ + if (client_ok == 0 && server_ok == 0) { + return 0; + } + + /* Only the flow manager thread has the ability to force reassembly on + * to-be pruned flows */ + if (!(tv != NULL && tv == flow_manager_TV)) { + SCSpinUnlock(&f->fb->s); + SCMutexUnlock(&f->m); + return 1; + } + + f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; + + /* we need to incr use counts before we unlock the flow */ + if (client_ok == 1) { + FlowIncrUsecnt(f); + } + if (server_ok == 1) { + FlowIncrUsecnt(f); + } + + /* move this unlock after the strream reassemble call */ + SCSpinUnlock(&f->fb->s); + + if (suricata_ctl_flags != 0) { + return 1; + } + + Packet *p1 = NULL, *p2 = NULL, *p3 = NULL; + + /* insert a pseudo packet in the toserver direction */ + if (client_ok == 1) { + p1 = FFRPseudoPacketSetup(1, f); + if (p1 == NULL) + exit(0); + p1->tcph->th_seq = htons(ssn->server.next_seq); + p1->tcph->th_ack = htons(ssn->client.seg_list_tail->seq + + ssn->client.seg_list_tail->payload_len); + p1->tcph->th_offx2 = 0x50; + p1->tcph->th_flags |= TH_ACK; + p1->tcph->th_win = 10; + p1->tcph->th_urp = 0; + p1->tcph->th_sum = TCPCalculateChecksum((uint16_t *)&(p1->ip4h->ip_src), + (uint16_t *)p1->tcph, 20); + + p2 = FFRPseudoPacketSetup(0, f); + if (p2 == NULL) { + exit(0); + } + + if (server_ok == 1) { + p2->tcph->th_seq = htons(ssn->client.next_seq); + p2->tcph->th_ack = htons(ssn->server.seg_list_tail->seq + + ssn->server.seg_list_tail->payload_len); + p2->tcph->th_offx2 = 0x50; + p2->tcph->th_flags |= TH_ACK; + p2->tcph->th_win = 10; + p2->tcph->th_urp = 0; + p2->tcph->th_sum = TCPCalculateChecksum((uint16_t *)&(p2->ip4h->ip_src), + (uint16_t *)p2->tcph, 20); + + p3 = FFRPseudoPacketSetup(1, f); + if (p3 == NULL) { + exit(0); + } + p3->tcph->th_seq = htons(ssn->server.next_seq); + p3->tcph->th_ack = htons(ssn->client.seg_list_tail->seq + + ssn->client.seg_list_tail->payload_len); + p3->tcph->th_offx2 = 0x50; + p3->tcph->th_flags |= TH_ACK; + p3->tcph->th_win = 10; + p3->tcph->th_urp = 0; + p3->tcph->th_sum = TCPCalculateChecksum((uint16_t *)&(p3->ip4h->ip_src), + (uint16_t *)p3->tcph, 20); + } else { + p2->tcph->th_seq = htons(ssn->client.next_seq); + p2->tcph->th_ack = htons(ssn->server.last_ack); + p2->tcph->th_offx2 = 0x50; + p2->tcph->th_flags |= TH_ACK; + p2->tcph->th_win = 10; + p2->tcph->th_urp = 0; + p2->tcph->th_sum = TCPCalculateChecksum((uint16_t *)&(p2->ip4h->ip_src), + (uint16_t *)p2->tcph, 20); + } + } else { + p1 = FFRPseudoPacketSetup(0, f); + if (p1 == NULL) { + exit(0); + } + p1->tcph->th_seq = htons(ssn->client.next_seq); + p1->tcph->th_ack = htons(ssn->server.seg_list_tail->seq + + ssn->server.seg_list_tail->payload_len); + p1->tcph->th_offx2 = 0x50; + p1->tcph->th_flags |= TH_ACK; + p1->tcph->th_win = 10; + p1->tcph->th_urp = 0; + p1->tcph->th_sum = TCPCalculateChecksum((uint16_t *)&(p1->ip4h->ip_src), + (uint16_t *)p1->tcph, 20); + + p2 = FFRPseudoPacketSetup(1, f); + if (p2 == NULL) { + exit(0); + } + p2->tcph->th_seq = htons(ssn->server.next_seq); + p2->tcph->th_ack = htons(ssn->client.last_ack); + p2->tcph->th_offx2 = 0x50; + p2->tcph->th_flags |= TH_ACK; + p2->tcph->th_win = 10; + p2->tcph->th_urp = 0; + p2->tcph->th_sum = TCPCalculateChecksum((uint16_t *)&(p2->ip4h->ip_src), + (uint16_t *)p2->tcph, 20); + } + f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; + SCMutexUnlock(&f->m); + + SCMutexLock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); + PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p1); + PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p2); + if (p3 != NULL) + PacketEnqueue(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq, p3); + SCMutexUnlock(&stream_pseudo_pkt_decode_tm_slot->slot_post_pq.mutex_q); + + return 1; +} + /** FlowPrune * * Inspect top (last recently used) flow from the queue and see if @@ -203,7 +474,7 @@ static uint64_t prune_usecnt = 0; * \retval 0 on error, failed block, nothing to prune * \retval 1 on successfully pruned one */ -static int FlowPrune (FlowQueue *q, struct timeval *ts) +static int FlowPrune (ThreadVars *tv, FlowQueue *q, struct timeval *ts) { SCEnter(); int mr = SCMutexTrylock(&q->mutex_q); @@ -327,6 +598,10 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts) return 0; } + if (FlowForceReassemblyForFlowV2(tv, f) == 1) { + return 0; + } + /* this should not be possible */ BUG_ON(SC_ATOMIC_GET(f->use_cnt) > 0); @@ -359,11 +634,11 @@ static int FlowPrune (FlowQueue *q, struct timeval *ts) * \param timeout timeout to consider * \retval cnt number of flows that are timed out */ -static uint32_t FlowPruneFlowQueue(FlowQueue *q, struct timeval *ts) +static uint32_t FlowPruneFlowQueue(ThreadVars *tv, FlowQueue *q, struct timeval *ts) { SCEnter(); uint32_t cnt = 0; - while(FlowPrune(q, ts)) { cnt++; } + while(FlowPrune(tv, q, ts)) { cnt++; } return cnt; } @@ -570,7 +845,7 @@ static uint32_t FlowPruneFlowQueueCnt(FlowQueue *q, struct timeval *ts, int try_ SCEnter(); uint32_t cnt = 0; while (try_cnt--) { - cnt += FlowPrune(q, ts); + cnt += FlowPrune(NULL, q, ts); } return cnt; } @@ -1339,6 +1614,8 @@ void *FlowManagerThread(void *td) memset(&ts, 0, sizeof(ts)); + flow_manager_TV = th_v; + /* get StreamTCP TM's slot and TV containing this slot */ stream_pseudo_pkt_stream_tm_slot = TmSlotGetSlotForTM(TMM_STREAMTCP); if (stream_pseudo_pkt_stream_tm_slot == NULL) { @@ -1379,6 +1656,14 @@ void *FlowManagerThread(void *td) stream_pseudo_pkt_detect_TV = NULL; } + stream_pseudo_pkt_decode_tm_slot = TmThreadGetFirstTmSlotForPartialPattern("decode"); + if (stream_pseudo_pkt_decode_tm_slot == NULL) { + /* yes, this is fatal! */ + SCLogError(SC_ERR_TM_MODULES_ERROR, "Looks like we have failed to " + "retrieve the slot for DECODE TM"); + exit(EXIT_FAILURE); + } + /* set the thread name */ SCSetThreadName(th_v->name); SCLogDebug("%s started...", th_v->name); @@ -1417,21 +1702,21 @@ void *FlowManagerThread(void *td) int i; for (i = 0; i < FLOW_PROTO_MAX; i++) { /* prune closing list */ - nowcnt = FlowPruneFlowQueue(&flow_close_q[i], &ts); + nowcnt = FlowPruneFlowQueue(th_v, &flow_close_q[i], &ts); if (nowcnt) { SCLogDebug("Pruned %" PRIu32 " closing flows...", nowcnt); closing_cnt += nowcnt; } /* prune new list */ - nowcnt = FlowPruneFlowQueue(&flow_new_q[i], &ts); + nowcnt = FlowPruneFlowQueue(th_v, &flow_new_q[i], &ts); if (nowcnt) { SCLogDebug("Pruned %" PRIu32 " new flows...", nowcnt); new_cnt += nowcnt; } /* prune established list */ - nowcnt = FlowPruneFlowQueue(&flow_est_q[i], &ts); + nowcnt = FlowPruneFlowQueue(th_v, &flow_est_q[i], &ts); if (nowcnt) { SCLogDebug("Pruned %" PRIu32 " established flows...", nowcnt); established_cnt += nowcnt; @@ -1953,7 +2238,7 @@ static int FlowTestPrune(Flow *f, struct timeval *ts) { } SCLogDebug("calling FlowPrune"); - FlowPrune(q, ts); + FlowPrune(NULL, q, ts); if (q->len != 0) { printf("Failed in prunning the flow: "); goto error; diff --git a/src/flow.h b/src/flow.h index 0a33ce3891..2c20014ace 100644 --- a/src/flow.h +++ b/src/flow.h @@ -89,6 +89,7 @@ #define FLOW_TC_PP_ALPROTO_DETECT_DONE 0x00200000 /* Both pattern matcher and probing parser alproto detection done */ #define FLOW_TC_PM_PP_ALPROTO_DETECT_DONE 0x00400000 +#define FLOW_TIMEOUT_REASSEMBLY_DONE 0x00800000 /* pkt flow flags */ #define FLOW_PKT_TOSERVER 0x01 diff --git a/src/stream-tcp.c b/src/stream-tcp.c index 11bb40957f..fac765e4a9 100644 --- a/src/stream-tcp.c +++ b/src/stream-tcp.c @@ -529,7 +529,7 @@ TcpSession *StreamTcpNewSession (Packet *p) static void StreamTcpPacketSetState(Packet *p, TcpSession *ssn, uint8_t state) { - if (state == ssn->state) + if (state == ssn->state || PKT_IS_PSEUDOPKT(p)) return; ssn->state = state; diff --git a/src/tm-threads.c b/src/tm-threads.c index 51d82c78ff..3dabca835f 100644 --- a/src/tm-threads.c +++ b/src/tm-threads.c @@ -162,7 +162,9 @@ void *TmThreadsSlot1NoIn(void *td) /* handle post queue */ while (s->slot_post_pq.top != NULL) { + SCMutexLock(&s->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&s->slot_post_pq); + SCMutexUnlock(&s->slot_post_pq.mutex_q); if (extra_p != NULL) tv->tmqh_out(tv, extra_p); } @@ -402,7 +404,9 @@ void *TmThreadsSlot1(void *td) while (s->slot_post_pq.top != NULL) { /* handle new packets from this func */ + SCMutexLock(&s->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&s->slot_post_pq); + SCMutexUnlock(&s->slot_post_pq.mutex_q); if (extra_p != NULL) { tv->tmqh_out(tv, extra_p); } @@ -663,7 +667,9 @@ void *TmThreadsSlotVar(void *td) TmSlot *slot; for (slot = s; slot != NULL; slot = slot->slot_next) { while (slot->slot_post_pq.top != NULL) { + SCMutexLock(&slot->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&slot->slot_post_pq); + SCMutexUnlock(&slot->slot_post_pq.mutex_q); if (extra_p == NULL) break; @@ -1439,6 +1445,37 @@ void TmThreadDisableReceiveThreads(void) return; } +TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *tm_name) +{ + ThreadVars *tv = NULL; + TmSlot *slots = NULL; + + SCMutexLock(&tv_root_lock); + + /* all receive threads are part of packet processing threads */ + tv = tv_root[TVT_PPT]; + + while (tv) { + slots = tv->tm_slots; + + while (slots != NULL) { + TmModule *tm = TmModuleGetById(slots->tm_id); + + char *found = strcasestr(tm->name, tm_name); + if (found != NULL) + goto end; + + slots = slots->slot_next; + } + + tv = tv->next; + } + + end: + SCMutexUnlock(&tv_root_lock); + return slots; +} + void TmThreadKillThreads(void) { ThreadVars *tv = NULL; int i = 0; diff --git a/src/tm-threads.h b/src/tm-threads.h index 43a921156b..271514a2c6 100644 --- a/src/tm-threads.h +++ b/src/tm-threads.h @@ -123,6 +123,7 @@ void TmThreadsMSMasterDisableSlaveAllSyncPts(ThreadVars *tv); ThreadVars *TmThreadsGetTVContainingSlot(TmSlot *); void TmThreadDisableReceiveThreads(void); +TmSlot *TmThreadGetFirstTmSlotForPartialPattern(const char *); #if 0 @@ -153,7 +154,9 @@ void TmThreadDisableReceiveThreads(void); TmSlot *slot = (s); \ while (slot != NULL) { \ while (slot->slot_post_pq.top != NULL) { \ + SCMutexLock(&slot->slot_post_pq.mutex_q); \ Packet *extra_p = PacketDequeue(&slot->slot_post_pq);\ + SCMutexUnlock(&slot->slot_post_pq.mutex_q); \ if (extra_p != NULL) { \ if (slot->slot_next != NULL) { \ r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next); \ @@ -205,7 +208,9 @@ static inline TmEcode TmThreadsSlotProcessPkt(ThreadVars *tv, TmSlot *s, Packet TmSlot *slot = s; while (slot != NULL) { while (slot->slot_post_pq.top != NULL) { + SCMutexLock(&slot->slot_post_pq.mutex_q); Packet *extra_p = PacketDequeue(&slot->slot_post_pq); + SCMutexUnlock(&slot->slot_post_pq.mutex_q); if (extra_p != NULL) { if (slot->slot_next != NULL) { r = TmThreadsSlotVarRun(tv, extra_p, slot->slot_next);