af-packet: synchronize reading start

This patch is updating af-packet to discard packets that have been
sent to a socket before all socket in a fanout group have been setup.
Without this, there is no way to assure that all packets for a single
flow will be treated by the same thread.

Tests have been done on a system with an ixgbe network card. When using
'cluster_flow' load balancing and disactivating receive hash on the iface:
 ethtool -K IFACE rxhash off
then suricata is behaving as expected and all packets for a single flow
are treated by the same thread.

For some unknown reason, this is not the case when using cluster_cpu. It
seems that in that case the load balancing is not perfect on the card side.

The rxhash offloading has a direct impact on the cluster_flow load balancing
because load balancing is done by using a generic hash key attached to
each skb. This hash can be computed by the network card or can be
computed by the kernel. In the xase of a ixgbe network card, it seems there
is some issue with the hash key for TCP. This explains why it is necessary to
remove the rxhash offloading to have a correct behavior. This could also
explain why cluster_cpu is currently failing because the card is using the
same hash key computation to do the RSS queues load balancing.
pull/898/head
Eric Leblond 12 years ago
parent 70efc66e33
commit 919377d4a5

@ -452,6 +452,11 @@ void AFPPeersListReachedInc()
}
}
static int AFPPeersListStarted()
{
return !peerslist.turn;
}
/**
* \brief Clean the global peers list.
*/
@ -940,6 +945,107 @@ void AFPSwitchState(AFPThreadVars *ptv, int state)
}
}
static int AFPReadAndDiscard(AFPThreadVars *ptv, struct timeval *synctv)
{
struct sockaddr_ll from;
struct iovec iov;
struct msghdr msg;
struct timeval ts;
union {
struct cmsghdr cmsg;
char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))];
} cmsg_buf;
if (unlikely(suricata_ctl_flags != 0)) {
return 1;
}
msg.msg_name = &from;
msg.msg_namelen = sizeof(from);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = &cmsg_buf;
msg.msg_controllen = sizeof(cmsg_buf);
msg.msg_flags = 0;
iov.iov_len = ptv->datalen;
iov.iov_base = ptv->data;
recvmsg(ptv->socket, &msg, MSG_TRUNC);
if (ioctl(ptv->socket, SIOCGSTAMP, &ts) == -1) {
/* FIXME */
return -1;
}
if ((ts.tv_sec > synctv->tv_sec) ||
(ts.tv_sec >= synctv->tv_sec &&
ts.tv_usec > synctv->tv_usec)) {
return 1;
}
return 0;
}
static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv)
{
union thdr h;
if (unlikely(suricata_ctl_flags != 0)) {
return 1;
}
/* Read packet from ring */
h.raw = (((union thdr **)ptv->frame_buf)[ptv->frame_offset]);
if (h.raw == NULL) {
return -1;
}
if (((time_t)h.h2->tp_sec > synctv->tv_sec) ||
((time_t)h.h2->tp_sec == synctv->tv_sec &&
(suseconds_t) (h.h2->tp_nsec / 1000) > synctv->tv_usec)) {
return 1;
}
h.h2->tp_status = TP_STATUS_KERNEL;
if (++ptv->frame_offset >= ptv->req.tp_frame_nr) {
ptv->frame_offset = 0;
}
return 0;
}
static int AFPSynchronizeStart(AFPThreadVars *ptv)
{
int r;
struct timeval synctv;
/* Set timeval to end of the world */
synctv.tv_sec = 0xffffffff;
synctv.tv_usec = 0xffffffff;
while (1) {
if (AFPPeersListStarted() && synctv.tv_sec == (time_t) 0xffffffff) {
gettimeofday(&synctv, NULL);
}
if (ptv->flags & AFP_RING_MODE) {
r = AFPReadAndDiscardFromRing(ptv, &synctv);
} else {
r = AFPReadAndDiscard(ptv, &synctv);
}
SCLogDebug("Discarding on %s", ptv->tv->name);
switch (r) {
case 1:
SCLogInfo("Starting to read on %s", ptv->tv->name);
return 1;
case -1:
return r;
}
}
return 1;
}
/**
* \brief Try to reopen socket
*
@ -1001,6 +1107,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
}
if (ptv->afp_state == AFP_STATE_UP) {
SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket);
AFPSynchronizeStart(ptv);
}
fds.fd = ptv->socket;

Loading…
Cancel
Save