"""Adapter for printing legacy recipe output
TODO(https://crbug.com/326904531): This file is intended to be a temporary
workaround and should be replaced once this bug is resolved"""
import json
import logging
import os
import re
import sys
class StoringLogger(logging.Logger):
"""A logger that stores messages that are not logged."""
_storage = []
@classmethod
def clear_step(cls):
cls._storage = []
@classmethod
def log_last_step(cls):
for logger, line in cls._storage:
super().log(logger, logging.INFO, line)
def log(self, level, msg, *args, **kwargs):
if self.isEnabledFor(level):
super().log(level, msg, *args, **kwargs)
elif self.__class__._storage is not None:
self.__class__._storage.append((self, msg % args if args else msg))
logging.setLoggerClass(StoringLogger)
basic_logger = logging.getLogger('basic_logger')
basic_logger.addHandler(logging.StreamHandler(sys.stdout))
basic_logger.propagate = False
class PassthroughAdapter:
"""Doesn't filter anything, just logs everything from the recipe run."""
def ProcessLine(self, line):
basic_logger.log(logging.DEBUG, line)
def EnsureFailurePrinted(self):
pass
class LegacyOutputAdapter:
"""Interprets the legacy recipe run mode output to logging
This will filter, route and in some cases reformat the output to trace levels
of logging. This will cause specific output (e.g. unfiltered step names) to
always print to std out or when -v is passed, the stdout will additionally
be passed to the logging stdout. Note -vv will cause PassthroughAdapter to
interpret results"""
SEED_STEP_TEXT = '@@@SEED_STEP@'
STEP_CLOSED_TEXT = '@@@STEP_CLOSED@@@'
ANNOTATOR_PREFIX_SUFIX = '@@@'
TRIGGER_STEP_PREFIX = 'test_pre_run.[trigger] '
TRIGGER_LINK_TEXT = '@@@STEP_LINK@task UI:'
UTR_LOG_NAME = 'utr_log'
def __init__(self):
self._trigger_link_re = re.compile(r'.+@(https://.+)@@@$')
self._ninja_status_re = re.compile(r'\[(\d+)\/(\d+)\]')
self._collect_wait_re = re.compile(
r'.+prpc call (.+) swarming.v2.Tasks.ListTaskStates, stdin: '
r'(\{"task_id": .+\})$'
)
self._result_links_re = re.compile(
r'@@@STEP_LINK@shard (#\d+) test results@(https://[^@]+)@@@')
self._current_proccess_fn = self._StepNameProcessLine
self._step_to_processors = {
'compile': self._ProcessCompileLine,
'reclient compile': self._ProcessCompileLine,
'test_pre_run.[trigger] ': self._ProcessTriggerLine,
'collect tasks.wait for tasks': self._ProcessCollectLine,
'download compilation outputs': self._PrintOnlyStepName,
}
self._step_to_log_level = {
'lookup_builder_gn_args': logging.DEBUG,
'git rev-parse': logging.DEBUG,
'git diff to instrument': logging.DEBUG,
'save paths of affected files': logging.DEBUG,
'preprocess for reclient.start reproxy via bootstrap': logging.INFO,
'preprocess for reclient': logging.DEBUG,
'process clang crashes': logging.DEBUG,
'compile confirm no-op': logging.DEBUG,
'postprocess for reclient': logging.DEBUG,
'setup_build': logging.DEBUG,
'get compile targets for scripts': logging.DEBUG,
'lookup GN args': logging.DEBUG,
'install infra/tools/luci/isolate': logging.DEBUG,
'find command lines': logging.DEBUG,
'test_pre_run.install infra/tools/luci/swarming': logging.DEBUG,
'isolate tests': logging.DEBUG,
'read GN args': logging.DEBUG,
'test_pre_run.[trigger] ': logging.INFO,
'test_pre_run.': logging.DEBUG,
'collect tasks.wait for tasks': logging.INFO,
'collect tasks': logging.DEBUG,
'$debug - all results': logging.DEBUG,
'Test statistics': logging.DEBUG,
'read gclient': logging.DEBUG,
'write output_properties_file': logging.DEBUG,
'prepare skylab tests.': logging.DEBUG,
'update invocation instructions': logging.DEBUG,
}
self._single_line_logger = None
self._terminal_columns = -1
if sys.stdout.isatty() and sys.stderr.isatty():
logger = logging.getLogger('single_line_logger')
handler = logging.StreamHandler(sys.stdout)
handler.terminator = ''
logger.addHandler(handler)
logger.propagate = False
self._single_line_logger = logger
self._terminal_columns, _ = os.get_terminal_size()
self._last_line = ''
self._last_line_teriminal_lines = 0
self._current_log_level = logging.DEBUG
self._current_step_name = ''
self._dot_count = 0
self._last_step_lines = []
self._last_log_level = self._current_log_level
root_logger = logging.getLogger()
self._default_logger = logging.getLogger('default_logger')
self._default_logger.setLevel(root_logger.level)
for handler in root_logger.handlers:
self._default_logger.addHandler(handler)
self._default_logger.propagate = False
def _PrintCurrentStepName(self, log_level):
self._default_logger.log(log_level, '\n[cyan]Running: %s[/]',
self._current_step_name)
def _StdoutProcessLine(self, line):
if line.startswith(f'@@@STEP_LOG_LINE@{self.UTR_LOG_NAME}@'):
line = line[len(f'@@@STEP_LOG_LINE@{self.UTR_LOG_NAME}@'):-3]
if line.startswith(self.ANNOTATOR_PREFIX_SUFIX):
return
is_urlish = re.match(r'^http[s]?://\S+$', line)
if is_urlish:
self._default_logger.log(self._current_log_level, line)
else:
basic_logger.log(self._current_log_level, line)
def _StepNameProcessLine(self, line):
if line.startswith(self.SEED_STEP_TEXT):
self._PrintCurrentStepName(self._current_log_level)
return
self._StdoutProcessLine(line)
def _PrintOnlyStepName(self, line):
if line.startswith(self.SEED_STEP_TEXT):
self._PrintCurrentStepName(self._current_log_level)
if line.startswith(f'@@@STEP_LOG_LINE@{self.UTR_LOG_NAME}@'):
line = line[len(f'@@@STEP_LOG_LINE@{self.UTR_LOG_NAME}@'):-3]
if not line.startswith(self.ANNOTATOR_PREFIX_SUFIX):
self._last_step_lines.append(line)
def _ProcessTriggerLine(self, line):
if line.startswith(self.SEED_STEP_TEXT + self.TRIGGER_STEP_PREFIX):
test_name = line[len(self.SEED_STEP_TEXT +
self.TRIGGER_STEP_PREFIX):line.index(' (') if ' (' in
line else -len(self.ANNOTATOR_PREFIX_SUFIX)]
self._step_to_processors[test_name] = self._ProcessResult
self._step_to_log_level[test_name] = logging.DEBUG
elif line.startswith(self.TRIGGER_LINK_TEXT):
matches = self._trigger_link_re.match(line)
if matches:
task_name = self._current_step_name[len(self.TRIGGER_STEP_PREFIX):]
basic_logger.log(self._current_log_level,
f'Triggered {task_name}: ' + matches[1])
else:
self._StdoutProcessLine(line)
def _ProcessCompileLine(self, line):
if line.startswith(self.SEED_STEP_TEXT):
self._PrintCurrentStepName(logging.INFO)
return
matches = self._ninja_status_re.match(line)
if self._single_line_logger and matches:
self._single_line_logger.log(self._current_log_level, '\33[2K')
if self._last_line_teriminal_lines > 1:
for _ in range(self._last_line_teriminal_lines - 1):
self._single_line_logger.log(self._current_log_level, '\33[A\33[2K')
self._single_line_logger.log(self._current_log_level, '\r' + line)
self._single_line_logger.handlers[0].flush()
return
if self._single_line_logger and self._last_line.startswith('['):
basic_logger.log(self._current_log_level, '')
self._StdoutProcessLine(line)
def _ProcessCollectLine(self, line):
if line.startswith(self.SEED_STEP_TEXT):
self._PrintCurrentStepName(logging.INFO)
matches = self._collect_wait_re.match(line)
if matches:
if not self._single_line_logger:
basic_logger.log(self._current_log_level, line)
return
task_ids = json.loads(matches[2])['task_id']
self._dot_count = (self._dot_count % 5) + 1
self._single_line_logger.log(
self._current_log_level,
f'\33[2K\rStill waiting on: {len(task_ids)} shard(s)' +
'.' * self._dot_count)
return
if line == self.STEP_CLOSED_TEXT and self._single_line_logger:
self._single_line_logger.log(self._current_log_level,
'\33[2K\rStill waiting on: 0 shard(s)...')
basic_logger.log(self._current_log_level, '')
def _ProcessResult(self, line):
matches = self._result_links_re.match(line)
if matches:
basic_logger.log(self._current_log_level,
'Test results for %s shard %s: %s',
self._current_step_name, matches[1], matches[2])
def ProcessLine(self, line):
if line.startswith(self.SEED_STEP_TEXT):
StoringLogger.clear_step()
self._current_step_name = line[len(self.SEED_STEP_TEXT
):-len(self.ANNOTATOR_PREFIX_SUFIX)]
self._current_proccess_fn = self._get_processor(self._current_step_name)
self._current_log_level = self._get_log_level(self._current_step_name)
self._last_log_level = self._current_log_level
self._last_step_lines = []
if self._current_proccess_fn:
self._current_proccess_fn(line)
self._last_line = line
self._last_line_teriminal_lines = int(
(len(line) - 1) / self._terminal_columns) + 1
if line.startswith(self.STEP_CLOSED_TEXT):
self._current_log_level = logging.DEBUG
self._current_proccess_fn = None
def EnsureFailurePrinted(self):
if self._last_step_lines:
basic_logger.log(self._last_log_level, '\n'.join(self._last_step_lines))
StoringLogger.log_last_step()
def _get_processor(self, step_name):
if step_name in self._step_to_processors:
return self._step_to_processors[step_name]
for match_name in self._step_to_processors:
if step_name.startswith(match_name):
return self._step_to_processors[match_name]
return self._StepNameProcessLine
def _get_log_level(self, step_name):
if step_name in self._step_to_log_level:
return self._step_to_log_level[step_name]
for match_name in self._step_to_log_level:
if step_name.startswith(match_name):
return self._step_to_log_level[match_name]
return logging.INFO