"""
NOTE:
- This scripts is a demo to import example data import Qlib
- !!!!!!!!!!!!!!!TODO!!!!!!!!!!!!!!!!!!!:
- Its structure is not well designed and very ugly, your contribution is welcome to make importing dataset easier
"""
from datetime import date, datetime as dt
import os
from pathlib import Path
import random
import shutil
import time
import traceback
from arctic import Arctic, chunkstore
import arctic
from arctic import Arctic, CHUNK_STORE
from arctic.chunkstore.chunkstore import CHUNK_SIZE
import fire
from joblib import Parallel, delayed, parallel
import numpy as np
import pandas as pd
from pandas import DataFrame
from pandas.core.indexes.datetimes import date_range
from pymongo.mongo_client import MongoClient
DIRNAME = Path(__file__).absolute().resolve().parent
N_JOBS = -1
LOG_FILE_PATH = DIRNAME / "log_file"
DATA_PATH = DIRNAME / "raw_data"
DATABASE_PATH = DIRNAME / "orig_data"
DATA_INFO_PATH = DIRNAME / "data_info"
DATA_FINISH_INFO_PATH = DIRNAME / "./data_finish_info"
DOC_TYPE = ["Tick", "Order", "OrderQueue", "Transaction", "Day", "Minute"]
MAX_SIZE = 3000 * 1024 * 1024 * 1024
ALL_STOCK_PATH = DATABASE_PATH / "all.txt"
ARCTIC_SRV = "127.0.0.1"
def get_library_name(doc_type):
if str.lower(doc_type) == str.lower("Tick"):
return "ticks"
else:
return str.lower(doc_type)
def is_stock(exchange_place, code):
if exchange_place == "SH" and code[0] != "6":
return False
if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30":
return False
return True
def add_one_stock_daily_data(filepath, type, exchange_place, arc, date):
"""
exchange_place: "SZ" OR "SH"
type: "tick", "orderbook", ...
filepath: the path of csv
arc: arclink created by a process
"""
code = os.path.split(filepath)[-1].split(".csv")[0]
if exchange_place == "SH" and code[0] != "6":
return
if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30":
return
df = pd.read_csv(filepath, encoding="gbk", dtype={"code": str})
code = os.path.split(filepath)[-1].split(".csv")[0]
def format_time(day, hms):
day = str(day)
hms = str(hms)
if hms[0] == "1":
return (
"-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:2], hms[2:4], hms[4:6] + "." + hms[6:]])
)
else:
return (
"-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:1], hms[1:3], hms[3:5] + "." + hms[5:]])
)
timestamp = list(zip(list(df["date"]), list(df["time"])))
error_index_list = []
for index, t in enumerate(timestamp):
try:
pd.Timestamp(format_time(t[0], t[1]))
except Exception:
error_index_list.append(index)
if len(error_index_list) > 0:
print("error: {}, {}".format(filepath, len(error_index_list)))
df = df.drop(error_index_list)
timestamp = list(zip(list(df["date"]), list(df["time"])))
pd_timestamp = pd.DatetimeIndex(
[pd.Timestamp(format_time(timestamp[i][0], timestamp[i][1])) for i in range(len(df["date"]))]
)
df = df.drop(columns=["date", "time", "name", "code", "wind_code"])
df["date"] = pd.to_datetime(pd_timestamp)
df.set_index("date", inplace=True)
if str.lower(type) == "orderqueue":
df["ab"] = [
",".join([str(int(row["ab" + str(i + 1)])) for i in range(0, row["ab_items"])])
for timestamp, row in df.iterrows()
]
df = df.drop(columns=["ab" + str(i) for i in range(1, 51)])
type = get_library_name(type)
lib = arc[type]
symbol = "".join([exchange_place, code])
if symbol in lib.list_symbols():
print("update {0}, date={1}".format(symbol, date))
if df.empty == True:
return error_index_list
lib.update(symbol, df, chunk_size="D")
else:
print("write {0}, date={1}".format(symbol, date))
lib.write(symbol, df, chunk_size="D")
return error_index_list
def add_one_stock_daily_data_wrapper(filepath, type, exchange_place, index, date):
pid = os.getpid()
code = os.path.split(filepath)[-1].split(".csv")[0]
arc = Arctic(ARCTIC_SRV)
try:
if index % 100 == 0:
print("index = {}, filepath = {}".format(index, filepath))
error_index_list = add_one_stock_daily_data(filepath, type, exchange_place, arc, date)
if error_index_list is not None and len(error_index_list) > 0:
f = open(os.path.join(LOG_FILE_PATH, "temp_timestamp_error_{0}_{1}_{2}.txt".format(pid, date, type)), "a+")
f.write("{}, {}, {}\n".format(filepath, error_index_list, exchange_place + "_" + code))
f.close()
except Exception as e:
info = traceback.format_exc()
print("error:" + str(e))
f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, date, type)), "a+")
f.write("fail:" + str(filepath) + "\n" + str(e) + "\n" + str(info) + "\n")
f.close()
finally:
arc.reset()
def add_data(tick_date, doc_type, stock_name_dict):
pid = os.getpid()
if doc_type not in DOC_TYPE:
print("doc_type not in {}".format(DOC_TYPE))
return
try:
begin_time = time.time()
os.system(f"cp {DATABASE_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} {DATA_PATH}/")
os.system(
f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SH"
)
os.system(
f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SZ"
)
os.system(f"chmod 777 {DATA_PATH}")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH/{tick_date}")
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ/{tick_date}")
print("tick_date={}".format(tick_date))
temp_data_path_sh = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SH", tick_date)
temp_data_path_sz = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SZ", tick_date)
is_files_exist = {"sh": os.path.exists(temp_data_path_sh), "sz": os.path.exists(temp_data_path_sz)}
sz_files = (
(
set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sz) if i[:2] == "30" or i[0] == "0"])
& set(stock_name_dict["SZ"])
)
if is_files_exist["sz"]
else set()
)
sz_file_nums = len(sz_files) if is_files_exist["sz"] else 0
sh_files = (
(
set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sh) if i[0] == "6"])
& set(stock_name_dict["SH"])
)
if is_files_exist["sh"]
else set()
)
sh_file_nums = len(sh_files) if is_files_exist["sh"] else 0
print("sz_file_nums:{}, sh_file_nums:{}".format(sz_file_nums, sh_file_nums))
f = (DATA_INFO_PATH / "data_info_log_{}_{}".format(doc_type, tick_date)).open("w+")
f.write("sz:{}, sh:{}, date:{}:".format(sz_file_nums, sh_file_nums, tick_date) + "\n")
f.close()
if sh_file_nums > 0:
Parallel(n_jobs=N_JOBS)(
delayed(add_one_stock_daily_data_wrapper)(
os.path.join(temp_data_path_sh, name + ".csv"), doc_type, "SH", index, tick_date
)
for index, name in enumerate(list(sh_files))
)
if sz_file_nums > 0:
Parallel(n_jobs=N_JOBS)(
delayed(add_one_stock_daily_data_wrapper)(
os.path.join(temp_data_path_sz, name + ".csv"), doc_type, "SZ", index, tick_date
)
for index, name in enumerate(list(sz_files))
)
os.system(f"rm -f {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)}")
os.system(f"rm -rf {DATA_PATH}/{tick_date + '_' + doc_type}")
total_time = time.time() - begin_time
f = (DATA_FINISH_INFO_PATH / "data_info_finish_log_{}_{}".format(doc_type, tick_date)).open("w+")
f.write("finish: date:{}, consume_time:{}, end_time: {}".format(tick_date, total_time, time.time()) + "\n")
f.close()
except Exception as e:
info = traceback.format_exc()
print("date error:" + str(e))
f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, tick_date, doc_type)), "a+")
f.write("fail:" + str(tick_date) + "\n" + str(e) + "\n" + str(info) + "\n")
f.close()
class DSCreator:
"""Dataset creator"""
def clear(self):
client = MongoClient(ARCTIC_SRV)
client.drop_database("arctic")
def initialize_library(self):
arc = Arctic(ARCTIC_SRV)
for doc_type in DOC_TYPE:
arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE)
def _get_empty_folder(self, fp: Path):
fp = Path(fp)
if fp.exists():
shutil.rmtree(fp)
fp.mkdir(parents=True, exist_ok=True)
def import_data(self, doc_type_l=["Tick", "Transaction", "Order"]):
for fp in LOG_FILE_PATH, DATA_INFO_PATH, DATA_FINISH_INFO_PATH, DATA_PATH:
self._get_empty_folder(fp)
arc = Arctic(ARCTIC_SRV)
for doc_type in DOC_TYPE:
arc.set_quota(get_library_name(doc_type), MAX_SIZE)
arc.reset()
for doc_type in doc_type_l:
date_list = list(set([int(path.split("_")[0]) for path in os.listdir(DATABASE_PATH) if doc_type in path]))
date_list.sort()
date_list = [str(date) for date in date_list]
f = open(ALL_STOCK_PATH, "r")
stock_name_list = [lines.split("\t")[0] for lines in f.readlines()]
f.close()
stock_name_dict = {
"SH": [stock_name[2:] for stock_name in stock_name_list if "SH" in stock_name],
"SZ": [stock_name[2:] for stock_name in stock_name_list if "SZ" in stock_name],
}
lib_name = get_library_name(doc_type)
a = Arctic(ARCTIC_SRV)
stock_name_exist = a[lib_name].list_symbols()
lib = a[lib_name]
initialize_count = 0
for stock_name in stock_name_list:
if stock_name not in stock_name_exist:
initialize_count += 1
pdf = pd.DataFrame(index=[pd.Timestamp("1900-01-01")])
pdf.index.name = "date"
lib.write(stock_name, pdf)
print("initialize count: {}".format(initialize_count))
print("tasks: {}".format(date_list))
a.reset()
date_list = ["20201231"]
Parallel(n_jobs=min(2, len(date_list)))(
delayed(add_data)(date, doc_type, stock_name_dict) for date in date_list
)
if __name__ == "__main__":
fire.Fire(DSCreator)