import logging
import os
import posixpath
import sys
import time
from typing import Any
import unittest
from telemetry.util import image_util, screenshot
import gpu_path_util
from gpu_tests import common_typing as ct
from gpu_tests import gpu_integration_test
from gpu_tests import pixel_test_pages
from gpu_tests import skia_gold_heartbeat_integration_test_base as sghitb
from gpu_tests import skia_gold_integration_test_base
from gpu_tests.util import host_information
from gpu_tests.util import screenshot_utils
SCROLLBAR_WIDTH = 10
DEFAULT_SCREENSHOT_TIMEOUT = 5
SLOW_SCREENSHOT_MULTIPLIER = 4
MAX_FLAKY_OUTPUT_TEST_TRIES = 3
class PixelIntegrationTest(sghitb.SkiaGoldHeartbeatIntegrationTestBase):
"""GPU pixel tests backed by Skia Gold and Telemetry."""
test_base_name = 'Pixel'
@classmethod
def Name(cls) -> str:
"""The name by which this test is invoked on the command line."""
return 'pixel'
@classmethod
def _SuiteSupportsParallelTests(cls) -> bool:
return True
def _GetSerialGlobs(self) -> set[str]:
serial_globs = set()
if host_information.IsMac():
serial_globs |= {
'Pixel_OffscreenCanvasWebGL*',
'Pixel_VideoStreamFrom*',
}
if host_information.IsWindows():
serial_globs |= {
'Pixel_DirectComposition_Underlay*',
'Pixel_DirectComposition_Video*',
}
return serial_globs
def _GetSerialTests(self) -> set[str]:
serial_tests = {
'Pixel_OffscreenCanvasIBRCWebGLHighPerfWorker',
'Pixel_OffscreenCanvasIBRCWebGLMain',
'Pixel_OffscreenCanvasIBRCWebGLWorker',
'Pixel_WebGLLowToHighPower',
'Pixel_WebGLLowToHighPowerAlphaFalse',
}
if host_information.IsLinux() and host_information.IsAmdGpu():
serial_tests |= {
'Pixel_OffscreenCanvasWebGLSoftwareCompositingWorker',
}
if host_information.IsMac() and host_information.Isx86Cpu():
serial_tests |= {
'Pixel_SVGHuge',
}
if host_information.IsWindows() and host_information.IsArmCpu():
serial_tests |= {
'Pixel_Video_Context_Loss_VP9',
'Pixel_WebGLContextRestored',
'Pixel_WebGLSadCanvas',
}
return serial_tests
@classmethod
def GenerateGpuTests(cls, options: ct.ParsedCmdArgs) -> ct.TestGenerator:
namespace = pixel_test_pages.PixelTestPages
pages = namespace.DefaultPages(cls.test_base_name)
pages += namespace.GpuRasterizationPages(cls.test_base_name)
pages += namespace.ExperimentalCanvasFeaturesPages(cls.test_base_name)
pages += namespace.LowLatencyPages(cls.test_base_name)
pages += namespace.WebGPUPages(cls.test_base_name)
pages += namespace.WebGPUCanvasCapturePages(cls.test_base_name)
pages += namespace.WebGPUDeviceDestroyPages(cls.test_base_name)
pages += namespace.PaintWorkletPages(cls.test_base_name)
pages += namespace.VideoFromCanvasPages(cls.test_base_name)
pages += namespace.NoGpuProcessPages(cls.test_base_name)
pages += namespace.MeetEffectsPages(cls.test_base_name)
if host_information.IsMac():
pages += namespace.MacSpecificPages(cls.test_base_name)
pages += namespace.DualGPUMacSpecificPages(cls.test_base_name)
if host_information.IsWindows():
pages += namespace.DirectCompositionPages(cls.test_base_name)
pages += namespace.HdrTestPages(cls.test_base_name)
pages += namespace.WARPPages(cls.test_base_name)
if host_information.IsLinux() or (host_information.IsWindows()
and not host_information.IsArmCpu()):
pages += namespace.SwiftShaderPages(cls.test_base_name)
for p in pages:
yield (p.name, posixpath.join(gpu_path_util.GPU_DATA_RELATIVE_PATH,
p.url), [p])
def RunActualGpuTest(self, test_path: str, args: ct.TestArgs) -> None:
super().RunActualGpuTest(test_path, args)
test_case = args[0]
self.RestartBrowserIfNecessaryWithArgs(test_case.browser_args)
attempt = 1
while True:
tab_data = sghitb.TabData(self.tab,
self.__class__.websocket_server,
is_default_tab=True)
self.NavigateTo(test_path, tab_data)
loop_state = sghitb.LoopState()
for action in test_case.test_actions:
action.Run(test_case, tab_data, loop_state, self)
try:
self._RunSkiaGoldBasedPixelTest(test_case)
break
except skia_gold_integration_test_base.GoldComparisonFailure:
if (test_case.known_flaky_output_test
and attempt <= MAX_FLAKY_OUTPUT_TEST_TRIES):
logging.warning(
'Known flaky output test %s failed on attempt %d, retrying',
test_case.name, attempt)
attempt += 1
continue
raise
def _OnAfterTest(self, args: ct.TestArgs) -> None:
"""Conditionally restarts the browser after the test is finished.
This must be done as a post-test hook instead of at the end of the test
method because restarting wipes crash data, but expected crash checks are
performed after RunActualGpuTest finishes.
Args:
args: The same arguments that the test was run with.
"""
test_case = args[0]
if (test_case.used_custom_test_actions
or test_case.restart_browser_after_test):
self._RestartBrowser(
'Must restart after non-standard test actions or if required by test')
if test_case.used_custom_test_actions and self.IsDualGPUMacLaptop():
time.sleep(4)
def GetExpectedCrashes(self, args: ct.TestArgs) -> None:
"""Returns which crashes, per process type, to expect for the current test.
Args:
args: The list passed to _RunGpuTest()
Returns:
A dictionary mapping crash types as strings to the number of expected
crashes of that type. Examples include 'gpu' for the GPU process,
'renderer' for the renderer process, and 'browser' for the browser
process.
"""
crashes_by_platform = args[0].expected_per_process_crashes
os_name = self.platform.GetOSName()
return crashes_by_platform.get(
os_name,
crashes_by_platform.get(
pixel_test_pages.EXPECTED_CRASHES_PLATFORM_DEFAULT, {}))
def _RunSkiaGoldBasedPixelTest(
self, test_case: pixel_test_pages.PixelTestPage) -> None:
"""Captures and compares a test image using Skia Gold.
Raises an Exception if the comparison fails.
Args:
test_case: the GPU PixelTestPage object for the test.
"""
tab = self.tab
if test_case.RequiresFullScreenOSScreenshot():
if not self.browser.platform.CanTakeScreenshot():
logging.warning('Skipping the test because the platform does not '
'support OS screenshots')
self.skipTest('The platform does not support fullscreen OS screenshot')
fh = screenshot.TryCaptureScreenShot(self.browser.platform, None,
self._GetScreenshotTimeout())
if fh is None:
self.fail('Unable to get file handle of the screenshot')
screen_shot = image_util.FromPngFile(fh.GetAbsPath())
elif test_case.ShouldCaptureFullScreenshot(self.browser):
screen_shot = tab.FullScreenshot(15)
else:
screen_shot = tab.Screenshot(self._GetScreenshotTimeout())
if screen_shot is None:
self.fail('Could not capture screenshot')
dpr = screenshot_utils.GetEffectiveDpr(tab)
screen_shot = test_case.crop_action.CropScreenshot(
screen_shot, dpr, self.browser.platform.GetDeviceTypeName(),
self.browser.platform.GetOSName())
image_name = self._UrlToImageName(test_case.name)
self._UploadTestResultToSkiaGold(image_name, screen_shot, test_case)
def _GetScreenshotTimeout(self) -> float:
multiplier = 1 + (self.child.jobs - 1) / 3.0
if self._IsSlowTest():
multiplier = SLOW_SCREENSHOT_MULTIPLIER
return DEFAULT_SCREENSHOT_TIMEOUT * multiplier
@classmethod
def ExpectationsFiles(cls) -> list[str]:
return [
os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'test_expectations',
'pixel_expectations.txt')
]
def load_tests(loader: unittest.TestLoader, tests: Any,
pattern: Any) -> unittest.TestSuite:
del loader, tests, pattern
return gpu_integration_test.LoadAllTestsInModule(sys.modules[__name__])