GGiteeUP
0e6ce162创建于 2024年7月30日历史提交
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

#
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function
from __future__ import unicode_literals

import sys
import importlib

from prompt_toolkit.shortcuts import run_application
from prompt_toolkit.application import Application
from prompt_toolkit.key_binding.manager import KeyBindingManager
from prompt_toolkit.keys import Keys
from prompt_toolkit.layout.containers import Window
from prompt_toolkit.filters import IsDone
from prompt_toolkit.layout.controls import TokenListControl
from prompt_toolkit.layout.containers import ConditionalContainer
from prompt_toolkit.layout.containers import HSplit
from prompt_toolkit.layout.dimension import LayoutDimension as D
from prompt_toolkit.token import Token
from exceptions.ohos_exception import OHOSException
from services.interface.menu_interface import MenuInterface
from helper.separator import Separator
from containers.arg import Arg, ModuleType
from resources.config import Config
from util.log_util import LogUtil
from util.product_util import ProductUtil


class Menu(MenuInterface):

    def select_compile_option(self) -> dict:
        config = Config()
        results = {}
        all_build_args = Arg.parse_all_args(ModuleType.BUILD)
        for arg in all_build_args.values():
            if isinstance(arg, Arg) and arg.arg_attribute.get("optional"):
                if arg.arg_name != 'target_cpu':
                    choices = [
                        choice if isinstance(choice, Separator) else {
                            'name': choice,
                            'value': arg.arg_name
                        } for choice in arg.arg_attribute.get("optional")
                    ]
                else:
                    if config.support_cpu is not None and isinstance(config.support_cpu, list):
                        choices = []
                        for cpu in config.support_cpu:
                            choices.append({
                                'name': cpu,
                                'value': arg.arg_name,
                            })
                    elif config.target_cpu is not None:
                        choices = [
                            {
                                'name': config.target_cpu,
                                'value': arg.arg_name,
                            }
                        ]
                    else:
                        choices = [
                            {
                                'name': all_build_args.get('target_cpu').arg_value,
                                'value': arg.arg_name,
                            }
                        ]

                result = self._list_promt(arg.arg_name, 'select {} value'.format(
                    arg.arg_name), choices).get(arg.arg_name)[0]
                results[arg.arg_name] = result
        return results

    def select_product(self) -> dict:
        product_path_dict = {}
        company_separator = None
        os_level = self._select_os_level()
        for product_info in ProductUtil.get_products():
            if product_info['os_level'] is None:
                raise OHOSException("")
            if product_info['os_level'] == os_level:
                company = product_info['company']
                product = product_info['name']
                if company_separator is None or company_separator != company:
                    company_separator = company
                    product_key = Separator(company_separator)
                    product_path_dict[product_key] = None

                product_path_dict['{}@{}'.format(product,
                                                 company)] = product_info

        if not len(product_path_dict):
            raise OHOSException('no valid product found')

        choices = [
            product if isinstance(product, Separator) else {
                'name': product.split('@')[0],
                'value': product.split('@')[1]
            } for product in product_path_dict.keys()
        ]
        product = self._list_promt('product', 'Which product do you need?',
                                   choices).get('product')
        product_key = f'{product[0]}@{product[1]}'
        return product_path_dict.get(product_key)

    def _select_os_level(self) -> str:
        choices = [
            {
                'name': 'mini',
                'value': 'os_level'
            },
            {
                'name': 'small',
                'value': 'os_level'
            },
            {
                'name': 'standard',
                'value': 'os_level'
            }
        ]
        return self._list_promt('os_level', 'Which os_level do you need?',
                                choices).get('os_level')[0]

    def _list_promt(self, name: str, message: str, choices: list, **kwargs):
        questions = self._get_questions('list', name, message, choices)

        return self._prompt(questions=questions, **kwargs)

    def _get_questions(self, promt_type: str, name: str, message: str, choices: list):
        questions = [{
            'type': promt_type,
            'qmark': 'OHOS',
            'name': name,
            'message': message,
            'choices': choices
        }]

        return questions

    def _prompt(self, questions: list, answers=None, **kwargs):
        if isinstance(questions, dict):
            questions = [questions]
        answers = answers or {}

        patch_stdout = kwargs.pop('patch_stdout', False)
        return_asyncio_coroutine = kwargs.pop(
            'return_asyncio_coroutine', False)
        true_color = kwargs.pop('true_color', False)
        refresh_interval = kwargs.pop('refresh_interval', 0)
        eventloop = kwargs.pop('eventloop', None)
        kbi_msg = kwargs.pop('keyboard_interrupt_msg', 'Cancelled by user')

        for question in questions:
            try:
                choices = question.get('choices')
                if choices is not None and callable(choices):
                    question['choices'] = choices(answers)

                _kwargs = {}
                _kwargs.update(kwargs)
                _kwargs.update(question)
                question_type = _kwargs.pop('type')
                name = _kwargs.pop('name')
                message = _kwargs.pop('message')
                when = _kwargs.pop('when', None)
                question_filter = _kwargs.pop('filter', None)

                if when:
                    # at least a little sanity check!
                    if callable(question['when']):
                        try:
                            if not question['when'](answers):
                                continue
                        except Exception as error:
                            raise ValueError(
                                'Problem in \'when\' check of %s question: %s' %
                                (name, error))
                    else:
                        raise ValueError('\'when\' needs to be function that '
                                         'accepts a dict argument')
                if question_filter:
                    # at least a little sanity check!
                    if not callable(question['filter']):
                        raise ValueError('\'filter\' needs to be function that '
                                         'accepts an argument')

                if callable(question.get('default')):
                    _kwargs['default'] = question['default'](answers)

                application = self._question(message, **_kwargs)

                answer = run_application(
                    application,
                    patch_stdout=patch_stdout,
                    return_asyncio_coroutine=return_asyncio_coroutine,
                    true_color=true_color,
                    refresh_interval=refresh_interval,
                    eventloop=eventloop)

                if answer is not None:
                    if question_filter:
                        try:
                            answer = question['filter'](answer)
                        except Exception as error:
                            raise ValueError('Problem processing \'filter\' of'
                                             '{} question: {}'.format(name, error))
                    answers[name] = answer
            except AttributeError as attr_error:
                LogUtil.hb_error(attr_error)
                raise ValueError('No question type \'%s\'' % question_type)
            except KeyboardInterrupt:
                LogUtil.hb_warning('')
                LogUtil.hb_warning(kbi_msg)
                LogUtil.hb_warning('')
                return {}
        return answers

    def _question(self, message: str, **kwargs):
        if 'choices' not in kwargs:
            raise OHOSException("You must choose one platform.")

        choices = kwargs.pop('choices', None)
        qmark = kwargs.pop('qmark', '?')
        style = kwargs.pop('style', _get_style('terminal'))

        inquirer_control = InquirerControl(choices)

        def get_prompt_tokens(cli):
            tokens = []

            tokens.append((Token.QuestionMark, qmark))
            tokens.append((Token.Question, ' %s ' % message))
            if inquirer_control.answered:
                tokens.append((Token.Answer, ' ' +
                               inquirer_control.get_selection()[0]))
            else:
                tokens.append((Token.Instruction, ' (Use arrow keys)'))
            return tokens

        # assemble layout
        layout = HSplit([
            Window(height=D.exact(1),
                   content=TokenListControl(get_prompt_tokens)),
            ConditionalContainer(
                Window(inquirer_control),
                filter=~IsDone()
            )
        ])

        # key bindings
        manager = KeyBindingManager.for_prompt()

        @manager.registry.add_binding(Keys.ControlQ, eager=True)
        @manager.registry.add_binding(Keys.ControlC, eager=True)
        def _(event):
            raise KeyboardInterrupt()

        @manager.registry.add_binding(Keys.Down, eager=True)
        def move_cursor_down(event):
            def _next():
                inquirer_control.selected_option_index = (
                    (inquirer_control.selected_option_index + 1) %
                    inquirer_control.choice_count)
            _next()
            while isinstance(inquirer_control.choices[
                inquirer_control.selected_option_index][0], Separator) \
                    or inquirer_control.choices[
                        inquirer_control.selected_option_index][2]:
                _next()

        @manager.registry.add_binding(Keys.Up, eager=True)
        def move_cursor_up(event):
            def _prev():
                inquirer_control.selected_option_index = (
                    (inquirer_control.selected_option_index - 1) %
                    inquirer_control.choice_count)
            _prev()
            while isinstance(inquirer_control.choices[
                inquirer_control.selected_option_index][0], Separator) \
                    or inquirer_control.choices[
                        inquirer_control.selected_option_index][2]:
                _prev()

        @manager.registry.add_binding(Keys.Enter, eager=True)
        def set_answer(event):
            inquirer_control.answered = True
            event.cli.set_return_value(inquirer_control.get_selection())

        return Application(
            layout=layout,
            key_bindings_registry=manager.registry,
            mouse_support=False,
            style=style
        )


class InquirerControl(TokenListControl):
    def __init__(self, choices: list, **kwargs):
        self.selected_option_index = 0
        self.answered = False
        self.choices = choices
        self._init_choices(choices)
        super(InquirerControl, self).__init__(self._get_choice_tokens,
                                              **kwargs)

    @property
    def choice_count(self):
        return len(self.choices)
    
    def get_selection(self):
        return self.choices[self.selected_option_index]

    def _init_choices(self, choices: list, default=None):
        # helper to convert from question format to internal format
        self.choices = []  # list (name, value, disabled)
        searching_first_choice = True
        for index, choice in enumerate(choices):
            if isinstance(choice, Separator):
                self.choices.append((choice, None, None))
            else:
                base_string = str if sys.version_info[0] >= 3 else None
                if isinstance(choice, base_string):
                    self.choices.append((choice, choice, None))
                else:
                    name = choice.get('name')
                    value = choice.get('value', name)
                    disabled = choice.get('disabled', None)
                    self.choices.append((name, value, disabled))
                if searching_first_choice:
                    self.selected_option_index = index
                    searching_first_choice = False

    def _get_choice_tokens(self, cli):
        tokens = []
        token = Token

        def append(index: int, choice: list):
            selected = (index == self.selected_option_index)

            @_if_mousedown
            def select_item(cli, mouse_event):
                # bind option with this index to mouse event
                self.selected_option_index = index
                self.answered = True

            tokens.append((token.Pointer if selected else token, ' \u276f '
                          if selected else '   '))
            if selected:
                tokens.append((Token.SetCursorPosition, ''))
            if choice[2]:  # disabled
                tokens.append((token.Selected if selected else token,
                               '- %s (%s)' % (choice[0], choice[2])))
            else:
                if isinstance(choice[0], Separator):
                    tokens.append((token.Separator,
                                  str(choice[0]),
                                  select_item))
                else:
                    try:
                        tokens.append((token.Selected if selected else token,
                                      str(choice[0]), select_item))
                    except Exception:
                        tokens.append((token.Selected if selected else
                                      token, choice[0], select_item))
            tokens.append((token, '\n'))

        # prepare the select choices
        for i, choice in enumerate(self.choices):
            append(i, choice)
        tokens.pop()  # Remove last newline.
        return tokens


def _get_style(style_type: str):
    style = importlib.import_module('prompt_toolkit.styles')
    token = importlib.import_module('prompt_toolkit.token')
    if style_type == 'terminal':
        return style.style_from_dict({
            token.Token.Separator: '#75c951',
            token.Token.QuestionMark: '#5F819D',
            token.Token.Selected: '',  # default
            token.Token.Pointer: '#FF9D00 bold',  # AWS orange
            token.Token.Instruction: '',  # default
            token.Token.Answer: '#FF9D00 bold',  # AWS orange
            token.Token.Question: 'bold',
        })
    if style_type == 'answer':
        return style.style_from_dict({
            token.Token.Separator: '#75c951',
            token.Token.QuestionMark: '#E91E63 bold',
            token.Token.Selected: '#cc5454',  # default
            token.Token.Pointer: '#ed9164 bold',
            token.Token.Instruction: '',  # default
            token.Token.Answer: '#f44336 bold',
            token.Token.Question: '',
        })

    return None


def _if_mousedown(handler):
    def handle_if_mouse_down(cli, mouse_event):
        mouse_events = importlib.import_module('prompt_toolkit.mouse_events')
        if mouse_event.event_type == mouse_events.MouseEventTypes.MOUSE_DOWN:
            return handler(cli, mouse_event)
        else:
            return NotImplemented

    return handle_if_mouse_down