diff --git a/src/source-af-packet.c b/src/source-af-packet.c index 51ed693e41..654145a6d7 100644 --- a/src/source-af-packet.c +++ b/src/source-af-packet.c @@ -452,6 +452,11 @@ void AFPPeersListReachedInc() } } +static int AFPPeersListStarted() +{ + return !peerslist.turn; +} + /** * \brief Clean the global peers list. */ @@ -940,6 +945,107 @@ void AFPSwitchState(AFPThreadVars *ptv, int state) } } +static int AFPReadAndDiscard(AFPThreadVars *ptv, struct timeval *synctv) +{ + struct sockaddr_ll from; + struct iovec iov; + struct msghdr msg; + struct timeval ts; + union { + struct cmsghdr cmsg; + char buf[CMSG_SPACE(sizeof(struct tpacket_auxdata))]; + } cmsg_buf; + + + if (unlikely(suricata_ctl_flags != 0)) { + return 1; + } + + msg.msg_name = &from; + msg.msg_namelen = sizeof(from); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = &cmsg_buf; + msg.msg_controllen = sizeof(cmsg_buf); + msg.msg_flags = 0; + + iov.iov_len = ptv->datalen; + iov.iov_base = ptv->data; + + recvmsg(ptv->socket, &msg, MSG_TRUNC); + + if (ioctl(ptv->socket, SIOCGSTAMP, &ts) == -1) { + /* FIXME */ + return -1; + } + + if ((ts.tv_sec > synctv->tv_sec) || + (ts.tv_sec >= synctv->tv_sec && + ts.tv_usec > synctv->tv_usec)) { + return 1; + } + return 0; +} + +static int AFPReadAndDiscardFromRing(AFPThreadVars *ptv, struct timeval *synctv) +{ + union thdr h; + + if (unlikely(suricata_ctl_flags != 0)) { + return 1; + } + + /* Read packet from ring */ + h.raw = (((union thdr **)ptv->frame_buf)[ptv->frame_offset]); + if (h.raw == NULL) { + return -1; + } + + if (((time_t)h.h2->tp_sec > synctv->tv_sec) || + ((time_t)h.h2->tp_sec == synctv->tv_sec && + (suseconds_t) (h.h2->tp_nsec / 1000) > synctv->tv_usec)) { + return 1; + } + + h.h2->tp_status = TP_STATUS_KERNEL; + if (++ptv->frame_offset >= ptv->req.tp_frame_nr) { + ptv->frame_offset = 0; + } + + + return 0; +} + +static int AFPSynchronizeStart(AFPThreadVars *ptv) +{ + int r; + struct timeval synctv; + + /* Set timeval to end of the world */ + synctv.tv_sec = 0xffffffff; + synctv.tv_usec = 0xffffffff; + + while (1) { + if (AFPPeersListStarted() && synctv.tv_sec == (time_t) 0xffffffff) { + gettimeofday(&synctv, NULL); + } + if (ptv->flags & AFP_RING_MODE) { + r = AFPReadAndDiscardFromRing(ptv, &synctv); + } else { + r = AFPReadAndDiscard(ptv, &synctv); + } + SCLogDebug("Discarding on %s", ptv->tv->name); + switch (r) { + case 1: + SCLogInfo("Starting to read on %s", ptv->tv->name); + return 1; + case -1: + return r; + } + } + return 1; +} + /** * \brief Try to reopen socket * @@ -1001,6 +1107,7 @@ TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot) } if (ptv->afp_state == AFP_STATE_UP) { SCLogInfo("Thread %s using socket %d", tv->name, ptv->socket); + AFPSynchronizeStart(ptv); } fds.fd = ptv->socket;