diff --git a/etc/schema.json b/etc/schema.json index f0a8ba5af9..a69bcec35e 100644 --- a/etc/schema.json +++ b/etc/schema.json @@ -1893,6 +1893,9 @@ "alerted": { "type": "boolean" }, + "elephant": { + "type": "boolean" + }, "bypass": { "type": "string" }, @@ -6301,6 +6304,10 @@ "description": "Total number of flows", "type": "integer" }, + "elephant": { + "description": "Total number of elephant flows", + "type": "integer" + }, "udp": { "description": "Number of UDP flows", "type": "integer" diff --git a/src/Makefile.am b/src/Makefile.am index 47fcf90ece..7b69b4f724 100755 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -535,6 +535,7 @@ noinst_HEADERS = \ util-lua-smtp.h \ util-lua-ssh.h \ util-lua-tls.h \ + util-flow-rate.h \ util-macset.h \ util-magic.h \ util-memcmp.h \ @@ -1101,6 +1102,7 @@ libsuricata_c_a_SOURCES = \ util-lua-ssh.c \ util-lua-tls.c \ util-macset.c \ + util-flow-rate.c \ util-magic.c \ util-mem.c \ util-memcmp.c \ diff --git a/src/decode.c b/src/decode.c index 1933745f69..3ba77b9229 100644 --- a/src/decode.c +++ b/src/decode.c @@ -659,6 +659,7 @@ void DecodeRegisterPerfCounters(DecodeThreadVars *dtv, ThreadVars *tv) dtv->counter_flow_icmp4 = StatsRegisterCounter("flow.icmpv4", tv); dtv->counter_flow_icmp6 = StatsRegisterCounter("flow.icmpv6", tv); dtv->counter_flow_tcp_reuse = StatsRegisterCounter("flow.tcp_reuse", tv); + dtv->counter_flow_elephant = StatsRegisterCounter("flow.elephant", tv); dtv->counter_flow_get_used = StatsRegisterCounter("flow.get_used", tv); dtv->counter_flow_get_used_eval = StatsRegisterCounter("flow.get_used_eval", tv); dtv->counter_flow_get_used_eval_reject = StatsRegisterCounter("flow.get_used_eval_reject", tv); diff --git a/src/decode.h b/src/decode.h index b98568a26f..e5147fd8ed 100644 --- a/src/decode.h +++ b/src/decode.h @@ -1004,6 +1004,7 @@ typedef struct DecodeThreadVars_ uint16_t counter_flow_icmp4; uint16_t counter_flow_icmp6; uint16_t counter_flow_tcp_reuse; + uint16_t counter_flow_elephant; uint16_t counter_flow_get_used; uint16_t counter_flow_get_used_eval; uint16_t counter_flow_get_used_eval_reject; diff --git a/src/flow-util.c b/src/flow-util.c index 9e90ae5be9..d9d60a681c 100644 --- a/src/flow-util.c +++ b/src/flow-util.c @@ -36,6 +36,7 @@ #include "util-var.h" #include "util-debug.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "flow-storage.h" #include "detect.h" @@ -202,6 +203,12 @@ void FlowInit(ThreadVars *tv, Flow *f, const Packet *p) FlowSetStorageById(f, MacSetGetFlowStorageID(), ms); } + if (FlowRateStorageEnabled()) { + DEBUG_VALIDATE_BUG_ON(FlowGetStorageById(f, FlowRateGetStorageID()) != NULL); + FlowRateStore *frs = FlowRateStoreInit(); + FlowSetStorageById(f, FlowRateGetStorageID(), frs); + } + SCFlowRunInitCallbacks(tv, f, p); SCReturn; diff --git a/src/flow.c b/src/flow.c index 07562abac7..4374c95556 100644 --- a/src/flow.c +++ b/src/flow.c @@ -53,6 +53,7 @@ #include "util-byte.h" #include "util-misc.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "util-debug.h" @@ -344,6 +345,30 @@ static inline void FlowUpdateTtlTC(Flow *f, uint8_t ttl) f->max_ttl_toclient = MAX(f->max_ttl_toclient, ttl); } +static inline void FlowUpdateFlowRate( + ThreadVars *tv, DecodeThreadVars *dtv, Flow *f, const Packet *p, int dir) +{ + if (FlowRateStorageEnabled()) { + /* No need to update the struct if flow is already marked as elephant flow */ + if (f->flags & FLOW_IS_ELEPHANT) + return; + FlowRateStore *frs = FlowGetStorageById(f, FlowRateGetStorageID()); + if (frs != NULL) { + FlowRateStoreUpdate(frs, p->ts, GET_PKT_LEN(p), dir); + bool fr_exceeds = FlowRateIsExceeding(frs, dir); + if (fr_exceeds) { + SCLogDebug("Flow rate for flow %p exceeds the configured values, marking it as an " + "elephant flow", + f); + f->flags |= FLOW_IS_ELEPHANT; + if (tv != NULL) { + StatsIncr(tv, dtv->counter_flow_elephant); + } + } + } + } +} + static inline void FlowUpdateEthernet( ThreadVars *tv, DecodeThreadVars *dtv, Flow *f, const Packet *p, bool toserver) { @@ -407,6 +432,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars if (pkt_dir == TOSERVER) { f->todstpktcnt++; f->todstbytecnt += GET_PKT_LEN(p); + FlowUpdateFlowRate(tv, dtv, f, p, TOSERVER); p->flowflags = FLOW_PKT_TOSERVER; if (!(f->flags & FLOW_TO_DST_SEEN)) { if (FlowUpdateSeenFlag(p)) { @@ -431,6 +457,7 @@ void FlowHandlePacketUpdate(Flow *f, Packet *p, ThreadVars *tv, DecodeThreadVars } else { f->tosrcpktcnt++; f->tosrcbytecnt += GET_PKT_LEN(p); + FlowUpdateFlowRate(tv, dtv, f, p, TOCLIENT); p->flowflags = FLOW_PKT_TOCLIENT; if (!(f->flags & FLOW_TO_SRC_SEEN)) { if (FlowUpdateSeenFlag(p)) { diff --git a/src/flow.h b/src/flow.h index cf083387a3..53d4c8bb51 100644 --- a/src/flow.h +++ b/src/flow.h @@ -55,7 +55,8 @@ typedef struct AppLayerParserState_ AppLayerParserState; /** next packet in toclient direction will act on updated app-layer state */ #define FLOW_TC_APP_UPDATE_NEXT BIT_U32(2) -// vacancy bit 3 +/** Flow is marked an elephant flow */ +#define FLOW_IS_ELEPHANT BIT_U32(3) // vacancy bit 4 diff --git a/src/output-json-flow.c b/src/output-json-flow.c index 1c4f1dcc3d..fa40453603 100644 --- a/src/output-json-flow.c +++ b/src/output-json-flow.c @@ -334,6 +334,9 @@ static void EveFlowLogJSON(OutputJsonThreadCtx *aft, SCJsonBuilder *jb, Flow *f) if (f->flags & FLOW_WRONG_THREAD) JB_SET_TRUE(jb, "wrong_thread"); + if (f->flags & FLOW_IS_ELEPHANT) + JB_SET_TRUE(jb, "elephant"); + if (f->flags & FLOW_ACTION_DROP) { JB_SET_STRING(jb, "action", "drop"); } else if (f->flags & FLOW_ACTION_PASS) { diff --git a/src/runmode-unittests.c b/src/runmode-unittests.c index 634c918435..f11cce5fb3 100644 --- a/src/runmode-unittests.c +++ b/src/runmode-unittests.c @@ -89,6 +89,7 @@ #include "util-byte.h" #include "util-proto-name.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "util-memrchr.h" #include "util-mpm-ac.h" @@ -205,6 +206,7 @@ static void RegisterUnittests(void) AppLayerUnittestsRegister(); StreamingBufferRegisterTests(); MacSetRegisterTests(); + FlowRateRegisterTests(); #ifdef OS_WIN32 Win32SyscallRegisterTests(); #endif diff --git a/src/suricata.c b/src/suricata.c index ebabf5068a..68aadaf406 100644 --- a/src/suricata.c +++ b/src/suricata.c @@ -126,6 +126,7 @@ #include "util-ioctl.h" #include "util-landlock.h" #include "util-macset.h" +#include "util-flow-rate.h" #include "util-misc.h" #include "util-mpm-hs.h" #include "util-path.h" @@ -2677,6 +2678,7 @@ int PostConfLoadedSetup(SCInstance *suri) RegisterFlowBypassInfo(); MacSetRegisterFlowStorage(); + FlowRateRegisterFlowStorage(); SigTableInit(); diff --git a/src/util-flow-rate.c b/src/util-flow-rate.c new file mode 100644 index 0000000000..44e74dbf7b --- /dev/null +++ b/src/util-flow-rate.c @@ -0,0 +1,629 @@ +/* Copyright (C) 2025 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Shivani Bhardwaj + * + */ + +#include "suricata-common.h" +#include "flow-storage.h" +#include "flow-util.h" +#include "flow-private.h" +#include "util-storage.h" +#include "conf.h" +#include "util-misc.h" +#include "util-byte.h" +#include "util-flow-rate.h" +#include "util-unittest.h" +#include "util-unittest-helper.h" + +FlowStorageId g_flowrate_storage_id = { .id = -1 }; + +FlowRateConfig flow_rate_config; + +static void FlowRateStoreFree(void *ptr) +{ + FlowRateStore *frs = (FlowRateStore *)ptr; + size_t total_free = 0; + if (frs == NULL) + return; + + for (int i = 0; i < 2; i++) { + if (frs->dir[i].buf != NULL) { + SCFree(frs->dir[i].buf); + total_free += (frs->dir[i].size * sizeof(uint64_t)); + } + } + + SCFree(frs); + total_free += sizeof(*frs); + (void)SC_ATOMIC_SUB(flow_memuse, total_free); +} + +void FlowRateRegisterFlowStorage(void) +{ + SCConfNode *root = SCConfGetNode("flow"); + if (root == NULL) + return; + + bool track_flow = false; + track_flow = SCConfNodeLookupChild(root, "rate-tracking") != NULL ? true : false; + if (!track_flow) + return; + + SCConfNode *node = SCConfGetNode("flow.rate-tracking"); + const char *val = SCConfNodeLookupChildValue(node, "bytes"); + if (val == NULL) { + FatalError("No value for flow tracking bytes"); + } + uint64_t bytes = 0; + if (ParseSizeStringU64(val, &bytes) < 0) { + FatalError("Invalid value for flow tracking bytes"); + } + flow_rate_config.bytes = bytes; + + val = SCConfNodeLookupChildValue(node, "interval"); + if (val == NULL) { + FatalError("No value for flow tracking interval"); + } + SCTime_t interval = SCTIME_INITIALIZER; + uint16_t secs = 0; + if ((StringParseUint16(&secs, 10, 0, val) < 0) || (secs == 0)) { + FatalError("Invalid value for flow tracking interval"); + } + flow_rate_config.interval = SCTIME_ADD_SECS(interval, secs); + + g_flowrate_storage_id = + FlowStorageRegister("flowrate", sizeof(void *), NULL, FlowRateStoreFree); +} + +bool FlowRateStorageEnabled(void) +{ + return (g_flowrate_storage_id.id != -1); +} + +FlowRateStore *FlowRateStoreInit(void) +{ + FlowRateStore *frs = NULL; + size_t total_memuse = 0; + size_t expected_memuse = (2 * flow_rate_config.interval.secs * sizeof(uint64_t)) + sizeof(*frs); + + if (!FLOW_CHECK_MEMCAP(expected_memuse)) { + return NULL; + } + frs = SCCalloc(1, sizeof(*frs)); + if (unlikely(frs == NULL)) { + return NULL; + } + + total_memuse += sizeof(*frs); + for (int i = 0; i < 2; i++) { + frs->dir[i].size = (uint16_t)flow_rate_config.interval.secs; + frs->dir[i].buf = SCCalloc(frs->dir[i].size, sizeof(uint64_t)); + if (unlikely(frs->dir[i].buf == NULL)) { + FlowRateStoreFree(frs); + return NULL; + } + frs->dir[i].start_ts = SCTIME_INITIALIZER; + frs->dir[i].last_ts = SCTIME_INITIALIZER; + total_memuse += (frs->dir[i].size * sizeof(uint64_t)); + } + DEBUG_VALIDATE_BUG_ON(total_memuse != expected_memuse); + (void)SC_ATOMIC_ADD(flow_memuse, total_memuse); + + return frs; +} + +FlowStorageId FlowRateGetStorageID(void) +{ + return g_flowrate_storage_id; +} + +static inline void FlowRateClearSumInRange( + FlowRateStore *frs, uint16_t start, uint16_t end, int direction) +{ + for (uint16_t i = start; i <= end; i++) { + uint64_t byte_count_at_i = frs->dir[direction].buf[i]; + frs->dir[direction].buf[i] = 0; + DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum < byte_count_at_i); + frs->dir[direction].sum -= byte_count_at_i; + } +} + +static inline void FlowRateStoreUpdateCurrentRing( + FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, uint16_t idx, int direction) +{ + if (idx > frs->dir[direction].last_idx + 1) { + /* Index is not the same as last or the next so, the ring must be flushed for the items + * in between and sum updated */ + FlowRateClearSumInRange(frs, frs->dir[direction].last_idx + 1, idx, direction); + frs->dir[direction].buf[idx] += pkt_len; + /* Update the total sum */ + frs->dir[direction].sum += pkt_len; + } else if ((idx == frs->dir[direction].last_idx) || (idx == frs->dir[direction].last_idx + 1)) { + /* Index matches the last updated index in the ring buffer or is the next index in the + * buffer */ + /* Add to the existing open time interval */ + frs->dir[direction].buf[idx] += pkt_len; + /* Update the total sum */ + frs->dir[direction].sum += pkt_len; + } else { + /* Index is revisited after a full round of the buffer */ + uint64_t prev_byte_count = frs->dir[direction].buf[idx]; + /* Overwrite the buffer */ + frs->dir[direction].buf[idx] = pkt_len; + DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum < prev_byte_count); + /* Sum should get rid of previous count on the same index */ + frs->dir[direction].sum += pkt_len - prev_byte_count; + frs->dir[direction].start_ts = p_ts; + } + frs->dir[direction].last_idx = idx; +} + +static inline void FlowRateStoreFlushRing( + FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, int direction) +{ + memset(frs->dir[direction].buf, 0, frs->dir[direction].size); + frs->dir[direction].last_idx = 0; + frs->dir[direction].start_ts = p_ts; + frs->dir[direction].buf[0] = pkt_len; + /* Overwrite the sum calculated so far */ + frs->dir[direction].sum = pkt_len; +} + +void FlowRateStoreUpdate(FlowRateStore *frs, SCTime_t p_ts, uint32_t pkt_len, int direction) +{ + if (frs->dir[direction].last_ts.secs == 0) { + /* Should only happen when the ring is first used */ + DEBUG_VALIDATE_BUG_ON(frs->dir[direction].sum > 0); + /* Initialize last_ts and start_ts with the first packet's timestamp */ + frs->dir[direction].last_ts = p_ts; + frs->dir[direction].start_ts = p_ts; + } + + SCTime_t start_ts = frs->dir[direction].start_ts; + uint16_t idx = (p_ts.secs - start_ts.secs) % frs->dir[direction].size; + /* Update start_ts in case of initiating the revisit of buffer */ + if ((frs->dir[direction].last_idx == frs->dir[direction].size - 1) && + (frs->dir[direction].last_idx != idx)) { + start_ts = p_ts; + if (idx != 0) { + /* Update the sum */ + FlowRateClearSumInRange(frs, 0, idx, direction); + /* Consider current packet a new start of the ring */ + idx = 0; + } + } + /* If the packet has come in the last open interval of time */ + if (p_ts.secs - start_ts.secs < frs->dir[direction].size) { + FlowRateStoreUpdateCurrentRing(frs, p_ts, pkt_len, idx, direction); + } else { + /* Packet arrived after one or more rounds of the entire buffer */ + /* Flush the entire buffer */ + FlowRateStoreFlushRing(frs, p_ts, pkt_len, direction); + } + /* In any case, update the last seen timestamp */ + frs->dir[direction].last_ts = p_ts; +} + +bool FlowRateIsExceeding(FlowRateStore *frs, int direction) +{ + if (frs->dir[direction].sum >= flow_rate_config.bytes) { + return true; + } + return false; +} + +#ifdef UNITTESTS + +/* Test to check update of the same buffer item */ +static int FlowRateTest01(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 100; + flow_rate_config.interval = (SCTime_t){ .secs = 10, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 10); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 92); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 139); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of all buffer items */ +static int FlowRateTest02(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of wrapping around ring buffer */ +static int FlowRateTest03(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 4; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 180); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of buffer if new pkt comes out of the window */ +static int FlowRateTest04(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 60; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 44); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p2->ts.secs); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check update of wrapping around ring buffer when the packet + * out of the window but also does not fall on the first index of the ring */ +static int FlowRateTest05(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 6; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 89); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check sum when packet is within the window but is coming after a gap */ +static int FlowRateTest06(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 4; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 180); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + Packet *p6 = UTHBuildPacket((uint8_t *)"suricata", 8, IPPROTO_TCP); + p6->ts.secs = p1->ts.secs + 7; + FlowRateStoreUpdate(frs, p6->ts, GET_PKT_LEN(p6), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 91); + FAIL_IF(frs->dir[0].last_ts.secs != p6->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + FAIL_IF(frs->dir[0].buf[1] != 0); + FAIL_IF(frs->dir[0].buf[2] != 0); + FAIL_IF(frs->dir[0].buf[3] != 48); + + FlowRateStoreFree(frs); + PASS; +} + +/* Test to check sum when two packets are back to back within the window but are coming after a gap + */ +static int FlowRateTest07(void) +{ + SC_ATOMIC_SET(flow_config.memcap, 10000); + flow_rate_config.bytes = 200; + flow_rate_config.interval = (SCTime_t){ .secs = 4, .usecs = 0 }; + FlowRateStore *frs = FlowRateStoreInit(); + FAIL_IF_NULL(frs); + for (int i = 0; i < 2; i++) { + FAIL_IF(frs->dir[i].size != 4); + FAIL_IF(frs->dir[i].sum != 0); + } + Packet *p1 = UTHBuildPacket((uint8_t *)"blahblah", 8, IPPROTO_TCP); + FlowRateStoreUpdate(frs, p1->ts, GET_PKT_LEN(p1), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 48); + FAIL_IF(frs->dir[0].last_ts.secs != p1->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 48); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p2 = UTHBuildPacket((uint8_t *)"DATA", 4, IPPROTO_TCP); + p2->ts.secs = p1->ts.secs + 1; + FlowRateStoreUpdate(frs, p2->ts, GET_PKT_LEN(p2), TOSERVER); + /* Total length of packet is 44 */ + FAIL_IF(frs->dir[0].sum != 92); + FAIL_IF(frs->dir[0].last_ts.secs != p2->ts.secs); + FAIL_IF(frs->dir[0].buf[1] != 44); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p3 = UTHBuildPacket((uint8_t *)"ABababa", 7, IPPROTO_TCP); + p3->ts.secs = p1->ts.secs + 2; + FlowRateStoreUpdate(frs, p3->ts, GET_PKT_LEN(p3), TOSERVER); + /* Total length of packet is 47 */ + FAIL_IF(frs->dir[0].sum != 139); + FAIL_IF(frs->dir[0].last_ts.secs != p3->ts.secs); + FAIL_IF(frs->dir[0].buf[2] != 47); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p4 = UTHBuildPacket((uint8_t *)"yoohoo", 6, IPPROTO_TCP); + p4->ts.secs = p1->ts.secs + 3; + FlowRateStoreUpdate(frs, p4->ts, GET_PKT_LEN(p4), TOSERVER); + /* Total length of packet is 46 */ + FAIL_IF(frs->dir[0].sum != 185); + FAIL_IF(frs->dir[0].last_ts.secs != p4->ts.secs); + FAIL_IF(frs->dir[0].buf[3] != 46); + FAIL_IF(frs->dir[0].start_ts.secs != p1->ts.secs); + + Packet *p5 = UTHBuildPacket((uint8_t *)"nmn", 3, IPPROTO_TCP); + p5->ts.secs = p1->ts.secs + 5; + FlowRateStoreUpdate(frs, p5->ts, GET_PKT_LEN(p5), TOSERVER); + /* Total length of packet is 43 */ + FAIL_IF(frs->dir[0].sum != 136); + FAIL_IF(frs->dir[0].last_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + + Packet *p6 = UTHBuildPacket((uint8_t *)"suricata", 8, IPPROTO_TCP); + p6->ts.secs = p1->ts.secs + 8; + FlowRateStoreUpdate(frs, p6->ts, GET_PKT_LEN(p6), TOSERVER); + /* Total length of packet is 48 */ + FAIL_IF(frs->dir[0].sum != 91); + FAIL_IF(frs->dir[0].last_ts.secs != p6->ts.secs); + FAIL_IF(frs->dir[0].start_ts.secs != p5->ts.secs); + FAIL_IF(frs->dir[0].buf[0] != 43); + FAIL_IF(frs->dir[0].buf[1] != 0); + FAIL_IF(frs->dir[0].buf[2] != 0); + FAIL_IF(frs->dir[0].buf[3] != 48); + + FlowRateStoreFree(frs); + PASS; +} + +void FlowRateRegisterTests(void) +{ + UtRegisterTest("FlowRateTest01", FlowRateTest01); + UtRegisterTest("FlowRateTest02", FlowRateTest02); + UtRegisterTest("FlowRateTest03", FlowRateTest03); + UtRegisterTest("FlowRateTest04", FlowRateTest04); + UtRegisterTest("FlowRateTest05", FlowRateTest05); + UtRegisterTest("FlowRateTest06", FlowRateTest06); + UtRegisterTest("FlowRateTest07", FlowRateTest07); +} +#endif diff --git a/src/util-flow-rate.h b/src/util-flow-rate.h new file mode 100644 index 0000000000..d4754cadb5 --- /dev/null +++ b/src/util-flow-rate.h @@ -0,0 +1,64 @@ +/* Copyright (C) 2025 Open Information Security Foundation + * + * You can copy, redistribute or modify this Program under the terms of + * the GNU General Public License version 2 as published by the Free + * Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301, USA. + */ + +/** + * \file + * + * \author Shivani Bhardwaj + */ + +#ifndef SURICATA_UTIL_FLOW_RATE_H +#define SURICATA_UTIL_FLOW_RATE_H + +typedef struct FlowRateConfig_ { + uint64_t bytes; + SCTime_t interval; +} FlowRateConfig; + +typedef struct FlowRateDirStore_ { + /* Ring buffer to store byte count per second in */ + uint64_t *buf; + /* Total sum of bytes per direction */ + uint64_t sum; + /* Last index that was updated in the buffer */ + uint16_t last_idx; + /* Size of the ring; should be same for both directions */ + uint16_t size; + /* start timestamp to define and track the beginning of buffer */ + SCTime_t start_ts; + /* last timestamp that was processed in the buffer */ + SCTime_t last_ts; +} FlowRateDirStore; + +typedef struct FlowRateStore_ { + FlowRateDirStore dir[2]; +} FlowRateStore; + +extern FlowRateConfig flow_rate_config; + +bool FlowRateStorageEnabled(void); +void FlowRateRegisterFlowStorage(void); +FlowRateStore *FlowRateStoreInit(void); +FlowStorageId FlowRateGetStorageID(void); +void FlowRateStoreUpdate(FlowRateStore *, SCTime_t, uint32_t, int); +bool FlowRateIsExceeding(FlowRateStore *, int); + +#ifdef UNITTESTS +void FlowRateRegisterTests(void); +#endif + +#endif diff --git a/suricata.yaml.in b/suricata.yaml.in index e83ec1bfa6..5d366283a7 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -1498,6 +1498,11 @@ flow: emergency-recovery: 30 #managers: 1 # default to one flow manager #recyclers: 1 # default to one flow recycler thread + # Track flows and count them as elephant flow if they exceed the rate defined + # by the byte count per interval configured below. + #rate-tracking: + # bytes: 1GiB + # interval: 10 # seconds is the only supported unit for interval so far # This option controls the use of VLAN ids in the flow (and defrag) # hashing. Normally this should be enabled, but in some (broken)