'''
公共工具类,函数
'''
import hashlib
import os.path
import re
import time
import json
import requests
from dateutil.relativedelta import relativedelta
from dateutil import parser
import datetime
from pypinyin import lazy_pinyin
import functools
import io
import uuid
import pandas as pd
import importlib
class Singleton(type):
'''
单例模式
'''
def __init__(cls, *args, **kwargs):
cls.__instance = None
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
def import_class(class_path):
'''
导入模块
:param class_path:
:return:
'''
module_name, class_name = class_path.rsplit('.', 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)
def gen_json_response(data=None, code=200, msg='ok', msg_key='msg', code_key='code', data_key='data', extends={}):
'''
生成web接口返回json格式数据
:param data:
:param code:
:param msg:
:return:
'''
res_data = {
code_key: code,
msg_key: msg,
**extends
}
if data is not None:
res_data[data_key] = data
return res_data
def gen_uuid(res_type='source'):
'''
生成uuid唯一号
:return:
'''
uid = str(uuid.uuid4())
if res_type == 'base':
return uid.replace('-', '')
return uid
def gen_dict_hash_key(dic):
'''
对字典生成 hash key
'''
sorted_tag_keys = sorted(list(dic.keys()))
new_dic = {k: dic[k] for k in sorted_tag_keys}
return md5(str(new_dic))
def get_now_time(res_type='int'):
'''
返回当前时间
'''
t = time.time()
if res_type == 'int':
return int(t)
if res_type == 'ms':
return int(t * 1000)
if res_type == 'ns':
return int(t * 1000000000)
if res_type == 'datetime':
return timestamp_to_date(t)
return t
def trans_value_type(value, trans_type='str'):
'''
转换值类型
:param value:
:param trans_type:
:return:
'''
if trans_type == 'str':
value = str(value)
elif trans_type == 'int':
try:
value = int(value)
except Exception as e:
print(e)
elif trans_type == 'float':
try:
value = float(value)
except Exception as e:
print(e)
elif trans_type in ['date', 'datetime', 'timestamp']:
try:
value = format_date(value, res_type=trans_type)
except Exception as e:
print(e)
return value
def print_run_time(func):
'''
打印函数运行时间装饰器
:param func:
:return:
'''
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.time()
r = func(*args, **kwargs)
cost = time.time() - start
cost = round(cost, 3)
print(f"{func.__name__}函数执行了{cost}s")
return r
return wrapper
def parse_json(source, default=None):
'''
转为python对象
:param source:
:return:
'''
if isinstance(source, bytes):
source = source.decode()
if isinstance(source, str):
try:
source = json.loads(source)
except Exception as e:
print(e)
source = None
if source is None:
return default
return source
def gen_json_to_dict_code(dic, indent=2):
'''
将json字符串转为字典代码字符串
:param dic:
:param indent:
:return:
'''
dict_code = json.dumps(dic, indent=indent, ensure_ascii=False)
dict_code = '\n'.join(['{'] + [' ' * indent + i for i in dict_code.split('\n')[1:-1]] + [' ' * indent + '}'])
dict_code = dict_code.replace(': true', ': True').replace(': false', ': False').replace(': null', ': None')
return dict_code
def flatten_dict(dic, key, if_exist='replace'):
'''
对字典嵌套字典字段摊平到上级字典,如{'field': {'a': 1, 'b': 'hehe'}, 'c': 2020} -> {'a': 1, 'b': 'hehe', 'c': 2020}
'''
dic2 = dic.get(key, {})
for k in dic2:
if k not in dic or (k in dic and if_exist == 'replace'):
dic[k] = dic2[k]
dic.pop(key)
return dic
def get_json_value(val):
'''
将python变量转为json变量
:param val:
:return:
'''
value = f"'{val}'" if isinstance(val, str) else val
if value == True:
value = 'true'
elif value == False:
value = 'false'
elif value is None:
value = 'null'
return value
def parse_data_to_excel(data):
'''
将数据转为excel文件
:param data:
:return:
'''
df = pd.DataFrame(data)
print(df)
output = io.BytesIO()
df.to_excel(output, index=False)
output.seek(0)
return output
def request_url(url, method='get', params={}, headers={}, data={}, json={}, timeout=30, retry=3, proxy=None):
'''
请求url
:param url:
:param retry:
:return:
'''
req_times = 0
while req_times < retry:
try:
res = requests.request(method, url, headers=headers, params=params, data=data, json=json, stream=True, timeout=timeout, proxies=proxy)
return res
except Exception as e:
print(e)
req_times += 1
res = requests.request(method, url, headers=headers, params=params, data=data, stream=True, timeout=timeout)
return res
def trans_rule_value(value):
'''
转换条件筛选时的值
:param value:
:return:
'''
if not isinstance(value, str):
return value
print(value)
time_dict = {
's': 1,
'm': 60,
'h': 3600,
'd': 86400,
'w': 86400 * 7,
'M': 86400 * 30,
'Y': 86400 * 365
}
if value.startswith('timestamp:'):
t = value[-1]
now = get_now_time()
try:
value = int(value[10:-1].strip())
value = now + value * time_dict[t]
return value
except Exception as e:
print(e)
elif value.startswith('timestamp_ms:'):
t = value[-1]
now = get_now_time()
try:
value = int(value[13:-1].strip())
value = (now + value * time_dict[t]) * 1000
return value
except Exception as e:
print(e)
elif value.startswith('time:'):
t = value[-1]
now = get_now_time()
try:
value = int(value[5:-1].strip())
value = now + value * time_dict[t]
value = timestamp_to_date(value)
except Exception as e:
print(e)
elif value.startswith('date:'):
now = datetime.datetime.now()
try:
value = format_date(now, value[5:].strip())
value = format_date(value)
except Exception as e:
print(e)
elif value.startswith('int:'):
try:
value = int(value[4:])
except Exception as e:
print(e)
elif value.startswith('int:'):
try:
value = int(value[6:])
except Exception as e:
print(e)
return value
def get_now_date():
'''
获取当前时间
:return:
'''
return str(datetime.datetime.now)[:19]
def format_date(date, format='%Y-%m-%d %H:%M:%S', res_type='str', default=None):
'''
格式化日期
:param date:
:return:
'''
try:
if isinstance(date, int):
date = timestamp_to_date(date)
date = parser.parse(date)
if isinstance(date, str):
date = parser.parse(date)
elif not isinstance(date, datetime.datetime):
date = str(date)
date = parser.parse(date)
date = date.strftime(format)
if res_type == 'date':
return date
if res_type == 'datetime':
date = datetime.datetime.strptime(date, format)
return date
if res_type == 'timestamp':
return date_to_timestamp(date)
date = str(date)
if len(date) > 19:
date = date[:19]
if '.000Z' in date:
date = date.replace('.000Z', '')
if 'T' in date:
date = date.replace('T', ' ')
return date
except Exception as e:
print(e)
return default
def date_to_timestamp(date, default=None):
'''
日期转时间戳
:param date:
:return:
'''
try:
if not isinstance(date, str):
date = str(date)
if len(date) > 19:
date = date[:19]
if 'T' in date:
date = date.replace('T', ' ').replace('.000Z', '')
if len(date) == 10:
timeArray = time.strptime(date, "%Y-%m-%d")
else:
timeArray = time.strptime(date, "%Y-%m-%d %H:%M:%S")
timeStamp = int(time.mktime(timeArray))
return timeStamp
except Exception as e:
print(e)
return default
def timestamp_to_date(timestamp, defalut=None):
'''
时间戳转日期
:param date:
:return:
'''
try:
if timestamp > 1000000000 * 1000 * 1000:
timestamp = timestamp / 1000 / 1000
if timestamp > 1000000000 * 1000:
timestamp = timestamp / 1000
if timestamp != '' and timestamp is not None:
timeArray = time.localtime(timestamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
return otherStyleTime
else:
return defalut
except Exception as e:
print(e)
return defalut
def trans_time_length(time_length):
'''
转换时间长度,转为秒数
:param time_length:
:return:
'''
time_symbol_dict = {
'Y': 86400 * 365,
'M': 86400 * 30,
'W': 86400 * 7,
'd': 86400,
'h': 3600,
'm': 60,
's': 1
}
if time_length == 'forever':
return 86400 * 365 * 100
else:
symbol = time_length[-1]
length = time_length[:-1]
symbol_len = time_symbol_dict.get(symbol)
return int(length) * symbol_len
def get_date_list(start_date, end_date):
'''
获取日期列表
'''
start_time = start_date
end_time = end_date
datestart = datetime.datetime.strptime(start_time, '%Y-%m-%d')
dateend = datetime.datetime.strptime(end_time, '%Y-%m-%d')
date_list = []
date_list.append(datestart.strftime('%Y-%m-%d'))
while datestart < dateend:
datestart += relativedelta(days=+1)
date_list.append(datestart.strftime('%Y-%m-%d'))
query_list = []
for i in range(len(date_list)):
s = date_list[i]
query_list.append(s)
return query_list
def md5(content):
'''
将内容转为MD5值
:return:
'''
md = hashlib.md5()
if isinstance(content, bytes):
md.update(content)
else:
md.update(content.encode("utf8"))
return md.hexdigest()
def sha256(content):
'''
将内容转为sha256
:param content:
:return:
'''
s = hashlib.sha256()
if isinstance(content, bytes):
s.update(content)
else:
s.update(content.encode("utf8"))
return s.hexdigest()
def parse_to_string(val):
'''
将字段或列表转换为json字符串
:param val:
:return:
'''
if isinstance(val, dict) or isinstance(val, list):
return json.dumps(val)
return val
def parse_to_int(val, default=0):
'''
将字段或列表转换为数字
:param val:
:return:
'''
try:
val = int(val)
except Exception as e:
print(e)
val = default
return val
def format_str(str1='', type='lower'):
'''
去除特殊字符
:param str1:
:param type:
:return:
'''
if str1:
try:
temp = str1.decode('utf-8', 'ignore')
except BaseException:
temp = str1
xx = u"([\u4e00-\u9fa5a-zA-Z0-9]+)"
pattern = re.compile(xx)
results = pattern.findall(str(temp))
if results:
long_str = ''.join(results)
if type == 'lower':
return long_str.lower()
elif type == 'upper':
return long_str.upper()
else:
return long_str
else:
return ''
else:
return ''
def _lazy_pinyin(*args):
'''
中文转拼音
:param args:
:return:
'''
return lazy_pinyin(args[0])
def _change_to_lower(*args):
'''
转小写
:param args:
:return:
'''
return str(args[0]).lower()
def _change_to_upper(*args):
'''
转大写
:param args:
:return:
'''
return str(args[0]).upper()
def read_file(file):
'''
读取文件转为文件对象
'''
file_obj = None
if isinstance(file, str):
if file.startswith('http:'):
try:
res = request_url(file)
file_obj = io.BytesIO(res.content)
except Exception as e:
print(e)
else:
try:
file_obj = open(file, 'rb')
except Exception as e:
print(e)
return file_obj
def read_file_path(file):
'''
读取文件转为文件路径,网络文件生成临时文件
'''
if not isinstance(file, str):
file = str(file)
if file.startswith('http:'):
try:
suffix = file.split('.')[-1]
res = request_url(file)
if res.status_code == 200:
tmp_path = f'{gen_uuid()}.{suffix}'
tmp_file = open(tmp_path, 'wb')
tmp_file.write(res.content)
return True, True, tmp_path
else:
return False, False, f'请求网络文件错误:{res.text}'
except Exception as e:
return False, False, f'请求网络文件错误:{e}'
else:
if os.path.exists(file):
return True, False, file
else:
return False, False, '文件未找到'
def trans_dict_to_rules(api_form):
'''
对dict类型数据转换为筛选规则
'''
params_list = []
for k in api_form:
params_list.append((k, [api_form[k]]))
extract_rules = []
for k, values in params_list:
field = ''
fields = re.findall('\[(.*?)\]', k)
if fields != ['']:
field = '.'.join(fields)
if '[' in k:
rule = re.findall('(.*?)\[', k)[0]
else:
rule = k
for value in values:
extract_rules.append({'field': field, 'rule': rule, 'value': value})
print(extract_rules)
return extract_rules
def get_res_fields(res_data):
'''
获取返回字段列表
'''
res_fields = []
if isinstance(res_data, list) and res_data != []:
dic = res_data[0]
if isinstance(dic, dict):
res_fields = list(dic.keys())
if isinstance(res_data, dict):
if 'records' in res_data:
if res_data['records'] != []:
res_fields = list(res_data['records'][0].keys())
else:
res_fields = []
else:
if 'code' in res_data and res_data['code'] != 200:
return []
res_fields = list(res_data.keys())
return list(set(res_fields))
def parse_to_list(s, split_text=','):
'''
将字符串数据转换为列表
:param s:
:param split_text:
:return:
'''
if s is None:
return []
if isinstance(s, str):
if s == '':
return []
else:
s = s.split(split_text)
return s
def convert_to_json_serializable(value):
"""
将值转换为 JSON 可序列化的格式。
"""
if isinstance(value, (pd.Timestamp, pd.Period)):
return str(value)
elif isinstance(value, (list, dict)):
try:
json.dumps(value)
return value
except TypeError:
return str(value)
elif pd.isna(value):
return ""
else:
return value
def df_to_list(df):
if isinstance(df, pd.DataFrame):
for col in df.select_dtypes(include=['datetime']).columns:
df[col] = df[col].astype(str)
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
df[col].fillna(0, inplace=True)
else:
df[col].fillna("", inplace=True)
data_li = df.to_dict(orient='records')
elif isinstance(df, list):
data_li = df
else:
data_li = [df]
for record in data_li:
for key, value in record.items():
record[key] = convert_to_json_serializable(value)
return data_li
if __name__ == '__main__':
print(md5('faad49866e9498fc1719f5289e7a0269'))