glib/gio/tests/memory-monitor-portal.py.in

134 lines
5.0 KiB
Python
Raw Normal View History

#!/usr/bin/python3
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version. See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.
__author__ = 'Bastien Nocera'
__email__ = 'hadess@hadess.net'
__copyright__ = '(c) 2019 Red Hat Inc.'
__license__ = 'LGPL 3+'
import unittest
import sys
import subprocess
import fcntl
import os
import time
import taptestrunner
try:
# Do all non-standard imports here so we can skip the tests if any
# needed packages are not available.
import dbus
import dbus.mainloop.glib
import dbusmock
from gi.repository import GLib
from gi.repository import Gio
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal"
class TestLowMemoryMonitorPortal(dbusmock.DBusTestCase):
'''Test GMemoryMonitorPortal'''
@classmethod
def setUpClass(klass):
klass.start_system_bus()
klass.dbus_con = klass.get_dbus(True)
# Start session bus so that xdg-desktop-portal can run on it
klass.start_session_bus()
def setUp(self):
try:
Gio.MemoryMonitor
except AttributeError:
raise unittest.SkipTest('Low memory monitor not in '
'introspection data. Requires '
'GObject-Introspection ≥ 1.63.2')
try:
(self.p_mock, self.obj_lmm) = self.spawn_server_template(
'low_memory_monitor', {}, stdout=subprocess.PIPE)
except ModuleNotFoundError:
raise unittest.SkipTest("Low memory monitor dbusmock template not "
"found. Requires dbusmock ≥ 0.18.4.")
# set log to nonblocking
flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
self.last_warning = -1
self.dbusmock = dbus.Interface(self.obj_lmm, dbusmock.MOCK_IFACE)
try:
self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH])
except FileNotFoundError:
raise unittest.SkipTest("xdg-desktop-portal not available")
try:
self.wait_for_bus_object('org.freedesktop.portal.Desktop',
'/org/freedesktop/portal/desktop')
except:
raise
# subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop'])
os.environ['GTK_USE_PORTAL'] = "1"
self.memory_monitor = Gio.MemoryMonitor.dup_default()
assert("GMemoryMonitorPortal" in str(self.memory_monitor))
self.memory_monitor.connect("low-memory-warning", self.portal_memory_warning_cb)
self.mainloop = GLib.MainLoop()
self.main_context = self.mainloop.get_context()
def tearDown(self):
self.p_mock.terminate()
self.p_mock.wait()
def assertEventually(self, condition, message=None, timeout=50):
'''Assert that condition function eventually returns True.
Timeout is in deciseconds, defaulting to 50 (5 seconds). message is
printed on failure.
'''
while timeout >= 0:
context = GLib.MainContext.default()
while context.iteration(False):
pass
if condition():
break
timeout -= 1
time.sleep(0.1)
else:
self.fail(message or 'timed out waiting for ' + str(condition))
def portal_memory_warning_cb(self, monitor, level):
self.last_warning = level
self.main_context.wakeup()
def test_low_memory_warning_portal_signal(self):
'''LowMemoryWarning signal'''
# Wait 2 seconds
timeout = 2
while timeout > 0:
time.sleep(0.5)
timeout -= 0.5
self.main_context.iteration(False)
self.dbusmock.EmitWarning(100)
# Wait 2 seconds or until warning
self.assertEventually(lambda: self.last_warning == 100, "'100' low-memory warning not received", 20)
self.dbusmock.EmitWarning(255)
# Wait 2 seconds or until warning
self.assertEventually(lambda: self.last_warning == 255, "'255' low-memory warning not received", 20)
except ImportError as e:
@unittest.skip("Cannot import %s" % e.name)
class TestLowMemoryMonitorPortal(unittest.TestCase):
def test_low_memory_warning_portal_signal(self):
pass
if __name__ == '__main__':
unittest.main(testRunner=taptestrunner.TAPTestRunner())