import argparse
import logging
import re
import shutil
import subprocess
import sys
import util
_FIND_TRAILER = '''self.trailers()
.filter(|trailer| trailer.key().lower() == "{}")
.map(|trailer| trailer.value())
.join(",")'''
_BUGGED_MISSING_ERR = '''\
Searching for bugs requires the CLI tool "bugged" to be installed.
This tool is only available to Googlers (see go/bugged for install instructions)
'''
def _calculate_bugs(bugs: str) -> set[str]:
bugs = set(bugs.split(','))
bugs.discard('')
return bugs
def _search_bugs(query: str | None):
args = [
'bugged', 'search', '--error-if-not-found=true',
'--columns=issue,reporter,assignee,status,summary'
]
if query:
args.append(query)
ps = util.run_command(
args,
stdout=subprocess.PIPE,
check=False,
text=True,
)
if ps.returncode == 5:
print('No results found. Try another query?')
return _search_bugs(input('>>> '))
elif ps.returncode != 0:
print('Issue tracker search failed', file=sys.stderr)
exit(1)
lines = [
re.split(r'\s+', line, maxsplit=1)
for line in ps.stdout.rstrip().split('\n')
]
print(f'# {lines[0][1]}')
issues = {}
for i, (issue, line) in reversed(list(enumerate(lines))[1:]):
issues[str(i)] = issue
fmt = f'{i:>2} {line}'
print(f'\033[36m{fmt}\033[0m' if i % 2 else fmt)
print('Type the number in the left column to select an issue')
print('Type "q [query]" to search for something different')
while True:
answer = input('>>> ')
if answer.startswith('q '):
return _search_bugs(answer.removeprefix('q '))
elif answer == '':
return _search_bugs(None)
elif answer in issues:
return issues[answer]
print('Invalid response')
def _add_handler(args, revs: str):
direct = {
x['change_id']
for x in util.jj_log(
revisions=f'{revs}',
templates={
'change_id': 'change_id',
},
ignore_working_copy=True,
)
}
ancestors = util.jj_log(
revisions=f'mutable()::({revs})',
templates={
'change_id': 'change_id',
'parents': util.MUTABLE_PARENTS,
'trailers': 'trailers',
'desc': 'description',
'bugs': _FIND_TRAILER.format('bug'),
'fixed': _FIND_TRAILER.format('fixed'),
},
ignore_working_copy=True,
)
revs = {rev['change_id']: rev for rev in ancestors}
for rev in ancestors:
rev['fixed'] = _calculate_bugs(rev['fixed'])
rev['bugs'] = _calculate_bugs(rev['bugs'])
bugs = set(args.bug)
if args.query or (not bugs and not args.inherit):
if args.query:
print('Searching for bugs on issue tracker...')
else:
print('No bugs provided. Searching for bugs on issue tracker...')
bugged = shutil.which('bugged')
if bugged is None:
print(_BUGGED_MISSING_ERR, file=sys.stderr)
exit(1)
bugs.add(_search_bugs(args.query))
for rev in reversed(ancestors):
if rev['change_id'] not in direct:
continue
bugs_to_add = set(bugs)
if args.inherit:
if rev['parents']:
for change_id in rev['parents'].split(','):
bugs_to_add.update(revs[change_id]['bugs'])
bugs_to_add -= rev['bugs']
bugs_to_add -= rev['fixed']
(rev['fixed'] if args.fixed else rev['bugs']).update(bugs_to_add)
tag = 'Fixed' if args.fixed else 'Bug'
if bugs_to_add:
util.add_trailers(
rev=rev,
trailers={tag: [','.join(bugs_to_add)]},
commit=True,
)
def main(args):
logging.basicConfig(level=logging.getLevelNamesMapping()[args.verbosity])
revs = util.join_revsets(args.revision)
args.func(args, revs)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'--verbosity',
help='Verbosity of logging',
default='INFO',
choices=logging.getLevelNamesMapping().keys(),
type=lambda x: x.upper(),
)
subparsers = parser.add_subparsers()
parser_add = subparsers.add_parser('add')
parser_add.set_defaults(func=_add_handler)
parser_add.add_argument(
'-r',
'--revision',
help='The revisions to add the bug to',
action='append',
required=True,
)
parser_add.add_argument(
'-f',
'--fixed',
help='Mark the revision as fixing the bug',
action='store_true',
)
parser_add.add_argument(
'-i',
'--inherit',
help='Inherit the bug from parent commits. Bugs are not transitively ' +
'inherited unless the parent is also part of -r',
action='store_true',
)
parser_add.add_argument(
'-b',
'--bug',
help='The bug to add to the revisions',
action='append',
default=[],
)
parser_add.add_argument(
'-q',
'--query',
help='Query to use when searching for a bug in issue tracker',
default=None,
)
main(parser.parse_args())