script/eve-parity: add script for checking eve/keyword parity

Currently this script has two commands: "missing" and "having".

"missing" will show eve fields that do not map to any keywords.

"having" will sohw eve fields along with their keyword mappsings,
while also validating that those keywords really exist.

Related to tickets: #6463, #4772
pull/12717/head
Jason Ish 5 months ago committed by Victor Julien
parent 115d7d3c6d
commit 861896ed39

@ -0,0 +1,164 @@
#! /usr/bin/env python3
#
# Tool for checking parity between the EVE schema and Suricata
# keywords.
#
# Usage: ./scripts/eve-parity.py [missing|having]
#
# ## unmapped-keywords
#
# Display all known keywords that are not mapped to an EVE field.
#
# ## unmapped-fields
#
# Display all eve fields that do not have a keyword mapping.
#
# ## mapped-fields
#
# Display all EVE fields that have a keyword mapping.
import sys
import subprocess
import json
import argparse
def main():
parser = argparse.ArgumentParser(description="EVE Parity Check Tool")
parser.add_argument(
"command", choices=["mapped-fields", "unmapped-keywords", "unmapped-fields"]
)
args = parser.parse_args()
keywords = load_known_keywords()
keys = load_schema()
if args.command == "mapped-fields":
mapped_fields(keywords, keys)
elif args.command == "unmapped-keywords":
unmapped_keywords(keywords, keys)
elif args.command == "unmapped-fields":
unmapped_fields(keywords, keys)
def unmapped_keywords(keywords, keys):
"""Report known keywords that are not mapped to an EVE field."""
schema_keywords = set()
for key in keys.keys():
if "keywords" in keys[key] and keys[key]["keywords"]:
for keyword in keys[key]["keywords"]:
schema_keywords.add(keyword)
unmapped = keywords - schema_keywords
for keyword in sorted(unmapped):
print(keyword)
def unmapped_fields(keywords, keys):
with_missing = set()
for key in keys.keys():
if "keywords" not in keys[key]:
with_missing.add(key)
# Print sorted.
for key in sorted(with_missing):
print(key)
def mapped_fields(keywords, keys):
for key in keys.keys():
if "keywords" in keys[key] and keys[key]["keywords"]:
for keyword in keys[key]["keywords"]:
if keyword not in keywords:
errprint("ERROR: Unknown keyword: {}".format(keyword))
print("{} -> [{}]".format(key, ", ".join(keys[key]["keywords"])))
def load_schema():
schema = json.load(open("etc/schema.json"))
stack = [(schema, [])]
keys = {}
while stack:
(current, path) = stack.pop(0)
for name, props in current["properties"].items():
if "$ref" in props:
ref = find_ref(schema, props["$ref"])
if not ref:
raise Exception("$ref not found: {}".format(props["$ref"]))
props = ref
if props["type"] in ["string", "integer", "boolean", "number"]:
# End of the line...
key = ".".join(path + [name])
keys[key] = props.get("suricata", {})
elif props["type"] == "object":
# An object can set "suricata.keywords" to false to
# disable descending into it. For examples, "stats".
keywords = props.get("suricata", {}).get("keywords")
if keywords is False:
# print("Skipping object {}, keywords disabled".format(".".join(path + [name])))
continue
if "properties" in props:
stack.insert(0, (props, path + [name]))
else:
# May want to warn that this object has no properties.
key = ".".join(path + [name])
keys[key] = {}
elif props["type"] == "array":
if "items" in props and "type" in props["items"]:
if "properties" in props["items"]:
stack.insert(
0,
(
props["items"],
path + ["{}".format(name)],
),
)
else:
# May want to warn that this array has no properties.
key = ".".join(path + [name])
keys[key] = {}
else:
# May want to warn that this array has no items.
key = ".".join(path + [name])
keys[key] = {}
else:
raise Exception("Unsupported type: {}".format(props["type"]))
return keys
def load_known_keywords():
keywords = set()
result = subprocess.check_output(["./src/suricata", "--list-keywords=csv"])
lines = result.decode().split("\n")
# Skip first line, as its a header line.
for line in lines[1:]:
parts = line.split(";")
if parts:
keywords.add(parts[0])
return keywords
def errprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def find_ref(schema: dict, ref: str) -> dict:
parts = ref.split("/")
root = parts.pop(0)
if root != "#":
raise Exception("Unsupported reference: {}".format(ref))
while parts:
schema = schema[parts.pop(0)]
return schema
if __name__ == "__main__":
sys.exit(main())
Loading…
Cancel
Save