diff --git a/apgrep/actions.py b/apgrep/actions.py new file mode 100644 index 0000000..d46c9bb --- /dev/null +++ b/apgrep/actions.py @@ -0,0 +1,13 @@ +import fileinput + +from clfparser import CLFParser +from ipaddress import ip_address, ip_network +from apgrep.util import * + + +def clf_grep(logfiles, cidr_pattern): + for line in get_lines(logfiles): + host = ip_address(CLFParser.logParts(rec=line, formatString='%h')[0]) + if addr_match(host, cidr_pattern): + print(line, end='') + diff --git a/apgrep/cli.py b/apgrep/cli.py new file mode 100644 index 0000000..67189e9 --- /dev/null +++ b/apgrep/cli.py @@ -0,0 +1,19 @@ +import ipaddress +from plumbum import cli +from apgrep.actions import * +from apgrep.util import cidr_mask + +class apGrep(cli.Application): + + cidr_pattern = cli.SwitchAttr(["-m", "--match"], + cidr_mask, + argname="CIDR", + help="IP address in CIDR notation, both IPv4 and IPv6 are accepted", + mandatory=True) + + def main(self, *logfiles): + clf_grep(logfiles, self.cidr_pattern) + + +if __name__ == "__main__": + apGrep.run() diff --git a/apgrep/util.py b/apgrep/util.py new file mode 100644 index 0000000..366916a --- /dev/null +++ b/apgrep/util.py @@ -0,0 +1,32 @@ +import sys +import fileinput +from ipaddress import ip_address, ip_network + +# utility functions go here + +def cidr_mask(address): + """a wrapper on the ip_network to allow host bits""" + return ip_network(address, strict=False) + +def addr_match(address, network): + """tell on the address if it's within a network""" + return ip_address(address) in ip_network(network, strict=False) + +def get_lines(filenames): + """return iterator over all lines in a file""" + with fileinput.input(files=filenames, openhook=fileinput.hook_compressed) as fd: + for line in fd: + if isinstance(line, bytes): + line = line.decode('utf-8', errors="ignore") + yield line + +if __name__ == "__main__": + test_data = { + '127.0.0.1/16': '127.0.0.1', + '10.10.0.0/8': '8.8.8.8' + } + for net, ip in test_data.items(): + print(f"netmask {net} matches ip {ip}: {addr_match(ip, net)}") + + for line in get_lines(sys.argv[1:]): + print(line, end='')