suricatasc: Snug the processing of different commands

Since all of the commands were following the same procedure, namely,
split the input extract the arguments, throw the error if required
argument is missing else send the command over to suricata, put all of
this in one compact function alongwith a dictionary for specifications
for different commands, the name of the argument, the type and if it is
required or not.
Following fixups come with this commit:
- Code becomes really cozy
- Split errors on a few commands are well handled
- No redundant code
- More readability

References redmine ticket #2793
pull/3661/head
Shivani Bhardwaj 7 years ago committed by Victor Julien
parent 57285b54d5
commit bf37e3f5da

@ -0,0 +1,162 @@
argsd = {
"pcap-file": [
{
"name": "filename",
"required": 1,
},
{
"name": "output-dir",
"required": 1,
},
{
"name": "tenant",
"type": int,
"required": 0,
},
{
"name": "continuous",
"required": 0,
},
{
"name": "delete-when-done",
"required": 0,
},
],
"pcap-file-continuous": [
{
"name": "filename",
"required": 1,
},
{
"name": "output-dir",
"required": 1,
},
{
"name": "tenant",
"type": int,
"required": 0,
},
{
"name": "delete-when-done",
"required": 0,
},
],
"iface-stat": [
{
"name": "iface",
"required": 1,
},
],
"conf-get": [
{
"name": "variable",
"required": 1,
}
],
"unregister-tenant-handler": [
{
"name": "tenantid",
"required": 1,
},
{
"name": "htype",
"required": 1,
},
{
"name": "hargs",
"type": int,
"required": 0,
},
],
"register-tenant-handler": [
{
"name": "tenantid",
"required": 1,
},
{
"name": "htype",
"required": 1,
},
{
"name": "hargs",
"type": int,
"required": 0,
},
],
"unregister-tenant": [
{
"name": "id",
"type": int,
"required": 1,
},
],
"register-tenant": [
{
"name": "id",
"type": int,
"required": 1,
},
{
"name": "filename",
"required": 1,
},
],
"reload-tenant": [
{
"name": "id",
"type": int,
"required": 1,
},
{
"name": "filename",
"required": 1,
},
],
"add-hostbit": [
{
"name": "ipaddress",
"required": 1,
},
{
"name": "hostbit",
"required": 1,
},
{
"name": "expire",
"type": int,
"required": 1,
},
],
"remove-hostbit": [
{
"name": "ipaddress",
"required": 1,
},
{
"name": "hostbit",
"required": 1,
},
],
"list-hostbit": [
{
"name": "ipaddress",
"required": 1,
},
],
"memcap-set": [
{
"name": "config",
"required": 1,
},
{
"name": "memcap",
"required": 1,
},
],
"memcap-show": [
{
"name": "config",
"required": 1,
},
],
}

@ -23,8 +23,9 @@ from socket import socket, AF_UNIX, error
import select
import sys
SURICATASC_VERSION = "1.0"
from suricata.sc.specs import argsd
SURICATASC_VERSION = "1.0"
VERSION = "0.2"
INC_SIZE = 1024
@ -43,19 +44,19 @@ class SuricataException(Exception):
class SuricataNetException(SuricataException):
"""
Exception raised when network error occur.
Exception raised when a network error occurs
"""
class SuricataCommandException(SuricataException):
"""
Exception raised when command is not correct.
Exception raised when the command is incorrect
"""
class SuricataReturnException(SuricataException):
"""
Exception raised when return message is not correct.
Exception raised when return message is incorrect
"""
@ -78,9 +79,35 @@ class SuricataCompleter:
return None
return None
class SuricataSC:
def __init__(self, sck_path, verbose=False):
self.cmd_list = ['shutdown', 'quit', 'pcap-file', 'pcap-file-continuous', 'pcap-file-number', 'pcap-file-list', 'pcap-last-processed', 'pcap-interrupt', 'iface-list', 'iface-stat', 'register-tenant', 'unregister-tenant', 'register-tenant-handler', 'unregister-tenant-handler', 'add-hostbit', 'remove-hostbit', 'list-hostbit', 'memcap-set', 'memcap-show']
self.basic_commands = [
"shutdown",
"quit",
"pcap-file-number",
"pcap-file-list",
"pcap-last-processed",
"pcap-interrupt",
"iface-list",
]
self.fn_commands = [
"pcap-file ",
"pcap-file-continuous ",
"iface-stat",
"conf-get",
"unregister-tenant-handler",
"register-tenant-handler",
"unregister-tenant",
"register-tenant",
"reload-tenant",
"add-hostbit",
"remove-hostbit",
"list-hostbit",
"memcap-set",
"memcap-show",
]
self.cmd_list = self.basic_commands + self.fn_commands
self.sck_path = sck_path
self.verbose = verbose
self.socket = socket(AF_UNIX)
@ -100,7 +127,7 @@ class SuricataSC:
def send_command(self, command, arguments=None):
if command not in self.cmd_list and command != 'command-list':
raise SuricataCommandException("No such command: %s", command)
raise SuricataCommandException("Command not found: {}".format(command))
cmdmsg = {}
cmdmsg['command'] = command
@ -119,7 +146,6 @@ class SuricataSC:
cmdret = self.json_recv()
else:
cmdret = None
if not cmdret:
raise SuricataReturnException("Unable to get message from server")
@ -165,208 +191,36 @@ class SuricataSC:
self.cmd_list = cmdret["message"]["commands"]
self.cmd_list.append("quit")
def close(self):
self.socket.close()
def execute(self, command):
full_cmd = command.split()
cmd = full_cmd[0]
cmd_specs = argsd[cmd]
arguments = dict()
for c, spec in enumerate(cmd_specs, 1):
spec_type = str if "type" not in spec else spec["type"]
if spec["required"]:
try:
arguments[spec["name"]] = spec_type(full_cmd[c])
except IndexError:
raise SuricataCommandException("Missing arguments")
elif c < len(full_cmd):
arguments[spec["name"]] = spec_type(full_cmd[c])
return cmd, arguments
def parse_command(self, command):
arguments = None
if command.split(' ', 2)[0] in self.cmd_list:
if "pcap-file " in command:
try:
parts = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
cmd, filename, output = parts[0], parts[1], parts[2]
tenant = None
if len(parts) > 3:
tenant = parts[3]
continuous = None
if len(parts) > 4:
continuous = parts[4]
delete_when_done = None
if len(parts) > 5:
delete_when_done = parts[5]
if cmd != "pcap-file":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["filename"] = filename
arguments["output-dir"] = output
if tenant:
arguments["tenant"] = int(tenant)
if continuous:
arguments["continuous"] = continuous
if delete_when_done:
arguments["delete-when-done"] = delete_when_done
elif "pcap-file-continuous " in command:
try:
parts = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
cmd, filename, output = parts[0], parts[1], parts[2]
tenant = None
if len(parts) > 3:
tenant = parts[3]
delete_when_done = None
if len(parts) > 4:
delete_when_done = parts[4]
if cmd != "pcap-file":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["filename"] = filename
arguments["output-dir"] = output
arguments["continuous"] = True
if tenant:
arguments["tenant"] = int(tenant)
if delete_when_done:
arguments["delete-when-done"] = delete_when_done
elif "iface-stat" in command:
try:
[cmd, iface] = command.split(' ', 1)
except:
raise SuricataCommandException("Unable to split command '%s'" % (command))
if cmd != "iface-stat":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["iface"] = iface
elif "conf-get" in command:
try:
[cmd, variable] = command.split(' ', 1)
except:
raise SuricataCommandException("Unable to split command '%s'" % (command))
if cmd != "conf-get":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["variable"] = variable
elif "unregister-tenant-handler" in command:
try:
parts = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
cmd, tenantid, htype = parts[0], parts[1], parts[2]
hargs = None
if len(parts) > 3:
hargs = parts[3]
if cmd != "unregister-tenant-handler":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["id"] = int(tenantid)
arguments["htype"] = htype
if hargs:
arguments["hargs"] = int(hargs)
elif "register-tenant-handler" in command:
try:
parts = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
cmd, tenantid, htype = parts[0], parts[1], parts[2]
hargs = None
if len(parts) > 3:
hargs = parts[3]
if cmd != "register-tenant-handler":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["id"] = int(tenantid)
arguments["htype"] = htype
if hargs:
arguments["hargs"] = int(hargs)
elif "unregister-tenant" in command:
try:
[cmd, tenantid] = command.split(' ', 1)
except:
raise SuricataCommandException("Unable to split command '%s'" % (command))
if cmd != "unregister-tenant":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["id"] = int(tenantid)
elif "register-tenant" in command:
try:
[cmd, tenantid, filename] = command.split(' ', 2)
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "register-tenant":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["id"] = int(tenantid)
arguments["filename"] = filename
elif "reload-tenant" in command:
try:
[cmd, tenantid, filename] = command.split(' ', 2)
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "reload-tenant":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["id"] = int(tenantid)
arguments["filename"] = filename
elif "add-hostbit" in command:
try:
[cmd, ipaddress, hostbit, expire] = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "add-hostbit":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["ipaddress"] = ipaddress
arguments["hostbit"] = hostbit
arguments["expire"] = int(expire)
elif "remove-hostbit" in command:
try:
[cmd, ipaddress, hostbit] = command.split(' ', 2)
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "remove-hostbit":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["ipaddress"] = ipaddress
arguments["hostbit"] = hostbit
elif "list-hostbit" in command:
try:
[cmd, ipaddress] = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "list-hostbit":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["ipaddress"] = ipaddress
elif "memcap-set" in command:
try:
[cmd, config, memcap] = command.split(' ', 2)
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "memcap-set":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["config"] = config
arguments["memcap"] = memcap
elif "memcap-show" in command:
try:
[cmd, config] = command.split(' ')
except:
raise SuricataCommandException("Arguments to command '%s' is missing" % (command))
if cmd != "memcap-show":
raise SuricataCommandException("Invalid command '%s'" % (command))
else:
arguments = {}
arguments["config"] = config
cmd = command.split(maxsplit=2)[0]
if cmd in self.cmd_list:
if cmd in self.fn_commands:
cmd, arguments = getattr(self, "execute")(command=command)
else:
cmd = command
else:
raise SuricataCommandException("Unknown command '%s'" % (command))
return (cmd, arguments)
raise SuricataCommandException("Unknown command {}".format(command))
return cmd, arguments
def interactive(self):
print("Command list: " + ", ".join(self.cmd_list))
@ -382,7 +236,7 @@ class SuricataSC:
if command == "quit":
break
try:
(cmd, arguments) = self.parse_command(command)
cmd, arguments = self.parse_command(command)
except SuricataCommandException as err:
print(err)
continue

Loading…
Cancel
Save