glib/gio/tests/memory-monitor-portal.py.in
Philip Withnall b2144afe28 tests: Fix typo in memory-monitor-portal.py.in
This was my mistake in commit 67a9fbf1fa.

Signed-off-by: Philip Withnall <pwithnall@gnome.org>

Helps: #3237
2024-01-30 07:38:13 +00:00

144 lines
5.5 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.
__author__ = 'Bastien Nocera'
__email__ = 'hadess@hadess.net'
__copyright__ = '(c) 2019 Red Hat Inc.'
__license__ = 'LGPL 3+'
import unittest
import sys
import subprocess
import fcntl
import os
import taptestrunner
try:
# Do all non-standard imports here so we can skip the tests if any
# needed packages are not available.
import dbus
import dbus.mainloop.glib
import dbusmock
from gi.repository import GLib
from gi.repository import Gio
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal"
class TestLowMemoryMonitorPortal(dbusmock.DBusTestCase):
'''Test GMemoryMonitorPortal'''
@classmethod
def setUpClass(klass):
klass.start_system_bus()
klass.dbus_con = klass.get_dbus(True)
# Start session bus so that xdg-desktop-portal can run on it
klass.start_session_bus()
def setUp(self):
try:
Gio.MemoryMonitor
except AttributeError:
raise unittest.SkipTest('Low memory monitor not in '
'introspection data. Requires '
'GObject-Introspection ≥ 1.63.2')
try:
(self.p_mock, self.obj_lmm) = self.spawn_server_template(
'low_memory_monitor', {}, stdout=subprocess.PIPE)
except ModuleNotFoundError:
raise unittest.SkipTest("Low memory monitor dbusmock template not "
"found. Requires dbusmock ≥ 0.18.4.")
# set log to nonblocking
flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self.last_warning = -1
self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE)
try:
self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH])
except FileNotFoundError:
raise unittest.SkipTest("xdg-desktop-portal not available")
try:
self.wait_for_bus_object('org.freedesktop.portal.Desktop',
'/org/freedesktop/portal/desktop')
except:
raise
# subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop'])
os.environ['GIO_USE_PORTALS'] = "1"
self.memory_monitor = Gio.MemoryMonitor.dup_default()
assert("GMemoryMonitorPortal" in str(self.memory_monitor))
self.memory_monitor.connect("low-memory-warning", self.portal_memory_warning_cb)
self.mainloop = GLib.MainLoop()
self.main_context = self.mainloop.get_context()
# The LowMemoryMonitor API is stateless: it doesnt expose any
# properties, just a warning signal. Emit the signal in a loop until
# the GMemoryMonitor instance has initialised and synchronised to
# the right state.
def emit_warning(level):
self.dbusmock.EmitWarning(level)
return GLib.SOURCE_CONTINUE
idle_id = GLib.idle_add(emit_warning, 0)
while self.last_warning != 0:
self.main_context.iteration(True)
GLib.source_remove(idle_id)
def tearDown(self):
self.p_mock.terminate()
self.p_mock.wait()
def assertEventually(self, condition, message=None, timeout=5):
'''Assert that condition function eventually returns True.
Timeout is in seconds, defaulting to 5 seconds. message is
printed on failure.
'''
if not message:
message = 'timed out waiting for ' + str(condition)
def timed_out_cb(message):
self.fail(message)
return GLib.SOURCE_REMOVE
timeout_source = GLib.timeout_source_new_seconds(timeout)
timeout_source.set_callback(timed_out_cb, message)
timeout_source.attach(self.main_context)
while not condition():
self.main_context.iteration(True)
timeout_source.destroy()
def portal_memory_warning_cb(self, monitor, level):
self.last_warning = level
self.main_context.wakeup()
def test_low_memory_warning_portal_signal(self):
'''LowMemoryWarning signal'''
self.dbusmock.EmitWarning(100)
# Wait 2 seconds or until warning
self.assertEventually(lambda: self.last_warning == 100, "'100' low-memory warning not received", 2)
self.dbusmock.EmitWarning(255)
# Wait 2 seconds or until warning
self.assertEventually(lambda: self.last_warning == 255, "'255' low-memory warning not received", 2)
except ImportError as e:
@unittest.skip("Cannot import %s" % e.name)
class TestLowMemoryMonitorPortal(unittest.TestCase):
def test_low_memory_warning_portal_signal(self):
pass
if __name__ == '__main__':
unittest.main(testRunner=taptestrunner.TAPTestRunner())