af-packet: implement late open

This patch implements "late open". On high performance system, it
is needed to create the AF_PACKET just before reading to avoid
overflow. Socket creation has to be done with respect to the order
of thread creation to respect affinity settings.
This patch adds a counter to AFPPeer to be ale to synchronize the
initial socket creation.
pull/57/merge
Eric Leblond 13 years ago committed by Victor Julien
parent 3bea3b39df
commit 6040016347

@ -249,6 +249,8 @@ typedef struct AFPPeersList_ {
TAILQ_HEAD(, AFPPeer_) peers; /**< Head of list of fragments. */
int cnt;
int peered;
int turn; /**< Next value for initialisation order */
SC_ATOMIC_DECLARE(int, reached); /**< Counter used to synchronize start */
} AFPPeersList;
/**
@ -293,6 +295,9 @@ TmEcode AFPPeersListInit()
TAILQ_INIT(&peerslist.peers);
peerslist.peered = 0;
peerslist.cnt = 0;
peerslist.turn = 0;
SC_ATOMIC_INIT(peerslist.reached);
(void) SC_ATOMIC_SET(peerslist.reached, 0);
SCReturnInt(TM_ECODE_OK);
}
@ -338,6 +343,7 @@ TmEcode AFPPeersListAdd(AFPThreadVars *ptv)
SC_ATOMIC_INIT(peer->if_idx);
SC_ATOMIC_INIT(peer->state);
peer->flags = ptv->flags;
peer->turn = peerslist.turn++;
if (peer->flags & AFP_SOCK_PROTECT) {
SCMutexInit(&peer->sock_protect, NULL);
@ -381,6 +387,18 @@ TmEcode AFPPeersListAdd(AFPThreadVars *ptv)
SCReturnInt(TM_ECODE_OK);
}
int AFPPeersListWaitTurn(AFPPeer *peer)
{
if (peer->turn == SC_ATOMIC_GET(peerslist.reached))
return 0;
return 1;
}
void AFPPeersListReachedInc()
{
(void)SC_ATOMIC_ADD(peerslist.reached, 1);
}
/**
* \brief Clean the global peers list.
*/
@ -870,6 +888,21 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
ptv->slot = s->slot_next;
if (ptv->afp_state == AFP_STATE_DOWN) {
/* Wait for our turn, threads before us must have opened the socket */
while (AFPPeersListWaitTurn(ptv->mpeer)) {
usleep(1000);
}
r = AFPCreateSocket(ptv, ptv->iface, 1);
if (r < 0) {
SCLogError(SC_ERR_AFP_CREATE, "Couldn't init AF_PACKET socket");
}
AFPPeersListReachedInc();
}
if (ptv->afp_state == AFP_STATE_UP) {
SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket);
}
fds.fd = ptv->socket;
fds.events = POLLIN;
@ -1443,15 +1476,6 @@ TmEcode ReceiveAFPThreadInit(ThreadVars *tv, void *initdata, void **data) {
SCReturnInt(TM_ECODE_FAILED);
}
r = AFPCreateSocket(ptv, ptv->iface, 1);
if (r < 0) {
SCLogError(SC_ERR_AFP_CREATE, "Couldn't init AF_PACKET socket");
//SCFree(ptv);
//afpconfig->DerefFunc(afpconfig);
//SCReturnInt(TM_ECODE_OKFAILED);
}
ptv->copy_mode = afpconfig->copy_mode;
if (ptv->copy_mode != AFP_COPY_MODE_NONE) {
strlcpy(ptv->out_iface, afpconfig->out_iface, AFP_IFACE_NAME_LENGTH);

@ -88,6 +88,7 @@ typedef struct AFPPeer_ {
SC_ATOMIC_DECLARE(uint8_t, state);
SCMutex sock_protect;
int flags;
int turn; /**< Field used to store initialisation order. */
struct AFPPeer_ *peer;
TAILQ_ENTRY(AFPPeer_) next;
} AFPPeer;

Loading…
Cancel
Save