"""Test analysis functions for the testing framework.
"""
from collections import defaultdict
import datetime
import logging
import re
import os
from typing import List, Set
from models import Action
from models import CoverageTest
from models import CoverageTestsByPlatform
from models import ActionType
from models import CoverageTestsByPlatformSet
from models import TestId
from models import TestIdsTestNamesByPlatformSet
from models import TestIdTestNameTuple
from models import TestPartitionDescription
from models import TestPlatform
def filter_tests_for_partition(tests: List[CoverageTest],
partition: TestPartitionDescription
) -> List[CoverageTest]:
def DoesTestHaveActionWithPrefixes(test: CoverageTest):
"""Returns if the given tests has any actions with the given prefixes"""
nonlocal partition
for action in test.actions:
for prefix in partition.action_name_prefixes:
if action.name.startswith(prefix):
return True
return list(filter(DoesTestHaveActionWithPrefixes, tests))
def compare_and_print_tests_to_remove_and_add(
existing_tests: TestIdsTestNamesByPlatformSet,
required_tests: CoverageTestsByPlatformSet,
test_partitions: List[TestPartitionDescription],
default_partition: TestPartitionDescription, add_to_file: bool):
"""
Given the existing tests on disk and the required tests, print out the
changes that need to happen to make them match. This also takes into account
test partitioning, so tests are asked to be written to the appropriate test
partition file.
Note: This does NOT support moving tests between partition files. If a test
was found in any partition file, then it is ignored.
"""
def print_tests(filename: str, tests: List[CoverageTest],
partition: TestPartitionDescription, add_to_file: bool):
new_test_str: str = ""
for test in tests:
new_test_str += ("\n" + test.generate_browsertest(partition) +
"\n")
if add_to_file:
if os.path.exists(filename):
with open(filename, "r") as f:
test_file = f.read()
if re.search(r"IN_PROC_BROWSER_TEST_F(.|\n)*?}\n", test_file):
res = re.finditer(r"IN_PROC_BROWSER_TEST_F(.|\n)*?}\n",
test_file)
last_test_end_index = list(res)[-1].end()
elif "}" in test_file:
last_test_end_index = test_file.find("}") - 1
else:
last_test_end_index = len(test_file)
new_content = (test_file[:last_test_end_index] + new_test_str +
test_file[last_test_end_index:])
with open(filename, "w") as f:
f.write(new_content)
else:
print(f"\n\nCreate a new test file: {filename}\n"
"Remember to add the new test file to the BUILD file.\n"
"Add the following tests to the new test file:\n"
f"{new_test_str}")
else:
print(f"\n\nAdd the following tests to {filename}:\n"
f"{new_test_str}")
test_ids_to_keep: TestIdsByPlatformSet = defaultdict(lambda: set())
for platforms, tests in required_tests.items():
tests_to_add: List[CoverageTest] = []
for test in tests:
if platforms in existing_tests:
existing_test_set = set(
[test_id for (test_id, _) in existing_tests[platforms]])
if test.id not in existing_test_set:
tests_to_add.append(test)
else:
test_ids_to_keep[platforms].add(test.id)
else:
tests_to_add.append(test)
tests_added_to_partition: Set[TestId] = set()
for partition in test_partitions:
tests_to_add_partition = filter_tests_for_partition(
tests_to_add, partition)
if not tests_to_add_partition:
continue
for test in tests_to_add_partition:
if test.id in tests_added_to_partition:
raise ValueError(
"Cannot have a test written to multiple test files.")
tests_added_to_partition.add(test.id)
filename = partition.generate_browsertest_filepath(platforms)
print_tests(filename, tests_to_add_partition, partition,
add_to_file)
default_tests: List[CoverageTest] = [
test for test in tests_to_add
if test.id not in tests_added_to_partition
]
if not default_tests:
continue
filename = default_partition.generate_browsertest_filepath(platforms)
print_tests(filename, default_tests, default_partition, add_to_file)
for platforms, test_ids_names in existing_tests.items():
tests_to_remove = []
prompt_str = ""
nice_platform_str = ", ".join(
[f"{platform}" for platform in platforms])
if platforms not in test_ids_to_keep:
prompt_str = (f"\n\nRemove ALL tests from the file for the "
f"platforms [{nice_platform_str}]:\n")
tests_to_remove = [test_name for (_, test_name) in test_ids_names]
else:
prompt_str = (f"\n\nRemove these tests from the file for the "
f"platforms [{nice_platform_str}]:\n")
tests_to_remove = [
test_name for (test_id, test_name) in test_ids_names
if test_id not in test_ids_to_keep[platforms]
]
if not tests_to_remove:
continue
print(f"{prompt_str}{', '.join(tests_to_remove)}")
def expand_parameterized_tests(coverage_tests: List[CoverageTest]
) -> List[CoverageTest]:
"""
Takes a list of coverage tests that contain parameterized actions, and
expands all of the tests with those actions to result in a list of tests
without parameterized actions.
"""
def get_all_parameterized_tests(test_actions: List[Action]
) -> List[List[Action]]:
"""
Takes a list of actions with possible parameterized actions, and outputs
a list of resulting tests with all parameterized actions expanded.
"""
if not test_actions:
return [[]]
for i, action in enumerate(test_actions):
if action.type is not ActionType.PARAMETERIZED:
continue
actions_before_parameterized = test_actions[:i]
actions_after_parameterized = test_actions[i + 1:]
resulting_tests = []
for output_action in action.output_actions:
remaining_expanded_tests = get_all_parameterized_tests(
actions_after_parameterized)
for remaining_test in remaining_expanded_tests:
test = (actions_before_parameterized + [output_action] +
remaining_test)
resulting_tests.append(test)
return resulting_tests
return [test_actions]
result_tests = []
for test in coverage_tests:
expanded_tests = get_all_parameterized_tests(test.actions)
logging.info(f"Generated {len(expanded_tests)} test/s from {test.id}")
for resulting_test in get_all_parameterized_tests(test.actions):
result_tests.append(CoverageTest(resulting_test, test.platforms))
return result_tests
def filter_coverage_tests_for_platform(tests: List[CoverageTest],
platform: TestPlatform
) -> List[CoverageTest]:
def IsSupportedOnPlatform(test: CoverageTest):
return platform in test.platforms
return list(filter(IsSupportedOnPlatform, tests))
def partition_framework_tests_per_platform_combination(
generated_tests_per_platform: CoverageTestsByPlatform
) -> CoverageTestsByPlatformSet:
test_id_to_platforms = defaultdict(lambda: set())
test_id_to_test = {}
platform_set_to_tests = defaultdict(lambda: list())
for platform, tests in generated_tests_per_platform.items():
for test in tests:
test_id_to_platforms[test.id].add(platform)
if test.id not in test_id_to_test:
test_id_to_test[test.id] = CoverageTest(test.actions, set())
test_id_to_test[test.id].platforms.add(platform)
for test_id, platforms in test_id_to_platforms.items():
platforms = frozenset(platforms)
platform_set_to_tests[platforms].append(test_id_to_test[test_id])
return platform_set_to_tests