Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify multi-freq example #626

Merged
merged 9 commits into from
Sep 30, 2021
Merged
18 changes: 18 additions & 0 deletions examples/benchmarks/LightGBM/features_resample_N.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import pandas as pd

from qlib.data.inst_processor import InstProcessor
from qlib.utils.resam import resam_calendar


class ResampleNProcessor(InstProcessor):
def __init__(self, target_frq: str, **kwargs):
self.target_frq = target_frq

def __call__(self, df: pd.DataFrame, *args, **kwargs):
df.index = pd.to_datetime(df.index)
res_index = resam_calendar(df.index, "1min", self.target_frq)
df = df.resample(self.target_frq).last().reindex(res_index)
return df
16 changes: 0 additions & 16 deletions examples/benchmarks/LightGBM/features_sample.py

This file was deleted.

135 changes: 135 additions & 0 deletions examples/benchmarks/LightGBM/multi_freq_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

import pandas as pd

from qlib.data.dataset.loader import QlibDataLoader
from qlib.contrib.data.handler import DataHandlerLP, _DEFAULT_LEARN_PROCESSORS, check_transform_proc


class Avg15minLoader(QlibDataLoader):
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
df = super(Avg15minLoader, self).load(instruments, start_time, end_time)
if self.is_group:
# feature_day(day freq) and feature_15min(1min freq, Average every 15 minutes) renamed feature
df.columns = df.columns.map(lambda x: ("feature", x[1]) if x[0].startswith("feature") else x)
return df


class Avg15minHandler(DataHandlerLP):
def __init__(
self,
instruments="csi500",
start_time=None,
end_time=None,
freq="day",
infer_processors=[],
learn_processors=_DEFAULT_LEARN_PROCESSORS,
fit_start_time=None,
fit_end_time=None,
process_type=DataHandlerLP.PTYPE_A,
filter_pipe=None,
inst_processor=None,
**kwargs,
):
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)
data_loader = Avg15minLoader(
config=self.loader_config(), filter_pipe=filter_pipe, freq=freq, inst_processor=inst_processor
)
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
process_type=process_type,
)

def loader_config(self):

# Results for dataset: df: pd.DataFrame
# len(df.columns) == 6 + 6 * 16, len(df.index.get_level_values(level="datetime").unique()) == T
# df.columns: close0, close1, ..., close16, open0, ..., open16, ..., vwap16
# freq == day:
# close0, open0, low0, high0, volume0, vwap0
# freq == 1min:
# close1, ..., close16, ..., vwap1, ..., vwap16
# df.index.name == ["datetime", "instrument"]: pd.MultiIndex
# Example:
# feature ... label
# close0 open0 low0 ... vwap1 vwap16 LABEL0
# datetime instrument ...
# 2020-10-09 SH600000 11.794546 11.819587 11.769505 ... NaN NaN -0.005214
# 2020-10-15 SH600000 12.044961 11.944795 11.932274 ... NaN NaN -0.007202
# ... ... ... ... ... ... ... ...
# 2021-05-28 SZ300676 6.369684 6.495406 6.306568 ... NaN NaN -0.001321
# 2021-05-31 SZ300676 6.601626 6.465643 6.465130 ... NaN NaN -0.023428

# features day: len(columns) == 6, freq = day
# $close is the closing price of the current trading day:
# if the user needs to get the `close` before the last T days, use Ref($close, T-1), for example:
# $close Ref($close, 1) Ref($close, 2) Ref($close, 3) Ref($close, 4)
# instrument datetime
# SH600519 2021-06-01 244.271530
# 2021-06-02 242.205917 244.271530
# 2021-06-03 242.229889 242.205917 244.271530
# 2021-06-04 245.421524 242.229889 242.205917 244.271530
# 2021-06-07 247.547089 245.421524 242.229889 242.205917 244.271530

# WARNING: Ref($close, N), if N == 0, Ref($close, N) ==> $close

fields = ["$close", "$open", "$low", "$high", "$volume", "$vwap"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

# names: close0, open0, ..., vwap0
names = list(map(lambda x: x.strip("$") + "0", fields))

config = {"feature_day": (fields, names)}

# features 15min: len(columns) == 6 * 16, freq = 1min
# $close is the closing price of the current trading day:
# if the user gets 'close' for the i-th 15min of the last T days, use `Ref(Mean($close, 15), (T-1) * 240 + i * 15)`, for example:
# Ref(Mean($close, 15), 225) Ref(Mean($close, 15), 465) Ref(Mean($close, 15), 705)
# instrument datetime
# SH600519 2021-05-31 241.769897 243.077942 244.712997
# 2021-06-01 244.271530 241.769897 243.077942
# 2021-06-02 242.205917 244.271530 241.769897

# WARNING: Ref(Mean($close, 15), N), if N == 0, Ref(Mean($close, 15), N) ==> Mean($close, 15)

# Results of the current script:
# time: 09:00 --> 09:14, ..., 14:45 --> 14:59
# fields: Ref(Mean($close, 15), 225), ..., Mean($close, 15)
# name: close1, ..., close16
#

# Expression description: take close as an example
# Mean($close, 15) ==> df["$close"].rolling(15, min_periods=1).mean()
# Ref(Mean($close, 15), 15) ==> df["$close"].rolling(15, min_periods=1).mean().shift(15)

# NOTE: The last data of each trading day, which is the average of the i-th 15 minutes

# Average:
# Average of the i-th 15-minute period of each trading day: 1 <= i <= 250 // 16
# Avg(15minutes): Ref(Mean($close, 15), 240 - i * 15)
#
# Average of the first 15 minutes of each trading day; i = 1
# Avg(09:00 --> 09:14), df.index.loc["09:14"]: Ref(Mean($close, 15), 240- 1 * 15) ==> Ref(Mean($close, 15), 225)
# Average of the last 15 minutes of each trading day; i = 16
# Avg(14:45 --> 14:59), df.index.loc["14:59"]: Ref(Mean($close, 15), 240 - 16 * 15) ==> Ref(Mean($close, 15), 0) ==> Mean($close, 15)

# 15min resample to day
# df.resample("1d").last()
tmp_fields = []
tmp_names = []
for i, _f in enumerate(fields):
_fields = [f"Ref(Mean({_f}, 15), {j * 15})" for j in range(1, 240 // 15)]
_names = [f"{names[i][:-1]}{int(names[i][-1])+j}" for j in range(240 // 15 - 1, 0, -1)]
_fields.append(f"Mean({_f}, 15)")
_names.append(f"{names[i][:-1]}{int(names[i][-1])+240 // 15}")
tmp_fields += _fields
tmp_names += _names
config["feature_15min"] = (tmp_fields, tmp_names)
# label
config["label"] = (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])
return config
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ qlib_init:
1min: "~/.qlib/qlib_data/cn_data_1min"
region: cn
dataset_cache: null
maxtasksperchild: 1
maxtasksperchild: null
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
Expand All @@ -16,15 +16,15 @@ data_handler_config: &data_handler_config
instruments: *market
freq:
label: day
feature: 1min
feature_15min: 1min
feature_day: day
# with label as reference
inst_processor:
feature:
- class: Resample1minProcessor
module_path: features_sample.py
feature_15min:
- class: ResampleNProcessor
module_path: features_resample_N.py
kwargs:
hour: 14
minute: 56
target_frq: 1d

port_analysis_config: &port_analysis_config
strategy:
Expand Down Expand Up @@ -62,25 +62,25 @@ task:
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
class: Avg15minHandler
module_path: multi_freq_handler.py
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config
2 changes: 1 addition & 1 deletion qlib/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ def dataset_processor(instruments_d, column_names, start_time, end_time, freq, i
inst_l.append(inst)
task_l.append(
delayed(DatasetProvider.expression_calculator)(
inst, start_time, end_time, freq, normalize_column_names, spans, C
inst, start_time, end_time, freq, normalize_column_names, spans, C, inst_processors
)
)

Expand Down
2 changes: 1 addition & 1 deletion qlib/workflow/record_temp.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def load(self, name):
obj = self.recorder.load_object(name)
return obj

def list():
def list(self):
"""
List the supported artifacts.

Expand Down