5b2cd287创建于 2024年7月29日历史提交
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

#
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import codecs
import sys
import time
import os
import json
import logging.config
import stat
from logging import FileHandler


class SafeFileHandler(FileHandler):
    def __init__(self, filename, mode="a", encoding="utf-8", delay=0, suffix="%Y-%m-%d_%H"):
        if codecs is None:
            encoding = None
        current_time = time.strftime(suffix, time.localtime())
        FileHandler.__init__(self, filename + "." + current_time, mode, encoding, delay)

        self.filename = os.fspath(filename)

        self.mode = mode
        self.encoding = encoding
        self.suffix = suffix
        self.suftime = current_time

    def emit(self, record):
        try:
            if self.parse_file_name():
                self.gen_file_name()
            FileHandler.emit(self, record)
        except Exception as e:
            print(e)
            self.handleError(record)

    def parse_file_name(self):
        time_tuple = time.localtime()

        if self.suftime != time.strftime(self.suffix, time_tuple) or not os.path.exists(
                os.path.abspath(self.filename) + '.' + self.suftime):
            return 1
        else:
            return 0

    def gen_file_name(self):
        if self.stream:
            self.stream.close()
            self.stream = None

        if self.suftime != "":
            index = self.baseFilename.find("." + self.suftime)
            if index == -1:
                index = self.baseFilename.rfind(".")
            self.baseFilename = self.baseFilename[:index]

        cur_time = time.localtime()
        self.suftime = time.strftime(self.suffix, cur_time)
        self.baseFilename = os.path.abspath(self.filename) + "." + self.suftime

        if not self.delay:
            with os.fdopen(os.open(self.baseFilename, os.O_WRONLT | os.O_CREAT | os.O_EXCL,
                           stat.S_IWUSR | stat.S_IRUSR), self.mode, encoding=self.encoding) as f:
                self.stream = f


def get_logger(class_name, level="info"):
    formate = "%(asctime)s -%(levelname)s - %(name)s - %(message)s"
    formatter = logging.Formatter(formate)
    log_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "log")
    if not os.path.exists(log_path):
        os.makedirs(log_path)
    tfr_handler = SafeFileHandler(os.path.join(log_path, class_name + ".log"))
    tfr_handler.setFormatter(formatter)

    sh = logging.StreamHandler()
    sh.setFormatter(formatter)

    logger = logging.getLogger(class_name)
    logger.addHandler(tfr_handler)
    logger.addHandler(sh)

    if level == 'info':
        logger.setLevel(logging.INFO)
    elif level == 'error':
        logger.setLevel(logging.ERROR)
    return logger


def parse_json():
    config_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "build_example.json")
    try:
        with os.fdopen(os.open(config_path, os.O_WRONLT | os.O_CREAT | os.O_EXCL,
                       stat.S_IWUSR | stat.S_IRUSR), "r", encoding="utf-8") as json_file:
            data = json.load(json_file)
            return data
    except Exception as e:
        print(e)
    return None