Ttwx1284527fix code linter
1ca9fd8f创建于 2024年2月27日历史提交
#!/usr/bin/env python3
# coding: utf-8

"""
Copyright (c) 2024 Huawei Device Co., Ltd.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

Description: utils for test suite
"""
import argparse
import json
import os
import stat
import time as times
from datetime import datetime, timedelta, time

import requests
import yaml
from lxml import etree

from result import get_result


def parse_config():
    config_file_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), '../config.yaml')
    with open(config_file_path, 'r', encoding='utf-8') as config_file:
        configs = yaml.safe_load(config_file)
    return configs


def get_url(name, page):
    url_prefix = 'https://gitee.com/openharmony/'
    url_suffix = f'/pulls?assignee_id=&author_id=&label_ids=&label_text=&milestone_id=&page={page}' \
                 f'&priority=&project_type=&scope=&search=&single_label_id=&single_label_text=&' \
                 f'sort=created_at+desc&status=merged&target_project=&tester_id='
    url = url_prefix + name + url_suffix

    return url


def get_html(url):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
                      ' (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
    }
    try:
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return response.text
    except Exception:
        print("Failed to request the page")
        return ''
    return ''


def write_data(repo_name, data_file, title, committer, commit_time_str, pr_link):
    data = {
        'repo_name': repo_name,
        'title': title,
        'committer': committer,
        'commit_time_str': commit_time_str,
        'pr_link': pr_link
    }
    flags = os.O_WRONLY | os.O_CREAT
    mode = stat.S_IWUSR | stat.S_IRUSR
    with os.fdopen(os.open(data_file, flags, mode), 'a', encoding='utf-8') as file:
        json.dump(data, file, ensure_ascii=False)
        file.write('\n')


def get_commit_records(repo_name, commit_start_time, commit_end_time):
    data_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.txt')
    current_data_count = 0
    page = 1
    is_continue = True
    while is_continue:
        url = get_url(repo_name, str(page))
        html = get_html(url)
        tree = etree.HTML(html)
        commit_list = tree.xpath('/html/body/div[2]/div[2]/div[2]/div[2]/div')
        if not commit_list:
            break
        for commit_task in commit_list:
            title = commit_task.xpath('.//div[1]/a/text()')[0]
            committer = commit_task.xpath('.//div[3]/span[2]/a/span/text()')[0]
            commit_time_str = commit_task.xpath('.//div[3]/span[4]/span/text()')[0].strip()
            pr_link = commit_task.xpath('.//div[1]/a/@href')[0]
            commit_time = datetime.strptime(commit_time_str, '%Y-%m-%d %H:%M')
            if commit_start_time <= commit_time <= commit_end_time:
                current_data_count = current_data_count + 1
                write_data(repo_name, data_file, title, committer, commit_time_str, pr_link)
            if commit_time < commit_start_time:
                is_continue = False
        page += 1

    if current_data_count == 0:
        print(f"repo {repo_name} no commit records were found within the specified time range")
        failed_message = (f'this repo has no commit record from {commit_start_time}'
                          f' to {commit_end_time.strftime("%Y-%m-%d %H:%M:%S")}')
        write_data(repo_name, data_file, failed_message, None, None, None)

    return current_data_count


def retry_after_crawl_failed(repo_list, commit_start_time, commit_end_time):
    max_retries = 5
    try_in = 2 * 60
    data_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'data.txt')
    if os.path.exists(data_file):
        os.remove(data_file)
    for i in range(max_retries):
        try:
            data_count = 0
            for repo_name in repo_list:
                current_data_count = get_commit_records(repo_name, commit_start_time, commit_end_time)
                data_count = data_count + current_data_count
            print(f'The data was successfully obtained, a total of {data_count} commit records were retrieved')
            print(f'Data statistics from {commit_start_time} to {commit_end_time.strftime("%Y-%m-%d %H:%M:%S")}'
                  f' were successfully retrieved')
            return True
        except Exception:
            print(f"get data failed! retrying... ({i + 1}/{max_retries})")
            times.sleep(try_in)

    return False


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--startTime', type=str, dest='start_time', default=None,
                        help='specify crawl start time')
    parser.add_argument('--commitRepo', type=str, dest='commit_repo', default=None,
                        nargs='+',
                        help='get commit message in those repos')
    return parser.parse_args()


def clean_log():
    commit_log_html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                    'commit_log.html')
    if os.path.exists(commit_log_html_path):
        os.remove(commit_log_html_path)


def run():
    clean_log()
    repo_list_configs = parse_config()
    end_time = datetime.now()
    yesterday = datetime.now() - timedelta(days=1)
    start_time = datetime(yesterday.year, yesterday.month, yesterday.day, 0, 0, 0)
    repo_list = repo_list_configs.get('repo_list')

    arguments = parse_args()
    commit_start_time = repo_list_configs.get('commit_start_time') if arguments.start_time is None \
        else arguments.start_time
    if commit_start_time is not None:
        time_str = datetime.strptime(commit_start_time, '%Y-%m-%d')
        start_time = datetime.combine(time_str, time.min)
        end_time = start_time + timedelta(days=1)

    success = retry_after_crawl_failed(repo_list, start_time, end_time)
    if not success:
        print("Maximum retries reached, failed to crawl the data")
    else:
        get_result()


if __name__ == '__main__':
    run()