streaming: use error codes to indicate error reason

pull/9021/head
Victor Julien 2 years ago
parent c3ee3d513f
commit 06419cecbc

@ -21,6 +21,7 @@
#include "util-print.h"
#include "util-validate.h"
#include "util-debug.h"
#include "util-error.h"
static void ListRegions(StreamingBuffer *sb);
@ -191,21 +192,26 @@ static StreamingBufferRegion *FindRightEdge(const StreamingBuffer *sb,
return candidate;
}
/** \note uses sc_errno to give error details */
static inline StreamingBufferRegion *InitBufferRegion(
StreamingBuffer *sb, const StreamingBufferConfig *cfg, const uint32_t min_size)
{
if (sb->regions == USHRT_MAX || (cfg->max_regions != 0 && sb->regions >= cfg->max_regions)) {
SCLogDebug("max regions reached");
sc_errno = SC_ELIMIT;
return NULL;
}
StreamingBufferRegion *aux_r = CALLOC(cfg, 1, sizeof(*aux_r));
if (aux_r == NULL)
if (aux_r == NULL) {
sc_errno = SC_ENOMEM;
return NULL;
}
aux_r->buf = CALLOC(cfg, 1, MAX(cfg->buf_size, min_size));
if (aux_r->buf == NULL) {
FREE(cfg, aux_r, sizeof(*aux_r));
sc_errno = SC_ENOMEM;
return NULL;
}
aux_r->buf_size = MAX(cfg->buf_size, min_size);
@ -218,30 +224,34 @@ static inline int InitBuffer(StreamingBuffer *sb, const StreamingBufferConfig *c
{
sb->region.buf = CALLOC(cfg, 1, cfg->buf_size);
if (sb->region.buf == NULL) {
return -1;
return SC_ENOMEM;
}
sb->region.buf_size = cfg->buf_size;
return 0;
return SC_OK;
}
StreamingBuffer *StreamingBufferInit(const StreamingBufferConfig *cfg)
{
StreamingBuffer *sb = CALLOC(cfg, 1, sizeof(StreamingBuffer));
if (sb != NULL) {
sb->region.buf_size = cfg->buf_size;
sb->regions = sb->max_regions = 1;
if (sb == NULL) {
sc_errno = SC_ENOMEM;
return NULL;
}
if (cfg->buf_size > 0) {
if (InitBuffer(sb, cfg) == 0) {
return sb;
}
FREE(cfg, sb, sizeof(StreamingBuffer));
/* implied buf_size == 0 */
} else {
return sb;
}
sb->region.buf_size = cfg->buf_size;
sb->regions = sb->max_regions = 1;
if (cfg->buf_size == 0) {
return sb;
}
return NULL;
int r = InitBuffer(sb, cfg);
if (r != SC_OK) {
FREE(cfg, sb, sizeof(StreamingBuffer));
sc_errno = r;
return NULL;
}
return sb;
}
void StreamingBufferClear(StreamingBuffer *sb, const StreamingBufferConfig *cfg)
@ -305,7 +315,7 @@ static int WARN_UNUSED SBBInit(StreamingBuffer *sb, const StreamingBufferConfig
/* need to set up 2: existing data block and new data block */
StreamingBufferBlock *sbb = CALLOC(cfg, 1, sizeof(*sbb));
if (sbb == NULL) {
return -1;
return SC_ENOMEM;
}
sbb->offset = sb->region.stream_offset;
sbb->len = sb->region.buf_offset;
@ -315,7 +325,7 @@ static int WARN_UNUSED SBBInit(StreamingBuffer *sb, const StreamingBufferConfig
StreamingBufferBlock *sbb2 = CALLOC(cfg, 1, sizeof(*sbb2));
if (sbb2 == NULL) {
return -1;
return SC_ENOMEM;
}
sbb2->offset = region->stream_offset + rel_offset;
sbb2->len = data_len;
@ -326,13 +336,13 @@ static int WARN_UNUSED SBBInit(StreamingBuffer *sb, const StreamingBufferConfig
sb->sbb_size += sbb2->len;
if (SBB_RB_INSERT(&sb->sbb_tree, sbb2) != NULL) {
FREE(cfg, sbb2, sizeof(*sbb2));
return -1;
return SC_EINVAL;
}
#ifdef DEBUG
SBBPrintList(sb);
#endif
return 0;
return SC_OK;
}
/* setup with leading gap
@ -345,8 +355,9 @@ static int WARN_UNUSED SBBInitLeadingGap(StreamingBuffer *sb, const StreamingBuf
DEBUG_VALIDATE_BUG_ON(!RB_EMPTY(&sb->sbb_tree));
StreamingBufferBlock *sbb = CALLOC(cfg, 1, sizeof(*sbb));
if (sbb == NULL)
return -1;
if (sbb == NULL) {
return SC_ENOMEM;
}
sbb->offset = offset;
sbb->len = data_len;
@ -358,7 +369,7 @@ static int WARN_UNUSED SBBInitLeadingGap(StreamingBuffer *sb, const StreamingBuf
#ifdef DEBUG
SBBPrintList(sb);
#endif
return 0;
return SC_OK;
}
static inline void ConsolidateFwd(StreamingBuffer *sb, const StreamingBufferConfig *cfg,
@ -565,8 +576,9 @@ static int SBBUpdate(StreamingBuffer *sb, const StreamingBufferConfig *cfg,
SCLogDebug("* inserting: %u/%u", rel_offset, len);
StreamingBufferBlock *sbb = CALLOC(cfg, 1, sizeof(*sbb));
if (sbb == NULL)
return -1;
if (sbb == NULL) {
return SC_ENOMEM;
}
sbb->offset = region->stream_offset + rel_offset;
sbb->len = len;
@ -576,7 +588,7 @@ static int SBBUpdate(StreamingBuffer *sb, const StreamingBufferConfig *cfg,
SCLogDebug("* insert failed: exact match in tree with %p %" PRIu64 "/%u", res, res->offset,
res->len);
FREE(cfg, sbb, sizeof(StreamingBufferBlock));
return 0;
return SC_OK;
}
sb->sbb_size += len; // may adjust based on consolidation below
@ -594,7 +606,7 @@ static int SBBUpdate(StreamingBuffer *sb, const StreamingBufferConfig *cfg,
SCLogDebug("insert at region head");
region->buf_offset = sbb->len;
}
return 0;
return SC_OK;
}
static void SBBFree(StreamingBuffer *sb, const StreamingBufferConfig *cfg)
@ -685,7 +697,7 @@ static inline int WARN_UNUSED GrowRegionToSize(StreamingBuffer *sb,
size, BIT_U32(30));
g2s_warn_once = true;
}
return -1;
return SC_ELIMIT;
}
/* try to grow in multiples of cfg->buf_size */
@ -694,7 +706,7 @@ static inline int WARN_UNUSED GrowRegionToSize(StreamingBuffer *sb,
void *ptr = REALLOC(cfg, region->buf, region->buf_size, grow);
if (ptr == NULL) {
return -1;
return SC_ENOMEM;
}
/* for safe printing and general caution, lets memset the
* new data to 0 */
@ -710,7 +722,7 @@ static inline int WARN_UNUSED GrowRegionToSize(StreamingBuffer *sb,
sb->buf_size_max = region->buf_size;
}
#endif
return 0;
return SC_OK;
}
static int WARN_UNUSED GrowToSize(
@ -1048,10 +1060,10 @@ int StreamingBufferAppend(StreamingBuffer *sb, const StreamingBufferConfig *cfg,
if (!DATA_FITS(sb, data_len)) {
if (sb->region.buf_size == 0) {
if (GrowToSize(sb, cfg, data_len) != 0)
if (GrowToSize(sb, cfg, data_len) != SC_OK)
return -1;
} else {
if (GrowToSize(sb, cfg, sb->region.buf_offset + data_len) != 0)
if (GrowToSize(sb, cfg, sb->region.buf_offset + data_len) != SC_OK)
return -1;
}
}
@ -1083,10 +1095,10 @@ int StreamingBufferAppendNoTrack(StreamingBuffer *sb, const StreamingBufferConfi
if (!DATA_FITS(sb, data_len)) {
if (sb->region.buf_size == 0) {
if (GrowToSize(sb, cfg, data_len) != 0)
if (GrowToSize(sb, cfg, data_len) != SC_OK)
return -1;
} else {
if (GrowToSize(sb, cfg, sb->region.buf_offset + data_len) != 0)
if (GrowToSize(sb, cfg, sb->region.buf_offset + data_len) != SC_OK)
return -1;
}
}
@ -1181,6 +1193,7 @@ static void ListRegions(StreamingBuffer *sb)
/** \internal
* \brief process insert by consolidating the affected regions into one
* \note sets sc_errno
*/
static StreamingBufferRegion *BufferInsertAtRegionConsolidate(StreamingBuffer *sb,
const StreamingBufferConfig *cfg, StreamingBufferRegion *dst,
@ -1188,6 +1201,7 @@ static StreamingBufferRegion *BufferInsertAtRegionConsolidate(StreamingBuffer *s
const uint64_t data_offset, const uint32_t data_len, StreamingBufferRegion *prev,
uint32_t dst_buf_size)
{
int retval;
#ifdef DEBUG
const uint64_t data_re = data_offset + data_len;
SCLogDebug("sb %p dst %p src_start %p src_end %p data_offset %" PRIu64
@ -1209,8 +1223,10 @@ static StreamingBufferRegion *BufferInsertAtRegionConsolidate(StreamingBuffer *s
SCLogDebug("old_size %u, old_offset %u, dst_copy_offset %u", old_size, old_offset,
dst_copy_offset);
#endif
if (GrowRegionToSize(sb, cfg, dst, dst_size) != 0)
if ((retval = GrowRegionToSize(sb, cfg, dst, dst_size)) != SC_OK) {
sc_errno = retval;
return NULL;
}
SCLogDebug("resized to %u -> %u", dst_size, dst->buf_size);
/* validate that the size is exactly what we asked for */
DEBUG_VALIDATE_BUG_ON(dst_size != dst->buf_size);
@ -1308,6 +1324,7 @@ static StreamingBufferRegion *BufferInsertAtRegionConsolidate(StreamingBuffer *s
return dst;
}
/** \note sets sc_errno */
static StreamingBufferRegion *BufferInsertAtRegionDo(StreamingBuffer *sb,
const StreamingBufferConfig *cfg, const uint64_t offset, const uint32_t len)
{
@ -1323,14 +1340,17 @@ static StreamingBufferRegion *BufferInsertAtRegionDo(StreamingBuffer *sb,
SCLogDebug("start region %p/%" PRIu64 "/%u", start, start->stream_offset, start->buf_size);
StreamingBufferRegion *big = FindLargestRegionForOffset(sb, cfg, start, offset, len);
DEBUG_VALIDATE_BUG_ON(big == NULL);
if (big == NULL)
if (big == NULL) {
sc_errno = SC_EINVAL;
return NULL;
}
SCLogDebug("big region %p/%" PRIu64 "/%u", big, big->stream_offset, big->buf_size);
StreamingBufferRegion *end = FindRightEdge(sb, cfg, big, offset, len);
DEBUG_VALIDATE_BUG_ON(end == NULL);
if (end == NULL)
if (end == NULL) {
sc_errno = SC_EINVAL;
return NULL;
}
insert_adjusted_re = MAX(insert_adjusted_re, (end->stream_offset + end->buf_size));
uint32_t new_buf_size =
@ -1349,6 +1369,7 @@ static StreamingBufferRegion *BufferInsertAtRegionDo(StreamingBuffer *sb,
}
SCLogDebug("end region %p/%" PRIu64 "/%u", end, end->stream_offset, end->buf_size);
/* sets sc_errno */
StreamingBufferRegion *ret = BufferInsertAtRegionConsolidate(
sb, cfg, big, start, end, offset, len, start_prev, new_buf_size);
return ret;
@ -1365,6 +1386,7 @@ static StreamingBufferRegion *BufferInsertAtRegionDo(StreamingBuffer *sb,
SCLogDebug("no matching region found, append to %p (%s)", append,
append == &sb->region ? "main" : "aux");
/* sets sc_errno */
StreamingBufferRegion *add = InitBufferRegion(sb, cfg, len);
if (add == NULL)
return NULL;
@ -1382,6 +1404,8 @@ static StreamingBufferRegion *BufferInsertAtRegionDo(StreamingBuffer *sb,
* Will find an existing region, expand it if needed. If no existing region exists or is
* a good fit, it will try to set up a new region. If the region then overlaps or gets
* too close to the next, merge them.
*
* \note sets sc_errno
*/
static StreamingBufferRegion *BufferInsertAtRegion(StreamingBuffer *sb,
const StreamingBufferConfig *cfg, const uint8_t *data, const uint32_t data_len,
@ -1400,12 +1424,17 @@ static StreamingBufferRegion *BufferInsertAtRegion(StreamingBuffer *sb,
"data_offset %" PRIu64
", data_len %u intersects with main region, no next or way before next region",
data_offset, data_len);
if (sb->region.buf == NULL)
if (InitBuffer(sb, cfg) == -1) // TODO init with size
if (sb->region.buf == NULL) {
int r;
if ((r = InitBuffer(sb, cfg)) != SC_OK) { // TODO init with size
sc_errno = r;
return NULL;
}
}
return &sb->region;
}
} else if (sb->region.next == NULL) {
/* InitBufferRegion sets sc_errno */
StreamingBufferRegion *aux_r = sb->region.next = InitBufferRegion(sb, cfg, data_len);
if (aux_r == NULL)
return NULL;
@ -1415,6 +1444,7 @@ static StreamingBufferRegion *BufferInsertAtRegion(StreamingBuffer *sb,
aux_r->stream_offset, aux_r->buf_size);
return aux_r;
}
/* BufferInsertAtRegionDo sets sc_errno */
StreamingBufferRegion *blob = BufferInsertAtRegionDo(sb, cfg, data_offset, data_len);
SCLogDebug("blob %p (%s)", blob, blob == &sb->region ? "main" : "aux");
return blob;
@ -1423,21 +1453,23 @@ static StreamingBufferRegion *BufferInsertAtRegion(StreamingBuffer *sb,
/**
* \param offset offset relative to StreamingBuffer::stream_offset
*
* \return 0 in case of success
* \return -1 on memory allocation errors
* \return negative value on other errors
* \return SC_OK in case of success
* \return SC_E* errors otherwise
*/
int StreamingBufferInsertAt(StreamingBuffer *sb, const StreamingBufferConfig *cfg,
StreamingBufferSegment *seg, const uint8_t *data, uint32_t data_len, uint64_t offset)
{
int r;
BUG_ON(seg == NULL);
DEBUG_VALIDATE_BUG_ON(offset < sb->region.stream_offset);
if (offset < sb->region.stream_offset)
return -2;
if (offset < sb->region.stream_offset) {
return SC_EINVAL;
}
StreamingBufferRegion *region = BufferInsertAtRegion(sb, cfg, data, data_len, offset);
if (region == NULL) {
return -1;
return sc_errno;
}
const bool region_is_main = region == &sb->region;
@ -1447,8 +1479,8 @@ int StreamingBufferInsertAt(StreamingBuffer *sb, const StreamingBufferConfig *cf
uint32_t rel_offset = offset - region->stream_offset;
if (!DATA_FITS_AT_OFFSET(region, data_len, rel_offset)) {
if (GrowToSize(sb, cfg, (rel_offset + data_len)) != 0)
return -1;
if ((r = GrowToSize(sb, cfg, (rel_offset + data_len))) != SC_OK)
return r;
}
DEBUG_VALIDATE_BUG_ON(!DATA_FITS_AT_OFFSET(region, data_len, rel_offset));
@ -1497,26 +1529,26 @@ int StreamingBufferInsertAt(StreamingBuffer *sb, const StreamingBufferConfig *cf
} else if (sb->region.buf_offset) {
SCLogDebug("beyond expected offset: SBBInit");
/* existing data, but there is a gap between us */
if (SBBInit(sb, cfg, region, rel_offset, data_len) < 0)
return -1;
if ((r = SBBInit(sb, cfg, region, rel_offset, data_len)) != SC_OK)
return r;
} else {
/* gap before data in empty list */
SCLogDebug("empty sbb list: invoking SBBInitLeadingGap");
if (SBBInitLeadingGap(sb, cfg, region, offset, data_len) < 0)
return -1;
if ((r = SBBInitLeadingGap(sb, cfg, region, offset, data_len)) != SC_OK)
return r;
}
}
} else {
if (sb->region.buf_offset) {
/* existing data, but there is a gap between us */
SCLogDebug("empty sbb list, no data in main: use SBBInit");
if (SBBInit(sb, cfg, region, rel_offset, data_len) < 0)
return -1;
if ((r = SBBInit(sb, cfg, region, rel_offset, data_len)) != SC_OK)
return r;
} else {
/* gap before data in empty list */
SCLogDebug("empty sbb list: invoking SBBInitLeadingGap");
if (SBBInitLeadingGap(sb, cfg, region, offset, data_len) < 0)
return -1;
if ((r = SBBInitLeadingGap(sb, cfg, region, offset, data_len)) != SC_OK)
return r;
}
if (rel_offset == region->buf_offset) {
SCLogDebug("pre region->buf_offset %u", region->buf_offset);
@ -1527,8 +1559,8 @@ int StreamingBufferInsertAt(StreamingBuffer *sb, const StreamingBufferConfig *cf
} else {
SCLogDebug("updating sbb tree");
/* already have blocks, so append new block based on new data */
if (SBBUpdate(sb, cfg, region, rel_offset, data_len) < 0)
return -1;
if ((r = SBBUpdate(sb, cfg, region, rel_offset, data_len)) != SC_OK)
return r;
}
BUG_ON(!region_is_main && sb->head == NULL);
@ -1537,7 +1569,7 @@ int StreamingBufferInsertAt(StreamingBuffer *sb, const StreamingBufferConfig *cf
BUG_ON(offset + data_len > sb->region.stream_offset + sb->region.buf_offset);
}
return 0;
return SC_OK;
}
int StreamingBufferSegmentIsBeforeWindow(const StreamingBuffer *sb,

Loading…
Cancel
Save