From d3d9f1c395c9096e9bf7b89996396b1301b8bdb3 Mon Sep 17 00:00:00 2001 From: Sascha Steinbiss Date: Tue, 11 Jun 2024 13:14:00 +0200 Subject: [PATCH] redis: implement XADD stream support Ticket: #7082 --- doc/userguide/output/eve/eve-json-output.rst | 5 ++- doc/userguide/partials/eve-log.yaml | 5 ++- src/util-log-redis.c | 40 ++++++++++---------- src/util-log-redis.h | 3 +- suricata.yaml.in | 5 ++- 5 files changed, 32 insertions(+), 26 deletions(-) diff --git a/doc/userguide/output/eve/eve-json-output.rst b/doc/userguide/output/eve/eve-json-output.rst index 7fc40783c2..26a010a7a9 100644 --- a/doc/userguide/output/eve/eve-json-output.rst +++ b/doc/userguide/output/eve/eve-json-output.rst @@ -41,10 +41,11 @@ Output types:: # server: 127.0.0.1 # port: 6379 # async: true ## if redis replies are read asynchronously - # mode: list ## possible values: list|lpush (default), rpush, channel|publish + # mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream # ## lpush and rpush are using a Redis list. "list" is an alias for lpush # ## publish is using a Redis channel. "channel" is an alias for publish - # key: suricata ## key or channel to use (default to suricata) + # ## xadd is using a Redis stream. "stream" is an alias for xadd + # key: suricata ## string denoting the key/channel/stream to use (default to suricata) # Redis pipelining set up. This will enable to only do a query every # 'batch-size' events. This should lower the latency induced by network # connection at the cost of some memory. There is no flushing implemented diff --git a/doc/userguide/partials/eve-log.yaml b/doc/userguide/partials/eve-log.yaml index 05faf209d4..2030a5b68f 100644 --- a/doc/userguide/partials/eve-log.yaml +++ b/doc/userguide/partials/eve-log.yaml @@ -18,10 +18,11 @@ outputs: # server: 127.0.0.1 # port: 6379 # async: true ## if redis replies are read asynchronously - # mode: list ## possible values: list|lpush (default), rpush, channel|publish + # mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream # ## lpush and rpush are using a Redis list. "list" is an alias for lpush # ## publish is using a Redis channel. "channel" is an alias for publish - # key: suricata ## key or channel to use (default to suricata) + # ## xadd is using a Redis stream. "stream" is an alias for xadd + # key: suricata ## string denoting the key/channel/stream to use (default to suricata) # Redis pipelining set up. This will enable to only do a query every # 'batch-size' events. This should lower the latency induced by network # connection at the cost of some memory. There is no flushing implemented diff --git a/src/util-log-redis.c b/src/util-log-redis.c index 5f590d2c69..ad48ab68ef 100644 --- a/src/util-log-redis.c +++ b/src/util-log-redis.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2021 Open Information Security Foundation +/* Copyright (C) 2007-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -37,8 +37,11 @@ static const char * redis_lpush_cmd = "LPUSH"; static const char * redis_rpush_cmd = "RPUSH"; static const char * redis_publish_cmd = "PUBLISH"; +static const char *redis_xadd_cmd = "XADD"; static const char * redis_default_key = "suricata"; static const char * redis_default_server = "127.0.0.1"; +static const char *redis_default_format = "%s %s %s"; +static const char *redis_stream_format = "%s %s * eve %s"; static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx); static void SCLogFileCloseRedis(LogFileCtx *log_ctx); @@ -268,12 +271,8 @@ static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t return -1; } - redisAsyncCommand(ctx->async, - SCRedisAsyncCommandCallback, - file_ctx, - "%s %s %s", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, + redisAsyncCommand(ctx->async, SCRedisAsyncCommandCallback, file_ctx, + file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key, string); event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK); @@ -345,10 +344,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string) /* synchronous mode */ if (file_ctx->redis_setup.batch_size) { - redisAppendCommand(redis, "%s %s %s", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, - string); + redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command, + file_ctx->redis_setup.key, string); time_t now = time(NULL); if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) { redisReply *reply; @@ -374,9 +371,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string) redis = ctx->sync; if (redis) { SCLogInfo("Reconnected to redis server"); - redisAppendCommand(redis, "%s %s %s", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, + redisAppendCommand(redis, file_ctx->redis_setup.format, + file_ctx->redis_setup.command, file_ctx->redis_setup.key, string); ctx->batch_count++; return 0; @@ -395,10 +391,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string) ctx->batch_count++; } } else { - redisReply *reply = redisCommand(redis, "%s %s %s", - file_ctx->redis_setup.command, - file_ctx->redis_setup.key, - string); + redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format, + file_ctx->redis_setup.command, file_ctx->redis_setup.key, string); /* We may lose the reply if disconnection happens*/ if (reply) { switch (reply->type) { @@ -410,6 +404,10 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string) SCLogDebug("Redis integer %lld", reply->integer); ret = 0; break; + case REDIS_REPLY_STRING: + SCLogDebug("Redis string %s", reply->str); + ret = 0; + break; default: SCLogError("Redis default triggered with %d", reply->type); SCConfLogReopenSyncRedis(file_ctx); @@ -519,14 +517,18 @@ int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx) log_ctx->redis_setup.batch_size = 0; } + log_ctx->redis_setup.format = redis_default_format; if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) { log_ctx->redis_setup.command = redis_lpush_cmd; } else if(!strcmp(redis_mode, "rpush")){ log_ctx->redis_setup.command = redis_rpush_cmd; } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) { log_ctx->redis_setup.command = redis_publish_cmd; + } else if (!strcmp(redis_mode, "stream") || !strcmp(redis_mode, "xadd")) { + log_ctx->redis_setup.command = redis_xadd_cmd; + log_ctx->redis_setup.format = redis_stream_format; } else { - FatalError("Invalid redis mode"); + FatalError("Invalid redis mode: %s", redis_mode); } /* store server params for reconnection */ diff --git a/src/util-log-redis.h b/src/util-log-redis.h index f53669a195..f6d069555e 100644 --- a/src/util-log-redis.h +++ b/src/util-log-redis.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2016 Open Information Security Foundation +/* Copyright (C) 2016-2024 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -38,6 +38,7 @@ enum RedisMode { REDIS_LIST, REDIS_CHANNEL }; typedef struct RedisSetup_ { enum RedisMode mode; + const char *format; const char *command; const char *key; const char *server; diff --git a/suricata.yaml.in b/suricata.yaml.in index 7bf4165c36..6b87db93b0 100644 --- a/suricata.yaml.in +++ b/suricata.yaml.in @@ -112,10 +112,11 @@ outputs: # server: 127.0.0.1 # port: 6379 # async: true ## if redis replies are read asynchronously - # mode: list ## possible values: list|lpush (default), rpush, channel|publish + # mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream # ## lpush and rpush are using a Redis list. "list" is an alias for lpush # ## publish is using a Redis channel. "channel" is an alias for publish - # key: suricata ## key or channel to use (default to suricata) + # ## xadd is using a Redis stream. "stream" is an alias for xadd + # key: suricata ## string denoting the key/channel/stream to use (default to suricata) # Redis pipelining set up. This will enable to only do a query every # 'batch-size' events. This should lower the latency induced by network # connection at the cost of some memory. There is no flushing implemented