"""Runs tests with Xvfb/Xorg and Openbox/Weston/Mutter on Linux and normally on
other platforms."""
from __future__ import print_function
import copy
import os
import os.path
import random
import re
import signal
import socket
import subprocess
import sys
import tempfile
import threading
import time
import uuid
from contextlib import contextmanager
import psutil
import test_env
DEFAULT_XVFB_WHD = '1280x800x24'
DEFAULT_MUTTER_DISPLAY = '1920x1200'
class _ProcessError(Exception):
"""Exception raised when the requested display server or compositor cannot
start."""
def kill(proc, name, timeout_in_seconds=10):
"""Tries to kill |proc| gracefully with a timeout for each signal."""
if not proc:
return
thread = threading.Thread(target=proc.wait)
try:
proc.terminate()
thread.start()
thread.join(timeout_in_seconds)
if thread.is_alive():
print('%s running after SIGTERM, trying SIGKILL.\n' % name,
file=sys.stderr)
proc.kill()
except OSError as e:
print('Exception while killing process %s: %s' % (name, e), file=sys.stderr)
thread.join(timeout_in_seconds)
if thread.is_alive():
print('%s running after SIGTERM and SIGKILL; good luck!\n' % name,
file=sys.stderr)
@contextmanager
def dbus_session(env):
"""Starts a DBus session.
Works around a bug in GLib where it performs operations which aren't
async-signal-safe (in particular, memory allocations) between fork and exec
when it spawns subprocesses. This causes threads inside Chrome's browser and
utility processes to get stuck, and this harness to hang waiting for those
processes, which will never terminate. This doesn't happen on users'
machines, because they have an active desktop session and the
DBUS_SESSION_BUS_ADDRESS environment variable set, but it can happen on
headless environments. This is fixed by glib commit [1], but this workaround
will be necessary until the fix rolls into Chromium's CI.
[1] f2917459f745bebf931bccd5cc2c33aa81ef4d12
Modifies the passed in environment with at least DBUS_SESSION_BUS_ADDRESS and
DBUS_SESSION_BUS_PID set.
Returns the pid of the dbus-daemon if started, or None otherwise.
"""
dbus_pid = None
try:
if 'DBUS_SESSION_BUS_ADDRESS' not in os.environ:
dbus_output = subprocess.check_output(['dbus-launch'],
env=env).decode('utf-8').split('\n')
for line in dbus_output:
m = re.match(r'([^=]+)\=(.+)', line)
if m:
env[m.group(1)] = m.group(2)
dbus_pid = int(env['DBUS_SESSION_BUS_PID'])
except (subprocess.CalledProcessError, OSError, KeyError, ValueError) as e:
print('Exception while running dbus_launch: %s' % e)
try:
yield
finally:
if dbus_pid:
os.kill(dbus_pid, signal.SIGKILL)
def run_executable(cmd,
env,
stdoutfile=None,
use_openbox=True,
use_xcompmgr=True,
xvfb_whd=None,
cwd=None):
"""Runs an executable within Weston, Xvfb or Xorg on Linux or normally on
other platforms.
The method sets SIGUSR1 handler for Xvfb to return SIGUSR1
when it is ready for connections.
https://www.x.org/archive/X11R7.5/doc/man/man1/Xserver.1.html under Signals.
Args:
cmd: Command to be executed.
env: A copy of environment variables. "DISPLAY" and will be set if Xvfb is
used. "WAYLAND_DISPLAY" will be set if Weston is used.
stdoutfile: If provided, symbolization via script is disabled and stdout
is written to this file as well as to stdout.
use_openbox: A flag to use openbox process.
Some ChromeOS tests need a window manager.
use_xcompmgr: A flag to use xcompmgr process.
Some tests need a compositing wm to make use of transparent visuals.
xvfb_whd: WxHxD to pass to xvfb or DEFAULT_XVFB_WHD if None
cwd: Current working directory.
Returns:
the exit code of the specified commandline, or 1 on failure.
"""
use_xvfb = False
use_xorg = True
if '--no-xvfb' in cmd:
use_xvfb = False
use_xorg = False
cmd.remove('--no-xvfb')
if '--use-xvfb' in cmd:
if not use_xorg and not use_xvfb:
print('Conflicting flags --use-xvfb and --no-xvfb\n', file=sys.stderr)
return 1
use_xvfb = True
use_xorg = False
cmd.remove('--use-xvfb')
use_weston = False
if '--use-weston' in cmd:
if use_xvfb or use_xorg:
print('Unable to use Weston with xvfb or Xorg.\n', file=sys.stderr)
return 1
use_weston = True
cmd.remove('--use-weston')
use_mutter = False
mutter_display = DEFAULT_MUTTER_DISPLAY
if '--use-mutter' in cmd:
if use_xvfb or use_xorg or use_weston:
print('Unable to use mutter with xvfb or Xorg or weston.\n',
file=sys.stderr)
return 1
use_mutter = True
cmd.remove('--use-mutter')
for arg in cmd:
if arg.startswith('--mutter-display='):
mutter_display = arg.split('=')[1]
cmd.remove(arg)
break
if sys.platform.startswith('linux') and (use_xvfb or use_xorg):
return _run_with_x11(cmd, env, stdoutfile, use_openbox, use_xcompmgr,
use_xorg, xvfb_whd or DEFAULT_XVFB_WHD, cwd)
if use_weston:
return _run_with_weston(cmd, env, stdoutfile, cwd)
if use_mutter:
return _run_with_mutter(cmd, env, stdoutfile, cwd, mutter_display)
return test_env.run_executable(cmd, env, stdoutfile, cwd)
def _re_search_command(regex, args, **kwargs):
"""Runs a subprocess defined by `args` and returns a regex match for the
given expression on the output."""
return re.search(
regex,
subprocess.check_output(args,
stderr=subprocess.STDOUT,
text=True,
**kwargs), re.IGNORECASE)
def _make_xorg_modeline(width, height, refresh):
"""Generates a tuple of a modeline (list of parameters) and label based off a
specified width, height and refresh rate.
See: https://www.x.org/archive/X11R7.0/doc/html/chips4.html"""
re_matches = _re_search_command(
r'Modeline "(.*)"\s+(.*)',
['cvt', str(width), str(height),
str(refresh)],
)
modeline_label = re_matches.group(1)
modeline = re_matches.group(2)
return (modeline_label, list(filter(lambda a: a != '', modeline.split(' '))))
def _get_supported_virtual_sizes(default_whd):
"""Returns a list of tuples (width, height) for supported monitor resolutions.
The list will always include the default size defined in `default_whd`"""
(default_width, default_height, _) = default_whd.split('x')
default_size = (int(default_width), int(default_height))
return sorted(
set([default_size, (800, 600), (1024, 768), (1920, 1080), (1600, 1200)]))
def _make_xorg_config(default_whd):
"""Generates an Xorg config file and returns the file path. See:
https://www.x.org/releases/current/doc/man/man5/xorg.conf.5.xhtml"""
(_, _, depth) = default_whd.split('x')
mode_sizes = _get_supported_virtual_sizes(default_whd)
modelines = []
mode_labels = []
for width, height in mode_sizes:
(modeline_label, modeline) = _make_xorg_modeline(width, height, 60)
modelines.append('Modeline "%s" %s' % (modeline_label, ' '.join(modeline)))
mode_labels.append('"%s"' % modeline_label)
config = """
Section "Monitor"
Identifier "Monitor0"
HorizSync 5.0 - 1000.0
VertRefresh 5.0 - 200.0
%s
EndSection
Section "Device"
Identifier "Device0"
# Dummy driver requires package `xserver-xorg-video-dummy`.
Driver "dummy"
VideoRam 256000
EndSection
Section "Screen"
Identifier "Screen0"
Device "Device0"
Monitor "Monitor0"
SubSection "Display"
Depth %s
Modes %s
EndSubSection
EndSection
""" % ('\n'.join(modelines), depth, ' '.join(mode_labels))
config_file = os.path.join(tempfile.gettempdir(),
'xorg-%s.config' % uuid.uuid4().hex)
with open(config_file, 'w') as f:
f.write(config)
return config_file
def _setup_xrandr(env, default_whd):
"""Configures xrandr display(s)"""
def call_xrandr(args):
subprocess.check_call(['xrandr'] + args,
env=env,
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT)
(default_width, default_height, _) = default_whd.split('x')
default_size = (int(default_width), int(default_height))
dummy_displays_available = _re_search_command('DUMMY[0-9]', ['xrandr', '-q'],
env=env)
if dummy_displays_available:
screen_sizes = _get_supported_virtual_sizes(default_whd)
output_names = ['DUMMY0', 'DUMMY1', 'DUMMY2', 'DUMMY3', 'DUMMY4']
refresh_rate = 60
for width, height in screen_sizes:
(modeline_label, _) = _make_xorg_modeline(width, height, 60)
for output_name in output_names:
call_xrandr(['--addmode', output_name, modeline_label])
(default_mode_label, _) = _make_xorg_modeline(*default_size, refresh_rate)
for i, name in enumerate(output_names):
args = ['--output', name, '--mode', default_mode_label]
if i > 0:
args += ['--right-of', output_names[i - 1]]
call_xrandr(args)
call_xrandr(['-s', '%dx%d' % default_size])
call_xrandr(['--dpi', '96'])
def _setup_signals():
signal.signal(signal.SIGTERM, raise_process_terminated)
signal.signal(signal.SIGINT, raise_process_terminated)
def _run_with_x11(cmd, env, stdoutfile, use_openbox, use_xcompmgr, use_xorg,
xvfb_whd, cwd):
"""Runs with an X11 server. Uses Xvfb by default and Xorg when use_xorg is
True."""
openbox_proc = None
openbox_ready = MutableBoolean()
def set_openbox_ready(*_):
openbox_ready.setvalue(True)
xcompmgr_proc = None
x11_proc = None
x11_ready = MutableBoolean()
def set_x11_ready(*_):
x11_ready.setvalue(True)
x11_binary = 'Xorg' if use_xorg else 'Xvfb'
xorg_config_file = _make_xorg_config(xvfb_whd) if use_xorg else None
if 'XDG_CURRENT_DESKTOP' in env:
del env['XDG_CURRENT_DESKTOP']
with dbus_session(env):
try:
_setup_signals()
for _ in range(10):
x11_ready.setvalue(False)
display = find_display()
x11_cmd = None
if use_xorg:
x11_cmd = ['Xorg', display, '-noreset', '-config', xorg_config_file]
else:
x11_cmd = [
'Xvfb', display, '-screen', '0', xvfb_whd, '-ac', '-nolisten',
'tcp', '-dpi', '96', '+extension', 'RANDR', '-maxclients', '512'
]
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
x11_proc = subprocess.Popen(x11_cmd, stderr=subprocess.STDOUT, env=env)
signal.signal(signal.SIGUSR1, set_x11_ready)
for _ in range(30):
time.sleep(.1)
if x11_ready.getvalue() or x11_proc.poll() is not None:
break
if x11_proc.poll() is None:
if x11_ready.getvalue():
break
kill(x11_proc, x11_binary)
if x11_proc.poll() is not None:
raise _ProcessError('Failed to start after 10 tries')
env['DISPLAY'] = display
env['XVFB_DISPLAY'] = display
if use_openbox:
current_proc_id = os.getpid()
openbox_startup_cmd = 'kill --signal SIGUSR1 %s' % str(current_proc_id)
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
signal.signal(signal.SIGUSR1, set_openbox_ready)
for _ in range(10):
openbox_ready.setvalue(False)
openbox_proc = subprocess.Popen(
['openbox', '--sm-disable', '--startup', openbox_startup_cmd],
stderr=subprocess.STDOUT,
env=env)
for _ in range(30):
time.sleep(.1)
if openbox_ready.getvalue() or openbox_proc.poll() is not None:
break
if openbox_proc.poll() is None:
if openbox_ready.getvalue():
break
kill(openbox_proc, 'openbox')
print('Openbox failed to start. Retrying.', file=sys.stderr)
if openbox_proc.poll() is not None:
raise _ProcessError('Failed to start openbox after 10 tries')
if use_xcompmgr:
xcompmgr_proc = subprocess.Popen('xcompmgr',
stderr=subprocess.STDOUT,
env=env)
if use_xorg:
_setup_xrandr(env, xvfb_whd)
return test_env.run_executable(cmd, env, stdoutfile, cwd)
except OSError as e:
print('Failed to start %s or Openbox: %s\n' % (x11_binary, str(e)),
file=sys.stderr)
return 1
except _ProcessError as e:
print('%s fail: %s\n' % (x11_binary, str(e)), file=sys.stderr)
return 1
finally:
kill(openbox_proc, 'openbox')
kill(xcompmgr_proc, 'xcompmgr')
kill(x11_proc, x11_binary)
if xorg_config_file is not None:
os.remove(xorg_config_file)
def _run_with_wayland_common(compositor_executable, cmd, env):
_setup_signals()
if not os.path.isfile(compositor_executable):
build_dir = os.path.dirname(cmd[0])
if os.path.isdir(build_dir):
os.chdir(build_dir)
cmd[0] = os.path.join('.', os.path.basename(cmd[0]))
if not os.path.isfile(compositor_executable):
print('Compositor is not available. Starting without Wayland compositor')
return (False, cmd)
_set_xdg_runtime_dir(env)
return (True, cmd)
def _run_with_weston(cmd, env, stdoutfile, cwd):
with dbus_session(env):
weston_proc = None
try:
weston_executable = './weston'
compositor_found, cmd = _run_with_wayland_common(weston_executable, cmd,
env)
if not compositor_found:
return test_env.run_executable(cmd, env, stdoutfile, cwd)
with open(_weston_config_file_path(), 'w') as weston_config_file:
weston_config_file.write('[shell]\npanel-position=none')
weston_cmd = [
weston_executable, '--backend=headless-backend.so', '--idle-time=0',
'--modules=ui-controls.so,systemd-notify.so', '--width=1280',
'--height=800', '--config=' + _weston_config_file_path()
]
if '--weston-use-gl' in cmd:
weston_cmd.append('--use-gl')
cmd.remove('--weston-use-gl')
if '--weston-debug-logging' in cmd:
cmd.remove('--weston-debug-logging')
env = copy.deepcopy(env)
env['WAYLAND_DEBUG'] = '1'
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM
| socket.SOCK_NONBLOCK) as notify_socket:
notify_socket.bind(_weston_notify_socket_address())
env['NOTIFY_SOCKET'] = _weston_notify_socket_address()
weston_proc_display = None
for _ in range(10):
weston_proc = subprocess.Popen(weston_cmd,
stderr=subprocess.STDOUT,
env=env)
for _ in range(25):
time.sleep(0.1)
try:
if notify_socket.recv(512) == b'READY=1':
break
except BlockingIOError:
continue
for _ in range(25):
time.sleep(0.1)
weston_proc_display = _get_display_from_weston(weston_proc.pid)
if weston_proc_display is not None:
break
if weston_proc_display is not None:
break
if weston_proc_display is None:
raise _ProcessError('Failed to start Weston.')
env.pop('NOTIFY_SOCKET')
env['WAYLAND_DISPLAY'] = weston_proc_display
if '--chrome-wayland-debugging' in cmd:
cmd.remove('--chrome-wayland-debugging')
env['WAYLAND_DEBUG'] = '1'
else:
env['WAYLAND_DEBUG'] = '0'
return test_env.run_executable(cmd, env, stdoutfile, cwd)
except OSError as e:
print('Failed to start Weston: %s\n' % str(e), file=sys.stderr)
return 1
except _ProcessError as e:
print('Weston fail: %s\n' % str(e), file=sys.stderr)
return 1
finally:
kill(weston_proc, 'weston')
if os.path.exists(_weston_notify_socket_address()):
os.remove(_weston_notify_socket_address())
if os.path.exists(_weston_config_file_path()):
os.remove(_weston_config_file_path())
def _weston_notify_socket_address():
return os.path.join(tempfile.gettempdir(), '.xvfb.py-weston-notify.sock')
def _weston_config_file_path():
return os.path.join(tempfile.gettempdir(), '.xvfb.py-weston.ini')
def _run_with_mutter(cmd, env, stdoutfile, cwd, mutter_display):
with dbus_session(env):
mutter_proc = None
try:
mutter_executable = './mutter'
compositor_found, cmd = _run_with_wayland_common(mutter_executable, cmd,
env)
if not compositor_found:
if not os.path.isdir(
os.path.join(os.path.dirname(__file__), '..', 'third_party',
'mutter', 'src')):
print(
'In order to run tests using mutter, its sources need to be '
'checked outexplicitly and built.\n'
'Add \'"checkout_mutter": True\' in the "custom_vars" section '
'of your .gclient file, and run gclient sync.\n'
'Then build the test executable or mutter and run this script '
'again.',
file=sys.stderr)
return 1
return test_env.run_executable(cmd, env, stdoutfile, cwd)
mutter_cmd = [
mutter_executable, '--headless',
f'--virtual-monitor={mutter_display}', '--'
]
cmd = mutter_cmd + cmd
if '--mutter-debug-logging' in cmd:
cmd.remove('--mutter-debug-logging')
env = copy.deepcopy(env)
env['G_MESSAGES_DEBUG'] = 'libmutter'
env['MUTTER_DEBUG'] = 'input'
return test_env.run_executable(cmd, env, stdoutfile, cwd)
except _ProcessError as e:
print('mutter fail: %s\n' % str(e), file=sys.stderr)
return 1
finally:
kill(mutter_proc, 'mutter')
def _get_display_from_weston(weston_proc_pid):
"""Retrieves $WAYLAND_DISPLAY set by Weston.
Returns the $WAYLAND_DISPLAY variable from one of weston's subprocesses.
Weston updates this variable early in its startup in the main process, but we
can only read the environment variables as they were when the process was
created. Therefore we must use one of weston's subprocesses, which are all
spawned with the new value for $WAYLAND_DISPLAY. Any of them will do, as they
all have the same value set.
Args:
weston_proc_pid: The process of id of the main Weston process.
Returns:
the display set by Wayland, which clients can use to connect to.
"""
parent = psutil.Process(weston_proc_pid)
if parent is None:
return None
children = parent.children(recursive=True)
for process in children:
weston_proc_display = process.environ().get('WAYLAND_DISPLAY')
if weston_proc_display is not None:
return weston_proc_display
return None
class MutableBoolean(object):
"""Simple mutable boolean class. Used to be mutated inside an handler."""
def __init__(self):
self._val = False
def setvalue(self, val):
assert isinstance(val, bool)
self._val = val
def getvalue(self):
return self._val
def raise_process_terminated(*_):
raise _ProcessError('Terminated')
def find_display():
"""Iterates through X-lock files to find an available display number.
The lower bound follows xvfb-run standard at 99, and the upper bound
is set to 119.
Returns:
A string of a random available display number for Xvfb ':{99-119}'.
Raises:
_ProcessError: Raised when displays 99 through 119 are unavailable.
"""
available_displays = [
d for d in range(99, 120)
if not os.path.isfile('/tmp/.X{}-lock'.format(d))
]
if available_displays:
return ':{}'.format(random.choice(available_displays))
raise _ProcessError('Failed to find display number')
def _set_xdg_runtime_dir(env):
"""Sets the $XDG_RUNTIME_DIR variable if it hasn't been set before."""
runtime_dir = env.get('XDG_RUNTIME_DIR')
if not runtime_dir:
runtime_dir = '/tmp/xdg-tmp-dir/'
if not os.path.exists(runtime_dir):
os.makedirs(runtime_dir, 0o700)
env['XDG_RUNTIME_DIR'] = runtime_dir
def main():
usage = ('[command [--no-xvfb or --use-xvfb or --use-weston] args...]\n'
'\t --no-xvfb\t\tTurns off all X11 backings (Xvfb and Xorg).\n'
'\t --use-xvfb\t\tForces legacy Xvfb backing instead of Xorg.\n'
'\t --use-weston\t\tEnable Weston Wayland server.\n'
'\t --use-mutter\t\tEnable Mutter Wayland server.\n'
'\t --mutter-display\tSpecify Mutter Display Resolution as WxH.')
if len(sys.argv) < 2:
print(usage + '\n', file=sys.stderr)
return 2
if os.path.isdir(sys.argv[1]):
print('Invalid command: \"%s\" is a directory\n' % sys.argv[1],
file=sys.stderr)
print(usage + '\n', file=sys.stderr)
return 3
return run_executable(sys.argv[1:], os.environ.copy())
if __name__ == '__main__':
sys.exit(main())