rx TMs shouldn't return TM_ECODE_FAILED if engine is in shutdown mode + minor cleanup

pull/31/merge
Anoop Saldanha 14 years ago committed by Victor Julien
parent b057a20f10
commit 34581ce902

@ -510,14 +510,15 @@ static int AFPTryReopen(AFPThreadVars *ptv)
*/
TmEcode ReceiveAFPLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
uint16_t packet_q_len = 0;
AFPThreadVars *ptv = (AFPThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
struct pollfd fds;
int r;
TmSlot *s = (TmSlot *)slot;
SCEnter();
ptv->slot = s->slot_next;
fds.fd = ptv->socket;
fds.events = POLLIN;

@ -302,21 +302,21 @@ ReceiveErfDagThreadInit(ThreadVars *tv, void *initdata, void **data)
*/
TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
ErfDagThreadVars *dtv = (ErfDagThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
dtv->slot = s->slot_next;
uint32_t diff = 0;
int err;
uint8_t *top = NULL;
uint32_t pkts_read = 0;
TmSlot *s = (TmSlot *)slot;
SCEnter();
dtv->slot = s->slot_next;
while (1)
{
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL) {
SCReturnInt(TM_ECODE_FAILED);
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}
top = dag_advance_stream(dtv->dagfd, dtv->dagstream, &(dtv->btm));
@ -327,11 +327,11 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
dtv->btm = dtv->top;
}
continue;
}
else {
} else {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s when using dag_advance_stream",
dtv->dagstream, dtv->dagname);
"Failed to read from stream: %d, DAG: %s when "
"using dag_advance_stream",
dtv->dagstream, dtv->dagname);
SCReturnInt(TM_ECODE_FAILED);
}
}
@ -347,18 +347,14 @@ TmEcode ReceiveErfDagLoop(ThreadVars *tv, void *data, void *slot)
if (err == TM_ECODE_FAILED) {
SCLogError(SC_ERR_ERF_DAG_STREAM_READ_FAILED,
"Failed to read from stream: %d, DAG: %s",
dtv->dagstream, dtv->dagname);
"Failed to read from stream: %d, DAG: %s",
dtv->dagstream, dtv->dagname);
ReceiveErfDagCloseStream(dtv->dagfd, dtv->dagstream);
SCReturnInt(err);
SCReturnInt(TM_ECODE_FAILED);
}
SCLogDebug("Read %d records from stream: %d, DAG: %s",
pkts_read, dtv->dagstream, dtv->dagname);
}
if (suricata_ctl_flags != 0) {
SCReturnInt(TM_ECODE_FAILED);
pkts_read, dtv->dagstream, dtv->dagname);
}
SCReturnInt(TM_ECODE_OK);

@ -109,14 +109,14 @@ TmModuleDecodeErfFileRegister(void)
*/
TmEcode ReceiveErfFileLoop(ThreadVars *tv, void *data, void *slot)
{
Packet *p = NULL;
uint16_t packet_q_len = 0;
ErfFileThreadVars *etv = (ErfFileThreadVars *)data;
etv->slot = ((TmSlot *)slot)->slot_next;
Packet *p;
uint16_t packet_q_len = 0;
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL) {
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}

@ -309,6 +309,8 @@ TmEcode ReceiveIPFW(ThreadVars *tv, Packet *p, void *data, PacketQueue *pq, Pack
TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
IPFWThreadVars *ptv = (IPFWThreadVars *)data;
IPFWQueueVars *nq = NULL;
uint8_t pkt[IP_MAXPACKET];
@ -317,13 +319,7 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
struct timeval IPFWts;
Packet *p = NULL;
uint16_t packet_q_len = 0;
SCEnter();
if (ptv == NULL) {
SCLogWarning(SC_ERR_INVALID_ARGUMENT, "Null data pointer");
SCReturnInt(TM_ECODE_FAILED);
}
nq = IPFWGetQueue(ptv->ipfw_index);
if (nq == NULL) {
SCLogWarning(SC_ERR_INVALID_ARGUMENT, "Can't get thread variable");
@ -331,13 +327,9 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
}
SCLogInfo("Thread '%s' will run on port %d (item %d)",
tv->name,
nq->port_num,
ptv->ipfw_index);
tv->name, nq->port_num, ptv->ipfw_index);
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL)
{
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}
@ -362,7 +354,6 @@ TmEcode ReceiveIPFWLoop(ThreadVars *tv, void *data, void *slot)
strerror(errno));
SCReturnInt(TM_ECODE_FAILED);
}
SCReturnInt(TM_ECODE_FAILED);
}
/* We have a packet to process */
memset (&IPFWts, 0, sizeof(struct timeval));

@ -196,24 +196,24 @@ TmEcode NapatechFeedThreadInit(ThreadVars *tv, void *initdata, void **data)
*/
TmEcode NapatechFeedLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
int32_t status;
int32_t caplen;
PCAP_HEADER *header;
uint8_t *frame;
uint16_t packet_q_len = 0;
NapatechThreadVars *ntv = (NapatechThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
ntv->slot = s->slot_next;
int r;
TmSlot *s = (TmSlot *)slot;
SCEnter();
ntv->slot = s->slot_next;
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL)
{
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}
/* make sure we have at least one packet in the packet pool, to prevent
* us from alloc'ing packets at line rate */
do {
@ -233,11 +233,10 @@ TmEcode NapatechFeedLoop(ThreadVars *tv, void *data, void *slot)
* no frames currently available
*/
continue;
}
else if (status < 0) {
} else if (status < 0) {
SCLogError(SC_ERR_NAPATECH_FEED_NEXT_FAILED,
"Failed to read from Napatech feed %d:%d",
ntv->adapter_number, ntv->feed_number);
"Failed to read from Napatech feed %d:%d",
ntv->adapter_number, ntv->feed_number);
SCReturnInt(TM_ECODE_FAILED);
}
// beware that storelen is aligned; therefore, it may be larger than "caplen"
@ -260,8 +259,10 @@ TmEcode NapatechFeedLoop(ThreadVars *tv, void *data, void *slot)
SCReturnInt(TM_ECODE_FAILED);
}
TmThreadsSlotProcessPkt(ntv->tv, ntv->slot, p);
if (TmThreadsSlotProcessPkt(ntv->tv, ntv->slot, p) != TM_ECODE_OK) {
TmqhOutputPacketpool(ntv->tv, p);
SCReturnInt(TM_ECODE_FAILED);
}
}
SCReturnInt(TM_ECODE_OK);

@ -154,20 +154,20 @@ void PcapFileCallbackLoop(char *user, struct pcap_pkthdr *h, u_char *pkt) {
/**
* \brief Main PCAP file reading Loop function
*/
TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot) {
TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
uint16_t packet_q_len = 0;
PcapFileThreadVars *ptv = (PcapFileThreadVars *)data;
int r;
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
ptv->cb_result = TM_ECODE_OK;
int r;
SCEnter();
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL)
{
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}
@ -182,10 +182,10 @@ TmEcode ReceivePcapFileLoop(ThreadVars *tv, void *data, void *slot) {
/* Right now we just support reading packets one at a time. */
r = pcap_dispatch(pcap_g.pcap_handle, (int)packet_q_len,
(pcap_handler)PcapFileCallbackLoop, (u_char *)ptv);
(pcap_handler)PcapFileCallbackLoop, (u_char *)ptv);
if (unlikely(r == -1)) {
SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
r, pcap_geterr(pcap_g.pcap_handle));
r, pcap_geterr(pcap_g.pcap_handle));
/* in the error state we just kill the engine */
EngineKill();

@ -256,19 +256,18 @@ void PcapCallbackLoop(char *user, struct pcap_pkthdr *h, u_char *pkt) {
*/
TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
uint16_t packet_q_len = 0;
PcapThreadVars *ptv = (PcapThreadVars *)data;
int r;
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
ptv->cb_result = TM_ECODE_OK;
int r;
SCEnter();
while (1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL)
{
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}
@ -283,11 +282,11 @@ TmEcode ReceivePcapLoop(ThreadVars *tv, void *data, void *slot)
/* Right now we just support reading packets one at a time. */
r = pcap_dispatch(ptv->pcap_handle, (int)packet_q_len,
(pcap_handler)PcapCallbackLoop, (u_char *)ptv);
(pcap_handler)PcapCallbackLoop, (u_char *)ptv);
if (unlikely(r < 0)) {
int dbreak = 0;
SCLogError(SC_ERR_PCAP_DISPATCH, "error code %" PRId32 " %s",
r, pcap_geterr(ptv->pcap_handle));
r, pcap_geterr(ptv->pcap_handle));
do {
usleep(PCAP_RECONNECT_TIMEOUT);
if (suricata_ctl_flags != 0) {

@ -241,20 +241,19 @@ static inline void PfringProcessPacket(void *user, struct pfring_pkthdr *h, Pack
*/
TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
{
SCEnter();
uint16_t packet_q_len = 0;
PfringThreadVars *ptv = (PfringThreadVars *)data;
TmSlot *s = (TmSlot *)slot;
ptv->slot = s->slot_next;
Packet *p = NULL;
struct pfring_pkthdr hdr;
TmSlot *s = (TmSlot *)slot;
SCEnter();
ptv->slot = s->slot_next;
while(1) {
if (suricata_ctl_flags & SURICATA_STOP ||
suricata_ctl_flags & SURICATA_KILL) {
SCReturnInt(TM_ECODE_FAILED);
if (suricata_ctl_flags & (SURICATA_STOP || SURICATA_KILL)) {
SCReturnInt(TM_ECODE_OK);
}
/* make sure we have at least one packet in the packet pool, to prevent
@ -300,7 +299,7 @@ TmEcode ReceivePfringLoop(ThreadVars *tv, void *data, void *slot)
} else {
SCLogError(SC_ERR_PF_RING_RECV,"pfring_recv error %" PRId32 "", r);
TmqhOutputPacketpool(ptv->tv, p);
return TM_ECODE_FAILED;
SCReturnInt(TM_ECODE_FAILED);
}
SCPerfSyncCountersIfSignalled(tv, 0);
}

Loading…
Cancel
Save