"""Runs tests with Xvfb and Openbox or Weston on Linux and normally on other
platforms."""
from __future__ import print_function
import copy
import os
import os.path
import random
import re
import signal
import socket
import subprocess
import sys
import tempfile
import threading
import time
import psutil
import test_env
DEFAULT_XVFB_WHD = '1280x800x24'
class _XvfbProcessError(Exception):
"""Exception raised when Xvfb cannot start."""
class _WestonProcessError(Exception):
"""Exception raised when Weston cannot start."""
def kill(proc, name, timeout_in_seconds=10):
"""Tries to kill |proc| gracefully with a timeout for each signal."""
if not proc:
return
thread = threading.Thread(target=proc.wait)
try:
proc.terminate()
thread.start()
thread.join(timeout_in_seconds)
if thread.is_alive():
print('%s running after SIGTERM, trying SIGKILL.\n' % name,
file=sys.stderr)
proc.kill()
except OSError as e:
print('Exception while killing process %s: %s' % (name, e), file=sys.stderr)
thread.join(timeout_in_seconds)
if thread.is_alive():
print('%s running after SIGTERM and SIGKILL; good luck!\n' % name,
file=sys.stderr)
def launch_dbus(env):
"""Starts a DBus session.
Works around a bug in GLib where it performs operations which aren't
async-signal-safe (in particular, memory allocations) between fork and exec
when it spawns subprocesses. This causes threads inside Chrome's browser and
utility processes to get stuck, and this harness to hang waiting for those
processes, which will never terminate. This doesn't happen on users'
machines, because they have an active desktop session and the
DBUS_SESSION_BUS_ADDRESS environment variable set, but it can happen on
headless environments. This is fixed by glib commit [1], but this workaround
will be necessary until the fix rolls into Chromium's CI.
[1] f2917459f745bebf931bccd5cc2c33aa81ef4d12
Modifies the passed in environment with at least DBUS_SESSION_BUS_ADDRESS and
DBUS_SESSION_BUS_PID set.
Returns the pid of the dbus-daemon if started, or None otherwise.
"""
if 'DBUS_SESSION_BUS_ADDRESS' in os.environ:
return
try:
dbus_output = subprocess.check_output(
['dbus-launch'], env=env).decode('utf-8').split('\n')
for line in dbus_output:
m = re.match(r'([^=]+)\=(.+)', line)
if m:
env[m.group(1)] = m.group(2)
return int(env['DBUS_SESSION_BUS_PID'])
except (subprocess.CalledProcessError, OSError, KeyError, ValueError) as e:
print('Exception while running dbus_launch: %s' % e)
def run_executable(
cmd, env, stdoutfile=None, use_openbox=True, use_xcompmgr=True,
xvfb_whd=None, cwd=None):
"""Runs an executable within Weston or Xvfb on Linux or normally on other
platforms.
The method sets SIGUSR1 handler for Xvfb to return SIGUSR1
when it is ready for connections.
https://www.x.org/archive/X11R7.5/doc/man/man1/Xserver.1.html under Signals.
Args:
cmd: Command to be executed.
env: A copy of environment variables. "DISPLAY" and will be set if Xvfb is
used. "WAYLAND_DISPLAY" will be set if Weston is used.
stdoutfile: If provided, symbolization via script is disabled and stdout
is written to this file as well as to stdout.
use_openbox: A flag to use openbox process.
Some ChromeOS tests need a window manager.
use_xcompmgr: A flag to use xcompmgr process.
Some tests need a compositing wm to make use of transparent visuals.
xvfb_whd: WxHxD to pass to xvfb or DEFAULT_XVFB_WHD if None
cwd: Current working directory.
Returns:
the exit code of the specified commandline, or 1 on failure.
"""
use_xvfb = True
if '--no-xvfb' in cmd:
use_xvfb = False
cmd.remove('--no-xvfb')
use_weston = False
if '--use-weston' in cmd:
if use_xvfb:
print('Unable to use Weston with xvfb.\n', file=sys.stderr)
return 1
use_weston = True
cmd.remove('--use-weston')
if sys.platform.startswith('linux') and use_xvfb:
return _run_with_xvfb(cmd, env, stdoutfile, use_openbox, use_xcompmgr,
xvfb_whd or DEFAULT_XVFB_WHD, cwd)
if use_weston:
return _run_with_weston(cmd, env, stdoutfile, cwd)
return test_env.run_executable(cmd, env, stdoutfile, cwd)
def _run_with_xvfb(cmd, env, stdoutfile, use_openbox,
use_xcompmgr, xvfb_whd, cwd):
openbox_proc = None
openbox_ready = MutableBoolean()
def set_openbox_ready(*_):
openbox_ready.setvalue(True)
xcompmgr_proc = None
xvfb_proc = None
xvfb_ready = MutableBoolean()
def set_xvfb_ready(*_):
xvfb_ready.setvalue(True)
dbus_pid = None
try:
signal.signal(signal.SIGTERM, raise_xvfb_error)
signal.signal(signal.SIGINT, raise_xvfb_error)
xvfb_help = subprocess.check_output(
['Xvfb', '-help'], stderr=subprocess.STDOUT).decode('utf8')
for _ in range(10):
xvfb_ready.setvalue(False)
display = find_display()
xvfb_cmd = ['Xvfb', display, '-screen', '0', xvfb_whd, '-ac',
'-nolisten', 'tcp', '-dpi', '96', '+extension', 'RANDR']
if '-maxclients' in xvfb_help:
xvfb_cmd += ['-maxclients', '512']
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
xvfb_proc = subprocess.Popen(xvfb_cmd, stderr=subprocess.STDOUT, env=env)
signal.signal(signal.SIGUSR1, set_xvfb_ready)
for _ in range(30):
time.sleep(.1)
if xvfb_ready.getvalue() or xvfb_proc.poll() is not None:
break
if xvfb_proc.poll() is None:
if xvfb_ready.getvalue():
break
kill(xvfb_proc, 'Xvfb')
if xvfb_proc.poll() is not None:
raise _XvfbProcessError('Failed to start after 10 tries')
env['DISPLAY'] = display
env['XVFB_DISPLAY'] = display
dbus_pid = launch_dbus(env)
if use_openbox:
current_proc_id = os.getpid()
openbox_startup_cmd = 'kill --signal SIGUSR1 %s' % str(current_proc_id)
signal.signal(signal.SIGUSR1, signal.SIG_IGN)
signal.signal(signal.SIGUSR1, set_openbox_ready)
openbox_proc = subprocess.Popen(
['openbox', '--sm-disable', '--startup',
openbox_startup_cmd], stderr=subprocess.STDOUT, env=env)
for _ in range(30):
time.sleep(.1)
if openbox_ready.getvalue() or openbox_proc.poll() is not None:
break
if openbox_proc.poll() is not None or not openbox_ready.getvalue():
raise _XvfbProcessError('Failed to start OpenBox.')
if use_xcompmgr:
xcompmgr_proc = subprocess.Popen(
'xcompmgr', stderr=subprocess.STDOUT, env=env)
return test_env.run_executable(cmd, env, stdoutfile, cwd)
except OSError as e:
print('Failed to start Xvfb or Openbox: %s\n' % str(e), file=sys.stderr)
return 1
except _XvfbProcessError as e:
print('Xvfb fail: %s\n' % str(e), file=sys.stderr)
return 1
finally:
kill(openbox_proc, 'openbox')
kill(xcompmgr_proc, 'xcompmgr')
kill(xvfb_proc, 'Xvfb')
if dbus_pid:
os.kill(dbus_pid, signal.SIGKILL)
def _run_with_weston(cmd, env, stdoutfile, cwd):
weston_proc = None
try:
signal.signal(signal.SIGTERM, raise_weston_error)
signal.signal(signal.SIGINT, raise_weston_error)
dbus_pid = launch_dbus(env)
if not os.path.isfile("./weston"):
print('Weston is not available. Starting without Wayland compositor')
return test_env.run_executable(cmd, env, stdoutfile, cwd)
_set_xdg_runtime_dir(env)
with open(_weston_config_file_path(), 'w') as weston_config_file:
weston_config_file.write('[shell]\npanel-position=none')
weston_cmd = ['./weston', '--backend=headless-backend.so', '--idle-time=0',
'--modules=ui-controls.so,systemd-notify.so', '--width=1280',
'--height=800', '--config=' + _weston_config_file_path()]
if '--weston-use-gl' in cmd:
weston_cmd.append('--use-gl')
cmd.remove('--weston-use-gl')
if '--weston-debug-logging' in cmd:
cmd.remove('--weston-debug-logging')
env = copy.deepcopy(env)
env['WAYLAND_DEBUG'] = '1'
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM
| socket.SOCK_NONBLOCK) as notify_socket:
notify_socket.bind(_weston_notify_socket_address())
env['NOTIFY_SOCKET'] = _weston_notify_socket_address()
weston_proc_display = None
for _ in range(10):
weston_proc = subprocess.Popen(
weston_cmd,
stderr=subprocess.STDOUT, env=env)
for _ in range(25):
time.sleep(0.1)
try:
if notify_socket.recv(512) == b'READY=1':
break
except BlockingIOError:
continue
for _ in range(25):
time.sleep(0.1)
weston_proc_display = _get_display_from_weston(weston_proc.pid)
if weston_proc_display is not None:
break
if weston_proc_display is not None:
break
if weston_proc_display is None:
raise _WestonProcessError('Failed to start Weston.')
env.pop('NOTIFY_SOCKET')
env['WAYLAND_DISPLAY'] = weston_proc_display
if '--chrome-wayland-debugging' in cmd:
cmd.remove('--chrome-wayland-debugging')
env['WAYLAND_DEBUG'] = '1'
else:
env['WAYLAND_DEBUG'] = '0'
return test_env.run_executable(cmd, env, stdoutfile, cwd)
except OSError as e:
print('Failed to start Weston: %s\n' % str(e), file=sys.stderr)
return 1
except _WestonProcessError as e:
print('Weston fail: %s\n' % str(e), file=sys.stderr)
return 1
finally:
kill(weston_proc, 'weston')
if os.path.exists(_weston_notify_socket_address()):
os.remove(_weston_notify_socket_address())
if os.path.exists(_weston_config_file_path()):
os.remove(_weston_config_file_path())
if dbus_pid:
os.kill(dbus_pid, signal.SIGKILL)
def _weston_notify_socket_address():
return os.path.join(tempfile.gettempdir(), '.xvfb.py-weston-notify.sock')
def _weston_config_file_path():
return os.path.join(tempfile.gettempdir(), '.xvfb.py-weston.ini')
def _get_display_from_weston(weston_proc_pid):
"""Retrieves $WAYLAND_DISPLAY set by Weston.
Returns the $WAYLAND_DISPLAY variable from one of weston's subprocesses.
Weston updates this variable early in its startup in the main process, but we
can only read the environment variables as they were when the process was
created. Therefore we must use one of weston's subprocesses, which are all
spawned with the new value for $WAYLAND_DISPLAY. Any of them will do, as they
all have the same value set.
Args:
weston_proc_pid: The process of id of the main Weston process.
Returns:
the display set by Wayland, which clients can use to connect to.
"""
parent = psutil.Process(weston_proc_pid)
if parent is None:
return None
children = parent.children(recursive=True)
for process in children:
weston_proc_display = process.environ().get('WAYLAND_DISPLAY')
if weston_proc_display is not None:
return weston_proc_display
return None
class MutableBoolean(object):
"""Simple mutable boolean class. Used to be mutated inside an handler."""
def __init__(self):
self._val = False
def setvalue(self, val):
assert isinstance(val, bool)
self._val = val
def getvalue(self):
return self._val
def raise_xvfb_error(*_):
raise _XvfbProcessError('Terminated')
def raise_weston_error(*_):
raise _WestonProcessError('Terminated')
def find_display():
"""Iterates through X-lock files to find an available display number.
The lower bound follows xvfb-run standard at 99, and the upper bound
is set to 119.
Returns:
A string of a random available display number for Xvfb ':{99-119}'.
Raises:
_XvfbProcessError: Raised when displays 99 through 119 are unavailable.
"""
available_displays = [
d for d in range(99, 120)
if not os.path.isfile('/tmp/.X{}-lock'.format(d))
]
if available_displays:
return ':{}'.format(random.choice(available_displays))
raise _XvfbProcessError('Failed to find display number')
def _set_xdg_runtime_dir(env):
"""Sets the $XDG_RUNTIME_DIR variable if it hasn't been set before."""
runtime_dir = env.get('XDG_RUNTIME_DIR')
if not runtime_dir:
runtime_dir = '/tmp/xdg-tmp-dir/'
if not os.path.exists(runtime_dir):
os.makedirs(runtime_dir, 0o700)
env['XDG_RUNTIME_DIR'] = runtime_dir
def main():
usage = 'Usage: xvfb.py [command [--no-xvfb or --use-weston] args...]'
if len(sys.argv) < 2:
print(usage + '\n', file=sys.stderr)
return 2
if os.path.isdir(sys.argv[1]):
print('Invalid command: \"%s\" is a directory\n' % sys.argv[1],
file=sys.stderr)
print(usage + '\n', file=sys.stderr)
return 3
return run_executable(sys.argv[1:], os.environ.copy())
if __name__ == '__main__':
sys.exit(main())