import collections
from collections.abc import Generator
import dataclasses
import datetime
from enum import Enum
import gzip
import io
import logging
import os
import posixpath
import sys
import tempfile
import time
from typing import Any
import unittest
import perfetto.trace_processor as tp
from telemetry.timeline import tracing_config
from tracing.trace_data import trace_data
import gpu_path_util
from gpu_tests import common_browser_args as cba
from gpu_tests import common_typing as ct
from gpu_tests import gpu_integration_test
from gpu_tests import overlay_support
from gpu_tests import trace_test_pages
from gpu_tests.util import host_information
gpu_data_relative_path = gpu_path_util.GPU_DATA_RELATIVE_PATH
data_paths = [
gpu_path_util.GPU_DATA_DIR,
os.path.join(gpu_path_util.CHROMIUM_SRC_DIR, 'media', 'test', 'data')
]
webgl_test_harness_script = r"""
var domAutomationController = {};
domAutomationController._finished = false;
domAutomationController._originalLog = window.console.log;
domAutomationController._messages = '';
domAutomationController.log = function(msg) {
domAutomationController._messages += msg + "\n";
domAutomationController._originalLog.apply(window.console, [msg]);
}
domAutomationController.send = function(msg) {
// Issue a read pixel to synchronize the gpu process to ensure
// the asynchronous category enabling is finished.
var temp_canvas = document.createElement("canvas")
temp_canvas.width = 1;
temp_canvas.height = 1;
var temp_gl = temp_canvas.getContext("experimental-webgl") ||
temp_canvas.getContext("webgl");
if (temp_gl) {
temp_gl.clear(temp_gl.COLOR_BUFFER_BIT);
var id = new Uint8Array(4);
temp_gl.readPixels(0, 0, 1, 1, temp_gl.RGBA, temp_gl.UNSIGNED_BYTE, id);
} else {
console.log('Failed to get WebGL context.');
}
domAutomationController._finished = true;
}
window.domAutomationController = domAutomationController;
"""
basic_test_harness_script = r"""
var domAutomationController = {};
domAutomationController._proceed = false;
domAutomationController._readyForActions = false;
domAutomationController._succeeded = false;
domAutomationController._finished = false;
domAutomationController._originalLog = window.console.log;
domAutomationController._messages = '';
domAutomationController.log = function(msg) {
domAutomationController._messages += msg + "\n";
domAutomationController._originalLog.apply(window.console, [msg]);
}
domAutomationController.send = function(msg) {
domAutomationController._proceed = true;
let lmsg = msg.toLowerCase();
if (lmsg == "ready") {
domAutomationController._readyForActions = true;
} else {
domAutomationController._finished = true;
if (lmsg == "success") {
domAutomationController._succeeded = true;
} else {
domAutomationController._succeeded = false;
}
}
}
window.domAutomationController = domAutomationController;
"""
_GET_STATISTICS_EVENT_NAME = 'GetFrameStatisticsMedia'
_SWAP_CHAIN_PRESENT_EVENT_NAME = 'SwapChain::Present'
_BEGIN_OVERLAY_ACCESS_EVENT_NAME = 'SkiaOutputDeviceDComp::BeginOverlayAccess'
_PRESENT_SWAP_CHAIN_EVENT_NAME =\
'DXGISwapChainImageBacking::Present'
_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME =\
'HTMLCanvasElement::NotifyListenersCanvasChanged'
_STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME =\
'StaticBitmapImageToVideoFrameCopier::Convert'
_MFD3D11VC_CAPTURE_EVENT_NAME = 'CopyTextureToGpuMemoryBuffer'
_MFD3D11VC_MAP_EVENT_NAME = 'GpuMemoryBufferTrackerWin::DuplicateAsUnsafeRegion'
_MFD3D11VC_ALTERNATIVE_MAP_EVENT_NAME =\
'GpuChannelMessageFilter::CopyToGpuMemoryBufferAsync'
_MFD3D11VC_PRESENT_EVENT_NAME = 'DXGISharedHandleState::AcquireKeyedMutex'
_GPU_HOST_STORE_BLOB_EVENT_NAME =\
'GpuHostImpl::StoreBlobToDisk'
_WEBGPU_BLOB_CACHE_HIT_EVENT_NAME = \
'DawnCachingInterface::CacheHit'
_WEBGPU_CACHE_HANDLE_TYPE = 1
_MIN_CACHE_HIT_KEY = 'min_cache_hits'
_PROFILE_DIR_KEY = 'profile_dir'
_PROFILE_TYPE_KEY = 'profile_type'
class _TraceTestOrigin(Enum):
"""Enum type for different origin types when navigating to URLs.
The default enum DEFAULT resolves URLs using the explicit localhost IP,
i.e. 127.0.0.1. LOCALHOST resolves URLs using 'localhost' instead. This is
useful when we want the navigations to hit the same resource but appear to
be from a different origin.
As an implementation detail, the values of the enums correspond to the name of
the SeriallyExecutedBrowserTestCase instance functions to get the URLs."""
DEFAULT = 'UrlOfStaticFilePath'
LOCALHOST = 'LocalhostUrlOfStaticFilePath'
@dataclasses.dataclass
class _TraceTestArguments():
"""Struct-like object for passing trace test arguments instead of dicts."""
browser_args: list[str]
category: str
test_harness_script: str
finish_js_condition: str
success_eval_func: str
other_args: dict
restart_browser: bool = True
origin: _TraceTestOrigin = _TraceTestOrigin.DEFAULT
@dataclasses.dataclass
class _CacheTraceTestArguments():
"""Struct-like object for passing persistent cache trace test arguments.
Cache trace tests consist of a series of normal trace test invocations that
are necessary in order to verify the expected caching behaviors. The tests
start with a first load page which is generally used to populate cache
entries. If |test_renavigation| is true, the same browser that opened the
first load page is navigated to each cache page in |cache_pages| and the cache
conditions are verified. Then, regardless of the value of |test_renavigation|,
we iterate across the |cache_pages| again and restart the browser for each
page using a new clean user data directory that is seeded with the contents
from the first load page, and verify the cache conditions. The seeding just
copies all files in the user data directory from the first load over, see
*BrowserFinder classes for more details on the seeding:
//third_party/catapult/telemetry/telemetry/internal/backends/chrome/
The renavigation tests are suitable when we are verifying for cache hits since
no new entries should be generated in the |cache_pages|. However, they are not
suitable for cache miss cases because each |cache_page| may cause entries to
be written to the cache, thereby causing subsequent |cache_pages| to see cache
hits when we actually expect them to be misses. Note this is not a problem
for the restarted browser case because each browser restart seeds a new
temporary directory with only the contents after the first load page.
"""
browser_args: list[str]
category: str
test_harness_script: str
finish_js_condition: str
first_load_eval_func: str
cache_eval_func: str
cache_pages: list[str]
cache_page_origin: _TraceTestOrigin = _TraceTestOrigin.DEFAULT
test_renavigation: bool = True
def GenerateFirstLoadTest(self) -> _TraceTestArguments:
"""Returns the trace test arguments for the first load cache test."""
return _TraceTestArguments(browser_args=self.browser_args,
category=self.category,
test_harness_script=self.test_harness_script,
finish_js_condition=self.finish_js_condition,
success_eval_func=self.first_load_eval_func,
other_args={},
restart_browser=True)
def GenerateCacheHitTests(
self, cache_args: dict | None
) -> Generator[tuple[str, _TraceTestArguments], None, None]:
"""Returns a generator for all cache hit trace tests.
First pass of tests just do a re-navigation, second pass restarts with a
seeded profile directory.
"""
if self.test_renavigation:
for cache_hit_page in self.cache_pages:
yield (posixpath.join(gpu_data_relative_path, cache_hit_page),
_TraceTestArguments(browser_args=self.browser_args,
category=self.category,
test_harness_script=self.test_harness_script,
finish_js_condition=self.finish_js_condition,
success_eval_func=self.cache_eval_func,
other_args=cache_args,
restart_browser=False,
origin=self.cache_page_origin))
for cache_hit_page in self.cache_pages:
yield (posixpath.join(gpu_data_relative_path, cache_hit_page),
_TraceTestArguments(browser_args=self.browser_args,
category=self.category,
test_harness_script=self.test_harness_script,
finish_js_condition=self.finish_js_condition,
success_eval_func=self.cache_eval_func,
other_args=cache_args,
restart_browser=True,
origin=self.cache_page_origin))
class TraceIntegrationTest(gpu_integration_test.GpuIntegrationTest):
"""Tests GPU traces are plumbed through properly.
Also tests that GPU Device traces show up on devices that support them."""
known_test_prefixes = frozenset([
'OverlayModeTraceTest',
'SwapChainTraceTest',
'TraceTest',
'VideoPathTraceTest',
'WebGPUCachingTraceTest',
'WebGLCanvasCaptureTraceTest',
'WebGPUTraceTest',
])
@classmethod
def Name(cls) -> str:
return 'trace_test'
@classmethod
def _SuiteSupportsParallelTests(cls) -> bool:
return True
def _GetSerialGlobs(self) -> set[str]:
serial_globs = set()
if host_information.IsWindows():
serial_globs |= {
'OverlayModeTraceTest_DirectComposition_Underlay*',
'OverlayModeTraceTest_DirectComposition_Video*',
}
return serial_globs
def _GetSerialTests(self) -> set[str]:
serial_tests = set()
if host_information.IsMac():
serial_tests |= {
'WebGPUTraceTest_WebGPUCanvasOneCopyCapture',
'WebGPUTraceTest_WebGPUCanvasDisableOneCopyCapture_Accelerated',
}
return serial_tests
@classmethod
def GenerateGpuTests(cls, options: ct.ParsedCmdArgs) -> ct.TestGenerator:
namespace = trace_test_pages.TraceTestPages
for p in namespace.DefaultPages('TraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category=cls._DisabledByDefaultTraceCategory('gpu.service'),
test_harness_script=webgl_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckGLCategory',
other_args=p.other_args)
])
for p in namespace.VideoFromCanvasPages('WebGLCanvasCaptureTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='blink',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckWebGLCanvasCapture',
other_args=p.other_args)
])
for p in namespace.WebGPUCanvasCapturePages('WebGPUTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='blink',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckWebGPUCanvasCapture',
other_args=p.other_args)
])
if host_information.IsWindows():
for p in namespace.DirectCompositionPages('VideoPathTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category=cls._DisabledByDefaultTraceCategory('gpu.service'),
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckVideoPath',
other_args=p.other_args)
])
for p in namespace.DirectCompositionPages('OverlayModeTraceTest',
swap_count=60):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category=cls._DisabledByDefaultTraceCategory('gpu.service'),
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckOverlayMode',
other_args=p.other_args)
])
for p in namespace.LowLatencyPages('SwapChainTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckSwapChainPath',
other_args=p.other_args)
])
for p in namespace.RootSwapChainTests('SwapChainTraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckSwapChainHasAlpha',
other_args=p.other_args)
])
for p in namespace.MediaFoundationD3D11VideoCaptureTests('TraceTest'):
yield (p.name, posixpath.join(gpu_data_relative_path, p.url), [
_TraceTestArguments(
browser_args=p.browser_args,
category='gpu,' +
cls._DisabledByDefaultTraceCategory('video_and_image_capture'),
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
success_eval_func='CheckMediaFoundationD3D11VideoCapture',
other_args=p.other_args)
])
for test in namespace.WebGpuLoadReloadCachingTests(
'WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUFirstLoadCache',
cache_eval_func='CheckWebGPUCacheHits',
cache_pages=test.cache_pages,
)
])
for test in namespace.WebGpuIncognitoCachingTests('WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUCacheHits',
cache_eval_func='CheckNoWebGPUCacheHits',
cache_pages=test.cache_pages)
])
for test in namespace.WebGpuDifferentOriginCachingTests(
'WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUFirstLoadCache',
cache_eval_func='CheckNoWebGPUCacheHits',
cache_pages=test.cache_pages,
cache_page_origin=_TraceTestOrigin.LOCALHOST,
test_renavigation=False)
])
for test in namespace.WebGpuCrossOriginCacheMissTests(
'WebGPUCachingTraceTest'):
yield (test.name,
posixpath.join(gpu_data_relative_path, test.first_load_page), [
_CacheTraceTestArguments(
browser_args=test.browser_args,
category='gpu',
test_harness_script=basic_test_harness_script,
finish_js_condition='domAutomationController._finished',
first_load_eval_func='CheckWebGPUFirstLoadCache',
cache_eval_func='CheckNoWebGPUCacheHits',
cache_pages=test.cache_pages,
test_renavigation=False)
])
@classmethod
def _GetLocalPerfettoTraceProcessorPath(cls) -> str | None:
"""Gets the path to the local Perfetto trace_processor_shell binary.
Returns:
The path to the local Perfetto trace_processor_shell binary if it exists,
otherwise None.
"""
os_name = cls.browser.platform.GetOSName()
if os_name and os_name.lower() == 'win':
logging.warning(
'Falling back to cloud version of trace_processor_shell because '
'locally built binaries do not currently work on Windows.')
return None
binary = 'trace_processor_shell'
if os_name and os_name.lower() in ('android', 'chromeos', 'fuchsia'):
binary = 'host_trace_processor_shell'
output_directory = cls.GetOriginalFinderOptions().chromium_output_dir
if not output_directory:
logging.warning(
'Chromium output directory not set, not able to find local '
'trace_processor_shell. Will fall back to cloud version.')
return None
filepath = os.path.join(output_directory, binary)
if not os.path.exists(filepath):
logging.warning(
'Unable to find local trace_processor_shell. Will fall back to '
'cloud version.')
return None
return filepath
@classmethod
def _GetTraceProcessorLoadTimeout(cls) -> int:
"""Determines the load timeout to use for a TraceProcessor.
Returns:
The load timeout that should be used based on platform, etc.
"""
load_timeout = 2
slow_load_timeout = 10
os_name = cls.browser.platform.GetOSName()
if os_name == 'mac':
if cls.browser.browser_type == 'debug':
load_timeout = slow_load_timeout
elif 'nvidia' in cls.GetPlatformTags(cls.browser):
load_timeout = slow_load_timeout
elif os_name == 'linux':
load_timeout = slow_load_timeout
elif os_name == 'chromeos':
if 'chromeos-board-amd64-generic' in cls.GetPlatformTags(cls.browser):
load_timeout = slow_load_timeout
elif os_name == 'win':
if 'arch-arm64' in cls.GetPlatformTags(cls.browser):
load_timeout = slow_load_timeout
return load_timeout
@classmethod
def _GetTraceProcessorConfig(cls) -> tp.TraceProcessorConfig:
"""Gets the standardized trace processor config for the current platform.
Returns:
A TraceProcessorConfig with an automatically determined load timeout.
Will use the locally built trace processor if available.
"""
load_timeout = cls._GetTraceProcessorLoadTimeout()
processor_path = cls._GetLocalPerfettoTraceProcessorPath()
if processor_path:
processor_config = tp.TraceProcessorConfig(bin_path=processor_path,
load_timeout=load_timeout)
else:
processor_config = tp.TraceProcessorConfig(load_timeout=load_timeout)
return processor_config
def _GetTraceProcessorForTrace(self, trace: bytes) -> tp.TraceProcessor:
processor_config = self._GetTraceProcessorConfig()
trace_processor = tp.TraceProcessor(io.BytesIO(trace),
config=processor_config)
return trace_processor
def _RunActualGpuTraceTest(self,
test_path: str,
args: _TraceTestArguments,
profile_dir: str | None = None,
profile_type: str | None = None) -> dict:
"""Returns a dictionary generated via the success evaluation."""
if args.restart_browser:
self.RestartBrowserWithArgs(args.browser_args,
profile_dir=profile_dir,
profile_type=profile_type)
config = tracing_config.TracingConfig()
config.chrome_trace_config.SetProtoTraceFormat()
config.chrome_trace_config.category_filter.AddExcludedCategory('*')
if args.category.find(',') != -1:
config.chrome_trace_config.category_filter.AddFilterString(args.category)
else:
config.chrome_trace_config.category_filter.AddFilter(args.category)
config.enable_chrome_trace = True
tab = self.tab
tab.browser.platform.tracing_controller.StartTracing(config, 60)
url = getattr(self, args.origin.value)(test_path)
tab.Navigate(url, script_to_evaluate_on_commit=args.test_harness_script)
try:
tab.action_runner.WaitForJavaScriptCondition(args.finish_js_condition)
finally:
test_messages = tab.EvaluateJavaScript(
'domAutomationController._messages')
if test_messages:
logging.info('Logging messages from the test:\n%s', test_messages)
timeline_data = tab.browser.platform.tracing_controller.StopTracing()
trace_bytes = _MergePerfettoTraces(timeline_data)
self._MaybeSavePerfettoTraceAsArtifact(trace_bytes)
if args.success_eval_func:
prefixed_func_name = '_EvaluateSuccess_' + args.success_eval_func
with self._GetTraceProcessorForTrace(trace_bytes) as trace_processor:
results = getattr(self,
prefixed_func_name)(args.category, trace_processor,
args.other_args)
return results if results else {}
return {}
def RunActualGpuTest(self, test_path: str, args: ct.TestArgs) -> None:
params = args[0]
if isinstance(params, _TraceTestArguments):
self._RunActualGpuTraceTest(test_path, params)
elif isinstance(params, _CacheTraceTestArguments):
with tempfile.TemporaryDirectory() as cache_profile_dir:
load_params = params.GenerateFirstLoadTest()
results =\
self._RunActualGpuTraceTest(test_path,
load_params,
profile_dir=cache_profile_dir,
profile_type='exact')
for (hit_path, trace_params) in params.GenerateCacheHitTests(results):
self._RunActualGpuTraceTest(hit_path,
trace_params,
profile_dir=cache_profile_dir,
profile_type='clean')
@classmethod
def SetUpProcess(cls) -> None:
super().SetUpProcess()
cls.CustomizeBrowserArgs([])
cls.StartBrowser()
cls.SetStaticServerDirs(data_paths)
if cls._GetLocalPerfettoTraceProcessorPath():
return
if cls.child.worker_num == 1:
with tp.TraceProcessor(None, config=cls._GetTraceProcessorConfig()):
pass
else:
time.sleep(5)
@classmethod
def GenerateBrowserArgs(cls, additional_args: list[str]) -> list[str]:
"""Adds default arguments to |additional_args|.
See the parent class' method documentation for additional information.
"""
default_args = super(TraceIntegrationTest,
cls).GenerateBrowserArgs(additional_args)
default_args.extend([
cba.ENABLE_LOGGING,
cba.ENABLE_EXPERIMENTAL_WEB_PLATFORM_FEATURES,
cba.TEST_TYPE_GPU,
cba.DISABLE_DIRECT_SEARCH_ENGINE_PREWARM,
])
return default_args
@staticmethod
def _SwapChainPresentationModeListToStr(
presentation_mode_list: list[int]) -> str:
modes = [
overlay_support.PresentationModeEventToStr(m)
for m in presentation_mode_list
]
return f'[{",".join(modes)}]'
@staticmethod
def _DisabledByDefaultTraceCategory(category: str) -> str:
return f'disabled-by-default-{category}'
def _MaybeSavePerfettoTraceAsArtifact(self, trace: bytes) -> None:
if self.artifacts:
filename = (f'trace-for-ui.perfetto.dev-'
f'{datetime.datetime.now().isoformat()}.pb')
filename = filename.replace(':', '_')
self.artifacts.CreateArtifact(filename, filename, trace)
else:
logging.warning(
'Did not save trace as artifact due to not having an artifact '
'implementation set')
def _EvaluateSuccess_CheckGLCategory(self, category: str,
trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
del other_args
query = f"""\
SELECT
COUNT(*) as cnt
FROM
slices
JOIN
args
WHERE
slices.category = '{category}'
AND args.key = 'debug.gl_category'
AND args.string_value = 'gpu_toplevel'
AND slices.arg_set_id = args.arg_set_id
"""
for row in trace_processor.query(query):
if row.cnt <= 0:
self.fail(f'Trace markers for GPU category {category} were not found')
def _GetVideoExpectations(self, other_args: dict) -> '_VideoExpectations':
"""Helper for creating expectations for CheckVideoPath and CheckOverlayMode.
Args:
other_args: The |other_args| arg passed into the test.
Returns:
A _VideoExpectations instance with zero_copy, pixel_format, no_overlay,
and presentation_mode filled in.
"""
gpu = self.browser.GetSystemInfo().gpu.devices[0]
overlay_bot_config = overlay_support.GetOverlayConfigForGpu(gpu)
expected = _VideoExpectations()
expected.zero_copy = other_args.get('zero_copy', None)
expected.pixel_format = other_args.get('pixel_format', None)
expected.no_overlay = other_args.get('no_overlay', False)
video_rotation = other_args.get('video_rotation',
overlay_support.VideoRotation.UNROTATED)
video_is_not_scaled = other_args.get('full_size', False)
codec = other_args.get('codec', overlay_support.ZeroCopyCodec.UNSPECIFIED)
if overlay_bot_config.supports_overlays:
expected.pixel_format = overlay_bot_config.GetExpectedPixelFormat(
forced_pixel_format=expected.pixel_format)
expected.presentation_mode = (
overlay_bot_config.GetExpectedPresentationMode(
expected_pixel_format=expected.pixel_format,
video_rotation=video_rotation))
if expected.zero_copy is None and not expected.no_overlay:
expected.zero_copy = overlay_bot_config.GetExpectedZeroCopyUsage(
expected_pixel_format=expected.pixel_format,
video_rotation=video_rotation,
fullsize=video_is_not_scaled,
codec=codec)
return expected
def _EvaluateSuccess_CheckVideoPath(self, category: str,
trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
"""Verifies Chrome goes down the code path as expected.
Depending on whether hardware overlays are supported or not, which formats
are supported in overlays, whether video is downscaled or not, whether
video is rotated or not, Chrome's video presentation code path can be
different.
"""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
other_args = other_args or {}
expected = self._GetVideoExpectations(other_args)
swap_events = collections.defaultdict(dict)
pixel_format_key = 'debug.PixelFormat'
zero_copy_key = 'debug.ZeroCopy'
swap_event_query = f"""\
SELECT
slices.id,
key,
string_value,
int_value
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_SWAP_CHAIN_PRESENT_EVENT_NAME}'
AND args.arg_set_id = slices.arg_set_id
"""
for row in trace_processor.query(swap_event_query):
value = None
if row.key == pixel_format_key:
value = row.string_value
elif row.key == zero_copy_key:
value = bool(row.int_value)
else:
self.fail(f'Found {_SWAP_CHAIN_PRESENT_EVENT_NAME} event with arg '
f'{row.key}, which is not expected')
swap_events[row.id][row.key] = value
if expected.no_overlay and swap_events:
self.fail(f'Expected no overlay but got {len(swap_events)} '
f'{_SWAP_CHAIN_PRESENT_EVENT_NAME} events')
if not swap_events:
if expected.no_overlay:
return
self.fail(f'No {_SWAP_CHAIN_PRESENT_EVENT_NAME} events found')
for event_id, event_args in swap_events.items():
detected_pixel_format = event_args.get(pixel_format_key, None)
if detected_pixel_format is None:
self.fail(f'PixelFormat is missing from event '
f'{_SWAP_CHAIN_PRESENT_EVENT_NAME} with ID {event_id}')
if expected.pixel_format != detected_pixel_format:
self.fail(f'SwapChain pixel format mismatch, expected '
f'{expected.pixel_format} got {detected_pixel_format} for '
f'event with ID {event_id}')
detected_zero_copy = event_args.get(zero_copy_key, None)
if detected_zero_copy is None:
self.fail(f'ZeroCopy is missing from event '
f'{_SWAP_CHAIN_PRESENT_EVENT_NAME} with ID {event_id}')
if expected.zero_copy != detected_zero_copy:
self.fail(f'ZeroCopy mismatch, expected {expected.zero_copy} got '
f'{detected_zero_copy} for event with ID {event_id}')
def _EvaluateSuccess_CheckOverlayMode(self, category: str,
trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
"""Verifies video frames are promoted to overlays when supported."""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
other_args = other_args or {}
expected = self._GetVideoExpectations(other_args)
presentation_mode_history = []
composition_mode_query = f"""\
SELECT
int_value
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_GET_STATISTICS_EVENT_NAME}'
AND key = 'debug.CompositionMode'
AND args.arg_set_id = slices.arg_set_id
ORDER BY slices.id
"""
for row in trace_processor.query(composition_mode_query):
if expected.no_overlay:
self.fail(f'Expected no overlay got {_GET_STATISTICS_EVENT_NAME}')
presentation_mode_history.append(row.int_value)
get_statistics_count_query = f"""\
SELECT
COUNT(*) as cnt
FROM
slices
WHERE
category = '{category}'
AND name = '{_GET_STATISTICS_EVENT_NAME}'
"""
for row in trace_processor.query(get_statistics_count_query):
if row.cnt != len(presentation_mode_history):
self.fail(f'CompositionMode was missing from one or more '
f'{_GET_STATISTICS_EVENT_NAME} events. {row.cnt} total '
f'events found, {len(presentation_mode_history)} had '
f'CompositionMode')
if expected.no_overlay:
return
valid_entry_found = False
for index, mode in enumerate(reversed(presentation_mode_history)):
if index >= 3:
break
if mode in (overlay_support.PresentationModeEvent.NONE,
overlay_support.PresentationModeEvent.GET_STATISTICS_FAILED):
continue
if (overlay_support.PresentationModeEventToStr(mode)
!= expected.presentation_mode):
history_str = TraceIntegrationTest._SwapChainPresentationModeListToStr(
presentation_mode_history)
self.fail(f'SwapChain presentation mode mismatch, expected '
f'{expected.presentation_mode} got {history_str}')
valid_entry_found = True
if not valid_entry_found:
history_str = TraceIntegrationTest._SwapChainPresentationModeListToStr(
presentation_mode_history)
self.fail(f'No valid frame statistics being collected: {history_str}')
def _EvaluateSuccess_CheckSwapChainPath(self, category: str,
trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
"""Verifies that swap chains are used as expected for low latency canvas."""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
gpu = self.browser.GetSystemInfo().gpu.devices[0]
overlay_bot_config = overlay_support.GetOverlayConfigForGpu(gpu)
assert overlay_bot_config.direct_composition
expect_no_overlay = other_args and other_args.get('no_overlay', False)
expect_overlay = not expect_no_overlay
found_overlay = False
overlay_query = f"""\
SELECT
*
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_BEGIN_OVERLAY_ACCESS_EVENT_NAME}'
AND key = 'debug.debug_label'
AND string_value = 'SwapChainBuffer'
AND args.arg_set_id = slices.arg_set_id
"""
for _ in trace_processor.query(overlay_query):
found_overlay = True
break
if expect_overlay and not found_overlay:
self.fail(f'Overlay expected but not found: matching '
f'{_BEGIN_OVERLAY_ACCESS_EVENT_NAME} events were not found')
elif expect_no_overlay and found_overlay:
self.fail(f'Overlay not expected but found: matching '
f'{_BEGIN_OVERLAY_ACCESS_EVENT_NAME} events were found')
def _EvaluateSuccess_CheckSwapChainHasAlpha(
self, category: str, trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
"""Verified that all DXGI swap chains are presented with the expected alpha
mode."""
os_name = self.browser.platform.GetOSName()
assert os_name and os_name.lower() == 'win'
gpu = self.browser.GetSystemInfo().gpu.devices[0]
overlay_bot_config = overlay_support.GetOverlayConfigForGpu(gpu)
assert overlay_bot_config.direct_composition
expect_has_alpha = other_args and other_args.get('has_alpha', False)
has_present_swap_chain_event_with_has_alpha = False
swap_chain_query = f"""\
SELECT
int_value
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_PRESENT_SWAP_CHAIN_EVENT_NAME}'
AND key = 'debug.has_alpha'
AND args.arg_set_id = slices.arg_set_id
"""
for row in trace_processor.query(swap_chain_query):
has_present_swap_chain_event_with_has_alpha = True
got_has_alpha = bool(row.int_value)
if expect_has_alpha != got_has_alpha:
self.fail(f'Expected events with name {_PRESENT_SWAP_CHAIN_EVENT_NAME} '
f'with has_alpha expected {expect_has_alpha}, got '
f'{got_has_alpha}')
if not has_present_swap_chain_event_with_has_alpha:
self.fail(
f'Expected events with name {_PRESENT_SWAP_CHAIN_EVENT_NAME} and '
'has_alpha value, but were not found')
def _EvaluateSuccess_CheckWebGLCanvasCapture(
self, category: str, trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
if other_args is None:
return
self._CheckCanvasCaptureImpl(category, trace_processor, other_args)
def _EvaluateSuccess_CheckWebGPUCanvasCapture(
self, category: str, trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
self._CheckCanvasCaptureImpl(category, trace_processor, other_args)
def _CheckCanvasCaptureImpl(self, category: str,
trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
expected_one_copy = other_args.get('one_copy', None)
expected_accelerated_two_copy = other_args.get('accelerated_two_copy', None)
if expected_one_copy and expected_accelerated_two_copy:
self.fail('one_copy and accelerated_two_copy are mutually exclusive')
found_one_copy_event = False
one_copy_query = f"""\
SELECT
DISTINCT(int_value)
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME}'
AND key = 'debug.one_copy_canvas_capture'
AND args.arg_set_id = slices.arg_set_id
"""
for row in trace_processor.query(one_copy_query):
found_one_copy_event = True
detected_one_copy = bool(row.int_value)
if expected_one_copy != detected_one_copy:
self.fail(f'one_copy_canvas_capture mismatch, expected '
f'{expected_one_copy} got {detected_one_copy}')
found_accelerated_two_copy_event = False
two_copy_query = f"""\
SELECT
DISTINCT(int_value)
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME}'
AND key = 'debug.accelerated_frame_pool_copy'
AND args.arg_set_id = slices.arg_set_id
"""
for row in trace_processor.query(two_copy_query):
found_accelerated_two_copy_event = True
detected_accelerated_two_copy = bool(row.int_value)
if expected_accelerated_two_copy != detected_accelerated_two_copy:
self.fail(f'accelerated_frame_pool_copy mismatch, expected '
f'{expected_accelerated_two_copy} got '
f'{detected_accelerated_two_copy}')
if expected_one_copy is not None and found_one_copy_event is False:
self.fail(f'{_HTML_CANVAS_NOTIFY_LISTENERS_CANVAS_CHANGED_EVENT_NAME} '
f'events with one_copy_canvas_capture were not found')
if (expected_accelerated_two_copy is not None
and found_accelerated_two_copy_event is False):
self.fail(f'{_STATIC_BITMAP_TO_VID_FRAME_CONVERT_EVENT_NAME} events with '
f'accelerated_frame_pool_copy were not found')
def _EvaluateSuccess_CheckWebGPUFirstLoadCache(
self, category: str, trace_processor: tp.TraceProcessor,
_other_args: dict) -> dict:
cache_query = f"""\
SELECT
COUNT(*) as cnt
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_GPU_HOST_STORE_BLOB_EVENT_NAME}'
AND key = 'debug.handle_type'
AND int_value = {_WEBGPU_CACHE_HANDLE_TYPE}
AND args.arg_set_id = slices.arg_set_id
"""
stored_blobs = 0
for row in trace_processor.query(cache_query):
stored_blobs += row.cnt
if stored_blobs == 0:
self.fail('Expected at least 1 cache entry to be written.')
return {_MIN_CACHE_HIT_KEY: stored_blobs}
def _EvaluateSuccess_CheckWebGPUCacheHits(self, category: str,
trace_processor: tp.TraceProcessor,
other_args: dict) -> None:
cache_hit_query = f"""\
SELECT
COUNT(*) as cnt
FROM
slices
WHERE
category = '{category}'
AND name = '{_WEBGPU_BLOB_CACHE_HIT_EVENT_NAME}'
"""
cache_hits = 0
for row in trace_processor.query(cache_hit_query):
cache_hits += row.cnt
stored_blobs = other_args.get(_MIN_CACHE_HIT_KEY, 1)
if cache_hits == 0 or cache_hits < stored_blobs:
self.fail(f'WebGPU cache hits ({cache_hits}) is 0 or less than blobs '
f'stored ({stored_blobs}).')
cache_write_query = f"""\
SELECT
*
FROM
slices
JOIN
args
WHERE
category = '{category}'
AND name = '{_GPU_HOST_STORE_BLOB_EVENT_NAME}'
AND key = 'debug.handle_type'
AND int_value = {_WEBGPU_CACHE_HANDLE_TYPE}
AND args.arg_set_id = slices.arg_set_id
"""
for row in trace_processor.query(cache_write_query):
self.fail('Unexpected WebGPU cache entry was stored on reloaded page')
def _EvaluateSuccess_CheckNoWebGPUCacheHits(
self, category: str, trace_processor: tp.TraceProcessor,
_other_args: dict) -> None:
cache_hit_query = f"""\
SELECT
COUNT(*) as cnt
FROM
slices
WHERE
category = '{category}'
AND name = '{_WEBGPU_BLOB_CACHE_HIT_EVENT_NAME}'
"""
for row in trace_processor.query(cache_hit_query):
cache_hits = row.cnt
if cache_hits != 0:
self.fail(f'Expected 0 WebGPU cache hits, but got {cache_hits}.')
def _EvaluateSuccess_CheckMediaFoundationD3D11VideoCapture(
self, category: str, trace_processor: tp.TraceProcessor,
_other_args: dict) -> None:
del category
os_version = self.browser.platform.GetOSVersionName()
assert os_version
if os_version.lower() not in ['win10', 'win11']:
self.skipTest(
'MediaFoundationD3D11VideoCapture only available on win 10+')
js_succeeded = self.tab.EvaluateJavaScript(
'domAutomationController._succeeded')
self.assertTrue(js_succeeded)
found_events = {
_MFD3D11VC_ALTERNATIVE_MAP_EVENT_NAME: False,
_MFD3D11VC_CAPTURE_EVENT_NAME: False,
_MFD3D11VC_MAP_EVENT_NAME: False,
_MFD3D11VC_PRESENT_EVENT_NAME: False,
}
event_query = """\
SELECT
DISTINCT(name)
FROM
slices
"""
for row in trace_processor.query(event_query):
if row.name in found_events:
found_events[row.name] = True
if found_events[_MFD3D11VC_ALTERNATIVE_MAP_EVENT_NAME]:
found_events[_MFD3D11VC_MAP_EVENT_NAME] = True
if found_events[_MFD3D11VC_MAP_EVENT_NAME]:
found_events[_MFD3D11VC_ALTERNATIVE_MAP_EVENT_NAME] = True
for event_name, found in found_events.items():
if not found:
self.fail(f'No {event_name} events found')
@classmethod
def ExpectationsFiles(cls) -> list[str]:
return [
os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'test_expectations',
'trace_test_expectations.txt')
]
@dataclasses.dataclass
class _VideoExpectations():
"""Struct-like object for passing around video test expectations."""
pixel_format: str | None = None
zero_copy: bool | None = None
no_overlay: bool | None = None
presentation_mode: str | None = None
def _MergePerfettoTraces(trace_builder: trace_data.TraceDataBuilder) -> bytes:
"""Merge all Perfetto trace components into a single trace.
Args:
trace_builder: A Telemetry TraceDataBuilder containing the trace components
to merge.
Returns:
Bytes containing the merged trace.
"""
merged_trace = b''
for _, trace_filepath in trace_builder.IterTraceParts():
if trace_filepath.endswith('.pb.gz'):
with gzip.open(trace_filepath, 'rb') as infile:
merged_trace += infile.read()
else:
with open(trace_filepath, 'rb') as infile:
merged_trace += infile.read()
return merged_trace
def load_tests(loader: unittest.TestLoader, tests: Any,
pattern: Any) -> unittest.TestSuite:
del loader, tests, pattern
return gpu_integration_test.LoadAllTestsInModule(sys.modules[__name__])