2019-11-15 12:24:57 +01:00
|
|
|
#!/usr/bin/python3
|
|
|
|
|
|
|
|
# This program is free software; you can redistribute it and/or modify it under
|
|
|
|
# the terms of the GNU Lesser General Public License as published by the Free
|
|
|
|
# Software Foundation; either version 3 of the License, or (at your option) any
|
|
|
|
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
|
|
|
|
# of the license.
|
|
|
|
|
|
|
|
__author__ = 'Bastien Nocera'
|
|
|
|
__email__ = 'hadess@hadess.net'
|
|
|
|
__copyright__ = '(c) 2019 Red Hat Inc.'
|
|
|
|
__license__ = 'LGPL 3+'
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
import sys
|
|
|
|
import subprocess
|
2023-01-18 11:15:16 +01:00
|
|
|
import tempfile
|
2019-11-15 12:24:57 +01:00
|
|
|
import fcntl
|
|
|
|
import os
|
|
|
|
import time
|
|
|
|
|
2020-04-09 15:28:12 +01:00
|
|
|
try:
|
|
|
|
# Do all non-standard imports here so we can skip the tests if any
|
|
|
|
# needed packages are not available.
|
|
|
|
import dbus
|
|
|
|
import dbus.mainloop.glib
|
|
|
|
import dbusmock
|
|
|
|
from gi.repository import GLib
|
|
|
|
from gi.repository import Gio
|
|
|
|
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
|
|
|
|
class TestLowMemoryMonitor(dbusmock.DBusTestCase):
|
|
|
|
'''Test GMemoryMonitorDBus'''
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def setUpClass(klass):
|
|
|
|
klass.start_system_bus()
|
|
|
|
klass.dbus_con = klass.get_dbus(True)
|
|
|
|
|
2023-01-18 11:15:16 +01:00
|
|
|
klass.workdir = tempfile.mkdtemp(prefix='memory-monitor-dbus')
|
|
|
|
klass.start_monitor()
|
|
|
|
klass.addClassCleanup(klass.stop_monitor)
|
|
|
|
|
2020-04-09 15:28:12 +01:00
|
|
|
def setUp(self):
|
|
|
|
try:
|
|
|
|
Gio.MemoryMonitor
|
|
|
|
except AttributeError:
|
|
|
|
raise unittest.SkipTest('Low memory monitor not in '
|
|
|
|
'introspection data. Requires '
|
|
|
|
'GObject-Introspection ≥ 1.63.2')
|
|
|
|
try:
|
|
|
|
(self.p_mock, self.obj_lmm) = self.spawn_server_template(
|
|
|
|
'low_memory_monitor', {}, stdout=subprocess.PIPE)
|
|
|
|
except ModuleNotFoundError:
|
|
|
|
raise unittest.SkipTest("Low memory monitor dbusmock template not "
|
|
|
|
"found. Requires dbusmock ≥ 0.18.4.")
|
|
|
|
# set log to nonblocking
|
|
|
|
flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
|
|
|
|
fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
|
|
|
self.last_warning = -1
|
|
|
|
self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE)
|
|
|
|
self.memory_monitor = Gio.MemoryMonitor.dup_default()
|
|
|
|
self.memory_monitor.connect("low-memory-warning", self.memory_warning_cb)
|
|
|
|
self.mainloop = GLib.MainLoop()
|
|
|
|
self.main_context = self.mainloop.get_context()
|
|
|
|
|
2023-01-18 11:15:16 +01:00
|
|
|
@classmethod
|
|
|
|
def start_monitor(klass):
|
|
|
|
'''Start dbus-monitor'''
|
|
|
|
|
|
|
|
# You can rename the log file to *.log if you want to see it on test
|
|
|
|
# case failures
|
|
|
|
klass.monitor_log = open(os.path.join(klass.workdir, 'dbus-monitor.log'), 'wb', buffering=0)
|
2023-01-18 11:26:33 +01:00
|
|
|
klass.monitor = subprocess.Popen(['dbus-monitor', '--monitor', '--system'],
|
2023-01-18 11:15:16 +01:00
|
|
|
stdout=klass.monitor_log,
|
|
|
|
stderr=subprocess.STDOUT)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def stop_monitor(klass):
|
|
|
|
'''Stop dbus-monitor'''
|
|
|
|
|
|
|
|
assert klass.monitor
|
|
|
|
klass.stop_process(klass.monitor)
|
|
|
|
|
|
|
|
klass.monitor_log.flush()
|
|
|
|
klass.monitor_log.close()
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def stop_process(cls, proc, timeout=1):
|
|
|
|
proc.terminate()
|
|
|
|
try:
|
|
|
|
proc.wait(timeout)
|
|
|
|
except:
|
|
|
|
print("Killing %d (%s) after timeout of %f seconds" % (proc.pid, proc.args[0], timeout))
|
|
|
|
proc.kill()
|
|
|
|
proc.wait()
|
|
|
|
|
2020-04-09 15:28:12 +01:00
|
|
|
def tearDown(self):
|
|
|
|
self.p_mock.terminate()
|
|
|
|
self.p_mock.wait()
|
|
|
|
|
2023-01-17 16:51:53 +01:00
|
|
|
def assertLevelEventually(self, expected_level, timeout=50):
|
|
|
|
'''Assert that the expected level is eventually reached.
|
2021-08-11 15:38:12 +02:00
|
|
|
|
2023-01-17 16:51:53 +01:00
|
|
|
Timeout is in deciseconds, defaulting to 50 (5 seconds).
|
2021-08-11 15:38:12 +02:00
|
|
|
'''
|
|
|
|
while timeout >= 0:
|
|
|
|
context = GLib.MainContext.default()
|
|
|
|
while context.iteration(False):
|
|
|
|
pass
|
2023-01-17 16:51:53 +01:00
|
|
|
if self.last_warning == expected_level:
|
2021-08-11 15:38:12 +02:00
|
|
|
break
|
|
|
|
timeout -= 1
|
|
|
|
time.sleep(0.1)
|
|
|
|
else:
|
2023-01-17 16:51:53 +01:00
|
|
|
self.fail(f'timed out waiting for expected level {expected_level}, last warning is {self.last_warning}')
|
2021-08-11 15:38:12 +02:00
|
|
|
|
2020-04-09 15:28:12 +01:00
|
|
|
def memory_warning_cb(self, monitor, level):
|
|
|
|
self.last_warning = level
|
|
|
|
self.main_context.wakeup()
|
|
|
|
|
|
|
|
def test_low_memory_warning_signal(self):
|
|
|
|
'''LowMemoryWarning signal'''
|
|
|
|
|
|
|
|
# Wait 2 seconds
|
|
|
|
timeout = 2
|
|
|
|
while timeout > 0:
|
|
|
|
time.sleep(0.5)
|
|
|
|
timeout -= 0.5
|
|
|
|
self.main_context.iteration(False)
|
|
|
|
|
|
|
|
self.dbusmock.EmitWarning(100)
|
2023-01-17 16:51:53 +01:00
|
|
|
# Wait 5 seconds or until warning
|
|
|
|
self.assertLevelEventually(100)
|
2020-04-09 15:28:12 +01:00
|
|
|
|
|
|
|
self.dbusmock.EmitWarning(255)
|
2023-01-17 16:51:53 +01:00
|
|
|
# Wait 5 seconds or until warning
|
|
|
|
self.assertLevelEventually(255)
|
2020-04-09 15:28:12 +01:00
|
|
|
|
|
|
|
except ImportError as e:
|
|
|
|
@unittest.skip("Cannot import %s" % e.name)
|
|
|
|
class TestLowMemoryMonitor(unittest.TestCase):
|
|
|
|
def test_low_memory_warning_signal(self):
|
|
|
|
pass
|
2019-11-15 12:24:57 +01:00
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2023-01-18 10:52:00 +01:00
|
|
|
unittest.main(testRunner=unittest.TextTestRunner(stream=sys.stdout, verbosity=2))
|