app-layer: improve async and out of order txs

Free txs that are done out of order if we can. Some protocol
implementations have transactions running in parallel, where it is
possible that a tx that started later finishes earlier than other
transactions. Support freeing those.

Also improve handling on asynchronious transactions. If transactions
are unreplied, e.g. in the dns flood case, the parser may at some
point free transactions on it's own. Handle this case in
the app-layer engine so that the various tracking id's (inspect, log,
and 'min') are updated accordingly.

Next, free txs much more aggressively. Instead of freeing old txs
at the app-layer parsing stage, free all complete txs at the end
of the flow-worker. This frees txs much sooner in many cases.
pull/3182/head
Victor Julien 7 years ago
parent 3d9ade9c35
commit 7a96d18f36

@ -75,8 +75,6 @@
#include "runmodes.h"
static GetActiveTxIdFunc AppLayerGetActiveTxIdFuncPtr = NULL;
struct AppLayerParserThreadCtx_ {
void *alproto_local_storage[FLOW_PROTO_MAX][ALPROTO_MAX];
};
@ -196,14 +194,7 @@ void AppLayerParserStateFree(AppLayerParserState *pstate)
int AppLayerParserSetup(void)
{
SCEnter();
memset(&alp_ctx, 0, sizeof(alp_ctx));
/* set the default tx handler if none was set explicitly */
if (AppLayerGetActiveTxIdFuncPtr == NULL) {
RegisterAppLayerGetActiveTxIdFunc(AppLayerTransactionGetActiveDetectLog);
}
SCReturnInt(0);
}
@ -685,6 +676,11 @@ void AppLayerParserSetTransactionInspectId(const Flow *f, AppLayerParserState *p
const uint8_t ipproto = f->proto;
const AppProto alproto = f->alproto;
SCLogDebug("called: %s, tag_txs_as_inspected %s",direction==0?"toserver":"toclient",
tag_txs_as_inspected?"true":"false");
/* mark all txs as inspected if the applayer progress is
* at the 'end state'. */
for (; idx < total_txs; idx++) {
void *tx = AppLayerParserGetTx(ipproto, alproto, alstate, idx);
if (tx == NULL)
@ -705,23 +701,34 @@ void AppLayerParserSetTransactionInspectId(const Flow *f, AppLayerParserState *p
break;
}
pstate->inspect_id[direction] = idx;
SCLogDebug("inspect_id now %"PRIu64, pstate->inspect_id[direction]);
/* if necessary we flag all txs that are complete as 'inspected' */
/* if necessary we flag all txs that are complete as 'inspected'
* also move inspect_id forward. */
if (tag_txs_as_inspected) {
for (; idx < total_txs; idx++) {
bool check_inspect_id = false;
void *tx = AppLayerParserGetTx(ipproto, alproto, alstate, idx);
if (tx == NULL)
continue;
int state_progress = AppLayerParserGetStateProgress(ipproto, alproto, tx, flags);
if (state_progress >= state_done_progress) {
uint64_t detect_flags = AppLayerParserGetTxDetectFlags(ipproto, alproto, tx, flags);
if ((detect_flags & APP_LAYER_TX_INSPECTED_FLAG) == 0) {
detect_flags |= APP_LAYER_TX_INSPECTED_FLAG;
AppLayerParserSetTxDetectFlags(ipproto, alproto, tx, flags, detect_flags);
SCLogDebug("%p/%"PRIu64" out of order tx is done for direction %s. Flag %016"PRIx64,
tx, idx, flags & STREAM_TOSERVER ? "toserver" : "toclient", detect_flags);
if (tx == NULL) {
check_inspect_id = true;
} else {
int state_progress = AppLayerParserGetStateProgress(ipproto, alproto, tx, flags);
if (state_progress >= state_done_progress) {
uint64_t detect_flags = AppLayerParserGetTxDetectFlags(ipproto, alproto, tx, flags);
if ((detect_flags & APP_LAYER_TX_INSPECTED_FLAG) == 0) {
detect_flags |= APP_LAYER_TX_INSPECTED_FLAG;
AppLayerParserSetTxDetectFlags(ipproto, alproto, tx, flags, detect_flags);
SCLogDebug("%p/%"PRIu64" out of order tx is done for direction %s. Flag %016"PRIx64,
tx, idx, flags & STREAM_TOSERVER ? "toserver" : "toclient", detect_flags);
check_inspect_id = true;
}
}
}
if (check_inspect_id) {
SCLogDebug("%p/%"PRIu64" out of order tx. Update inspect_id? %"PRIu64, tx, idx, pstate->inspect_id[direction]);
if (pstate->inspect_id[direction]+1 == idx)
pstate->inspect_id[direction] = idx;
}
}
}
@ -775,81 +782,10 @@ FileContainer *AppLayerParserGetFiles(uint8_t ipproto, AppProto alproto,
SCReturnPtr(ptr, "FileContainer *");
}
/** \brief active TX retrieval for normal ops: so with detection and logging
*
* \retval tx_id lowest tx_id that still needs work */
uint64_t AppLayerTransactionGetActiveDetectLog(Flow *f, uint8_t flags)
{
AppLayerParserProtoCtx *p = &alp_ctx.ctxs[f->protomap][f->alproto];
uint64_t log_id = f->alparser->log_id;
uint64_t inspect_id = f->alparser->inspect_id[flags & STREAM_TOSERVER ? 0 : 1];
if (p->logger == true) {
return (log_id < inspect_id) ? log_id : inspect_id;
} else {
return inspect_id;
}
}
/** \brief active TX retrieval for logging only: so NO detection
*
* If the logger is enabled, we simply return the log_id here.
*
* Otherwise, we go look for the tx id. There probably is no point
* in running this function in that case though. With no detection
* and no logging, why run a parser in the first place?
**/
uint64_t AppLayerTransactionGetActiveLogOnly(Flow *f, uint8_t flags)
{
AppLayerParserProtoCtx *p = &alp_ctx.ctxs[f->protomap][f->alproto];
if (p->logger == true) {
uint64_t log_id = f->alparser->log_id;
SCLogDebug("returning %"PRIu64, log_id);
return log_id;
}
/* logger is disabled, return highest 'complete' tx id */
const uint64_t total_txs = AppLayerParserGetTxCnt(f, f->alstate);
uint64_t idx = f->alparser->min_id;
const int state_done_progress = AppLayerParserGetStateProgressCompletionStatus(f->alproto, flags);
for (; idx < total_txs; idx++) {
void *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, idx);
if (tx == NULL)
continue;
const int state_progress = AppLayerParserGetStateProgress(f->proto, f->alproto, tx, flags);
if (state_progress >= state_done_progress)
continue;
else
break;
}
SCLogDebug("returning %"PRIu64, idx);
return idx;
}
void RegisterAppLayerGetActiveTxIdFunc(GetActiveTxIdFunc FuncPtr)
{
//BUG_ON(AppLayerGetActiveTxIdFuncPtr != NULL);
AppLayerGetActiveTxIdFuncPtr = FuncPtr;
SCLogDebug("AppLayerGetActiveTxIdFuncPtr is now %p", AppLayerGetActiveTxIdFuncPtr);
}
/**
* \brief Get 'active' tx id, meaning the lowest id that still need work.
*
* \retval id tx id
*/
static uint64_t AppLayerTransactionGetActive(Flow *f, uint8_t flags)
{
BUG_ON(AppLayerGetActiveTxIdFuncPtr == NULL);
return AppLayerGetActiveTxIdFuncPtr(f, flags);
}
/**
* \brief remove obsolete (inspected and logged) transactions
*/
static void AppLayerParserTransactionsCleanup(Flow *f)
void AppLayerParserTransactionsCleanup(Flow *f)
{
DEBUG_ASSERT_FLOW_LOCKED(f);
@ -857,27 +793,79 @@ static void AppLayerParserTransactionsCleanup(Flow *f)
if (unlikely(p->StateTransactionFree == NULL))
return;
const uint64_t tx_id_ts = AppLayerTransactionGetActive(f, STREAM_TOSERVER);
const uint64_t tx_id_tc = AppLayerTransactionGetActive(f, STREAM_TOCLIENT);
uint64_t min = MIN(tx_id_ts, tx_id_tc);
if (min > 0) {
uint64_t x = f->alparser->min_id;
for ( ; x < min - 1; x++) {
void *tx = AppLayerParserGetTx(f->proto, f->alproto, f->alstate, x);
if (tx != 0) {
SCLogDebug("while freeing %"PRIu64", also free TX at %"PRIu64, min - 1, x);
p->StateTransactionFree(f->alstate, x);
}
const uint8_t ipproto = f->proto;
const AppProto alproto = f->alproto;
void * const alstate = f->alstate;
AppLayerParserState * const alparser = f->alparser;
if (alstate == NULL || alparser == NULL)
return;
const uint64_t min = alparser->min_id;
const uint64_t total_txs = AppLayerParserGetTxCnt(f, alstate);
const LoggerId logger_expectation = AppLayerParserProtocolGetLoggerBits(ipproto, alproto);
const int tx_end_state_ts = AppLayerParserGetStateProgressCompletionStatus(alproto, STREAM_TOSERVER);
const int tx_end_state_tc = AppLayerParserGetStateProgressCompletionStatus(alproto, STREAM_TOCLIENT);
SCLogDebug("checking %"PRIu64" txs from offset %"PRIu64, total_txs, min);
for (uint64_t i = min ; i < total_txs; i++) {
void * const tx = AppLayerParserGetTx(ipproto, alproto, alstate, i);
if (tx == NULL) {
SCLogDebug("%p/%"PRIu64" skipping: no tx", tx, i);
goto wrap_up;
}
SCLogDebug("%p/%"PRIu64" checking", tx, i);
SCLogDebug("freeing %"PRIu64" %p", min - 1, p->StateTransactionFree);
const int tx_progress_tc = AppLayerParserGetStateProgress(ipproto, alproto, tx, STREAM_TOCLIENT);
if (tx_progress_tc < tx_end_state_tc) {
SCLogDebug("%p/%"PRIu64" skipping: tc parser not done", tx, i);
continue;
}
const int tx_progress_ts = AppLayerParserGetStateProgress(ipproto, alproto, tx, STREAM_TOSERVER);
if (tx_progress_ts < tx_end_state_ts) {
SCLogDebug("%p/%"PRIu64" skipping: ts parser not done", tx, i);
continue;
}
if (f->sgh_toserver != NULL) {
uint64_t detect_flags_ts = AppLayerParserGetTxDetectFlags(ipproto, alproto, tx, STREAM_TOSERVER);
if (!(detect_flags_ts & APP_LAYER_TX_INSPECTED_FLAG)) {
SCLogDebug("%p/%"PRIu64" skipping: TS inspect not done: ts:%"PRIx64,
tx, i, detect_flags_ts);
continue;
}
}
if (f->sgh_toclient != NULL) {
uint64_t detect_flags_tc = AppLayerParserGetTxDetectFlags(ipproto, alproto, tx, STREAM_TOCLIENT);
if (!(detect_flags_tc & APP_LAYER_TX_INSPECTED_FLAG)) {
SCLogDebug("%p/%"PRIu64" skipping: TC inspect not done: tc:%"PRIx64,
tx, i, detect_flags_tc);
continue;
}
}
if (logger_expectation != 0) {
LoggerId tx_logged = AppLayerParserGetTxLogged(f, alstate, tx);
if (tx_logged != logger_expectation) {
SCLogDebug("%p/%"PRIu64" skipping: logging not done: want:%"PRIx32", have:%"PRIx32,
tx, i, logger_expectation, tx_logged);
continue;
}
}
if ((AppLayerParserGetTx(f->proto, f->alproto, f->alstate, min - 1))) {
p->StateTransactionFree(f->alstate, min - 1);
/* if we are here, the tx can be freed. */
p->StateTransactionFree(alstate, i);
SCLogDebug("%p/%"PRIu64" freed", tx, i);
wrap_up:
/* see if this tx is actually in order. If so, we need to bring all
* trackers up to date. */
SCLogDebug("%p/%"PRIu64" update f->alparser->min_id? %"PRIu64, tx, i, alparser->min_id);
if (i == alparser->min_id) {
uint64_t next_id = i + 1;
alparser->min_id = next_id;
alparser->inspect_id[0] = MAX(alparser->inspect_id[0], next_id);
alparser->inspect_id[1] = MAX(alparser->inspect_id[1], next_id);
alparser->log_id = MAX(alparser->log_id, next_id);
SCLogDebug("%p/%"PRIu64" updated f->alparser->min_id %"PRIu64, tx, i, alparser->min_id);
}
f->alparser->min_id = min - 1;
SCLogDebug("f->alparser->min_id %"PRIu64, f->alparser->min_id);
}
}
@ -1150,8 +1138,6 @@ int AppLayerParserParse(ThreadVars *tv, AppLayerParserThreadCtx *alp_tctx, Flow
}
}
}
/* next, see if we can get rid of transactions now */
AppLayerParserTransactionsCleanup(f);
/* stream truncated, inform app layer */
if (flags & STREAM_DEPTH)

@ -52,31 +52,6 @@ int AppLayerParserProtoIsRegistered(uint8_t ipproto, AppProto alproto);
/***** transaction handling *****/
/** \brief Function ptr type for getting active TxId from a flow
* Used by AppLayerTransactionGetActive.
*/
typedef uint64_t (*GetActiveTxIdFunc)(Flow *f, uint8_t flags);
/** \brief Register GetActiveTxId Function
*
*/
void RegisterAppLayerGetActiveTxIdFunc(GetActiveTxIdFunc FuncPtr);
/** \brief active TX retrieval for normal ops: so with detection and logging
*
* \retval tx_id lowest tx_id that still needs work
*
* This is the default function.
*/
uint64_t AppLayerTransactionGetActiveDetectLog(Flow *f, uint8_t flags);
/** \brief active TX retrieval for logging only ops
*
* \retval tx_id lowest tx_id that still needs work
*/
uint64_t AppLayerTransactionGetActiveLogOnly(Flow *f, uint8_t flags);
int AppLayerParserSetup(void);
void AppLayerParserPostStreamSetup(void);
int AppLayerParserDeSetup(void);
@ -258,7 +233,7 @@ void AppLayerParserStreamTruncated(uint8_t ipproto, AppProto alproto, void *alst
AppLayerParserState *AppLayerParserStateAlloc(void);
void AppLayerParserStateFree(AppLayerParserState *pstate);
void AppLayerParserTransactionsCleanup(Flow *f);
#ifdef DEBUG
void AppLayerParserStatePrintDetails(AppLayerParserState *pstate);

@ -39,6 +39,7 @@
#include "app-layer.h"
#include "detect-engine.h"
#include "output.h"
#include "app-layer-parser.h"
#include "util-validate.h"
@ -271,6 +272,9 @@ static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data, PacketQueue *pr
if (p->flow) {
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
/* run tx cleanup last */
AppLayerParserTransactionsCleanup(p->flow);
FLOWLOCK_UNLOCK(p->flow);
}

@ -2507,9 +2507,6 @@ static void PostConfLoadedDetectSetup(SCInstance *suri)
gettimeofday(&de_ctx->last_reload, NULL);
DetectEngineAddToMaster(de_ctx);
DetectEngineBumpVersion();
} else {
/* tell the app layer to consider only the log id */
RegisterAppLayerGetActiveTxIdFunc(AppLayerTransactionGetActiveLogOnly);
}
}

Loading…
Cancel
Save