mirror of
				https://gitlab.gnome.org/GNOME/glib.git
				synced 2025-11-04 01:58:54 +01:00 
			
		
		
		
	This was my mistake in commit 67a9fbf1fa.
Signed-off-by: Philip Withnall <pwithnall@gnome.org>
Helps: #3237
		
	
		
			
				
	
	
		
			144 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			144 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/python3
 | 
						||
 | 
						||
# This program is free software; you can redistribute it and/or modify it under
 | 
						||
# the terms of the GNU Lesser General Public License as published by the Free
 | 
						||
# Software Foundation; either version 3 of the License, or (at your option) any
 | 
						||
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
 | 
						||
# of the license.
 | 
						||
 | 
						||
__author__ = 'Bastien Nocera'
 | 
						||
__email__ = 'hadess@hadess.net'
 | 
						||
__copyright__ = '(c) 2019 Red Hat Inc.'
 | 
						||
__license__ = 'LGPL 3+'
 | 
						||
 | 
						||
import unittest
 | 
						||
import sys
 | 
						||
import subprocess
 | 
						||
import fcntl
 | 
						||
import os
 | 
						||
 | 
						||
import taptestrunner
 | 
						||
 | 
						||
try:
 | 
						||
    # Do all non-standard imports here so we can skip the tests if any
 | 
						||
    # needed packages are not available.
 | 
						||
    import dbus
 | 
						||
    import dbus.mainloop.glib
 | 
						||
    import dbusmock
 | 
						||
    from gi.repository import GLib
 | 
						||
    from gi.repository import Gio
 | 
						||
 | 
						||
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 | 
						||
 | 
						||
    XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal"
 | 
						||
 | 
						||
    class TestLowMemoryMonitorPortal(dbusmock.DBusTestCase):
 | 
						||
        '''Test GMemoryMonitorPortal'''
 | 
						||
 | 
						||
        @classmethod
 | 
						||
        def setUpClass(klass):
 | 
						||
            klass.start_system_bus()
 | 
						||
            klass.dbus_con = klass.get_dbus(True)
 | 
						||
            # Start session bus so that xdg-desktop-portal can run on it
 | 
						||
            klass.start_session_bus()
 | 
						||
 | 
						||
        def setUp(self):
 | 
						||
            try:
 | 
						||
                Gio.MemoryMonitor
 | 
						||
            except AttributeError:
 | 
						||
                raise unittest.SkipTest('Low memory monitor not in '
 | 
						||
                                        'introspection data. Requires '
 | 
						||
                                        'GObject-Introspection ≥ 1.63.2')
 | 
						||
            try:
 | 
						||
                (self.p_mock, self.obj_lmm) = self.spawn_server_template(
 | 
						||
                    'low_memory_monitor', {}, stdout=subprocess.PIPE)
 | 
						||
            except ModuleNotFoundError:
 | 
						||
                raise unittest.SkipTest("Low memory monitor dbusmock template not "
 | 
						||
                                        "found. Requires dbusmock ≥ 0.18.4.")
 | 
						||
            # set log to nonblocking
 | 
						||
            flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
 | 
						||
            fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
 | 
						||
            self.last_warning = -1
 | 
						||
            self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE)
 | 
						||
            try:
 | 
						||
                self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH])
 | 
						||
            except FileNotFoundError:
 | 
						||
                raise unittest.SkipTest("xdg-desktop-portal not available")
 | 
						||
 | 
						||
            try:
 | 
						||
                self.wait_for_bus_object('org.freedesktop.portal.Desktop',
 | 
						||
                                        '/org/freedesktop/portal/desktop')
 | 
						||
            except:
 | 
						||
                raise
 | 
						||
            # subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop'])
 | 
						||
 | 
						||
            os.environ['GIO_USE_PORTALS'] = "1"
 | 
						||
            self.memory_monitor = Gio.MemoryMonitor.dup_default()
 | 
						||
            assert("GMemoryMonitorPortal" in str(self.memory_monitor))
 | 
						||
            self.memory_monitor.connect("low-memory-warning", self.portal_memory_warning_cb)
 | 
						||
            self.mainloop = GLib.MainLoop()
 | 
						||
            self.main_context = self.mainloop.get_context()
 | 
						||
 | 
						||
            # The LowMemoryMonitor API is stateless: it doesn’t expose any
 | 
						||
            # properties, just a warning signal. Emit the signal in a loop until
 | 
						||
            # the GMemoryMonitor instance has initialised and synchronised to
 | 
						||
            # the right state.
 | 
						||
            def emit_warning(level):
 | 
						||
                self.dbusmock.EmitWarning(level)
 | 
						||
                return GLib.SOURCE_CONTINUE
 | 
						||
 | 
						||
            idle_id = GLib.idle_add(emit_warning, 0)
 | 
						||
            while self.last_warning != 0:
 | 
						||
                self.main_context.iteration(True)
 | 
						||
            GLib.source_remove(idle_id)
 | 
						||
 | 
						||
        def tearDown(self):
 | 
						||
            self.p_mock.terminate()
 | 
						||
            self.p_mock.wait()
 | 
						||
 | 
						||
        def assertEventually(self, condition, message=None, timeout=5):
 | 
						||
            '''Assert that condition function eventually returns True.
 | 
						||
 | 
						||
            Timeout is in seconds, defaulting to 5 seconds. message is
 | 
						||
            printed on failure.
 | 
						||
            '''
 | 
						||
            if not message:
 | 
						||
                message = 'timed out waiting for ' + str(condition)
 | 
						||
 | 
						||
            def timed_out_cb(message):
 | 
						||
                self.fail(message)
 | 
						||
                return GLib.SOURCE_REMOVE
 | 
						||
 | 
						||
            timeout_source = GLib.timeout_source_new_seconds(timeout)
 | 
						||
            timeout_source.set_callback(timed_out_cb, message)
 | 
						||
            timeout_source.attach(self.main_context)
 | 
						||
 | 
						||
            while not condition():
 | 
						||
                self.main_context.iteration(True)
 | 
						||
 | 
						||
            timeout_source.destroy()
 | 
						||
 | 
						||
        def portal_memory_warning_cb(self, monitor, level):
 | 
						||
            self.last_warning = level
 | 
						||
            self.main_context.wakeup()
 | 
						||
 | 
						||
        def test_low_memory_warning_portal_signal(self):
 | 
						||
            '''LowMemoryWarning signal'''
 | 
						||
 | 
						||
            self.dbusmock.EmitWarning(100)
 | 
						||
            # Wait 2 seconds or until warning
 | 
						||
            self.assertEventually(lambda: self.last_warning == 100, "'100' low-memory warning not received", 2)
 | 
						||
 | 
						||
            self.dbusmock.EmitWarning(255)
 | 
						||
            # Wait 2 seconds or until warning
 | 
						||
            self.assertEventually(lambda: self.last_warning == 255, "'255' low-memory warning not received", 2)
 | 
						||
 | 
						||
except ImportError as e:
 | 
						||
    @unittest.skip("Cannot import %s" % e.name)
 | 
						||
    class TestLowMemoryMonitorPortal(unittest.TestCase):
 | 
						||
        def test_low_memory_warning_portal_signal(self):
 | 
						||
            pass
 | 
						||
 | 
						||
if __name__ == '__main__':
 | 
						||
    unittest.main(testRunner=taptestrunner.TAPTestRunner())
 |