| 
									
										
										
										
											2019-11-15 12:24:57 +01:00
										 |  |  |  | #!/usr/bin/python3 | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | # This program is free software; you can redistribute it and/or modify it under | 
					
						
							|  |  |  |  | # the terms of the GNU Lesser General Public License as published by the Free | 
					
						
							|  |  |  |  | # Software Foundation; either version 3 of the License, or (at your option) any | 
					
						
							|  |  |  |  | # later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text | 
					
						
							|  |  |  |  | # of the license. | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | __author__ = 'Bastien Nocera' | 
					
						
							|  |  |  |  | __email__ = 'hadess@hadess.net' | 
					
						
							|  |  |  |  | __copyright__ = '(c) 2019 Red Hat Inc.' | 
					
						
							|  |  |  |  | __license__ = 'LGPL 3+' | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  | import unittest | 
					
						
							|  |  |  |  | import sys | 
					
						
							|  |  |  |  | import subprocess | 
					
						
							|  |  |  |  | import fcntl | 
					
						
							|  |  |  |  | import os | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-07 15:45:06 +01:00
										 |  |  |  | import taptestrunner | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  | try: | 
					
						
							|  |  |  |  |     # Do all non-standard imports here so we can skip the tests if any | 
					
						
							|  |  |  |  |     # needed packages are not available. | 
					
						
							|  |  |  |  |     import dbus | 
					
						
							|  |  |  |  |     import dbus.mainloop.glib | 
					
						
							|  |  |  |  |     import dbusmock | 
					
						
							|  |  |  |  |     from gi.repository import GLib | 
					
						
							|  |  |  |  |     from gi.repository import Gio | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |     class TestLowMemoryMonitor(dbusmock.DBusTestCase): | 
					
						
							|  |  |  |  |         '''Test GMemoryMonitorDBus''' | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         @classmethod | 
					
						
							|  |  |  |  |         def setUpClass(klass): | 
					
						
							|  |  |  |  |             klass.start_system_bus() | 
					
						
							|  |  |  |  |             klass.dbus_con = klass.get_dbus(True) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         def setUp(self): | 
					
						
							|  |  |  |  |             try: | 
					
						
							|  |  |  |  |                 Gio.MemoryMonitor | 
					
						
							|  |  |  |  |             except AttributeError: | 
					
						
							|  |  |  |  |                 raise unittest.SkipTest('Low memory monitor not in ' | 
					
						
							|  |  |  |  |                                         'introspection data. Requires ' | 
					
						
							|  |  |  |  |                                         'GObject-Introspection ≥ 1.63.2') | 
					
						
							|  |  |  |  |             try: | 
					
						
							|  |  |  |  |                 (self.p_mock, self.obj_lmm) = self.spawn_server_template( | 
					
						
							|  |  |  |  |                     'low_memory_monitor', {}, stdout=subprocess.PIPE) | 
					
						
							|  |  |  |  |             except ModuleNotFoundError: | 
					
						
							|  |  |  |  |                 raise unittest.SkipTest("Low memory monitor dbusmock template not " | 
					
						
							|  |  |  |  |                                         "found. Requires dbusmock ≥ 0.18.4.") | 
					
						
							|  |  |  |  |             # set log to nonblocking | 
					
						
							|  |  |  |  |             flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL) | 
					
						
							|  |  |  |  |             fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) | 
					
						
							|  |  |  |  |             self.last_warning = -1 | 
					
						
							|  |  |  |  |             self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE) | 
					
						
							| 
									
										
										
										
											2024-01-22 13:59:12 +00:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  |             try: | 
					
						
							|  |  |  |  |                 self.wait_for_bus_object('org.freedesktop.LowMemoryMonitor', | 
					
						
							|  |  |  |  |                                         '/org/freedesktop/LowMemoryMonitor', | 
					
						
							|  |  |  |  |                                         system_bus=True) | 
					
						
							|  |  |  |  |             except: | 
					
						
							|  |  |  |  |                 raise | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  |             self.memory_monitor = Gio.MemoryMonitor.dup_default() | 
					
						
							| 
									
										
										
										
											2024-01-22 13:59:49 +00:00
										 |  |  |  |             assert("GMemoryMonitorDBus" in str(self.memory_monitor)) | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  |             self.memory_monitor.connect("low-memory-warning", self.memory_warning_cb) | 
					
						
							|  |  |  |  |             self.mainloop = GLib.MainLoop() | 
					
						
							|  |  |  |  |             self.main_context = self.mainloop.get_context() | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-22 14:03:50 +00:00
										 |  |  |  |             # The LowMemoryMonitor API is stateless: it doesn’t expose any | 
					
						
							|  |  |  |  |             # properties, just a warning signal. Emit the signal in a loop until | 
					
						
							|  |  |  |  |             # the GMemoryMonitor instance has initialised and synchronised to | 
					
						
							|  |  |  |  |             # the right state. | 
					
						
							|  |  |  |  |             def emit_warning(level): | 
					
						
							|  |  |  |  |                 self.dbusmock.EmitWarning(level) | 
					
						
							|  |  |  |  |                 return GLib.SOURCE_CONTINUE | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             idle_id = GLib.idle_add(emit_warning, 0) | 
					
						
							|  |  |  |  |             while self.last_warning != 0: | 
					
						
							|  |  |  |  |                 self.main_context.iteration(True) | 
					
						
							|  |  |  |  |             GLib.source_remove(idle_id) | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  |         def tearDown(self): | 
					
						
							|  |  |  |  |             self.p_mock.terminate() | 
					
						
							|  |  |  |  |             self.p_mock.wait() | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-22 13:56:05 +00:00
										 |  |  |  |         def assertEventually(self, condition, message=None, timeout=5): | 
					
						
							| 
									
										
										
										
											2021-08-11 15:38:12 +02:00
										 |  |  |  |             '''Assert that condition function eventually returns True.
 | 
					
						
							|  |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-01-22 13:56:05 +00:00
										 |  |  |  |             Timeout is in seconds, defaulting to 5 seconds. message is | 
					
						
							| 
									
										
										
										
											2021-08-11 15:38:12 +02:00
										 |  |  |  |             printed on failure. | 
					
						
							|  |  |  |  |             '''
 | 
					
						
							| 
									
										
										
										
											2024-01-22 13:56:05 +00:00
										 |  |  |  |             if not message: | 
					
						
							|  |  |  |  |                 message = 'timed out waiting for ' + str(condition) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             def timed_out_cb(message): | 
					
						
							|  |  |  |  |                 self.fail(message) | 
					
						
							|  |  |  |  |                 return GLib.SOURCE_REMOVE | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             timeout_source = GLib.timeout_source_new_seconds(timeout) | 
					
						
							|  |  |  |  |             timeout_source.set_callback(timed_out_cb, message) | 
					
						
							|  |  |  |  |             timeout_source.attach(self.main_context) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             while not condition(): | 
					
						
							|  |  |  |  |                 self.main_context.iteration(True) | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             timeout_source.destroy() | 
					
						
							| 
									
										
										
										
											2021-08-11 15:38:12 +02:00
										 |  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  |         def memory_warning_cb(self, monitor, level): | 
					
						
							| 
									
										
										
										
											2024-01-22 11:22:45 +00:00
										 |  |  |  |             print("Received memory warning signal, level", level) | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  |             self.last_warning = level | 
					
						
							|  |  |  |  |             self.main_context.wakeup() | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |         def test_low_memory_warning_signal(self): | 
					
						
							|  |  |  |  |             '''LowMemoryWarning signal''' | 
					
						
							|  |  |  |  | 
 | 
					
						
							|  |  |  |  |             self.dbusmock.EmitWarning(100) | 
					
						
							|  |  |  |  |             # Wait 2 seconds or until warning | 
					
						
							| 
									
										
										
										
											2024-01-22 13:56:05 +00:00
										 |  |  |  |             self.assertEventually(lambda: self.last_warning == 100, "'100' low-memory warning not received", 2) | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  |             self.dbusmock.EmitWarning(255) | 
					
						
							|  |  |  |  |             # Wait 2 seconds or until warning | 
					
						
							| 
									
										
										
										
											2024-01-22 13:56:05 +00:00
										 |  |  |  |             self.assertEventually(lambda: self.last_warning == 255, "'255' low-memory warning not received", 2) | 
					
						
							| 
									
										
										
										
											2020-04-09 15:28:12 +01:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  | except ImportError as e: | 
					
						
							|  |  |  |  |     @unittest.skip("Cannot import %s" % e.name) | 
					
						
							|  |  |  |  |     class TestLowMemoryMonitor(unittest.TestCase): | 
					
						
							|  |  |  |  |         def test_low_memory_warning_signal(self): | 
					
						
							|  |  |  |  |             pass | 
					
						
							| 
									
										
										
										
											2019-11-15 12:24:57 +01:00
										 |  |  |  | 
 | 
					
						
							|  |  |  |  | if __name__ == '__main__': | 
					
						
							| 
									
										
										
										
											2020-04-07 15:45:06 +01:00
										 |  |  |  |     unittest.main(testRunner=taptestrunner.TAPTestRunner()) |