apparmor/aa-notify-more-arch-mr809.diff
2021-11-09 18:09:23 +00:00

189 lines
7.8 KiB
Diff
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This patch contains the code changes from
https://gitlab.com/apparmor/apparmor/-/merge_requests/809
It does NOT include the added unit tests because adding binary test files with a patch is too hard.
diff --git a/utils/aa-notify b/utils/aa-notify
index 91d0f3b9c240e1ff0fec8aa673ef70fa78cf33bc..024044a0c58ed4827502da66786acb4e9b54fc2f 100755
--- a/utils/aa-notify
+++ b/utils/aa-notify
@@ -34,7 +34,6 @@ import os
import re
import sys
import time
-import struct
import notify2
import psutil
import pwd
@@ -45,6 +44,7 @@ import apparmor.ui as aaui
import apparmor.config as aaconfig
from apparmor.common import DebugLogger, open_file_read
from apparmor.fail import enable_aa_exception_handler
+from apparmor.notify import get_last_login_timestamp
from apparmor.translations import init_translation
import LibAppArmor # C-library to parse one log line
@@ -61,48 +61,6 @@ def get_user_login():
return username
-def get_last_login_timestamp(username):
- '''Directly read wtmp and get last login for user as epoch timestamp'''
- timestamp = 0
- filename = '/var/log/wtmp'
- last_login = 0
-
- debug_logger.debug('Username: {}'.format(username))
-
- with open(filename, "rb") as wtmp_file:
- offset = 0
- wtmp_filesize = os.path.getsize(filename)
- debug_logger.debug('WTMP filesize: {}'.format(wtmp_filesize))
- while offset < wtmp_filesize:
- wtmp_file.seek(offset)
- offset += 384 # Increment for next entry
-
- type = struct.unpack("<L", wtmp_file.read(4))[0]
- debug_logger.debug('WTMP entry type: {}'.format(type))
-
- # Only parse USER lines
- if type == 7:
- # Read each item and move pointer forward
- pid = struct.unpack("<L", wtmp_file.read(4))[0]
- line = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
- id = wtmp_file.read(4).decode("utf-8", "replace").split('\0', 1)[0]
- user = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
- host = wtmp_file.read(256).decode("utf-8", "replace").split('\0', 1)[0]
- term = struct.unpack("<H", wtmp_file.read(2))[0]
- exit = struct.unpack("<H", wtmp_file.read(2))[0]
- session = struct.unpack("<L", wtmp_file.read(4))[0]
- timestamp = struct.unpack("<L", wtmp_file.read(4))[0]
- usec = struct.unpack("<L", wtmp_file.read(4))[0]
- entry = (pid, line, id, user, host, term, exit, session, timestamp, usec)
- debug_logger.debug('WTMP entry: {}'.format(entry))
-
- # Store login timestamp for requested user
- if user == username:
- last_login = timestamp
-
- # When loop is done, last value should be the latest login timestamp
- return last_login
-
def format_event(event, logsource):
output = []
diff --git a/utils/apparmor/notify.py b/utils/apparmor/notify.py
new file mode 100644
index 0000000000000000000000000000000000000000..1101a29346d79dd873c347fd12dd79cda1e1c786
--- /dev/null
+++ b/utils/apparmor/notify.py
@@ -0,0 +1,105 @@
+#! /usr/bin/python3
+# ----------------------------------------------------------------------
+# Copyright (C) 20182019 Otto Kekäläinen <otto@kekalainen.net>
+# Copyright (C) 2021 Christian Boltz
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of version 2 of the GNU General Public
+# License as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# ----------------------------------------------------------------------
+
+import os
+import struct
+
+from apparmor.common import AppArmorBug, DebugLogger
+
+debug_logger = DebugLogger('apparmor.notify')
+
+
+def sane_timestamp(timestamp):
+ ''' Check if the given timestamp is in a date range that makes sense for a wtmp file '''
+
+ if timestamp < 946681200: # 2000-01-01
+ return False
+ elif timestamp > 2524604400: # 2050-01-01
+ return False
+
+ return True
+
+def get_last_login_timestamp(username, filename='/var/log/wtmp'):
+ '''Directly read wtmp and get last login for user as epoch timestamp'''
+ timestamp = 0
+ last_login = 0
+
+ debug_logger.debug('Username: {}'.format(username))
+
+ with open(filename, "rb") as wtmp_file:
+ offset = 0
+ wtmp_filesize = os.path.getsize(filename)
+ debug_logger.debug('WTMP filesize: {}'.format(wtmp_filesize))
+
+ if wtmp_filesize < 356:
+ return 0 # (nearly) empty wtmp file, no entries
+
+ # detect architecture based on utmp format differences
+ wtmp_file.seek(340) # first possible timestamp position
+ timestamp_x86_64 = struct.unpack("<L", wtmp_file.read(4))[0]
+ timestamp_aarch64 = struct.unpack("<L", wtmp_file.read(4))[0]
+ timestamp_s390x = struct.unpack(">L", wtmp_file.read(4))[0]
+ debug_logger.debug('WTMP timestamps: x86_64 %s, aarch64 %s, s390x %s' % (timestamp_x86_64, timestamp_aarch64, timestamp_s390x))
+
+ if sane_timestamp(timestamp_x86_64):
+ endianness = '<' # little endian
+ extra_offset_before = 0
+ extra_offset_after = 0
+ elif sane_timestamp(timestamp_aarch64):
+ endianness = '<' # little endian
+ extra_offset_before = 4
+ extra_offset_after = 12
+ elif sane_timestamp(timestamp_s390x):
+ endianness = '>' # big endian
+ extra_offset_before = 8
+ extra_offset_after = 8
+ else:
+ raise AppArmorBug('Your /var/log/wtmp is broken or has an unknown format. Please open a bugreport with /var/log/wtmp and the output of "last" attached!')
+
+ while offset < wtmp_filesize:
+ wtmp_file.seek(offset)
+ offset += 384 + extra_offset_before + extra_offset_after # Increment for next entry
+
+ type = struct.unpack('%sH' % endianness, wtmp_file.read(2))[0]
+ debug_logger.debug('WTMP entry type: {}'.format(type))
+ wtmp_file.read(2) # skip padding
+
+ # Only parse USER lines
+ if type == 7:
+ # Read each item and move pointer forward
+ pid = struct.unpack("<L", wtmp_file.read(4))[0]
+ line = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
+ id = wtmp_file.read(4).decode("utf-8", "replace").split('\0', 1)[0]
+ user = wtmp_file.read(32).decode("utf-8", "replace").split('\0', 1)[0]
+ host = wtmp_file.read(256).decode("utf-8", "replace").split('\0', 1)[0]
+ term = struct.unpack("<H", wtmp_file.read(2))[0]
+ exit = struct.unpack("<H", wtmp_file.read(2))[0]
+ session = struct.unpack("<L", wtmp_file.read(4))[0]
+ if extra_offset_before:
+ wtmp_file.read(extra_offset_before)
+ timestamp = struct.unpack('%sL' % endianness, wtmp_file.read(4))[0]
+ if extra_offset_after:
+ wtmp_file.read(extra_offset_after)
+ usec = struct.unpack("<L", wtmp_file.read(4))[0]
+ entry = (pid, line, id, user, host, term, exit, session, timestamp, usec)
+ debug_logger.debug('WTMP entry: {}'.format(entry))
+
+ # Store login timestamp for requested user
+ if user == username:
+ last_login = timestamp
+
+ # When loop is done, last value should be the latest login timestamp
+ return last_login