#!/usr/bin/python3 # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation; either version 3 of the License, or (at your option) any # later version. See http://www.gnu.org/copyleft/lgpl.html for the full text # of the license. __author__ = 'Bastien Nocera' __email__ = 'hadess@hadess.net' __copyright__ = '(c) 2019 Red Hat Inc.' __license__ = 'LGPL 3+' import unittest import sys import subprocess import fcntl import os import taptestrunner try: # Do all non-standard imports here so we can skip the tests if any # needed packages are not available. import dbus import dbus.mainloop.glib import dbusmock from gi.repository import GLib from gi.repository import Gio dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal" class TestLowMemoryMonitorPortal(dbusmock.DBusTestCase): '''Test GMemoryMonitorPortal''' @classmethod def setUpClass(klass): klass.start_system_bus() klass.dbus_con = klass.get_dbus(True) # Start session bus so that xdg-desktop-portal can run on it klass.start_session_bus() def setUp(self): try: Gio.MemoryMonitor except AttributeError: raise unittest.SkipTest('Low memory monitor not in ' 'introspection data. Requires ' 'GObject-Introspection ≥ 1.63.2') try: (self.p_mock, self.obj_lmm) = self.spawn_server_template( 'low_memory_monitor', {}, stdout=subprocess.PIPE) except ModuleNotFoundError: raise unittest.SkipTest("Low memory monitor dbusmock template not " "found. Requires dbusmock ≥ 0.18.4.") # set log to nonblocking flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL) fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK) self.last_warning = -1 self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE) try: self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH]) except FileNotFoundError: raise unittest.SkipTest("xdg-desktop-portal not available") try: self.wait_for_bus_object('org.freedesktop.portal.Desktop', '/org/freedesktop/portal/desktop') except: raise # subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop']) os.environ['GIO_USE_PORTALS'] = "1" self.memory_monitor = Gio.MemoryMonitor.dup_default() assert("GMemoryMonitorPortal" in str(self.memory_monitor)) self.memory_monitor.connect("low-memory-warning", self.portal_memory_warning_cb) self.mainloop = GLib.MainLoop() self.main_context = self.mainloop.get_context() # The LowMemoryMonitor API is stateless: it doesn’t expose any # properties, just a warning signal. Emit the signal in a loop until # the GMemoryMonitor instance has initialised and synchronised to # the right state. def emit_warning(level): self.dbusmock.EmitWarning(level) return GLib.SOURCE_CONTINUE idle_id = GLib.idle_add(emit_warning, 0) while self.last_warning != 0: self.main_context.iteration(True) GLib.source_remove(idle_id) def tearDown(self): self.p_mock.terminate() self.p_mock.wait() def assertEventually(self, condition, message=None, timeout=5): '''Assert that condition function eventually returns True. Timeout is in seconds, defaulting to 5 seconds. message is printed on failure. ''' if not message: message = 'timed out waiting for ' + str(condition) def timed_out_cb(message): self.fail(message) return GLib.SOURCE_REMOVE timeout_source = GLib.timeout_source_new_seconds(timeout) timeout_source.set_callback(timed_out_cb, message) timeout_source.attach(self.main_context) while not condition(): self.main_context.iteration(True) timeout_source.destroy() def portal_memory_warning_cb(self, monitor, level): self.last_warning = level self.main_context.wakeup() def test_low_memory_warning_portal_signal(self): '''LowMemoryWarning signal''' self.dbusmock.EmitWarning(100) # Wait 2 seconds or until warning self.assertEventually(lambda: self.last_warning == 100, "'100' low-memory warning not received", 2) self.dbusmock.EmitWarning(255) # Wait 2 seconds or until warning self.assertEventually(lambda: self.last_warning == 255, "'255' low-memory warning not received", 2) except ImportError as e: @unittest.skip("Cannot import %s" % e.name) class TestLowMemoryMonitorPortal(unittest.TestCase): def test_low_memory_warning_portal_signal(self): pass if __name__ == '__main__': unittest.main(testRunner=taptestrunner.TAPTestRunner())