mirror of
https://gitlab.gnome.org/GNOME/glib.git
synced 2025-01-19 10:46:14 +01:00
b2144afe28
This was my mistake in commit 67a9fbf1fa
.
Signed-off-by: Philip Withnall <pwithnall@gnome.org>
Helps: #3237
144 lines
5.5 KiB
Python
Executable File
144 lines
5.5 KiB
Python
Executable File
#!/usr/bin/python3
|
||
|
||
# This program is free software; you can redistribute it and/or modify it under
|
||
# the terms of the GNU Lesser General Public License as published by the Free
|
||
# Software Foundation; either version 3 of the License, or (at your option) any
|
||
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
|
||
# of the license.
|
||
|
||
__author__ = 'Bastien Nocera'
|
||
__email__ = 'hadess@hadess.net'
|
||
__copyright__ = '(c) 2019 Red Hat Inc.'
|
||
__license__ = 'LGPL 3+'
|
||
|
||
import unittest
|
||
import sys
|
||
import subprocess
|
||
import fcntl
|
||
import os
|
||
|
||
import taptestrunner
|
||
|
||
try:
|
||
# Do all non-standard imports here so we can skip the tests if any
|
||
# needed packages are not available.
|
||
import dbus
|
||
import dbus.mainloop.glib
|
||
import dbusmock
|
||
from gi.repository import GLib
|
||
from gi.repository import Gio
|
||
|
||
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
||
|
||
XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal"
|
||
|
||
class TestLowMemoryMonitorPortal(dbusmock.DBusTestCase):
|
||
'''Test GMemoryMonitorPortal'''
|
||
|
||
@classmethod
|
||
def setUpClass(klass):
|
||
klass.start_system_bus()
|
||
klass.dbus_con = klass.get_dbus(True)
|
||
# Start session bus so that xdg-desktop-portal can run on it
|
||
klass.start_session_bus()
|
||
|
||
def setUp(self):
|
||
try:
|
||
Gio.MemoryMonitor
|
||
except AttributeError:
|
||
raise unittest.SkipTest('Low memory monitor not in '
|
||
'introspection data. Requires '
|
||
'GObject-Introspection ≥ 1.63.2')
|
||
try:
|
||
(self.p_mock, self.obj_lmm) = self.spawn_server_template(
|
||
'low_memory_monitor', {}, stdout=subprocess.PIPE)
|
||
except ModuleNotFoundError:
|
||
raise unittest.SkipTest("Low memory monitor dbusmock template not "
|
||
"found. Requires dbusmock ≥ 0.18.4.")
|
||
# set log to nonblocking
|
||
flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
|
||
fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
||
self.last_warning = -1
|
||
self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE)
|
||
try:
|
||
self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH])
|
||
except FileNotFoundError:
|
||
raise unittest.SkipTest("xdg-desktop-portal not available")
|
||
|
||
try:
|
||
self.wait_for_bus_object('org.freedesktop.portal.Desktop',
|
||
'/org/freedesktop/portal/desktop')
|
||
except:
|
||
raise
|
||
# subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop'])
|
||
|
||
os.environ['GIO_USE_PORTALS'] = "1"
|
||
self.memory_monitor = Gio.MemoryMonitor.dup_default()
|
||
assert("GMemoryMonitorPortal" in str(self.memory_monitor))
|
||
self.memory_monitor.connect("low-memory-warning", self.portal_memory_warning_cb)
|
||
self.mainloop = GLib.MainLoop()
|
||
self.main_context = self.mainloop.get_context()
|
||
|
||
# The LowMemoryMonitor API is stateless: it doesn’t expose any
|
||
# properties, just a warning signal. Emit the signal in a loop until
|
||
# the GMemoryMonitor instance has initialised and synchronised to
|
||
# the right state.
|
||
def emit_warning(level):
|
||
self.dbusmock.EmitWarning(level)
|
||
return GLib.SOURCE_CONTINUE
|
||
|
||
idle_id = GLib.idle_add(emit_warning, 0)
|
||
while self.last_warning != 0:
|
||
self.main_context.iteration(True)
|
||
GLib.source_remove(idle_id)
|
||
|
||
def tearDown(self):
|
||
self.p_mock.terminate()
|
||
self.p_mock.wait()
|
||
|
||
def assertEventually(self, condition, message=None, timeout=5):
|
||
'''Assert that condition function eventually returns True.
|
||
|
||
Timeout is in seconds, defaulting to 5 seconds. message is
|
||
printed on failure.
|
||
'''
|
||
if not message:
|
||
message = 'timed out waiting for ' + str(condition)
|
||
|
||
def timed_out_cb(message):
|
||
self.fail(message)
|
||
return GLib.SOURCE_REMOVE
|
||
|
||
timeout_source = GLib.timeout_source_new_seconds(timeout)
|
||
timeout_source.set_callback(timed_out_cb, message)
|
||
timeout_source.attach(self.main_context)
|
||
|
||
while not condition():
|
||
self.main_context.iteration(True)
|
||
|
||
timeout_source.destroy()
|
||
|
||
def portal_memory_warning_cb(self, monitor, level):
|
||
self.last_warning = level
|
||
self.main_context.wakeup()
|
||
|
||
def test_low_memory_warning_portal_signal(self):
|
||
'''LowMemoryWarning signal'''
|
||
|
||
self.dbusmock.EmitWarning(100)
|
||
# Wait 2 seconds or until warning
|
||
self.assertEventually(lambda: self.last_warning == 100, "'100' low-memory warning not received", 2)
|
||
|
||
self.dbusmock.EmitWarning(255)
|
||
# Wait 2 seconds or until warning
|
||
self.assertEventually(lambda: self.last_warning == 255, "'255' low-memory warning not received", 2)
|
||
|
||
except ImportError as e:
|
||
@unittest.skip("Cannot import %s" % e.name)
|
||
class TestLowMemoryMonitorPortal(unittest.TestCase):
|
||
def test_low_memory_warning_portal_signal(self):
|
||
pass
|
||
|
||
if __name__ == '__main__':
|
||
unittest.main(testRunner=taptestrunner.TAPTestRunner())
|