6a946761创建于 2022年2月6日历史提交
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from arctic.arctic import Arctic
import qlib
from qlib.data import D
import unittest


class TestClass(unittest.TestCase):
    """
    Useful commands
    - run all tests: pytest examples/orderbook_data/example.py
    - run a single test:  pytest -s --pdb --disable-warnings examples/orderbook_data/example.py::TestClass::test_basic01
    """

    def setUp(self):
        """
        Configure for arctic
        """
        provider_uri = "~/.qlib/qlib_data/yahoo_cn_1min"
        qlib.init(
            provider_uri=provider_uri,
            mem_cache_size_limit=1024**3 * 2,
            mem_cache_type="sizeof",
            kernels=1,
            expression_provider={"class": "LocalExpressionProvider", "kwargs": {"time2idx": False}},
            feature_provider={
                "class": "ArcticFeatureProvider",
                "module_path": "qlib.contrib.data.data",
                "kwargs": {"uri": "127.0.0.1"},
            },
            dataset_provider={
                "class": "LocalDatasetProvider",
                "kwargs": {
                    "align_time": False,  # Order book is not fixed, so it can't be align to a shared fixed frequency calendar
                },
            },
        )
        # self.stocks_list = ["SH600519"]
        self.stocks_list = ["SZ000725"]

    def test_basic(self):
        # NOTE: this data contains a lot of zeros in $askX and $bidX
        df = D.features(
            self.stocks_list,
            fields=["$ask1", "$ask2", "$bid1", "$bid2"],
            freq="ticks",
            start_time="20201230",
            end_time="20210101",
        )
        print(df)

    def test_basic_without_time(self):
        df = D.features(self.stocks_list, fields=["$ask1"], freq="ticks")
        print(df)

    def test_basic01(self):
        df = D.features(
            self.stocks_list,
            fields=["TResample($ask1, '1min', 'last')"],
            freq="ticks",
            start_time="20201230",
            end_time="20210101",
        )
        print(df)

    def test_basic02(self):
        df = D.features(
            self.stocks_list,
            fields=["$function_code"],
            freq="transaction",
            start_time="20201230",
            end_time="20210101",
        )
        print(df)

    def test_basic03(self):
        df = D.features(
            self.stocks_list,
            fields=["$function_code"],
            freq="order",
            start_time="20201230",
            end_time="20210101",
        )
        print(df)

    # Here are some popular expressions for high-frequency
    # 1) some shared expression
    expr_sum_buy_ask_1 = "(TResample($ask1, '1min', 'last') + TResample($bid1, '1min', 'last'))"
    total_volume = (
        "TResample("
        + "+".join([f"${name}{i}" for i in range(1, 11) for name in ["asize", "bsize"]])
        + ", '1min', 'sum')"
    )

    @staticmethod
    def total_func(name, method):
        return "TResample(" + "+".join([f"${name}{i}" for i in range(1, 11)]) + ",'1min', '{}')".format(method)

    def test_exp_01(self):
        exprs = []
        names = []
        for name in ["asize", "bsize"]:
            for i in range(1, 11):
                exprs.append(f"TResample(${name}{i}, '1min', 'mean') / ({self.total_volume})")
                names.append(f"v_{name}_{i}")
        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)

    # 2) some often used papers;
    def test_exp_02(self):
        spread_func = (
            lambda index: f"2 * TResample($ask{index} - $bid{index}, '1min', 'last') / {self.expr_sum_buy_ask_1}"
        )
        mid_func = (
            lambda index: f"2 * TResample(($ask{index} + $bid{index})/2, '1min', 'last') / {self.expr_sum_buy_ask_1}"
        )

        exprs = []
        names = []
        for i in range(1, 11):
            exprs.extend([spread_func(i), mid_func(i)])
            names.extend([f"p_spread_{i}", f"p_mid_{i}"])
        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)

    def test_exp_03(self):
        expr3_func1 = (
            lambda name, index_left, index_right: f"2 * TResample(Abs(${name}{index_left} - ${name}{index_right}), '1min', 'last') / {self.expr_sum_buy_ask_1}"
        )
        for name in ["ask", "bid"]:
            for i in range(1, 10):
                exprs = [expr3_func1(name, i + 1, i)]
                names = [f"p_diff_{name}_{i}_{i+1}"]
        exprs.extend([expr3_func1("ask", 10, 1), expr3_func1("bid", 1, 10)])
        names.extend(["p_diff_ask_10_1", "p_diff_bid_1_10"])
        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)

    def test_exp_04(self):
        exprs = []
        names = []
        for name in ["asize", "bsize"]:
            exprs.append(f"(({ self.total_func(name, 'mean')}) / 10) / {self.total_volume}")
            names.append(f"v_avg_{name}")

        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)

    def test_exp_05(self):
        exprs = [
            f"2 * Sub({ self.total_func('ask', 'last')}, {self.total_func('bid', 'last')})/{self.expr_sum_buy_ask_1}",
            f"Sub({ self.total_func('asize', 'mean')}, {self.total_func('bsize', 'mean')})/{self.total_volume}",
        ]
        names = ["p_accspread", "v_accspread"]

        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)

    #  (p|v)_diff_(ask|bid|asize|bsize)_(time_interval)
    def test_exp_06(self):
        t = 3
        expr6_price_func = (
            lambda name, index, method: f'2 * (TResample(${name}{index}, "{t}s", "{method}") - Ref(TResample(${name}{index}, "{t}s", "{method}"), 1)) / {t}'
        )
        exprs = []
        names = []
        for i in range(1, 11):
            for name in ["bid", "ask"]:
                exprs.append(
                    f"TResample({expr6_price_func(name, i, 'last')}, '1min', 'mean') / {self.expr_sum_buy_ask_1}"
                )
                names.append(f"p_diff_{name}{i}_{t}s")

        for i in range(1, 11):
            for name in ["asize", "bsize"]:
                exprs.append(f"TResample({expr6_price_func(name, i, 'mean')}, '1min', 'mean') / {self.total_volume}")
                names.append(f"v_diff_{name}{i}_{t}s")

        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)

    # TODOs:
    # Following expressions may be implemented in the future
    # expr7_2 = lambda funccode, bsflag, time_interval: \
    #     "TResample(TRolling(TEq(@transaction.function_code,  {}) & TEq(@transaction.bs_flag ,{}), '{}s', 'sum') / \
    #     TRolling(@transaction.function_code, '{}s', 'count') , '1min', 'mean')".format(ord(funccode), bsflag,time_interval,time_interval)
    # create_dataset(7, "SH600000", [expr7_2("C")] + [expr7(funccode, ordercode) for funccode in ['B','S'] for ordercode in ['0','1']])
    # create_dataset(7,  ["SH600000"], [expr7_2("C", 48)] )

    @staticmethod
    def expr7_init(funccode, ordercode, time_interval):
        # NOTE: based on on order frequency (i.e. freq="order")
        return f"Rolling(Eq($function_code,  {ord(funccode)}) & Eq($order_kind ,{ord(ordercode)}), '{time_interval}s', 'sum') / Rolling($function_code, '{time_interval}s', 'count')"

    # (la|lb|ma|mb|ca|cb)_intensity_(time_interval)
    def test_exp_07_1(self):
        # NOTE: based on transaction frequency (i.e. freq="transaction")
        expr7_3 = (
            lambda funccode, code, time_interval: f"TResample(Rolling(Eq($function_code,  {ord(funccode)}) & {code}($ask_order, $bid_order) , '{time_interval}s', 'sum')   / Rolling($function_code, '{time_interval}s', 'count') , '1min', 'mean')"
        )

        exprs = [expr7_3("C", "Gt", "3"), expr7_3("C", "Lt", "3")]
        names = ["ca_intensity_3s", "cb_intensity_3s"]

        df = D.features(self.stocks_list, fields=exprs, freq="transaction")
        df.columns = names
        print(df)

    trans_dict = {"B": "a", "S": "b", "0": "l", "1": "m"}

    def test_exp_07_2(self):
        # NOTE: based on on order frequency
        expr7 = (
            lambda funccode, ordercode, time_interval: f"TResample({self.expr7_init(funccode, ordercode, time_interval)}, '1min', 'mean')"
        )

        exprs = []
        names = []
        for funccode in ["B", "S"]:
            for ordercode in ["0", "1"]:
                exprs.append(expr7(funccode, ordercode, "3"))
                names.append(self.trans_dict[ordercode] + self.trans_dict[funccode] + "_intensity_3s")
        df = D.features(self.stocks_list, fields=exprs, freq="transaction")
        df.columns = names
        print(df)

    @staticmethod
    def expr7_3_init(funccode, code, time_interval):
        # NOTE: It depends on transaction frequency
        return f"Rolling(Eq($function_code,  {ord(funccode)}) & {code}($ask_order, $bid_order) , '{time_interval}s', 'sum') / Rolling($function_code, '{time_interval}s', 'count')"

    # (la|lb|ma|mb|ca|cb)_relative_intensity_(time_interval_small)_(time_interval_big)
    def test_exp_08_1(self):
        expr8_1 = (
            lambda funccode, ordercode, time_interval_short, time_interval_long: f"TResample(Gt({self.expr7_init(funccode, ordercode, time_interval_short)},{self.expr7_init(funccode, ordercode, time_interval_long)}), '1min', 'mean')"
        )

        exprs = []
        names = []
        for funccode in ["B", "S"]:
            for ordercode in ["0", "1"]:
                exprs.append(expr8_1(funccode, ordercode, "10", "900"))
                names.append(self.trans_dict[ordercode] + self.trans_dict[funccode] + "_relative_intensity_10s_900s")

        df = D.features(self.stocks_list, fields=exprs, freq="order")
        df.columns = names
        print(df)

    def test_exp_08_2(self):
        # NOTE: It depends on transaction frequency
        expr8_2 = (
            lambda funccode, ordercode, time_interval_short, time_interval_long: f"TResample(Gt({self.expr7_3_init(funccode, ordercode, time_interval_short)},{self.expr7_3_init(funccode, ordercode, time_interval_long)}), '1min', 'mean')"
        )

        exprs = [expr8_2("C", "Gt", "10", "900"), expr8_2("C", "Lt", "10", "900")]
        names = ["ca_relative_intensity_10s_900s", "cb_relative_intensity_10s_900s"]

        df = D.features(self.stocks_list, fields=exprs, freq="transaction")
        df.columns = names
        print(df)

    ## v9(la|lb|ma|mb|ca|cb)_diff_intensity_(time_interval1)_(time_interval2)
    # 1) calculating the original data
    # 2) Resample data to 3s and calculate the changing rate
    # 3) Resample data to 1min

    def test_exp_09_trans(self):
        exprs = [
            f'TResample(Div(Sub(TResample({self.expr7_3_init("C", "Gt", "3")}, "3s", "last"), Ref(TResample({self.expr7_3_init("C", "Gt", "3")}, "3s","last"), 1)), 3), "1min", "mean")',
            f'TResample(Div(Sub(TResample({self.expr7_3_init("C", "Lt", "3")}, "3s", "last"), Ref(TResample({self.expr7_3_init("C", "Lt", "3")}, "3s","last"), 1)), 3), "1min", "mean")',
        ]
        names = ["ca_diff_intensity_3s_3s", "cb_diff_intensity_3s_3s"]
        df = D.features(self.stocks_list, fields=exprs, freq="transaction")
        df.columns = names
        print(df)

    def test_exp_09_order(self):
        exprs = []
        names = []
        for funccode in ["B", "S"]:
            for ordercode in ["0", "1"]:
                exprs.append(
                    f'TResample(Div(Sub(TResample({self.expr7_init(funccode, ordercode, "3")}, "3s", "last"), Ref(TResample({self.expr7_init(funccode, ordercode, "3")},"3s", "last"), 1)), 3) ,"1min", "mean")'
                )
                names.append(self.trans_dict[ordercode] + self.trans_dict[funccode] + "_diff_intensity_3s_3s")
        df = D.features(self.stocks_list, fields=exprs, freq="order")
        df.columns = names
        print(df)

    def test_exp_10(self):
        exprs = []
        names = []
        for i in [5, 10, 30, 60]:
            exprs.append(
                f'TResample(Ref(TResample($ask1 + $bid1, "1s", "ffill"), {-i}) / TResample($ask1 + $bid1, "1s", "ffill") - 1, "1min", "mean" )'
            )
            names.append(f"lag_{i}_change_rate" for i in [5, 10, 30, 60])
        df = D.features(self.stocks_list, fields=exprs, freq="ticks")
        df.columns = names
        print(df)


if __name__ == "__main__":
    unittest.main()