import functools
import os
import socket
import subprocess
import sys
import threading
import time
from contextlib import closing
from contextlib import contextmanager
from typing import List, Optional
import attr
from chrome.test.variations.drivers import DriverFactory
from chrome.test.variations.test_utils import SRC_DIR
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
sys.path.append(os.path.join(SRC_DIR, 'third_party'))
sys.path.append(os.path.join(SRC_DIR, 'third_party', 'catapult', 'telemetry'))
from chromite.lib import device, vm
from chromite.lib import remote_access
from telemetry.core.exceptions import BrowserConnectionGoneException
from telemetry.internal.browser import browser_options
from telemetry.internal.platform import cros_device
from telemetry.internal.platform.cros_platform_backend import CrosPlatformBackend
from telemetry.internal.backends.chrome import cros_browser_finder
CACHE_DIR = os.path.join(SRC_DIR, "build", "cros_cache")
INITIAL_WAIT_TIME_SECONDS = 10
class _PossibleCrOSBrowser(cros_browser_finder.PossibleCrOSBrowser):
"""The CrOS browser wrapper to filter out start-up args."""
def GetBrowserStartupArgs(self, browser_options):
startup_args = super().GetBrowserStartupArgs(browser_options)
removed_args = [
'--enable-gpu-benchmarking',
]
return [arg for arg in startup_args if arg not in removed_args]
def _launch_browser(browser_args: List[str]) -> 'Browser':
finder_options = browser_options.BrowserFinderOptions()
finder_options.browser_type = 'cros-browser'
finder_options.verbosity = 2
finder_options.CreateParser().parse_args(args=[])
b_options = finder_options.browser_options
b_options.browser_startup_timeout = 15
b_options.AppendExtraBrowserArgs(browser_args)
device = cros_device.CrOSDevice(
host_name='localhost',
ssh_port=9222,
ssh_identity=finder_options.ssh_identity,
is_local=False)
platform = CrosPlatformBackend.CreatePlatformForDevice(device, None)
possibleBrowser = _PossibleCrOSBrowser(
'cros-chrome', finder_options, platform, is_guest=False)
possibleBrowser.SetUpEnvironment(b_options)
try:
browser = possibleBrowser.Create()
except BrowserConnectionGoneException as e:
raise WebDriverException from e
return browser
def _wait_for_port(
port: int, host: str='localhost', timeout:float=5) -> bool:
start_time = time.perf_counter()
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
while(time.perf_counter() - start_time <= timeout):
if sock.connect_ex((host, port)) == 0:
return True
else:
time.sleep(0.01)
return False
@attr.attrs()
class CrOSDriverFactory(DriverFactory):
channel: str = attr.attrib()
board: str = attr.attrib()
server_port: int = attr.attrib()
def __attrs_post_init__(self):
super().__attrs_post_init__()
self._vm_started = False
def _launch_vm(self) -> vm.VM:
parser = vm.VM.GetParser()
opts = parser.parse_args([
f'--board={self.board}',
f'--cache-dir={CACHE_DIR}',
])
_device = device.Device.Create(opts)
if not _device.IsRunning():
_device.Start()
return _device
def _ssh_forward(self, port: int, server_port: int) -> subprocess.Popen:
local_pfs = remote_access.PortForwardSpec(local_port=port)
remote_pfs = remote_access.PortForwardSpec(local_port=server_port)
tunnel = self.device.remote.agent.CreateTunnel(
to_local=[local_pfs], to_remote=[remote_pfs])
if not _wait_for_port(port):
return None
return tunnel
def _copy_seed_file(self, seed_file: str) -> str:
assert os.path.exists(seed_file)
remote_seed_path = f'/tmp/{os.path.basename(seed_file)}'
remote_device = self.device.remote
assert remote_device.IsDirWritable('/tmp/'), 'tmp dir not writable'
remote_device.CopyToDevice(src=seed_file,
dest=remote_seed_path,
mode='scp',
verbose=True)
assert remote_device.IfFileExists(remote_seed_path), (
'file not pushed to device'
)
remote_device.run(
['chmod', 'a+rw', remote_seed_path], remote_sudo=True, print_cmd=True)
return remote_seed_path
@property
def supports_startup_timeout(self) -> bool:
return False
@functools.cached_property
def device(self) -> device.Device:
device_ = self._launch_vm()
self._vm_started = True
return device_
@property
def vm_started(self) -> bool:
return self._vm_started
@contextmanager
def tunnel_context(self, debugging_port, server_port):
tunnel = self._ssh_forward(debugging_port, server_port)
if not tunnel:
raise WebDriverException(f'Unable to forward port: {debugging_port}')
def poll():
stat = tunnel.poll()
while stat == None:
stat = tunnel.poll()
threading.Thread(target=poll).start()
try:
yield
finally:
tunnel.terminate()
@contextmanager
def create_driver(
self,
seed_file: Optional[str] = None,
options: Optional[webdriver.ChromeOptions] = None
):
assert self.device, "VM fails to boot."
browser_args = [
'--remote-allow-origins=*',
]
if seed_file:
remote_seed_path = self._copy_seed_file(seed_file)
browser_args.extend([
f'--variations-test-seed-path="{remote_seed_path}"',
f'--fake-variations-channel={self.channel}',
'--disable-variations-safe-mode',
'--disable-field-trial-config',
'--force-fieldtrials=SeedFileTrial/Default'
])
browser = _launch_browser(browser_args)
debugging_port, _ = browser._browser_backend._FindDevToolsPortAndTarget()
options = options or self.default_options
options.debugger_address=f'localhost:{debugging_port}'
with self.tunnel_context(debugging_port, self.server_port):
time.sleep(INITIAL_WAIT_TIME_SECONDS)
driver = webdriver.Chrome(service=self.get_driver_service(),
options=options)
self.wait_for_window(driver)
try:
yield driver
finally:
driver.quit()
def close(self):
if self.vm_started and self.device.IsRunning():
self.device.Stop()
pass