-
Notifications
You must be signed in to change notification settings - Fork 0
Persistence
Loren1166 edited this page Sep 25, 2024
·
1 revision
The persistence subpackage handles data storage and retrieval, mainly to support backtesting. 持久化子包处理数据存储和检索,主要用于支持回测。
class BaseDataCatalog(ABC):
"""
Provides a abstract base class for a queryable data catalog.
提供可查询数据目录的抽象基类。
"""
@abstractmethod
@classmethod
def from_env() -> BaseDataCatalog:
...
@abstractmethod
@classmethod
def from_uri(uri: str) -> BaseDataCatalog:
...
@abstractmethod
def query(
self,
data_cls: type,
instrument_ids: list[str] | None = None,
bar_types: list[str] | None = None,
**kwargs: Any
) -> list[Data]:
...
def instruments(
self,
instrument_type: type | None = None,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[Instrument]:
...
def instrument_status(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[InstrumentStatus]:
...
def instrument_closes(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[InstrumentClose]:
...
def order_book_deltas(
self,
instrument_ids: list[str] | None = None,
batched: bool = False,
**kwargs: Any
) -> list[OrderBookDelta] | list[OrderBookDeltas]:
...
def order_book_depth10(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[OrderBookDepth10]:
...
def quote_ticks(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[QuoteTick]:
...
def trade_ticks(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[TradeTick]:
...
def bars(self, bar_types: list[str] | None = None, **kwargs: Any) -> list[Bar]:
...
def custom_data(
self,
cls: type,
as_nautilus: bool = False,
metadata: dict | None = None,
**kwargs: Any
) -> list[CustomData]:
...
@abstractmethod
def list_data_types(self) -> list[str]:
...
def list_generic_data_types(self) -> list[str]:
...
@abstractmethod
def list_backtest_runs(self) -> list[str]:
...
@abstractmethod
def list_live_runs(self) -> list[str]:
...
@abstractmethod
def read_live_run(self, instance_id: str, **kwargs: Any) -> list[str]:
...
@abstractmethod
def read_backtest(self, instance_id: str, **kwargs: Any) -> list[str]:
...
class FeatherFile(NamedTuple):
"""
FeatherFile(path, class_name)
"""
path: str
"""
str
Alias for field number 0
"""
class_name: str
"""
str
Alias for field number 1
"""
def count(self, value, /):
"""
Return number of occurrences of value.
返回 value 出现的次数。
"""
...
def index(self, value, start=0, stop=9223372036854775807, /):
"""
Return first index of value.
Raises ValueError if the value is not present.
如果值不存在,则引发 ValueError。
"""
...
class ParquetDataCatalog(BaseDataCatalog):
"""
Provides a queryable data catalog persisted to files in Parquet (Arrow) format.
提供可查询的数据目录,持久化到 Parquet(Arrow)格式的文件中。
Parameters:
参数:
path (PathLike *[*str ] | str) – The root path for this data catalog. Must exist and must be an absolute path.
此数据目录的根路径。必须存在且必须是绝对路径。
fs_protocol (str , default 'file') – The filesystem protocol used by fsspec to handle file operations. This determines how the data catalog interacts with storage, be it local filesystem, cloud storage, or others. Common protocols include ‘file’ for local storage, ‘s3’ for Amazon S3, and ‘gcs’ for Google Cloud Storage. If not provided, it defaults to ‘file’, meaning the catalog operates on the local filesystem.
fsspec 用于处理文件操作的文件系统协议。这决定了数据目录如何与存储交互,无论是本地文件系统、云存储还是其他存储。常见协议包括用于本地存储的“file”、用于 Amazon S3 的“s3”和用于 Google Cloud Storage 的“gcs”。如果未提供,则默认为“file”,这意味着目录在本地文件系统上运行。
fs_storage_options (dict , optional) – The fs storage options.
fs 存储选项。
min_rows_per_group (int , default 0) – The minimum number of rows per group. When the value is greater than 0, the dataset writer will batch incoming data and only write the row groups to the disk when sufficient rows have accumulated.
每组的最小行数。当值大于 0 时,数据集写入器将批量处理传入数据,并且仅当累积了足够多的行时才将行组写入磁盘。
max_rows_per_group (int , default 5000) – The maximum number of rows per group. If the value is greater than 0, then the dataset writer may split up large incoming batches into multiple row groups. If this value is set, then min_rows_per_group should also be set. Otherwise it could end up with very small row groups.
每组的最大行数。如果值大于 0,则数据集写入器可能会将传入的大批量数据拆分为多个行组。如果设置了此值,则还应设置 min_rows_per_group。否则,最终可能会出现非常小的行组。
show_query_paths (bool , default False) – If globed query paths should be printed to stdout.
如果应将全局查询路径打印到标准输出,则为 True。
WARNING
The data catalog is not threadsafe. Using it in a multithreaded environment can lead to unexpected behavior.
数据目录不是线程安全的。在多线程环境中使用它可能会导致意外行为。
"""
def __init__(
self,
path,
fs_protocol: str = 'file',
fs_storage_options: dict | None = None,
min_rows_per_group: int = 0,
max_rows_per_group: int = 5000,
show_query_paths: bool = False
):
...
@classmethod
def from_env() -> ParquetDataCatalog:
"""
Create a data catalog instance by accessing the ‘NAUTILUS_PATH’ environment variable.
通过访问“NAUTILUS_PATH”环境变量创建数据目录实例。
Return type: ParquetDataCatalog
Raises: OSError – If the ‘NAUTILUS_PATH’ environment variable is not set.
如果未设置“NAUTILUS_PATH”环境变量,则引发 OSError。
"""
...
@classmethod
def from_uri(uri: str) -> ParquetDataCatalog:
"""
Create a data catalog instance from the given uri.
从给定的 URI 创建数据目录实例。
Parameters:
参数:
uri (str) – The URI string for the backing path.
后备路径的 URI 字符串。
Return type: ParquetDataCatalog
"""
...
def write_chunk(
self,
data: list[Data],
data_cls: type[Data],
instrument_id: str | None = None,
basename_template: str = 'part-{i}',
**kwargs: Any
) -> None:
...
def write_data(
self,
data: list[Data | Event] | list[OrderBookDelta | OrderBookDepth10 | QuoteTick | TradeTick | Bar],
basename_template: str = 'part-{i}',
**kwargs: Any
) -> None:
"""
Write the given data to the catalog.
将给定数据写入目录。
The function categorizes the data based on their class name and, when applicable, their associated instrument ID. It then delegates the actual writing process to the write_chunk method.
该函数根据数据的类名以及适用的关联工具 ID 对数据进行分类。然后,它将实际写入过程委托给 write_chunk 方法。
Parameters:
参数:
data (list *[*Data | Event ]) – The data or event objects to be written to the catalog.
要写入目录的数据或事件对象。
basename_template (str , default 'part-{i}') – A template string used to generate basenames of written data files. The token ‘{i}’ will be replaced with an automatically incremented integer as files are partitioned. If not specified, it defaults to ‘part-{i}’ + the default extension ‘.parquet’.
用于生成写入数据文件的基本名称的模板字符串。标记“{i}”将被自动递增的整数替换,因为文件已分区。如果未指定,则默认为“part-{i}”+ 默认扩展名“.parquet”。
kwargs (Any) – Additional keyword arguments to be passed to the write_chunk method.
要传递给 write_chunk 方法的附加关键字参数。
WARNING
Any existing data which already exists under a filename will be overwritten. If a basename_template is not provided, then its very likely existing data for the data type and instrument ID will be overwritten. To prevent data loss, ensure that the basename_template (or the default naming scheme) generates unique filenames for different data sets.
任何已存在于文件名下的现有数据都将被覆盖。如果未提供 basename_template,则很可能为数据类型和工具 ID 覆盖现有数据。为防止数据丢失,请确保 basename_template(或默认命名方案)为不同的数据集生成唯一的文件名。
Raises:
引发:
ValueError – If data of the same type is not monotonically increasing (or non-decreasing) based on ts_init.
如果相同类型的数据未基于 ts_init 单调递增(或不递减),则引发 ValueError。
"""
...
def query(
self,
data_cls: type,
instrument_ids: list[str] | None = None,
bar_types: list[str] | None = None,
start: int | str | float | None = None,
end: int | str | float | None = None,
where: str | None = None,
**kwargs: Any
) -> list[Data | CustomData]:
...
def backend_session(
self,
data_cls: type,
instrument_ids: list[str] | None = None,
bar_types: list[str] | None = None,
start: int | str | float | None = None,
end: int | str | float | None = None,
where: str | None = None,
session: DataBackendSession | None = None,
**kwargs: Any
) -> DataBackendSession:
...
def query_rust(
self,
data_cls: type,
instrument_ids: list[str] | None = None,
bar_types: list[str] | None = None,
start: int | str | float | None = None,
end: int | str | float | None = None,
where: str | None = None,
**kwargs: Any
) -> list[Data]:
...
def query_pyarrow(
self,
data_cls: type,
instrument_ids: list[str] | None = None,
bar_types: list[str] | None = None,
start: int | str | float | None = None,
end: int | str | float | None = None,
filter_expr: str | None = None,
**kwargs: Any
) -> list[Data]:
...
def instruments(
self,
instrument_type: type | None = None,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[Instrument]:
...
def list_data_types(self):
...
def list_backtest_runs(self) -> list[str]:
...
def list_live_runs(self) -> list[str]:
...
def read_live_run(self, instance_id: str, **kwargs: Any) -> list[Data]:
...
def read_backtest(self, instance_id: str, **kwargs: Any) -> list[Data]:
...
def convert_stream_to_data(
self,
instance_id: UUID4,
data_cls: type,
other_catalog: ParquetDataCatalog | None = None,
**kwargs: Any
) -> None:
...
def bars(self, bar_types: list[str] | None = None, **kwargs: Any) -> list[Bar]:
...
def custom_data(
self,
cls: type,
as_nautilus: bool = False,
metadata: dict | None = None,
**kwargs: Any
) -> list[CustomData]:
...
def instrument_closes(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[InstrumentClose]:
...
def instrument_status(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[InstrumentStatus]:
...
def list_generic_data_types(self) -> list[str]:
...
def order_book_deltas(
self,
instrument_ids: list[str] | None = None,
batched: bool = False,
**kwargs: Any
) -> list[OrderBookDelta] | list[OrderBookDeltas]:
...
def order_book_depth10(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[OrderBookDepth10]:
...
def quote_ticks(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[QuoteTick]:
...
def trade_ticks(
self,
instrument_ids: list[str] | None = None,
**kwargs: Any
) -> list[TradeTick]:
...
class BarDataWrangler:
"""
Provides a means of building lists of Nautilus Bar objects.
提供构建 Nautilus Bar 对象列表的方法。
Parameters:
参数:
bar_type (BarType) – The bar type for the wrangler.
转换器的 Bar 类型。
instrument (Instrument) – The instrument for the wrangler.
转换器的工具。
"""
def __init__(self, bar_type: BarType, instrument: Instrument):
...
@property
def bar_type(self):
...
@property
def instrument(self):
...
def process(
self,
data: pd.DataFrame,
default_volume: float = 1000000.0,
ts_init_delta: int = 0
):
"""
Process the given bar dataset into Nautilus Bar objects.
将给定的 Bar 数据集处理为 Nautilus Bar 对象。
Expects columns ['open', 'high', 'low', 'close', 'volume'] with 'timestamp' index. Note: The 'volume' column is optional, if one does not exist then will use the default_volume.
预期列 ['open', 'high', 'low', 'close', 'volume'],索引为 'timestamp'。注意:“volume”列是可选的,如果不存在,则将使用 default_volume。
Parameters:
参数:
data (pd.DataFrame) – The data to process.
要处理的数据。
default_volume (float) – The default volume for each bar (if not provided).
每个 Bar 的默认交易量(如果未提供)。
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
数据时间戳和 ts_init 值之间的纳秒差。可用于表示/模拟数据源和 Nautilus 系统之间的延迟。
Return type: list[Bar]
Raises: ValueError – If data is empty.
如果数据为空,则引发 ValueError。
"""
...
class OrderBookDeltaDataWrangler:
"""
Provides a means of building lists of Nautilus OrderBookDelta objects.
提供构建 Nautilus OrderBookDelta 对象列表的方法。
Parameters:
参数:
instrument (Instrument) – The instrument for the data wrangler.
数据转换器的工具。
"""
def __init__(self, instrument: Instrument):
...
@property
def instrument(self):
...
def process(
self,
data: pd.DataFrame,
ts_init_delta: int = 0,
is_raw: bool = False
):
"""
Process the given order book dataset into Nautilus OrderBookDelta objects.
将给定的订单簿数据集处理为 Nautilus OrderBookDelta 对象。
Parameters:
参数:
data (pd.DataFrame) – The data to process.
要处理的数据。
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
数据时间戳和 ts_init 值之间的纳秒差。可用于表示/模拟数据源和 Nautilus 系统之间的延迟。
is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point values.
如果数据已缩放为 Nautilus 定点值,则为 True。
Raises:
引发:
ValueError – If data is empty.
如果数据为空,则引发 ValueError。
"""
...
class QuoteTickDataWrangler:
"""
Provides a means of building lists of Nautilus QuoteTick objects.
提供构建 Nautilus QuoteTick 对象列表的方法。
Parameters:
参数:
instrument (Instrument) – The instrument for the data wrangler.
数据转换器的工具。
"""
def __init__(self, instrument: Instrument):
...
@property
def instrument(self):
...
def process(
self,
data: pd.DataFrame,
default_volume: float = 1000000.0,
ts_init_delta: int = 0
) -> list:
"""
Process the given tick dataset into Nautilus QuoteTick objects.
将给定的行情数据集处理为 Nautilus QuoteTick 对象。
Expects columns ['bid_price', 'ask_price'] with 'timestamp' index. Note: The 'bid_size' and 'ask_size' columns are optional, will then use the default_volume.
预期列 ['bid_price', 'ask_price'],索引为 'timestamp'。注意:“bid_size”和“ask_size”列是可选的,然后将使用 default_volume。
Parameters:
参数:
data (pd.DataFrame) – The tick data to process.
要处理的行情数据。
default_volume (float) – The default volume for each tick (if not provided).
每个行情的默认交易量(如果未提供)。
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system. Cannot be negative.
数据时间戳和 ts_init 值之间的纳秒差。可用于表示/模拟数据源和 Nautilus 系统之间的延迟。不能为负数。
Return type: list[QuoteTick]
"""
...
def process_bar_data(
self,
bid_data: pd.DataFrame,
ask_data: pd.DataFrame,
default_volume: float = 1000000.0,
ts_init_delta: int = 0,
offset_interval_ms: int = 100,
timestamp_is_close: bool = True,
random_seed: int | None = None,
is_raw: bool = False,
sort_data: bool = True
) -> list:
"""
Process the given bar datasets into Nautilus QuoteTick objects.
将给定的 Bar 数据集处理为 Nautilus QuoteTick 对象。
Expects columns ['open', 'high', 'low', 'close', 'volume'] with 'timestamp' index. Note: The 'volume' column is optional, will then use the default_volume.
预期列 ['open', 'high', 'low', 'close', 'volume'],索引为 'timestamp'。注意:“volume”列是可选的,然后将使用 default_volume。
Parameters:
参数:
bid_data (pd.DataFrame) – The bid bar data.
买入 Bar 数据。
ask_data (pd.DataFrame) – The ask bar data.
卖出 Bar 数据。
default_volume (float) – The volume per tick if not available from the data.
如果数据中没有,则每个行情的交易量。
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
数据时间戳和 ts_init 值之间的纳秒差。可用于表示/模拟数据源和 Nautilus 系统之间的延迟。
offset_interval_ms (int , default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
为 Bar 时间戳偏移每个行情的毫秒数。如果 timestamp_is_close 为 True,则将使用负偏移量,否则将使用正偏移量(另请参见 timestamp_is_close)。
random_seed (int , optional) – The random seed for shuffling order of high and low ticks from bar data. If random_seed is None then won’t shuffle.
用于打乱 Bar 数据中高低行情顺序的随机种子。如果 random_seed 为 None,则不会打乱。
is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point values.
如果数据已缩放为 Nautilus 定点值,则为 True。
timestamp_is_close (bool , default True) – If bar timestamps are at the close. If True then open, high, low timestamps are offset before the close timestamp. If False then high, low, close timestamps are offset after the open timestamp.
如果 Bar 时间戳在收盘时,则为 True。如果为 True,则开盘价、最高价、最低价时间戳在收盘价时间戳之前偏移。如果为 False,则最高价、最低价、收盘价时间戳在开盘价时间戳之后偏移。
sort_data (bool , default True) – If the data should be sorted by timestamp.
如果数据应按时间戳排序,则为 True。
"""
...
@property
def processed_data(self):
...
class TradeTickDataWrangler:
"""
Provides a means of building lists of Nautilus TradeTick objects.
提供构建 Nautilus TradeTick 对象列表的方法。
Parameters:
参数:
instrument (Instrument) – The instrument for the data wrangler.
数据转换器的工具。
"""
def __init__(self, instrument: Instrument):
...
@property
def instrument(self):
...
def process(
self,
data: pd.DataFrame,
ts_init_delta: int = 0,
is_raw: bool = False
):
"""
Process the given trade tick dataset into Nautilus TradeTick objects.
将给定的成交行情数据集处理为 Nautilus TradeTick 对象。
Parameters:
参数:
data (pd.DataFrame) – The data to process.
要处理的数据。
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
数据时间戳和 ts_init 值之间的纳秒差。可用于表示/模拟数据源和 Nautilus 系统之间的延迟。
is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point values.
如果数据已缩放为 Nautilus 定点值,则为 True。
Raises:
引发:
ValueError – If data is empty.
如果数据为空,则引发 ValueError。
"""
...
def process_bar_data(
self,
data: pd.DataFrame,
ts_init_delta: int = 0,
offset_interval_ms: int = 100,
timestamp_is_close: bool = True,
random_seed: int | None = None,
is_raw: bool = False,
sort_data: bool = True
) -> list:
"""
Process the given bar datasets into Nautilus QuoteTick objects.
将给定的 Bar 数据集处理为 Nautilus QuoteTick 对象。
Expects columns ['open', 'high', 'low', 'close', 'volume'] with 'timestamp' index. Note: The 'volume' column is optional, will then use the default_volume.
预期列 ['open', 'high', 'low', 'close', 'volume'],索引为 'timestamp'。注意:“volume”列是可选的,然后将使用 default_volume。
Parameters:
参数:
data (pd.DataFrame) – The trade bar data.
交易 Bar 数据。
ts_init_delta (int) – The difference in nanoseconds between the data timestamps and the ts_init value. Can be used to represent/simulate latency between the data source and the Nautilus system.
数据时间戳和 ts_init 值之间的纳秒差。可用于表示/模拟数据源和 Nautilus 系统之间的延迟。
offset_interval_ms (int , default 100) – The number of milliseconds to offset each tick for the bar timestamps. If timestamp_is_close then will use negative offsets, otherwise will use positive offsets (see also timestamp_is_close).
为 Bar 时间戳偏移每个行情的毫秒数。如果 timestamp_is_close 为 True,则将使用负偏移量,否则将使用正偏移量(另请参见 timestamp_is_close)。
random_seed (int , optional) – The random seed for shuffling order of high and low ticks from bar data. If random_seed is None then won’t shuffle.
用于打乱 Bar 数据中高低行情顺序的随机种子。如果 random_seed 为 None,则不会打乱。
is_raw (bool , default False) – If the data is scaled to Nautilus fixed-point.
如果数据已缩放为 Nautilus 定点。
timestamp_is_close (bool , default True) – If bar timestamps are at the close. If True then open, high, low timestamps are offset before the close timestamp. If False then high, low, close timestamps are offset after the open timestamp.
如果 Bar 时间戳在收盘时,则为 True。如果为 True,则开盘价、最高价、最低价时间戳在收盘价时间戳之前偏移。如果为 False,则最高价、最低价、收盘价时间戳在开盘价时间戳之后偏移。
sort_data (bool , default True) – If the data should be sorted by timestamp.
如果数据应按时间戳排序,则为 True。
"""
...
def align_bid_ask_bar_data(bid_data: pd.DataFrame, ask_data: pd.DataFrame):
"""
Merge bid and ask data into a single DataFrame with prefixed column names.
将买入和卖出数据合并到一个 DataFrame 中,并使用前缀列名。
Parameters:
参数:
bid_data (pd.DataFrame) – The DataFrame containing bid data.
包含买入数据的 DataFrame。
ask_data (pd.DataFrame) – The DataFrame containing ask data.
包含卖出数据的 DataFrame。
Returns
pd.DataFrame – A merged DataFrame with columns prefixed by ‘
bid_
’ for bid data and ‘
ask_
’ for ask data, joined on their indexes.
合并的 DataFrame,其中买入数据的列以“
bid_
”为前缀,卖出数据的列以“
ask_
”为前缀,并根据其索引进行联接。
"""
...
def calculate_bar_price_offsets(
num_records,
timestamp_is_close: bool,
offset_interval_ms: int,
random_seed=None
):
"""
Calculate and potentially randomize the time offsets for bar prices based on the closeness of the timestamp.
根据时间戳的接近程度计算并可能随机化 Bar 价格的时间偏移量。
Parameters:
参数:
num_records (int) – The number of records for which offsets are to be generated.
要为其生成偏移量的记录数。
timestamp_is_close (bool) – A flag indicating whether the timestamp is close to the trading time.
一个标志,指示时间戳是否接近交易时间。
offset_interval_ms (int) – The offset interval in milliseconds to be applied.
要应用的偏移量间隔(以毫秒为单位)。
random_seed (Optional *[*int ]) – The seed for random number generation to ensure reproducibility.
用于随机数生成的种子,以确保可重复性。
Returns:
返回值:
dict – high and low offsets are randomized. 高低偏移量是随机的。
Return type:
返回类型:
A dictionary with arrays of offsets for open, high, low, and close prices. If random_seed is provided,
一个包含开盘价、最高价、最低价和收盘价的偏移量数组的字典。如果提供了 random_seed,
calculate_volume_quarter(volume: np.ndarray, int precision: int)
"""
...
def calculate_volume_quarter(volume, precision: int):
"""
Convert raw volume data to quarter precision.
将原始交易量数据转换为四分之一精度。
Parameters:
参数:
volume (np.ndarray) – An array of volume data to be processed.
要处理的交易量数据数组。
precision (int) – The decimal precision to which the volume data is rounded, adjusted by subtracting 9.
交易量数据舍入到的小数精度,通过减去 9 进行调整。
Returns: The volume data adjusted to quarter precision.
调整为四分之一精度的交易量数据。
Return type: np.ndarray
"""
...
def prepare_event_and_init_timestamps(index: pd.DatetimeIndex, ts_init_delta: int):
...
def preprocess_bar_data(data: pd.DataFrame, is_raw: bool) -> pd.DataFrame:
"""
Preprocess financial bar data to a standardized format.
将金融 Bar 数据预处理为标准化格式。
Ensures the DataFrame index is labeled as “timestamp”, converts the index to UTC, removes time zone awareness, drops rows with NaN values in critical columns, and optionally scales the data.
确保 DataFrame 索引标记为“timestamp”,将索引转换为 UTC,移除时区感知,删除关键列中具有 NaN 值的行,并可选地缩放数据。
Parameters:
参数:
data (pd.DataFrame) – The input DataFrame containing financial bar data.
包含金融 Bar 数据的输入 DataFrame。
is_raw (bool) – A flag to determine whether the data should be scaled. If False, scales the data by 1e9.
一个标志,用于确定是否应缩放数据。如果为 False,则将数据缩放 1e9。
Returns: pd.DataFrame
Return type:
返回类型:
The preprocessed DataFrame with a cleaned and standardized structure.
经过清理和标准化结构的预处理 DataFrame。
"""
...
class StreamingFeatherWriter:
"""
Provides a stream writer of Nautilus objects into feather files.
提供将 Nautilus 对象流式写入 feather 文件的写入器。
Parameters:
参数:
path (str) – The path to persist the stream to.
要将流持久化到的路径。
fs_protocol (str , default 'file') – The fsspec file system protocol.
fsspec 文件系统协议。
flush_interval_ms (int , optional) – The flush interval (milliseconds) for writing chunks.
写入块的刷新间隔(以毫秒为单位)。
replace (bool , default False) – If existing files at the given path should be replaced.
是否应替换给定路径下的现有文件。
include_types (list *[*type ] , optional) – A list of Arrow serializable types to write. If this is specified then only the included types will be written.
要写入的 Arrow 可序列化类型的列表。如果指定了此项,则只会写入包含的类型。
"""
def __init__(
self,
path: str,
fs_protocol: str = 'file',
flush_interval_ms: int | None = None,
replace: bool = False,
include_types: list | None = None
):
...
@property
def is_closed(self) -> bool:
"""
Return whether all file streams are closed.
返回是否所有文件流都已关闭。
Return type: bool
"""
...
def write(self, obj) -> None:
"""
Write the object to the stream.
将对象写入流。
Parameters:
参数:
obj (object) – The object to write.
要写入的对象。
Raises:
引发:
ValueError – If obj is None.
如果 obj 为 None,则引发 ValueError。
"""
...
def check_flush(self) -> None:
"""
Flush all stream writers if current time greater than the next flush interval.
如果当前时间大于下一个刷新间隔,则刷新所有流写入器。
"""
...
def flush(self) -> None:
"""
Flush all stream writers.
刷新所有流写入器。
"""
...
def close(self) -> None:
"""
Flush and close all stream writers.
刷新并关闭所有流写入器。
"""
...
def generate_signal_class(name: str, value_type: type) -> type:
"""
Dynamically create a Data subclass for this signal.
为此信号动态创建一个 Data 子类。
Parameters:
参数:
name (str) – The name of the signal data.
信号数据的名称。
value_type (type) – The type for the signal data value.
信号数据值的类型。
Return type: SignalData
"""
...