From 6040016347fd64b4e91903d78f57b2f9673c6be7 Mon Sep 17 00:00:00 2001 From: Eric Leblond Date: Tue, 4 Sep 2012 12:14:58 +0200 Subject: [PATCH] af-packet: implement late open This patch implements "late open". On high performance system, it is needed to create the AF_PACKET just before reading to avoid overflow. Socket creation has to be done with respect to the order of thread creation to respect affinity settings. This patch adds a counter to AFPPeer to be ale to synchronize the initial socket creation. --- src/source-af-packet.c | 42 +++++++++++++++++++++++++++++++++--------- src/source-af-packet.h | 1 + 2 files changed, 34 insertions(+), 9 deletions(-) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 947f529ce6..fd89e39fea 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -249,6 +249,8 @@ typedef struct AFPPeersList_ { TAILQ_HEAD(, AFPPeer_) peers; /**< Head of list of fragments. */ int cnt; int peered; + int turn; /**< Next value for initialisation order */ + SC_ATOMIC_DECLARE(int, reached); /**< Counter used to synchronize start */ } AFPPeersList; /** @@ -293,6 +295,9 @@ TmEcode AFPPeersListInit() TAILQ_INIT(&peerslist.peers); peerslist.peered = 0; peerslist.cnt = 0; + peerslist.turn = 0; + SC_ATOMIC_INIT(peerslist.reached); + (void) SC_ATOMIC_SET(peerslist.reached, 0); SCReturnInt(TM_ECODE_OK); } @@ -338,6 +343,7 @@ TmEcode AFPPeersListAdd(AFPThreadVars *ptv) SC_ATOMIC_INIT(peer->if_idx); SC_ATOMIC_INIT(peer->state); peer->flags = ptv->flags; + peer->turn = peerslist.turn++; if (peer->flags & AFP_SOCK_PROTECT) { SCMutexInit(&peer->sock_protect, NULL); @@ -381,6 +387,18 @@ TmEcode AFPPeersListAdd(AFPThreadVars *ptv) SCReturnInt(TM_ECODE_OK); } +int AFPPeersListWaitTurn(AFPPeer *peer) +{ + if (peer->turn == SC_ATOMIC_GET(peerslist.reached)) + return 0; + return 1; +} + +void AFPPeersListReachedInc() +{ + (void)SC_ATOMIC_ADD(peerslist.reached, 1); +} + /** * \brief Clean the global peers list. */ @@ -870,6 +888,21 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) ptv->slot = s->slot_next; + if (ptv->afp_state == AFP_STATE_DOWN) { + /* Wait for our turn, threads before us must have opened the socket */ + while (AFPPeersListWaitTurn(ptv->mpeer)) { + usleep(1000); + } + r = AFPCreateSocket(ptv, ptv->iface, 1); + if (r < 0) { + SCLogError(SC_ERR_AFP_CREATE, "Couldn't init AF_PACKET socket"); + } + AFPPeersListReachedInc(); + } + if (ptv->afp_state == AFP_STATE_UP) { + SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket); + } + fds.fd = ptv->socket; fds.events = POLLIN; @@ -1443,15 +1476,6 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) { SCReturnInt(TM_ECODE_FAILED); } - r = AFPCreateSocket(ptv, ptv->iface, 1); - if (r < 0) { - SCLogError(SC_ERR_AFP_CREATE, "Couldn't init AF_PACKET socket"); - //SCFree(ptv); - //afpconfig->DerefFunc(afpconfig); - //SCReturnInt(TM_ECODE_OKFAILED); - } - - ptv->copy_mode = afpconfig->copy_mode; if (ptv->copy_mode != AFP_COPY_MODE_NONE) { strlcpy(ptv->out_iface, afpconfig->out_iface, AFP_IFACE_NAME_LENGTH); diff --git a/src/source-af-packet.h b/src/source-af-packet.h index 5548b7d266..0fcd2cb863 100644 --- a/src/source-af-packet.h +++ b/src/source-af-packet.h @@ -88,6 +88,7 @@ typedef struct AFPPeer_ { SC_ATOMIC_DECLARE(uint8_t, state); SCMutex sock_protect; int flags; + int turn; /**< Field used to store initialisation order. */ struct AFPPeer_ *peer; TAILQ_ENTRY(AFPPeer_) next; } AFPPeer;