From 919377d4a50c98bfe6bcf17729d831bb501e5018 Mon Sep 17 00:00:00 2001 From: Eric Leblond Date: Tue, 11 Mar 2014 09:48:34 +0100 Subject: [PATCH] af-packet: synchronize reading start This patch is updating af-packet to discard packets that have been sent to a socket before all socket in a fanout group have been setup. Without this, there is no way to assure that all packets for a single flow will be treated by the same thread. Tests have been done on a system with an ixgbe network card. When using 'cluster_flow' load balancing and disactivating receive hash on the iface: ethtool -K IFACE rxhash off then suricata is behaving as expected and all packets for a single flow are treated by the same thread. For some unknown reason, this is not the case when using cluster_cpu. It seems that in that case the load balancing is not perfect on the card side. The rxhash offloading has a direct impact on the cluster_flow load balancing because load balancing is done by using a generic hash key attached to each skb. This hash can be computed by the network card or can be computed by the kernel. In the xase of a ixgbe network card, it seems there is some issue with the hash key for TCP. This explains why it is necessary to remove the rxhash offloading to have a correct behavior. This could also explain why cluster_cpu is currently failing because the card is using the same hash key computation to do the RSS queues load balancing. --- src/source-af-packet.c | 107 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 51ed693e41..654145a6d7 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -452,6 +452,11 @@ void AFPPeersListReachedInc() } } +static int AFPPeersListStarted() +{ + return !peerslist.turn; +} + /** * \brief Clean the global peers list. */ @@ -940,6 +945,107 @@ void AFPSwitchState(AFPThreadVars *ptv, int state) } } +static int AFPReadAndDiscard(AFPThreadVars *ptv, struct timeval *synctv) +{ + struct sockaddr_ll from; + struct iovec iov; + struct msghdr msg; + struct timeval ts; + union { + struct cmsghdr cmsg; + char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))]; + } cmsg_buf; + + + if (unlikely(suricata_ctl_flags != 0)) { + return 1; + } + + msg.msg_name = &from; + msg.msg_namelen = sizeof(from); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg_buf; + msg.msg_controllen = sizeof(cmsg_buf); + msg.msg_flags = 0; + + iov.iov_len = ptv->datalen; + iov.iov_base = ptv->data; + + recvmsg(ptv->socket, &msg, MSG_TRUNC); + + if (ioctl(ptv->socket, SIOCGSTAMP, &ts) == -1) { + /* FIXME */ + return -1; + } + + if ((ts.tv_sec > synctv->tv_sec) || + (ts.tv_sec >= synctv->tv_sec && + ts.tv_usec > synctv->tv_usec)) { + return 1; + } + return 0; +} + +static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv) +{ + union thdr h; + + if (unlikely(suricata_ctl_flags != 0)) { + return 1; + } + + /* Read packet from ring */ + h.raw = (((union thdr **)ptv->frame_buf)[ptv->frame_offset]); + if (h.raw == NULL) { + return -1; + } + + if (((time_t)h.h2->tp_sec > synctv->tv_sec) || + ((time_t)h.h2->tp_sec == synctv->tv_sec && + (suseconds_t) (h.h2->tp_nsec / 1000) > synctv->tv_usec)) { + return 1; + } + + h.h2->tp_status = TP_STATUS_KERNEL; + if (++ptv->frame_offset >= ptv->req.tp_frame_nr) { + ptv->frame_offset = 0; + } + + + return 0; +} + +static int AFPSynchronizeStart(AFPThreadVars *ptv) +{ + int r; + struct timeval synctv; + + /* Set timeval to end of the world */ + synctv.tv_sec = 0xffffffff; + synctv.tv_usec = 0xffffffff; + + while (1) { + if (AFPPeersListStarted() && synctv.tv_sec == (time_t) 0xffffffff) { + gettimeofday(&synctv, NULL); + } + if (ptv->flags & AFP_RING_MODE) { + r = AFPReadAndDiscardFromRing(ptv, &synctv); + } else { + r = AFPReadAndDiscard(ptv, &synctv); + } + SCLogDebug("Discarding on %s", ptv->tv->name); + switch (r) { + case 1: + SCLogInfo("Starting to read on %s", ptv->tv->name); + return 1; + case -1: + return r; + } + } + return 1; +} + /** * \brief Try to reopen socket * @@ -1001,6 +1107,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) } if (ptv->afp_state == AFP_STATE_UP) { SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket); + AFPSynchronizeStart(ptv); } fds.fd = ptv->socket;