#! /usr/bin/env python3
Fetch and generate ethercodes data for arpwatch
Usage: {appname} [-hVvfkt][-T sec][-O ouifile][-o outfile][-p spec]
-h, --help this message
-V, --version print version and exit
-v, --verbose verbose mode (cumulative)
-f, --force force operation
-k, --keep keep existing {ouifile}
-t, --timestamp print timestamp
-T, --deltat sec tolerance in timestamp comparison
(default: {deltat} sec.)
-O, --ouifile file IEEE.org host
(default: {ouifile})
-o, --outfile file arpwatch ethercodes
(default: {outfile})
-p, --patch spec patch specfile with updated timestamp
Fetch current IEEE MA-L Assignments file (oui.csv) from IEEE.org,
and generate ethercodes.dat for arpwatch consumption.
Fetch oui.csv only, if the timestamp is newer (unless --force is given).
Similar, generate ethercodes.dat only, if the timestamps don't match
(again, unless --force is given). Use option --keep to (re)generate
ethercodes.dat from an existing oui.csv.
Patch replaces a pattern NNNNNNNN_NNNNNN with the current timestamp
in the given file.
The timestamps of oui.csv fluctuate in a 2 seconds range(!). Therefore
compensate the fluctuation by taking a deltat tolerance factor into
(c)2018 by {author} {email}
__version__ = '0.3'
__author__ = 'Hans-Peter Jansen'
__email__ = '<hpj@urpla.net>'
__license__ = 'MIT'
import os
import re
import csv
import sys
import time
import getopt
import traceback
import email.utils
import urllib.error
import urllib.parse
import urllib.request
# avoid encoding issues with print() in locale-less environments
os.environ["PYTHONIOENCODING"] = "utf-8"
class gpar:
""" global parameter class """
appdir, appname = os.path.split(sys.argv[0])
if appdir == '.':
appdir = os.getcwd()
pid = os.getpid()
version = __version__
author = __author__
email = __email__
license = __license__
loglevel = 0
force = False
keep = False
timestamp = False
deltat = 2.5
ouifile = 'http://standards-oui.ieee.org/oui/oui.csv'
outfile = 'ethercodes.dat'
patchfile = None
def vout(lvl, msg):
if lvl <= gpar.loglevel:
print((msg).format(**gpar.__dict__), file = sys.stdout, flush = True)
stderr = lambda *msg: print(*msg, file = sys.stderr, flush = True)
def exit(ret = 0, msg = None, usage = False):
""" terminate process with optional message and usage """
if msg:
stderr('{}: {}'.format(gpar.appname, msg))
if usage:
def cmp_ts(t1, t2):
""" compare timestamps while taking a global tolerance factor into
return True, if timestamps match
return abs(t1 - t2) < gpar.deltat
hex = lambda x: int(x, 16)
hexstr = lambda h: format(h, 'x')
def code_key(val):
""" return the colon formated code key, if val is an exact 24 byte
hexadecimal string, and None otherwise
000000 -> 0:0:0
ABCDEF -> ab:cd:ef
if len(val) == 6:
return ':'.join((map(hexstr, map(hex, (val[:2], val[2:4], val[4:])))))
def append_qemu(content):
content += b'MA-L,525400,QEMU/KVM,Cyberspace\x0d\x0a'
return content
def fetch_infile(infile):
# check oui.csv parameter
vout(1, 'check {ouifile}')
req = urllib.request.urlopen(gpar.ouifile)
vout(3, 'header info: {}'.format(req.info()))
header = req.info()
ouisize = int(header['Content-Length'])
vout(1, 'oui file size: {}'.format(ouisize))
ouidate = header['Last-Modified']
vout(1, 'oui file date: {}'.format(ouidate))
ouidate = email.utils.parsedate(ouidate)
ouitime = time.mktime(ouidate)
vout(3, 'parsed oui file date: {} ({})'.format(
time.asctime(ouidate), ouitime))
# check, if local oui.csv is outdated
fetchoui = False
if gpar.force:
fetchoui = True
elif not os.path.exists(infile):
vout(1, 'no local file {infile} found')
fetchoui = True
elif os.path.getsize(infile) != ouisize:
vout(1, 'local file size differs: {} vs. {} remote'.format(
os.path.getsize(infile), ouisize))
fetchoui = True
elif not cmp_ts(os.stat(infile).st_mtime, ouitime):
vout(3, str(os.stat(infile).st_mtime))
vout(3, str(ouitime))
mtime = time.localtime(os.stat(infile).st_mtime)
otime = time.localtime(ouitime)
vout(1, 'local file date differs: {} vs. {} remote'.format(
time.asctime(mtime), time.asctime(otime)))
fetchoui = True
# fetch oui.csv
if fetchoui:
vout(1, 'fetch {ouifile}')
content = req.read()
content = append_qemu(content)
lines = content.split(sep=b'\r')
heading = lines.pop(0).replace(b'\n', b'', 1)
sort_dict = {}
for line in lines:
stripped = line.replace(b'\n', b'', 1)
items = stripped.split(b',', 3)
if len(stripped) == 0:
OUI = items[1].decode('ascii').lower()
if OUI in sort_dict:
sort_dict[OUI] = [stripped]
with open(infile, 'wb') as f:
CRLF = b'\r\n'
for OUI in sorted(sort_dict.keys()):
items = sort_dict[OUI]
for item in items:
os.utime(infile, (ouitime, ouitime))
return ouidate
def parse_csv(infile):
vout(1, 'parse {infile}')
codes = {}
with open(infile, newline = '', encoding = 'utf-8') as f:
reader = csv.reader(f)
for row in reader:
vout(3, str(row))
# generate
code = code_key(row[1])
if code:
if code in codes and codes[code] != row[2]:
vout(1, 'value {} exists already: "{}", "{}"'.format(
code, codes[code], row[2]))
codes[code] = row[2]
return codes
def patch_file(patchfile, timestamp):
vout(1, 'patch {}'.format(patchfile))
with open(patchfile, 'r+', encoding = 'utf-8') as f:
content = f.read()
patched, count = re.subn('\d{8}', timestamp, content)
if count and content != patched:
vout(1, '{} occurances replaced in {}'.format(count, patchfile))
vout(1, '{} unchanged'.format(patchfile))
def fetch_ethercodes():
# extract file argument from URL
gpar.infile = os.path.basename(urllib.parse.urlparse(gpar.ouifile).path)
# default: oui.csv
infile = gpar.infile
# default: ethercodes.dat
outfile = gpar.outfile
if gpar.keep:
if os.path.exists(infile):
# force generation of ethercodes.dat from existing oui.csv
gpar.force = True
ouidate = time.localtime(os.stat(infile).st_mtime)
exit(1, 'keep option selected, but no {infile}')
ouidate = fetch_infile(infile)
ouitime = time.mktime(ouidate)
# check, if ethercodes.dat is outdated
gencodes = False
if gpar.force:
gencodes = True
elif not os.path.exists(outfile):
vout(1, 'no local file {} found'.format(outfile))
gencodes = True
elif not cmp_ts(os.stat(outfile).st_mtime, ouitime):
vout(3, str(os.stat(outfile).st_mtime))
vout(3, str(ouitime))
mtime = time.localtime(os.stat(outfile).st_mtime)
otime = time.localtime(ouitime)
vout(2, 'local file date differs: {} vs. {} remote'.format(
time.asctime(mtime), time.asctime(otime)))
gencodes = True
# generate ethercodes.dat
if gencodes:
codes = parse_csv(infile)
gpar.nrcodes = len(codes)
vout(1, 'generate {outfile} with {nrcodes} entries')
with open(outfile, 'w', newline = '', encoding = 'utf-8') as f:
for key in sorted(codes.keys()):
f.write('%s\t%s\n' % (key, codes[key]))
os.utime(outfile, (ouitime, ouitime))
vout(1, 'successful')
vout(1, 'code file {outfile} up to date already')
timestamp = time.strftime('%Y%m%d', ouidate)
if gpar.timestamp:
vout(0, 'timestamp: {}'.format(timestamp))
if gpar.patchfile is not None:
patch_file(gpar.patchfile, timestamp)
return 0
def main(argv = None):
if argv is None:
argv = sys.argv[1:]
# yeah, oldschool, I know...
optlist, args = getopt.getopt(argv, 'hVvfktT:O:o:p:',
('help', 'version', 'verbose', 'force', 'keep', 'timestamp',
'deltat', 'ouifile', 'outfile', 'patch')
except getopt.error as msg:
exit(1, msg, True)
for opt, par in optlist:
if opt in ('-h', '--help'):
exit(usage = True)
elif opt in ('-V', '--version'):
exit(msg = 'version %s' % gpar.version)
elif opt in ('-v', '--verbose'):
gpar.loglevel += 1
elif opt in ('-f', '--force'):
gpar.force = True
elif opt in ('-k', '--keep'):
gpar.keep = True
elif opt in ('-t', '--timestamp'):
gpar.timestamp = True
elif opt in ('-T', '--deltat'):
gpar.deltat = par
elif opt in ('-O', '--ouifile'):
gpar.ouifile = par
elif opt in ('-o', '--outfile'):
gpar.outfile = par
elif opt in ('-p', '--patch'):
gpar.patchfile = par
except Exception:
exit(2, 'unexpected exception occured:\n%s' % traceback.format_exc())
if __name__ == '__main__':
# vim: ts=8 shiftwidth=8 expandtab