redis: implement XADD stream support

Ticket: #7082
pull/12026/head
Sascha Steinbiss 1 year ago committed by Victor Julien
parent 1860aa81e6
commit d3d9f1c395

@ -41,10 +41,11 @@ Output types::
# server: 127.0.0.1 # server: 127.0.0.1
# port: 6379 # port: 6379
# async: true ## if redis replies are read asynchronously # async: true ## if redis replies are read asynchronously
# mode: list ## possible values: list|lpush (default), rpush, channel|publish # mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream
# ## lpush and rpush are using a Redis list. "list" is an alias for lpush # ## lpush and rpush are using a Redis list. "list" is an alias for lpush
# ## publish is using a Redis channel. "channel" is an alias for publish # ## publish is using a Redis channel. "channel" is an alias for publish
# key: suricata ## key or channel to use (default to suricata) # ## xadd is using a Redis stream. "stream" is an alias for xadd
# key: suricata ## string denoting the key/channel/stream to use (default to suricata)
# Redis pipelining set up. This will enable to only do a query every # Redis pipelining set up. This will enable to only do a query every
# 'batch-size' events. This should lower the latency induced by network # 'batch-size' events. This should lower the latency induced by network
# connection at the cost of some memory. There is no flushing implemented # connection at the cost of some memory. There is no flushing implemented

@ -18,10 +18,11 @@ outputs:
# server: 127.0.0.1 # server: 127.0.0.1
# port: 6379 # port: 6379
# async: true ## if redis replies are read asynchronously # async: true ## if redis replies are read asynchronously
# mode: list ## possible values: list|lpush (default), rpush, channel|publish # mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream
# ## lpush and rpush are using a Redis list. "list" is an alias for lpush # ## lpush and rpush are using a Redis list. "list" is an alias for lpush
# ## publish is using a Redis channel. "channel" is an alias for publish # ## publish is using a Redis channel. "channel" is an alias for publish
# key: suricata ## key or channel to use (default to suricata) # ## xadd is using a Redis stream. "stream" is an alias for xadd
# key: suricata ## string denoting the key/channel/stream to use (default to suricata)
# Redis pipelining set up. This will enable to only do a query every # Redis pipelining set up. This will enable to only do a query every
# 'batch-size' events. This should lower the latency induced by network # 'batch-size' events. This should lower the latency induced by network
# connection at the cost of some memory. There is no flushing implemented # connection at the cost of some memory. There is no flushing implemented

@ -1,4 +1,4 @@
/* Copyright (C) 2007-2021 Open Information Security Foundation /* Copyright (C) 2007-2024 Open Information Security Foundation
* *
* You can copy, redistribute or modify this Program under the terms of * You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free * the GNU General Public License version 2 as published by the Free
@ -37,8 +37,11 @@
static const char * redis_lpush_cmd = "LPUSH"; static const char * redis_lpush_cmd = "LPUSH";
static const char * redis_rpush_cmd = "RPUSH"; static const char * redis_rpush_cmd = "RPUSH";
static const char * redis_publish_cmd = "PUBLISH"; static const char * redis_publish_cmd = "PUBLISH";
static const char *redis_xadd_cmd = "XADD";
static const char * redis_default_key = "suricata"; static const char * redis_default_key = "suricata";
static const char * redis_default_server = "127.0.0.1"; static const char * redis_default_server = "127.0.0.1";
static const char *redis_default_format = "%s %s %s";
static const char *redis_stream_format = "%s %s * eve %s";
static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx); static int SCConfLogReopenSyncRedis(LogFileCtx *log_ctx);
static void SCLogFileCloseRedis(LogFileCtx *log_ctx); static void SCLogFileCloseRedis(LogFileCtx *log_ctx);
@ -268,12 +271,8 @@ static int SCLogRedisWriteAsync(LogFileCtx *file_ctx, const char *string, size_t
return -1; return -1;
} }
redisAsyncCommand(ctx->async, redisAsyncCommand(ctx->async, SCRedisAsyncCommandCallback, file_ctx,
SCRedisAsyncCommandCallback, file_ctx->redis_setup.format, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
file_ctx,
"%s %s %s",
file_ctx->redis_setup.command,
file_ctx->redis_setup.key,
string); string);
event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK); event_base_loop(ctx->ev_base, EVLOOP_NONBLOCK);
@ -345,10 +344,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
/* synchronous mode */ /* synchronous mode */
if (file_ctx->redis_setup.batch_size) { if (file_ctx->redis_setup.batch_size) {
redisAppendCommand(redis, "%s %s %s", redisAppendCommand(redis, file_ctx->redis_setup.format, file_ctx->redis_setup.command,
file_ctx->redis_setup.command, file_ctx->redis_setup.key, string);
file_ctx->redis_setup.key,
string);
time_t now = time(NULL); time_t now = time(NULL);
if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) { if ((ctx->batch_count == file_ctx->redis_setup.batch_size) || (ctx->last_push < now)) {
redisReply *reply; redisReply *reply;
@ -374,9 +371,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
redis = ctx->sync; redis = ctx->sync;
if (redis) { if (redis) {
SCLogInfo("Reconnected to redis server"); SCLogInfo("Reconnected to redis server");
redisAppendCommand(redis, "%s %s %s", redisAppendCommand(redis, file_ctx->redis_setup.format,
file_ctx->redis_setup.command, file_ctx->redis_setup.command, file_ctx->redis_setup.key,
file_ctx->redis_setup.key,
string); string);
ctx->batch_count++; ctx->batch_count++;
return 0; return 0;
@ -395,10 +391,8 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
ctx->batch_count++; ctx->batch_count++;
} }
} else { } else {
redisReply *reply = redisCommand(redis, "%s %s %s", redisReply *reply = redisCommand(redis, file_ctx->redis_setup.format,
file_ctx->redis_setup.command, file_ctx->redis_setup.command, file_ctx->redis_setup.key, string);
file_ctx->redis_setup.key,
string);
/* We may lose the reply if disconnection happens*/ /* We may lose the reply if disconnection happens*/
if (reply) { if (reply) {
switch (reply->type) { switch (reply->type) {
@ -410,6 +404,10 @@ static int SCLogRedisWriteSync(LogFileCtx *file_ctx, const char *string)
SCLogDebug("Redis integer %lld", reply->integer); SCLogDebug("Redis integer %lld", reply->integer);
ret = 0; ret = 0;
break; break;
case REDIS_REPLY_STRING:
SCLogDebug("Redis string %s", reply->str);
ret = 0;
break;
default: default:
SCLogError("Redis default triggered with %d", reply->type); SCLogError("Redis default triggered with %d", reply->type);
SCConfLogReopenSyncRedis(file_ctx); SCConfLogReopenSyncRedis(file_ctx);
@ -519,14 +517,18 @@ int SCConfLogOpenRedis(ConfNode *redis_node, void *lf_ctx)
log_ctx->redis_setup.batch_size = 0; log_ctx->redis_setup.batch_size = 0;
} }
log_ctx->redis_setup.format = redis_default_format;
if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) { if (!strcmp(redis_mode, "list") || !strcmp(redis_mode,"lpush")) {
log_ctx->redis_setup.command = redis_lpush_cmd; log_ctx->redis_setup.command = redis_lpush_cmd;
} else if(!strcmp(redis_mode, "rpush")){ } else if(!strcmp(redis_mode, "rpush")){
log_ctx->redis_setup.command = redis_rpush_cmd; log_ctx->redis_setup.command = redis_rpush_cmd;
} else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) { } else if(!strcmp(redis_mode,"channel") || !strcmp(redis_mode,"publish")) {
log_ctx->redis_setup.command = redis_publish_cmd; log_ctx->redis_setup.command = redis_publish_cmd;
} else if (!strcmp(redis_mode, "stream") || !strcmp(redis_mode, "xadd")) {
log_ctx->redis_setup.command = redis_xadd_cmd;
log_ctx->redis_setup.format = redis_stream_format;
} else { } else {
FatalError("Invalid redis mode"); FatalError("Invalid redis mode: %s", redis_mode);
} }
/* store server params for reconnection */ /* store server params for reconnection */

@ -1,4 +1,4 @@
/* Copyright (C) 2016 Open Information Security Foundation /* Copyright (C) 2016-2024 Open Information Security Foundation
* *
* You can copy, redistribute or modify this Program under the terms of * You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free * the GNU General Public License version 2 as published by the Free
@ -38,6 +38,7 @@ enum RedisMode { REDIS_LIST, REDIS_CHANNEL };
typedef struct RedisSetup_ { typedef struct RedisSetup_ {
enum RedisMode mode; enum RedisMode mode;
const char *format;
const char *command; const char *command;
const char *key; const char *key;
const char *server; const char *server;

@ -112,10 +112,11 @@ outputs:
# server: 127.0.0.1 # server: 127.0.0.1
# port: 6379 # port: 6379
# async: true ## if redis replies are read asynchronously # async: true ## if redis replies are read asynchronously
# mode: list ## possible values: list|lpush (default), rpush, channel|publish # mode: list ## possible values: list|lpush (default), rpush, channel|publish, xadd|stream
# ## lpush and rpush are using a Redis list. "list" is an alias for lpush # ## lpush and rpush are using a Redis list. "list" is an alias for lpush
# ## publish is using a Redis channel. "channel" is an alias for publish # ## publish is using a Redis channel. "channel" is an alias for publish
# key: suricata ## key or channel to use (default to suricata) # ## xadd is using a Redis stream. "stream" is an alias for xadd
# key: suricata ## string denoting the key/channel/stream to use (default to suricata)
# Redis pipelining set up. This will enable to only do a query every # Redis pipelining set up. This will enable to only do a query every
# 'batch-size' events. This should lower the latency induced by network # 'batch-size' events. This should lower the latency induced by network
# connection at the cost of some memory. There is no flushing implemented # connection at the cost of some memory. There is no flushing implemented

Loading…
Cancel
Save