From 80a39cef91d8183f624a28ca1ad12c98f3b7224c Mon Sep 17 00:00:00 2001 From: cjdsellers Date: Fri, 19 Feb 2021 13:43:28 +1100 Subject: [PATCH 1/5] Bump version --- poetry.lock | 48 ++++++++++++++++++++++++------------------------ pyproject.toml | 6 +++--- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9a358d497dc3..7666991191f7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -109,7 +109,7 @@ pytz = ">=2015.7" [[package]] name = "ccxt" -version = "1.41.98" +version = "1.41.100" description = "A JavaScript / Python / PHP cryptocurrency trading library with support for 130+ exchanges" category = "main" optional = false @@ -756,7 +756,7 @@ socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"] [[package]] name = "scipy" -version = "1.6.0" +version = "1.6.1" description = "SciPy: Scientific Library for Python" category = "main" optional = false @@ -983,7 +983,7 @@ docs = ["numpydoc"] [metadata] lock-version = "1.1" python-versions = "^3.7.9" -content-hash = "bacc2fb5ff6b8a3f52f6ace194dd8e5dcd58a6c828472b1a622c62631d2d13f6" +content-hash = "a2aa9b0ff03ada4e36d390aa7dbef3c0f4790522b3ec2f373d027fcf8204a3d2" [metadata.files] aiodns = [ @@ -1062,8 +1062,8 @@ babel = [ {file = "Babel-2.9.0.tar.gz", hash = "sha256:da031ab54472314f210b0adcff1588ee5d1d1d0ba4dbd07b94dba82bde791e05"}, ] ccxt = [ - {file = "ccxt-1.41.98-py2.py3-none-any.whl", hash = "sha256:75a55faadcee546b4042098ece06b40a776fb897dca22b35dd51f76c45ff04c4"}, - {file = "ccxt-1.41.98.tar.gz", hash = "sha256:690861ad72ba7c2a778ddd42eedc6c77f574404e251cbbbf7f6f84aba8701628"}, + {file = "ccxt-1.41.100-py2.py3-none-any.whl", hash = "sha256:ebea10e657d26f99dbd25ac26635c51ef140c980e96cd7c0a2808f622cb94ffa"}, + {file = "ccxt-1.41.100.tar.gz", hash = "sha256:4e059c2ecebb5878146ef9fc9ef43787316042e37a1eaeaf5f04a0459b37499b"}, ] certifi = [ {file = "certifi-2020.12.5-py2.py3-none-any.whl", hash = "sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830"}, @@ -1656,25 +1656,25 @@ requests = [ {file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"}, ] scipy = [ - {file = "scipy-1.6.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:3d4303e3e21d07d9557b26a1707bb9fc065510ee8501c9bf22a0157249a82fd0"}, - {file = "scipy-1.6.0-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:1bc5b446600c4ff7ab36bade47180673141322f0febaa555f1c433fe04f2a0e3"}, - {file = "scipy-1.6.0-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:8840a9adb4ede3751f49761653d3ebf664f25195fdd42ada394ffea8903dd51d"}, - {file = "scipy-1.6.0-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:8629135ee00cc2182ac8be8e75643b9f02235942443732c2ed69ab48edcb6614"}, - {file = "scipy-1.6.0-cp37-cp37m-win32.whl", hash = "sha256:58731bbe0103e96b89b2f41516699db9b63066e4317e31b8402891571f6d358f"}, - {file = "scipy-1.6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:876badc33eec20709d4e042a09834f5953ebdac4088d45a4f3a1f18b56885718"}, - {file = "scipy-1.6.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:c0911f3180de343643f369dc5cfedad6ba9f939c2d516bddea4a6871eb000722"}, - {file = "scipy-1.6.0-cp38-cp38-manylinux1_i686.whl", hash = "sha256:b8af26839ae343655f3ca377a5d5e5466f1d3b3ac7432a43449154fe958ae0e0"}, - {file = "scipy-1.6.0-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:4f1d9cc977ac6a4a63c124045c1e8bf67ec37098f67c699887a93736961a00ae"}, - {file = "scipy-1.6.0-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:eb7928275f3560d47e5538e15e9f32b3d64cd30ea8f85f3e82987425476f53f6"}, - {file = "scipy-1.6.0-cp38-cp38-win32.whl", hash = "sha256:31ab217b5c27ab429d07428a76002b33662f98986095bbce5d55e0788f7e8b15"}, - {file = "scipy-1.6.0-cp38-cp38-win_amd64.whl", hash = "sha256:2f1c2ebca6fd867160e70102200b1bd07b3b2d31a3e6af3c58d688c15d0d07b7"}, - {file = "scipy-1.6.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:155225621df90fcd151e25d51c50217e412de717475999ebb76e17e310176981"}, - {file = "scipy-1.6.0-cp39-cp39-manylinux1_i686.whl", hash = "sha256:f68d5761a2d2376e2b194c8e9192bbf7c51306ca176f1a0889990a52ef0d551f"}, - {file = "scipy-1.6.0-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:d902d3a5ad7f28874c0a82db95246d24ca07ad932741df668595fe00a4819870"}, - {file = "scipy-1.6.0-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:aef3a2dbc436bbe8f6e0b635f0b5fe5ed024b522eee4637dbbe0b974129ca734"}, - {file = "scipy-1.6.0-cp39-cp39-win32.whl", hash = "sha256:cdbc47628184a0ebeb5c08f1892614e1bd4a51f6e0d609c6eed253823a960f5b"}, - {file = "scipy-1.6.0-cp39-cp39-win_amd64.whl", hash = "sha256:313785c4dab65060f9648112d025f6d2fec69a8a889c714328882d678a95f053"}, - {file = "scipy-1.6.0.tar.gz", hash = "sha256:cb6dc9f82dfd95f6b9032a8d7ea70efeeb15d5b5fd6ed4e8537bb3c673580566"}, + {file = "scipy-1.6.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:a15a1f3fc0abff33e792d6049161b7795909b40b97c6cc2934ed54384017ab76"}, + {file = "scipy-1.6.1-cp37-cp37m-manylinux1_i686.whl", hash = "sha256:e79570979ccdc3d165456dd62041d9556fb9733b86b4b6d818af7a0afc15f092"}, + {file = "scipy-1.6.1-cp37-cp37m-manylinux1_x86_64.whl", hash = "sha256:a423533c55fec61456dedee7b6ee7dce0bb6bfa395424ea374d25afa262be261"}, + {file = "scipy-1.6.1-cp37-cp37m-manylinux2014_aarch64.whl", hash = "sha256:33d6b7df40d197bdd3049d64e8e680227151673465e5d85723b3b8f6b15a6ced"}, + {file = "scipy-1.6.1-cp37-cp37m-win32.whl", hash = "sha256:6725e3fbb47da428794f243864f2297462e9ee448297c93ed1dcbc44335feb78"}, + {file = "scipy-1.6.1-cp37-cp37m-win_amd64.whl", hash = "sha256:5fa9c6530b1661f1370bcd332a1e62ca7881785cc0f80c0d559b636567fab63c"}, + {file = "scipy-1.6.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bd50daf727f7c195e26f27467c85ce653d41df4358a25b32434a50d8870fc519"}, + {file = "scipy-1.6.1-cp38-cp38-manylinux1_i686.whl", hash = "sha256:f46dd15335e8a320b0fb4685f58b7471702234cba8bb3442b69a3e1dc329c345"}, + {file = "scipy-1.6.1-cp38-cp38-manylinux1_x86_64.whl", hash = "sha256:0e5b0ccf63155d90da576edd2768b66fb276446c371b73841e3503be1d63fb5d"}, + {file = "scipy-1.6.1-cp38-cp38-manylinux2014_aarch64.whl", hash = "sha256:2481efbb3740977e3c831edfd0bd9867be26387cacf24eb5e366a6a374d3d00d"}, + {file = "scipy-1.6.1-cp38-cp38-win32.whl", hash = "sha256:68cb4c424112cd4be886b4d979c5497fba190714085f46b8ae67a5e4416c32b4"}, + {file = "scipy-1.6.1-cp38-cp38-win_amd64.whl", hash = "sha256:5f331eeed0297232d2e6eea51b54e8278ed8bb10b099f69c44e2558c090d06bf"}, + {file = "scipy-1.6.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:0c8a51d33556bf70367452d4d601d1742c0e806cd0194785914daf19775f0e67"}, + {file = "scipy-1.6.1-cp39-cp39-manylinux1_i686.whl", hash = "sha256:83bf7c16245c15bc58ee76c5418e46ea1811edcc2e2b03041b804e46084ab627"}, + {file = "scipy-1.6.1-cp39-cp39-manylinux1_x86_64.whl", hash = "sha256:794e768cc5f779736593046c9714e0f3a5940bc6dcc1dba885ad64cbfb28e9f0"}, + {file = "scipy-1.6.1-cp39-cp39-manylinux2014_aarch64.whl", hash = "sha256:5da5471aed911fe7e52b86bf9ea32fb55ae93e2f0fac66c32e58897cfb02fa07"}, + {file = "scipy-1.6.1-cp39-cp39-win32.whl", hash = "sha256:8e403a337749ed40af60e537cc4d4c03febddcc56cd26e774c9b1b600a70d3e4"}, + {file = "scipy-1.6.1-cp39-cp39-win_amd64.whl", hash = "sha256:a5193a098ae9f29af283dcf0041f762601faf2e595c0db1da929875b7570353f"}, + {file = "scipy-1.6.1.tar.gz", hash = "sha256:c4fceb864890b6168e79b0e714c585dbe2fd4222768ee90bc1aa0f8218691b11"}, ] six = [ {file = "six-1.15.0-py2.py3-none-any.whl", hash = "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced"}, diff --git a/pyproject.toml b/pyproject.toml index 5723602cbed9..96a66f02d7d0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "nautilus_trader" -version = "1.103.1" +version = "1.104.0" description = "A high-performance algorithmic trading platform and event-driven backtester" authors = ["Nautech Systems "] license = "LGPL-3.0-or-later" @@ -33,7 +33,7 @@ generate-setup-file = false [tool.poetry.dependencies] python = "^3.7.9" -ccxt = "^1.41.98" +ccxt = "^1.41.100" cython = "^3.0a6" empyrical = "^0.5.5" # `importlib.metadata` is in the Python stdlib from 3.8 onwards @@ -47,7 +47,7 @@ psutil = "^5.8.0" pyarrow = "^3.0.0" pytz = "^2020.5" redis = "3.5.3" -scipy = "^1.6.0" +scipy = "^1.6.1" uvloop = { version = "^0.14.0", markers = "sys_platform != 'win32'" } [tool.poetry.dev-dependencies] From d5859f3c15859bda66f1d32b51bd3e4100a098c6 Mon Sep 17 00:00:00 2001 From: cjdsellers Date: Fri, 19 Feb 2021 13:43:51 +1100 Subject: [PATCH 2/5] Fix import --- examples/backtest/fx_market_maker_gbpusd_bars.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/backtest/fx_market_maker_gbpusd_bars.py b/examples/backtest/fx_market_maker_gbpusd_bars.py index 783111ace591..4daae3409d90 100644 --- a/examples/backtest/fx_market_maker_gbpusd_bars.py +++ b/examples/backtest/fx_market_maker_gbpusd_bars.py @@ -14,6 +14,7 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- +from datetime import datetime from decimal import Decimal import os import pathlib From 9144315281cff87a45991dd5fa6a81daec7b1553 Mon Sep 17 00:00:00 2001 From: cjdsellers Date: Fri, 19 Feb 2021 13:58:39 +1100 Subject: [PATCH 3/5] Generalize data client - Add Data and DataType - Extract MarketDataClient - Generalize data messages --- nautilus_trader/adapters/ccxt/data.pxd | 4 +- nautilus_trader/adapters/ccxt/data.pyx | 7 +- nautilus_trader/adapters/oanda/data.pxd | 4 +- nautilus_trader/adapters/oanda/data.pyx | 7 +- nautilus_trader/backtest/data_client.pxd | 4 +- nautilus_trader/backtest/data_client.pyx | 17 +- nautilus_trader/backtest/engine.pyx | 6 +- nautilus_trader/data/base.pxd | 10 + nautilus_trader/data/base.pyx | 60 +++++ nautilus_trader/data/client.pxd | 25 +- nautilus_trader/data/client.pyx | 149 +++++++++-- nautilus_trader/data/engine.pxd | 30 ++- nautilus_trader/data/engine.pyx | 237 ++++++++++------- nautilus_trader/data/messages.pxd | 26 +- nautilus_trader/data/messages.pyx | 86 +++--- nautilus_trader/execution/engine.pyx | 8 +- nautilus_trader/live/data_client.pxd | 5 + nautilus_trader/live/data_client.pyx | 53 +++- nautilus_trader/live/execution_engine.pyx | 2 +- nautilus_trader/trading/strategy.pyx | 89 +++---- .../adapters/ccxt/test_ccxt_data.py | 15 +- .../adapters/oanda/test_oanda_data.py | 8 +- .../backtest/test_backtest_data_client.py | 6 +- tests/unit_tests/data/test_data_client.py | 8 +- tests/unit_tests/data/test_data_engine.py | 245 ++++++++---------- .../unit_tests/live/test_live_data_client.py | 8 +- .../unit_tests/live/test_live_data_engine.py | 35 ++- .../serialization/test_serialization_base.py | 7 +- .../trading/test_trading_strategy.py | 6 +- .../unit_tests/trading/test_trading_trader.py | 6 +- 30 files changed, 702 insertions(+), 471 deletions(-) diff --git a/nautilus_trader/adapters/ccxt/data.pxd b/nautilus_trader/adapters/ccxt/data.pxd index 128b7176771d..addd34cec268 100644 --- a/nautilus_trader/adapters/ccxt/data.pxd +++ b/nautilus_trader/adapters/ccxt/data.pxd @@ -14,7 +14,7 @@ # ------------------------------------------------------------------------------------------------- from nautilus_trader.adapters.ccxt.providers cimport CCXTInstrumentProvider -from nautilus_trader.live.data_client cimport LiveDataClient +from nautilus_trader.live.data_client cimport LiveMarketDataClient from nautilus_trader.model.bar cimport Bar from nautilus_trader.model.bar cimport BarSpecification from nautilus_trader.model.bar cimport BarType @@ -22,7 +22,7 @@ from nautilus_trader.model.identifiers cimport Symbol from nautilus_trader.model.tick cimport TradeTick -cdef class CCXTDataClient(LiveDataClient): +cdef class CCXTDataClient(LiveMarketDataClient): cdef object _client cdef CCXTInstrumentProvider _instrument_provider diff --git a/nautilus_trader/adapters/ccxt/data.pyx b/nautilus_trader/adapters/ccxt/data.pyx index ed7458a5d81a..7d513a1fddd7 100644 --- a/nautilus_trader/adapters/ccxt/data.pyx +++ b/nautilus_trader/adapters/ccxt/data.pyx @@ -28,7 +28,7 @@ from nautilus_trader.core.correctness cimport Condition from nautilus_trader.core.datetime cimport from_unix_time_ms from nautilus_trader.core.datetime cimport to_unix_time_ms from nautilus_trader.core.uuid cimport UUID -from nautilus_trader.live.data_client cimport LiveDataClient +from nautilus_trader.live.data_client cimport LiveMarketDataClient from nautilus_trader.live.data_engine cimport LiveDataEngine from nautilus_trader.model.bar cimport Bar from nautilus_trader.model.bar cimport BarSpecification @@ -40,7 +40,6 @@ from nautilus_trader.model.c_enums.price_type cimport PriceType from nautilus_trader.model.c_enums.price_type cimport PriceTypeParser from nautilus_trader.model.identifiers cimport Symbol from nautilus_trader.model.identifiers cimport TradeMatchId -from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.instrument cimport Instrument from nautilus_trader.model.objects cimport Price from nautilus_trader.model.objects cimport Quantity @@ -52,7 +51,7 @@ from nautilus_trader.model.tick cimport TradeTick cdef int _SECONDS_IN_HOUR = 60 * 60 -cdef class CCXTDataClient(LiveDataClient): +cdef class CCXTDataClient(LiveMarketDataClient): """ Provides a data client for the unified CCXT Pro API. """ @@ -85,7 +84,7 @@ cdef class CCXTDataClient(LiveDataClient): """ super().__init__( - Venue(client.name.upper()), + client.name.upper(), engine, clock, logger, diff --git a/nautilus_trader/adapters/oanda/data.pxd b/nautilus_trader/adapters/oanda/data.pxd index 9d42a461f48b..ed7a21e18990 100644 --- a/nautilus_trader/adapters/oanda/data.pxd +++ b/nautilus_trader/adapters/oanda/data.pxd @@ -19,7 +19,7 @@ import threading from nautilus_trader.adapters.oanda.providers cimport OandaInstrumentProvider from nautilus_trader.core.uuid cimport UUID -from nautilus_trader.live.data_client cimport LiveDataClient +from nautilus_trader.live.data_client cimport LiveMarketDataClient from nautilus_trader.model.bar cimport Bar from nautilus_trader.model.bar cimport BarType from nautilus_trader.model.c_enums.price_type cimport PriceType @@ -29,7 +29,7 @@ from nautilus_trader.model.tick cimport QuoteTick from nautilus_trader.model.tick cimport TradeTick -cdef class OandaDataClient(LiveDataClient): +cdef class OandaDataClient(LiveMarketDataClient): cdef object _client cdef str _account_id cdef set _subscribed_instruments diff --git a/nautilus_trader/adapters/oanda/data.pyx b/nautilus_trader/adapters/oanda/data.pyx index a19b30c6e76b..d9ceaea2b11e 100644 --- a/nautilus_trader/adapters/oanda/data.pyx +++ b/nautilus_trader/adapters/oanda/data.pyx @@ -32,7 +32,7 @@ from nautilus_trader.core.constants cimport * # str constants only from nautilus_trader.core.correctness cimport Condition from nautilus_trader.core.datetime cimport format_iso8601 from nautilus_trader.core.uuid cimport UUID -from nautilus_trader.live.data_client cimport LiveDataClient +from nautilus_trader.live.data_client cimport LiveMarketDataClient from nautilus_trader.live.data_engine cimport LiveDataEngine from nautilus_trader.model.bar cimport Bar from nautilus_trader.model.bar cimport BarData @@ -42,7 +42,6 @@ from nautilus_trader.model.c_enums.bar_aggregation cimport BarAggregationParser from nautilus_trader.model.c_enums.price_type cimport PriceType from nautilus_trader.model.c_enums.price_type cimport PriceTypeParser from nautilus_trader.model.identifiers cimport Symbol -from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.instrument cimport Instrument from nautilus_trader.model.objects cimport Price from nautilus_trader.model.objects cimport Quantity @@ -52,7 +51,7 @@ from nautilus_trader.model.tick cimport QuoteTick cdef int _SECONDS_IN_HOUR = 60 * 60 -cdef class OandaDataClient(LiveDataClient): +cdef class OandaDataClient(LiveMarketDataClient): """ Provides a data client for the `Oanda` brokerage. """ @@ -83,7 +82,7 @@ cdef class OandaDataClient(LiveDataClient): """ super().__init__( - Venue("OANDA"), + "OANDA", engine, clock, logger, diff --git a/nautilus_trader/backtest/data_client.pxd b/nautilus_trader/backtest/data_client.pxd index d343bfd0d800..c11d303a8eee 100644 --- a/nautilus_trader/backtest/data_client.pxd +++ b/nautilus_trader/backtest/data_client.pxd @@ -13,9 +13,9 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- -from nautilus_trader.data.client cimport DataClient +from nautilus_trader.data.client cimport MarketDataClient -cdef class BacktestDataClient(DataClient): +cdef class BacktestMarketDataClient(MarketDataClient): cdef dict _instruments cdef bint _is_connected diff --git a/nautilus_trader/backtest/data_client.pyx b/nautilus_trader/backtest/data_client.pyx index 248167d08e35..7090133a8b63 100644 --- a/nautilus_trader/backtest/data_client.pyx +++ b/nautilus_trader/backtest/data_client.pyx @@ -23,15 +23,14 @@ from nautilus_trader.common.clock cimport Clock from nautilus_trader.common.logging cimport Logger from nautilus_trader.core.correctness cimport Condition from nautilus_trader.core.uuid cimport UUID -from nautilus_trader.data.client cimport DataClient +from nautilus_trader.data.client cimport MarketDataClient from nautilus_trader.data.engine cimport DataEngine from nautilus_trader.model.bar cimport BarType from nautilus_trader.model.identifiers cimport Symbol -from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.instrument cimport Instrument -cdef class BacktestDataClient(DataClient): +cdef class BacktestMarketDataClient(MarketDataClient): """ Provides an implementation of `DataClient` for backtesting. """ @@ -39,7 +38,7 @@ cdef class BacktestDataClient(DataClient): def __init__( self, list instruments not None, - Venue venue not None, + str name not None, DataEngine engine not None, Clock clock not None, Logger logger not None, @@ -60,7 +59,7 @@ cdef class BacktestDataClient(DataClient): """ super().__init__( - venue, + name, engine, clock, logger, @@ -68,7 +67,13 @@ cdef class BacktestDataClient(DataClient): self._instruments = {} for instrument in instruments: - Condition.equal(instrument.symbol.venue, self.venue, "instrument.symbol.venue", "self.venue") + # Check the instrument is for the correct client + Condition.equal( + instrument.symbol.venue.value, + self.name, + "instrument.symbol.venue.value", + "self.name", + ) self._instruments[instrument.symbol] = instrument self.is_connected = False diff --git a/nautilus_trader/backtest/engine.pyx b/nautilus_trader/backtest/engine.pyx index a64fe9126ab9..e6691fa9f9bd 100644 --- a/nautilus_trader/backtest/engine.pyx +++ b/nautilus_trader/backtest/engine.pyx @@ -18,7 +18,7 @@ import pytz from cpython.datetime cimport datetime from nautilus_trader.analysis.performance cimport PerformanceAnalyzer -from nautilus_trader.backtest.data_client cimport BacktestDataClient +from nautilus_trader.backtest.data_client cimport BacktestMarketDataClient from nautilus_trader.backtest.data_container cimport BacktestDataContainer from nautilus_trader.backtest.data_producer cimport BacktestDataProducer from nautilus_trader.backtest.data_producer cimport CachedProducer @@ -238,9 +238,9 @@ cdef class BacktestEngine: if instrument.symbol.venue == venue: instruments.append(instrument) - data_client = BacktestDataClient( + data_client = BacktestMarketDataClient( instruments=instruments, - venue=venue, + name=venue.value, engine=self._data_engine, clock=self._test_clock, logger=self._test_logger, diff --git a/nautilus_trader/data/base.pxd b/nautilus_trader/data/base.pxd index 73aecfc31af5..0c4689d491a6 100644 --- a/nautilus_trader/data/base.pxd +++ b/nautilus_trader/data/base.pxd @@ -27,6 +27,16 @@ from nautilus_trader.model.tick cimport QuoteTick from nautilus_trader.model.tick cimport TradeTick +cdef class Data: + cdef DataType data_type + cdef object data + + +cdef class DataType: + cdef type type + cdef dict metadata + + cdef class DataCacheFacade: # -- QUERIES --------------------------------------------------------------------------------------- # noqa diff --git a/nautilus_trader/data/base.pyx b/nautilus_trader/data/base.pyx index 5d70c61e0c48..b08f827e6259 100644 --- a/nautilus_trader/data/base.pyx +++ b/nautilus_trader/data/base.pyx @@ -25,6 +25,66 @@ from nautilus_trader.model.tick cimport QuoteTick from nautilus_trader.model.tick cimport TradeTick +cdef class Data: + """ + Represents wrapped data which includes data type information. + """ + + def __init__(self, DataType data_type not None, data not None): + """ + Initialize a new instance of the `Data` class. + + Parameters + ---------- + data_type : DataType + The data type. + data : object + The data object to wrap. + + """ + self.data_type = data_type + self.data = data + + +cdef class DataType: + """ + Represents a data type including its metadata. + """ + + def __init__(self, type data_type not None, dict metadata=None): + """ + Initialize a new instance of the `DataType` class. + + Parameters + ---------- + data_type : type + The PyObject type of the data. + metadata : dict + The data types metadata. + + """ + if metadata is None: + metadata = {} + + self.type = data_type + self.metadata = metadata + + def __eq__(self, DataType other) -> bool: + return self.type == other.type and self.metadata == other.metadata + + def __ne__(self, DataType other) -> bool: + return self.type != other.type or self.metadata != other.metadata + + def __hash__(self) -> int: + return hash((self.type, self.metadata)) + + def __str__(self) -> str: + return f"<{self.type.__name__}> {self.metadata}" + + def __repr__(self) -> str: + return f"{type(self).__name__}(type={self.type.__name__}, metadata={self.metadata})" + + cdef class DataCacheFacade: """ Provides a read-only facade for a `DataCache`. diff --git a/nautilus_trader/data/client.pxd b/nautilus_trader/data/client.pxd index aa55f3039bd4..65204d5d044f 100644 --- a/nautilus_trader/data/client.pxd +++ b/nautilus_trader/data/client.pxd @@ -19,11 +19,11 @@ from nautilus_trader.common.clock cimport Clock from nautilus_trader.common.logging cimport LoggerAdapter from nautilus_trader.common.uuid cimport UUIDFactory from nautilus_trader.core.uuid cimport UUID +from nautilus_trader.data.base cimport DataType from nautilus_trader.data.engine cimport DataEngine from nautilus_trader.model.bar cimport Bar from nautilus_trader.model.bar cimport BarType from nautilus_trader.model.identifiers cimport Symbol -from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.instrument cimport Instrument from nautilus_trader.model.order_book cimport OrderBook from nautilus_trader.model.tick cimport QuoteTick @@ -37,18 +37,35 @@ cdef class DataClient: cdef DataEngine _engine cdef dict _config - cdef readonly Venue venue + cdef readonly str name """The clients venue.\n\n:returns: `Venue`""" cdef readonly bint is_connected """If the client is connected.\n\n:returns: `bool`""" - cpdef list unavailable_methods(self) - cpdef void connect(self) except * cpdef void disconnect(self) except * cpdef void reset(self) except * cpdef void dispose(self) except * +# -- SUBSCRIPTIONS --------------------------------------------------------------------------------- + + cpdef void subscribe(self, DataType data_type) except * + cpdef void unsubscribe(self, DataType data_type) except * + +# -- REQUEST HANDLERS ------------------------------------------------------------------------------ + + cpdef void request(self, DataType data_type, UUID correlation_id) except * + +# -- DATA HANDLERS --------------------------------------------------------------------------------- + + cdef void _handle_data(self, DataType data_type, data) except * + cdef void _handle_data_response(self, DataType data_type, data, UUID correlation_id) except * + + +cdef class MarketDataClient(DataClient): + + cpdef list unavailable_methods(self) + # -- SUBSCRIPTIONS --------------------------------------------------------------------------------- cpdef void subscribe_instrument(self, Symbol symbol) except * diff --git a/nautilus_trader/data/client.pyx b/nautilus_trader/data/client.pyx index d5d3b08a3c43..2e0358dfb99b 100644 --- a/nautilus_trader/data/client.pyx +++ b/nautilus_trader/data/client.pyx @@ -27,14 +27,15 @@ from nautilus_trader.common.logging cimport Logger from nautilus_trader.common.logging cimport LoggerAdapter from nautilus_trader.common.uuid cimport UUIDFactory from nautilus_trader.core.constants cimport * # str constants only +from nautilus_trader.core.correctness cimport Condition from nautilus_trader.core.uuid cimport UUID +from nautilus_trader.data.base cimport Data from nautilus_trader.data.engine cimport DataEngine from nautilus_trader.data.messages cimport DataResponse from nautilus_trader.model.bar cimport Bar from nautilus_trader.model.bar cimport BarData from nautilus_trader.model.bar cimport BarType from nautilus_trader.model.identifiers cimport Symbol -from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.instrument cimport Instrument from nautilus_trader.model.order_book cimport OrderBook from nautilus_trader.model.tick cimport QuoteTick @@ -50,7 +51,7 @@ cdef class DataClient: def __init__( self, - Venue venue not None, + str name not None, DataEngine engine not None, Clock clock not None, Logger logger not None, @@ -59,8 +60,10 @@ cdef class DataClient: """ Initialize a new instance of the `DataClient` class. - venue : Venue - The venue the client can provide data for. + Parameters + ---------- + name : Venue + The data client name. engine : DataEngine The data engine to connect to the client. clock : Clock @@ -71,22 +74,122 @@ cdef class DataClient: The configuration options. """ + Condition.valid_string(name, "name") + if config is None: config = {} self._clock = clock self._uuid_factory = UUIDFactory() - self._log = LoggerAdapter(config.get("name", f"DataClient-{venue.value}"), logger) + self._log = LoggerAdapter(config.get("name", f"DataClient-{name}"), logger) self._engine = engine self._config = config - self.venue = venue + self.name = name self.is_connected = False self._log.info("Initialized.") def __repr__(self) -> str: - return f"{type(self).__name__}-{self.venue}" + return f"{type(self).__name__}-{self.name}" + + cpdef void connect(self) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + + cpdef void disconnect(self) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + + cpdef void reset(self) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + + cpdef void dispose(self) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + +# -- SUBSCRIPTIONS --------------------------------------------------------------------------------- + + cpdef void subscribe(self, DataType data_type) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + + cpdef void unsubscribe(self, DataType data_type) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + +# -- REQUESTS -------------------------------------------------------------------------------------- + + cpdef void request(self, DataType data_type, UUID correlation_id) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + +# -- PYTHON WRAPPERS ------------------------------------------------------------------------------- + + def _handle_data_py(self, DataType data_type, data): + self._engine.process(Data(data_type, data)) + + def _handle_data_response_py(self, DataType data_type, data, UUID correlation_id): + self._handle_data_response(data_type, data, correlation_id) + +# -- DATA HANDLERS --------------------------------------------------------------------------------- + + cdef void _handle_data(self, DataType data_type, data) except *: + self._engine.process(Data(data_type, data)) + + cdef void _handle_data_response(self, DataType data_type, data, UUID correlation_id) except *: + cdef DataResponse response = DataResponse( + provider=self.name, + data_type=data_type, + data=data, + correlation_id=correlation_id, + response_id=self._uuid_factory.generate(), + response_timestamp=self._clock.utc_now_c(), + ) + + self._engine.receive(response) + + +cdef class MarketDataClient(DataClient): + """ + The abstract base class for all market data clients. + + This class should not be used directly, but through its concrete subclasses. + """ + + def __init__( + self, + str name not None, + DataEngine engine not None, + Clock clock not None, + Logger logger not None, + dict config=None, + ): + """ + Initialize a new instance of the `MarketDataClient` class. + + Parameters + ---------- + name : str + The data client name (normally the venue). + engine : DataEngine + The data engine to connect to the client. + clock : Clock + The clock for the component. + logger : Logger + The logger for the component. + config : dict[str, object], optional + The configuration options. + + """ + super().__init__( + name=name, + engine=engine, + clock=clock, + logger=logger, + config=config, + ) cpdef list unavailable_methods(self): """ @@ -118,6 +221,14 @@ cdef class DataClient: # -- SUBSCRIPTIONS --------------------------------------------------------------------------------- + cpdef void subscribe(self, DataType data_type) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + + cpdef void unsubscribe(self, DataType data_type) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + cpdef void subscribe_instrument(self, Symbol symbol) except *: """Abstract method (implement in subclass).""" raise NotImplementedError("method must be implemented in the subclass") @@ -160,6 +271,10 @@ cdef class DataClient: # -- REQUESTS -------------------------------------------------------------------------------------- + cpdef void request(self, DataType datatype, UUID correlation_id) except *: + """Abstract method (implement in subclass).""" + raise NotImplementedError("method must be implemented in the subclass") + cpdef void request_instrument(self, Symbol symbol, UUID correlation_id) except *: """Abstract method (implement in subclass).""" raise NotImplementedError("method must be implemented in the subclass") @@ -249,9 +364,8 @@ cdef class DataClient: cdef void _handle_instruments(self, list instruments, UUID correlation_id) except *: cdef DataResponse response = DataResponse( - venue=self.venue, - data_type=Instrument, - metadata={}, + provider=self.name, + data_type=DataType(Instrument), data=instruments, correlation_id=correlation_id, response_id=self._uuid_factory.generate(), @@ -262,9 +376,8 @@ cdef class DataClient: cdef void _handle_quote_ticks(self, Symbol symbol, list ticks, UUID correlation_id) except *: cdef DataResponse response = DataResponse( - venue=self.venue, - data_type=QuoteTick, - metadata={SYMBOL: symbol}, + provider=self.name, + data_type=DataType(QuoteTick, metadata={SYMBOL: symbol}), data=ticks, correlation_id=correlation_id, response_id=self._uuid_factory.generate(), @@ -275,9 +388,8 @@ cdef class DataClient: cdef void _handle_trade_ticks(self, Symbol symbol, list ticks, UUID correlation_id) except *: cdef DataResponse response = DataResponse( - venue=self.venue, - data_type=TradeTick, - metadata={SYMBOL: symbol}, + provider=self.name, + data_type=DataType(TradeTick, metadata={SYMBOL: symbol}), data=ticks, correlation_id=correlation_id, response_id=self._uuid_factory.generate(), @@ -288,9 +400,8 @@ cdef class DataClient: cdef void _handle_bars(self, BarType bar_type, list bars, Bar partial, UUID correlation_id) except *: cdef DataResponse response = DataResponse( - venue=self.venue, - data_type=Bar, - metadata={BAR_TYPE: bar_type, "Partial": partial}, + provider=self.name, + data_type=DataType(Bar, metadata={BAR_TYPE: bar_type, "Partial": partial}), data=bars, correlation_id=correlation_id, response_id=self._uuid_factory.generate(), diff --git a/nautilus_trader/data/engine.pxd b/nautilus_trader/data/engine.pxd index 24b27ce7c4ff..4be8b90350d0 100644 --- a/nautilus_trader/data/engine.pxd +++ b/nautilus_trader/data/engine.pxd @@ -20,8 +20,10 @@ from nautilus_trader.common.timer cimport TimeEvent from nautilus_trader.core.constants cimport * # str constants only from nautilus_trader.core.uuid cimport UUID from nautilus_trader.data.aggregation cimport TimeBarAggregator +from nautilus_trader.data.base cimport DataType from nautilus_trader.data.cache cimport DataCache from nautilus_trader.data.client cimport DataClient +from nautilus_trader.data.client cimport MarketDataClient from nautilus_trader.data.messages cimport DataCommand from nautilus_trader.data.messages cimport DataRequest from nautilus_trader.data.messages cimport DataResponse @@ -90,16 +92,18 @@ cdef class DataEngine(Component): cdef inline void _execute_command(self, DataCommand command) except * cdef inline void _handle_subscribe(self, DataClient client, Subscribe command) except * cdef inline void _handle_unsubscribe(self, DataClient client, Unsubscribe command) except * - cdef inline void _handle_subscribe_instrument(self, DataClient client, Symbol symbol, handler: callable) except * - cdef inline void _handle_subscribe_order_book(self, DataClient client, Symbol symbol, dict metadata, handler: callable) except * - cdef inline void _handle_subscribe_quote_ticks(self, DataClient client, Symbol symbol, handler: callable) except * - cdef inline void _handle_subscribe_trade_ticks(self, DataClient client, Symbol symbol, handler: callable) except * - cdef inline void _handle_subscribe_bars(self, DataClient client, BarType bar_type, handler: callable) except * - cdef inline void _handle_unsubscribe_instrument(self, DataClient client, Symbol symbol, handler: callable) except * - cdef inline void _handle_unsubscribe_order_book(self, DataClient client, Symbol symbol, dict metadata, handler: callable) except * - cdef inline void _handle_unsubscribe_quote_ticks(self, DataClient client, Symbol symbol, handler: callable) except * - cdef inline void _handle_unsubscribe_trade_ticks(self, DataClient client, Symbol symbol, handler: callable) except * - cdef inline void _handle_unsubscribe_bars(self, DataClient client, BarType bar_type, handler: callable) except * + cdef inline void _handle_subscribe_instrument(self, MarketDataClient client, Symbol symbol, handler: callable) except * + cdef inline void _handle_subscribe_order_book(self, MarketDataClient client, Symbol symbol, dict metadata, handler: callable) except * + cdef inline void _handle_subscribe_quote_ticks(self, MarketDataClient client, Symbol symbol, handler: callable) except * + cdef inline void _handle_subscribe_trade_ticks(self, MarketDataClient client, Symbol symbol, handler: callable) except * + cdef inline void _handle_subscribe_bars(self, MarketDataClient client, BarType bar_type, handler: callable) except * + cdef inline void _handle_subscribe_data(self, DataClient client, DataType data_type, handler: callable) except * + cdef inline void _handle_unsubscribe_instrument(self, MarketDataClient client, Symbol symbol, handler: callable) except * + cdef inline void _handle_unsubscribe_order_book(self, MarketDataClient client, Symbol symbol, dict metadata, handler: callable) except * + cdef inline void _handle_unsubscribe_quote_ticks(self, MarketDataClient client, Symbol symbol, handler: callable) except * + cdef inline void _handle_unsubscribe_trade_ticks(self, MarketDataClient client, Symbol symbol, handler: callable) except * + cdef inline void _handle_unsubscribe_bars(self, MarketDataClient client, BarType bar_type, handler: callable) except * + cdef inline void _handle_unsubscribe_data(self, DataClient client, DataType data_type, handler: callable) except * cdef inline void _handle_request(self, DataRequest request) except * # -- DATA HANDLERS --------------------------------------------------------------------------------- @@ -124,9 +128,9 @@ cdef class DataEngine(Component): cpdef void _internal_update_instruments(self, list instruments) except * cpdef void _snapshot_order_book(self, TimeEvent snap_event) except * - cdef inline void _start_bar_aggregator(self, DataClient client, BarType bar_type) except * - cdef inline void _hydrate_aggregator(self, DataClient client, TimeBarAggregator aggregator, BarType bar_type) except * - cdef inline void _stop_bar_aggregator(self, DataClient client, BarType bar_type) except * + cdef inline void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type) except * + cdef inline void _hydrate_aggregator(self, MarketDataClient client, TimeBarAggregator aggregator, BarType bar_type) except * + cdef inline void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type) except * cdef inline void _bulk_build_tick_bars( self, BarType bar_type, diff --git a/nautilus_trader/data/engine.pyx b/nautilus_trader/data/engine.pyx index 006fd3d5cda0..150b7eda857a 100644 --- a/nautilus_trader/data/engine.pyx +++ b/nautilus_trader/data/engine.pyx @@ -51,7 +51,9 @@ from nautilus_trader.data.aggregation cimport TickBarAggregator from nautilus_trader.data.aggregation cimport TimeBarAggregator from nautilus_trader.data.aggregation cimport ValueBarAggregator from nautilus_trader.data.aggregation cimport VolumeBarAggregator +from nautilus_trader.data.base cimport DataType from nautilus_trader.data.client cimport DataClient +from nautilus_trader.data.client cimport MarketDataClient from nautilus_trader.data.messages cimport DataCommand from nautilus_trader.data.messages cimport DataRequest from nautilus_trader.data.messages cimport DataResponse @@ -64,7 +66,6 @@ from nautilus_trader.model.c_enums.bar_aggregation cimport BarAggregation from nautilus_trader.model.c_enums.bar_aggregation cimport BarAggregationParser from nautilus_trader.model.c_enums.price_type cimport PriceType from nautilus_trader.model.identifiers cimport Symbol -from nautilus_trader.model.identifiers cimport Venue from nautilus_trader.model.instrument cimport Instrument from nautilus_trader.model.order_book cimport OrderBook from nautilus_trader.model.tick cimport QuoteTick @@ -106,7 +107,7 @@ cdef class DataEngine(Component): super().__init__(clock, logger, name="DataEngine") self._use_previous_close = config.get("use_previous_close", True) - self._clients = {} # type: dict[Venue, DataClient] + self._clients = {} # type: dict[str, DataClient] self._correlation_index = {} # type: dict[UUID, callable] # Handlers @@ -115,7 +116,7 @@ cdef class DataEngine(Component): self._quote_tick_handlers = {} # type: dict[Symbol, list[callable]] self._trade_tick_handlers = {} # type: dict[Symbol, list[callable]] self._bar_handlers = {} # type: dict[BarType, list[callable]] - self._data_handlers = {} # type: dict[type, list[callable]] + self._data_handlers = {} # type: dict[DataType, list[callable]] # Aggregators self._bar_aggregators = {} # type: dict[BarType, BarAggregator] @@ -136,13 +137,13 @@ cdef class DataEngine(Component): self._log.info(f"use_previous_close={self._use_previous_close}") @property - def registered_venues(self): + def registered_clients(self): """ - The venues registered with the data engine. + The data clients registered with the data engine. Returns ------- - list[Venue] + list[str] """ return sorted(list(self._clients.keys())) @@ -258,9 +259,9 @@ cdef class DataEngine(Component): """ Condition.not_none(client, "client") - Condition.not_in(client.venue, self._clients, "client", "self._clients") + Condition.not_in(client.name, self._clients, "client", "self._clients") - self._clients[client.venue] = client + self._clients[client.name] = client self._log.info(f"Registered {client}.") @@ -291,9 +292,9 @@ cdef class DataEngine(Component): """ Condition.not_none(client, "client") - Condition.is_in(client.venue, self._clients, "client.venue", "self._clients") + Condition.is_in(client.name, self._clients, "client.name", "self._clients") - del self._clients[client.venue] + del self._clients[client.name] self._log.info(f"Deregistered {client}.") # -- ABSTRACT METHODS ------------------------------------------------------------------------------ @@ -415,10 +416,10 @@ cdef class DataEngine(Component): self._log.debug(f"{RECV}{CMD} {command}.") self.command_count += 1 - cdef DataClient client = self._clients.get(command.venue) + cdef DataClient client = self._clients.get(command.provider) if client is None: self._log.error(f"Cannot handle command " - f"(no client registered for {command.venue}) {command}.") + f"(no client registered for {command.provider}) {command}.") return # No client to handle command if isinstance(command, Subscribe): @@ -429,78 +430,84 @@ cdef class DataEngine(Component): self._log.error(f"Cannot handle unrecognized command {command}.") cdef inline void _handle_subscribe(self, DataClient client, Subscribe command) except *: - if command.data_type == Instrument: + if command.data_type.type == Instrument: self._handle_subscribe_instrument( client, - command.metadata.get(SYMBOL), + command.data_type.metadata.get(SYMBOL), command.handler, ) - elif command.data_type == OrderBook: + elif command.data_type.type == OrderBook: self._handle_subscribe_order_book( client, - command.metadata.get(SYMBOL), - command.metadata, + command.data_type.metadata.get(SYMBOL), + command.data_type.metadata, command.handler, ) - elif command.data_type == QuoteTick: + elif command.data_type.type == QuoteTick: self._handle_subscribe_quote_ticks( client, - command.metadata.get(SYMBOL), + command.data_type.metadata.get(SYMBOL), command.handler, ) - elif command.data_type == TradeTick: + elif command.data_type.type == TradeTick: self._handle_subscribe_trade_ticks( client, - command.metadata.get(SYMBOL), + command.data_type.metadata.get(SYMBOL), command.handler, ) - elif command.data_type == Bar: + elif command.data_type.type == Bar: self._handle_subscribe_bars( client, - command.metadata.get(BAR_TYPE), + command.data_type.metadata.get(BAR_TYPE), command.handler, ) else: - self._log.error(f"Cannot subscribe to unrecognized data type {command.data_type}.") + try: + client.subscribe(command.data_type) + except NotImplementedError: + self._log.error(f"Cannot subscribe to unrecognized data type {command.data_type}.") cdef inline void _handle_unsubscribe(self, DataClient client, Unsubscribe command) except *: - if command.data_type == Instrument: + if command.data_type.type == Instrument: self._handle_unsubscribe_instrument( client, - command.metadata.get(SYMBOL), + command.data_type.metadata.get(SYMBOL), command.handler, ) - elif command.data_type == OrderBook: + elif command.data_type.type == OrderBook: self._handle_unsubscribe_order_book( client, - command.metadata.get(SYMBOL), - command.metadata, + command.data_type.metadata.get(SYMBOL), + command.data_type.metadata, command.handler, ) - elif command.data_type == QuoteTick: + elif command.data_type.type == QuoteTick: self._handle_unsubscribe_quote_ticks( client, - command.metadata.get(SYMBOL), + command.data_type.metadata.get(SYMBOL), command.handler, ) - elif command.data_type == TradeTick: + elif command.data_type.type == TradeTick: self._handle_unsubscribe_trade_ticks( client, - command.metadata.get(SYMBOL), + command.data_type.metadata.get(SYMBOL), command.handler, ) - elif command.data_type == Bar: + elif command.data_type.type == Bar: self._handle_unsubscribe_bars( client, - command.metadata.get(BAR_TYPE), + command.data_type.metadata.get(BAR_TYPE), command.handler, ) else: - self._log.error(f"Cannot unsubscribe from unrecognized data type {command.data_type}.") + try: + client.unsubscribe(command.data_type) + except NotImplementedError: + self._log.error(f"Cannot subscribe to unrecognized data type {command.data_type}.") cdef inline void _handle_subscribe_instrument( self, - DataClient client, + MarketDataClient client, Symbol symbol, handler: callable, ) except *: @@ -522,7 +529,7 @@ cdef class DataEngine(Component): cdef inline void _handle_subscribe_order_book( self, - DataClient client, + MarketDataClient client, Symbol symbol, dict metadata, handler: callable, @@ -576,7 +583,7 @@ cdef class DataEngine(Component): cdef inline void _handle_subscribe_quote_ticks( self, - DataClient client, + MarketDataClient client, Symbol symbol, handler: callable, ) except *: @@ -599,7 +606,7 @@ cdef class DataEngine(Component): cdef inline void _handle_subscribe_trade_ticks( self, - DataClient client, + MarketDataClient client, Symbol symbol, handler: callable, ) except *: @@ -622,7 +629,7 @@ cdef class DataEngine(Component): cdef inline void _handle_subscribe_bars( self, - DataClient client, + MarketDataClient client, BarType bar_type, handler: callable, ) except *: @@ -649,9 +656,32 @@ cdef class DataEngine(Component): else: self._log.warning(f"Handler {handler} already subscribed to {bar_type} data.") - cdef inline void _handle_unsubscribe_instrument( + cdef inline void _handle_subscribe_data( self, DataClient client, + DataType data_type, + handler: callable, + ) except *: + Condition.not_none(client, "client") + Condition.not_none(data_type, "data_type") + Condition.callable(handler, "handler") + + if data_type not in self._data_handlers: + # Setup handlers + self._data_handlers[data_type] = [] # type: list[callable] + client.subscribe(data_type) + self._log.info(f"Subscribed to {data_type} data.") + + # Add handler for subscriber + if handler not in self._data_handlers[data_type]: + self._data_handlers[data_type].append(handler) + self._log.debug(f"Added {handler} for {data_type} data.") + else: + self._log.warning(f"Handler {handler} already subscribed to {data_type} data.") + + cdef inline void _handle_unsubscribe_instrument( + self, + MarketDataClient client, Symbol symbol, handler: callable, ) except *: @@ -678,7 +708,7 @@ cdef class DataEngine(Component): cdef inline void _handle_unsubscribe_order_book( self, - DataClient client, + MarketDataClient client, Symbol symbol, dict metadata, handler: callable, @@ -732,7 +762,7 @@ cdef class DataEngine(Component): cdef inline void _handle_unsubscribe_quote_ticks( self, - DataClient client, + MarketDataClient client, Symbol symbol, handler: callable, ) except *: @@ -759,7 +789,7 @@ cdef class DataEngine(Component): cdef inline void _handle_unsubscribe_trade_ticks( self, - DataClient client, + MarketDataClient client, Symbol symbol, handler: callable, ) except *: @@ -786,7 +816,7 @@ cdef class DataEngine(Component): cdef inline void _handle_unsubscribe_bars( self, - DataClient client, + MarketDataClient client, BarType bar_type, handler: callable, ) except *: @@ -814,16 +844,43 @@ cdef class DataEngine(Component): client.unsubscribe_bars(bar_type) self._log.info(f"Unsubscribed from {bar_type} data.") + cdef inline void _handle_unsubscribe_data( + self, + DataClient client, + DataType data_type, + handler: callable, + ) except *: + Condition.not_none(client, "client") + Condition.not_none(data_type, "data_type") + Condition.callable(handler, "handler") + + if data_type not in self._data_handlers: + self._log.warning(f"Handler {handler} not subscribed to {data_type} data.") + return + + # Remove subscribers handler + if handler in self._data_handlers[data_type]: + self._data_handlers[data_type].remove(handler) + self._log.debug(f"Removed handler {handler} for {data_type} data.") + else: + self._log.warning(f"Handler {handler} not subscribed to {data_type} data.") + + if not self._data_handlers[data_type]: + # No more handlers for data type + del self._data_handlers[data_type] + client.unsubscribe(data_type) + self._log.info(f"Unsubscribed from {data_type} data.") + # -- REQUEST HANDLERS ------------------------------------------------------------------------------ cdef inline void _handle_request(self, DataRequest request) except *: self._log.debug(f"{RECV}{REQ} {request}.") self.request_count += 1 - cdef DataClient client = self._clients.get(request.venue) + cdef DataClient client = self._clients.get(request.provider) if client is None: self._log.error(f"Cannot handle request " - f"(no client registered for {request.venue}) {request}.") + f"(no client registered for {request.provider}) {request}.") return # No client to handle request if request.id in self._correlation_index: @@ -833,44 +890,49 @@ cdef class DataEngine(Component): self._correlation_index[request.id] = request.callback - if request.data_type == Instrument: - symbol = request.metadata.get(SYMBOL) + if request.data_type.type == Instrument: + Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") + symbol = request.data_type.metadata.get(SYMBOL) if symbol: client.request_instrument(symbol, request.id) else: client.request_instruments(request.id) - elif request.data_type == QuoteTick: + elif request.data_type.type == QuoteTick: + Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") client.request_quote_ticks( - request.metadata.get(SYMBOL), - request.metadata.get(FROM_DATETIME), - request.metadata.get(TO_DATETIME), - request.metadata.get(LIMIT, 0), + request.data_type.metadata.get(SYMBOL), + request.data_type.metadata.get(FROM_DATETIME), + request.data_type.metadata.get(TO_DATETIME), + request.data_type.metadata.get(LIMIT, 0), request.id, ) - elif request.data_type == TradeTick: + elif request.data_type.type == TradeTick: + Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") client.request_trade_ticks( - request.metadata.get(SYMBOL), - request.metadata.get(FROM_DATETIME), - request.metadata.get(TO_DATETIME), - request.metadata.get(LIMIT, 0), + request.data_type.metadata.get(SYMBOL), + request.data_type.metadata.get(FROM_DATETIME), + request.data_type.metadata.get(TO_DATETIME), + request.data_type.metadata.get(LIMIT, 0), request.id, ) - elif request.data_type == Bar: + elif request.data_type.type == Bar: + Condition.true(isinstance(client, MarketDataClient), "client was not a MarketDataClient") client.request_bars( - request.metadata.get(BAR_TYPE), - request.metadata.get(FROM_DATETIME), - request.metadata.get(TO_DATETIME), - request.metadata.get(LIMIT, 0), + request.data_type.metadata.get(BAR_TYPE), + request.data_type.metadata.get(FROM_DATETIME), + request.data_type.metadata.get(TO_DATETIME), + request.data_type.metadata.get(LIMIT, 0), request.id, ) else: - self._log.error(f"Cannot handle request " - f"(data type {request.data_type} is unrecognized).") + try: + client.request(request.data_type, request.id) + except NotImplementedError: + self._log.error(f"Cannot handle request ({request.data_type} is unrecognized).") # -- DATA HANDLERS --------------------------------------------------------------------------------- cdef inline void _handle_data(self, data) except *: - # Not logging every data item received self.data_count += 1 if isinstance(data, OrderBook): @@ -948,17 +1010,17 @@ cdef class DataEngine(Component): self._log.debug(f"{RECV}{RES} {response}.") self.response_count += 1 - if response.data_type == Instrument: + if response.data_type.type == Instrument: self._handle_instruments(response.data, response.correlation_id) - elif response.data_type == QuoteTick: + elif response.data_type.type == QuoteTick: self._handle_quote_ticks(response.data, response.correlation_id) - elif response.data_type == TradeTick: + elif response.data_type.type == TradeTick: self._handle_trade_ticks(response.data, response.correlation_id) - elif response.data_type == Bar: + elif response.data_type.type == Bar: self._handle_bars( - response.metadata.get(BAR_TYPE), + response.data_type.metadata.get(BAR_TYPE), response.data, - response.metadata.get("Partial"), + response.data_type.metadata.get("Partial"), response.correlation_id, ) else: @@ -1051,7 +1113,7 @@ cdef class DataEngine(Component): for handler in handlers: handler(order_book) - cdef inline void _start_bar_aggregator(self, DataClient client, BarType bar_type) except *: + cdef inline void _start_bar_aggregator(self, MarketDataClient client, BarType bar_type) except *: if bar_type.spec.is_time_aggregated(): # Create aggregator aggregator = TimeBarAggregator( @@ -1098,30 +1160,31 @@ cdef class DataEngine(Component): cdef inline void _hydrate_aggregator( self, - DataClient client, + MarketDataClient client, TimeBarAggregator aggregator, BarType bar_type, ) except *: - data_type = TradeTick if bar_type.spec.price_type == PriceType.LAST else QuoteTick + data_type = type(TradeTick) if bar_type.spec.price_type == PriceType.LAST else QuoteTick - if data_type == TradeTick and "request_trade_ticks" in client.unavailable_methods(): + if data_type == type(TradeTick) and "request_trade_ticks" in client.unavailable_methods(): return - elif data_type == QuoteTick and "request_quote_ticks" in client.unavailable_methods(): + elif data_type == type(QuoteTick) and "request_quote_ticks" in client.unavailable_methods(): return # Update aggregator with latest data bulk_updater = BulkTimeBarUpdater(aggregator) + metadata = { + SYMBOL: bar_type.symbol, + FROM_DATETIME: aggregator.get_start_time(), + TO_DATETIME: None, + } + # noinspection bulk_updater.receive # noinspection PyUnresolvedReferences request = DataRequest( - venue=bar_type.symbol.venue, - data_type=data_type, - metadata={ - SYMBOL: bar_type.symbol, - FROM_DATETIME: aggregator.get_start_time(), - TO_DATETIME: None, - }, + provider=bar_type.symbol.venue.value, + data_type=DataType(data_type, metadata), callback=bulk_updater.receive, request_id=self._uuid_factory.generate(), request_timestamp=self._clock.utc_now_c(), @@ -1130,7 +1193,7 @@ cdef class DataEngine(Component): # Send request directly to handler as we're already inside engine self._handle_request(request) - cdef inline void _stop_bar_aggregator(self, DataClient client, BarType bar_type) except *: + cdef inline void _stop_bar_aggregator(self, MarketDataClient client, BarType bar_type) except *: cdef aggregator = self._bar_aggregators.get(bar_type) if aggregator is None: self._log.warning(f"No bar aggregator to stop for {bar_type}") diff --git a/nautilus_trader/data/messages.pxd b/nautilus_trader/data/messages.pxd index 5f0edfb0d1b9..1f72b1a97891 100644 --- a/nautilus_trader/data/messages.pxd +++ b/nautilus_trader/data/messages.pxd @@ -16,16 +16,14 @@ from nautilus_trader.core.message cimport Command from nautilus_trader.core.message cimport Request from nautilus_trader.core.message cimport Response -from nautilus_trader.model.identifiers cimport Venue +from nautilus_trader.data.base cimport DataType cdef class DataCommand(Command): - cdef readonly Venue venue - """The venue for the command.\n\n:returns: `Venue`""" - cdef readonly type data_type + cdef readonly str provider + """The data client name for the command.\n\n:returns: `str`""" + cdef readonly DataType data_type """The command data type.\n\n:returns: `type`""" - cdef readonly dict metadata - """The command metadata.\n\n:returns: `dict`""" cdef readonly object handler """The handler for the command.\n\n:returns: `callable`""" @@ -39,22 +37,18 @@ cdef class Unsubscribe(DataCommand): cdef class DataRequest(Request): - cdef readonly Venue venue - """The venue for the request.\n\n:returns: `Venue`""" - cdef readonly type data_type + cdef readonly str provider + """The data client name for the request.\n\n:returns: `str`""" + cdef readonly DataType data_type """The request data type.\n\n:returns: `type`""" - cdef readonly dict metadata - """The request metadata.\n\n:returns: `dict`""" cdef readonly object callback """The callback to receive the data.\n\n:returns: `callable`""" cdef class DataResponse(Response): - cdef readonly Venue venue - """The venue of the response.\n\n:returns: `Venue`""" - cdef readonly type data_type + cdef readonly str provider + """The data client name for the response.\n\n:returns: `str`""" + cdef readonly DataType data_type """The response data type.\n\n:returns: `type`""" - cdef readonly dict metadata - """The response metadata.\n\n:returns: `dict`""" cdef readonly list data """The response data.\n\n:returns: `list`""" diff --git a/nautilus_trader/data/messages.pyx b/nautilus_trader/data/messages.pyx index d29795f257bb..4e59bcb6a0fc 100644 --- a/nautilus_trader/data/messages.pyx +++ b/nautilus_trader/data/messages.pyx @@ -16,6 +16,7 @@ from cpython.datetime cimport datetime from nautilus_trader.core.uuid cimport UUID +from nautilus_trader.data.base cimport DataType cdef class DataCommand(Command): @@ -27,9 +28,8 @@ cdef class DataCommand(Command): def __init__( self, - Venue venue not None, - type data_type not None, - dict metadata not None, + str provider not None, + DataType data_type not None, handler not None: callable, UUID command_id not None, datetime command_timestamp not None, @@ -38,12 +38,10 @@ cdef class DataCommand(Command): Parameters ---------- - venue : Venue - The venue for the command. + provider : str + The data client name for the command. data_type : type The data type for the command. - metadata : type - The metadata for the command. handler : callable The handler for the command. command_id : UUID @@ -54,16 +52,14 @@ cdef class DataCommand(Command): """ super().__init__(command_id, command_timestamp) - self.venue = venue + self.provider = provider self.data_type = data_type - self.metadata = metadata self.handler = handler def __repr__(self) -> str: return (f"{type(self).__name__}(" - f"venue={self.venue}, " - f"data_type={self.data_type.__name__}, " - f"metadata={self.metadata}, " + f"provider={self.provider}, " + f"data_type={self.data_type}, " f"handler={self.handler}, " f"id={self.id}, " f"timestamp={self.timestamp})") @@ -76,9 +72,8 @@ cdef class Subscribe(DataCommand): def __init__( self, - Venue venue not None, - type data_type not None, - dict metadata not None, + str provider not None, + DataType data_type not None, handler not None: callable, UUID command_id not None, datetime command_timestamp not None, @@ -88,12 +83,10 @@ cdef class Subscribe(DataCommand): Parameters ---------- - venue : Venue - The venue for the command. + provider : str + The data client name for the command. data_type : type The data type for the subscription. - metadata : type - The metadata for the subscription. handler : callable The handler for the subscription. command_id : UUID @@ -103,9 +96,8 @@ cdef class Subscribe(DataCommand): """ super().__init__( - venue, + provider, data_type, - metadata, handler, command_id, command_timestamp, @@ -119,9 +111,8 @@ cdef class Unsubscribe(DataCommand): def __init__( self, - Venue venue not None, - type data_type not None, - dict metadata not None, + str provider not None, + DataType data_type not None, handler not None: callable, UUID command_id not None, datetime command_timestamp not None, @@ -131,12 +122,10 @@ cdef class Unsubscribe(DataCommand): Parameters ---------- - venue : Venue - The venue for the command. + provider : str + The data client name for the command. data_type : type The data type to unsubscribe from. - metadata : type - The metadata of the subscription. handler : callable The handler for the subscription. command_id : UUID @@ -146,9 +135,8 @@ cdef class Unsubscribe(DataCommand): """ super().__init__( - venue, + provider, data_type, - metadata, handler, command_id, command_timestamp, @@ -162,9 +150,8 @@ cdef class DataRequest(Request): def __init__( self, - Venue venue not None, - type data_type not None, - dict metadata not None, + str provider not None, + DataType data_type not None, callback not None: callable, UUID request_id not None, datetime request_timestamp not None, @@ -174,12 +161,10 @@ cdef class DataRequest(Request): Parameters ---------- - venue : Venue - The venue for the request. + provider : str + The data client name for the request. data_type : type The data type for the request. - metadata : type - The metadata for the request. callback : callable The callback to receive the data. request_id : UUID @@ -193,16 +178,14 @@ cdef class DataRequest(Request): request_timestamp, ) - self.venue = venue + self.provider = provider self.data_type = data_type - self.metadata = metadata self.callback = callback def __repr__(self) -> str: return (f"{type(self).__name__}(" - f"venue={self.venue}, " - f"data_type={self.data_type.__name__}, " - f"metadata={self.metadata}, " + f"provider={self.provider}, " + f"data_type={self.data_type}, " f"callback={self.callback}, " f"id={self.id}, " f"timestamp={self.timestamp})") @@ -215,9 +198,8 @@ cdef class DataResponse(Response): def __init__( self, - Venue venue not None, - type data_type not None, - dict metadata not None, + str provider not None, + DataType data_type not None, list data not None, UUID correlation_id not None, UUID response_id not None, @@ -228,12 +210,10 @@ cdef class DataResponse(Response): Parameters ---------- - venue : Venue - The venue of the response. + provider : str + The data provider name of the response. data_type : type The data type of the response. - metadata : dict - The metadata of the response. data : list The data of the response. correlation_id : UUID @@ -250,16 +230,14 @@ cdef class DataResponse(Response): response_timestamp, ) - self.venue = venue + self.provider = provider self.data_type = data_type - self.metadata = metadata self.data = data def __repr__(self) -> str: return (f"{type(self).__name__}(" - f"venue={self.venue}, " - f"data_type={self.data_type.__name__}, " - f"metadata={self.metadata}, " + f"provider={self.provider}, " + f"data_type={self.data_type}, " f"len_data={len(self.data)}, " f"correlation_id={self.correlation_id}, " f"id={self.id}, " diff --git a/nautilus_trader/execution/engine.pyx b/nautilus_trader/execution/engine.pyx index 71a83e0455d4..b9c2748caf95 100644 --- a/nautilus_trader/execution/engine.pyx +++ b/nautilus_trader/execution/engine.pyx @@ -35,6 +35,7 @@ from nautilus_trader.common.component cimport Component from nautilus_trader.common.generators cimport PositionIdGenerator from nautilus_trader.common.logging cimport CMD from nautilus_trader.common.logging cimport EVT +from nautilus_trader.common.logging cimport LogColor from nautilus_trader.common.logging cimport Logger from nautilus_trader.common.logging cimport RECV from nautilus_trader.core.correctness cimport Condition @@ -382,7 +383,8 @@ cdef class ExecutionEngine(Component): self.cache.check_integrity() self._set_position_id_counts() - self._log.info(f"Loaded cache in {self._clock.unix_time() - ts:.3f}s.") + cdef long total_ns = round((self._clock.unix_time() - ts) * 1000000) + self._log.info(f"Loaded cache in {total_ns}μs.") # Update portfolio for account in self.cache.accounts(): @@ -623,8 +625,8 @@ cdef class ExecutionEngine(Component): # Set the correct ClientOrderId for the event event.cl_ord_id = cl_ord_id - self._log.warning(f"{repr(cl_ord_id)} was found in cache and " - f"applying event to order with {repr(order.id)}.") + self._log.info(f"{repr(cl_ord_id)} was found in cache and " + f"applying event to order with {repr(order.id)}.", LogColor.GREEN) if isinstance(event, OrderFilled): self._confirm_strategy_id(event) diff --git a/nautilus_trader/live/data_client.pxd b/nautilus_trader/live/data_client.pxd index 512698cdee83..0187fc69a2b6 100644 --- a/nautilus_trader/live/data_client.pxd +++ b/nautilus_trader/live/data_client.pxd @@ -14,7 +14,12 @@ # ------------------------------------------------------------------------------------------------- from nautilus_trader.data.client cimport DataClient +from nautilus_trader.data.client cimport MarketDataClient cdef class LiveDataClient(DataClient): cdef object _loop + + +cdef class LiveMarketDataClient(MarketDataClient): + cdef object _loop diff --git a/nautilus_trader/live/data_client.pyx b/nautilus_trader/live/data_client.pyx index 2e23ca65db05..b9e4c09c4757 100644 --- a/nautilus_trader/live/data_client.pyx +++ b/nautilus_trader/live/data_client.pyx @@ -19,8 +19,8 @@ from nautilus_trader.common.clock cimport LiveClock from nautilus_trader.common.logging cimport Logger from nautilus_trader.core.constants cimport * # str constants only from nautilus_trader.data.client cimport DataClient +from nautilus_trader.data.client cimport MarketDataClient from nautilus_trader.live.data_engine cimport LiveDataEngine -from nautilus_trader.model.identifiers cimport Venue cdef class LiveDataClient(DataClient): @@ -32,7 +32,7 @@ cdef class LiveDataClient(DataClient): def __init__( self, - Venue venue not None, + str name not None, LiveDataEngine engine not None, LiveClock clock not None, Logger logger not None, @@ -43,8 +43,8 @@ cdef class LiveDataClient(DataClient): Parameters ---------- - venue : Venue - The venue for the client. + name : str + The data client name. engine : LiveDataEngine The data engine for the client. clock : LiveClock @@ -56,7 +56,50 @@ cdef class LiveDataClient(DataClient): """ super().__init__( - venue, + name, + engine, + clock, + logger, + config, + ) + + self._loop: asyncio.AbstractEventLoop = engine.get_event_loop() + + +cdef class LiveMarketDataClient(MarketDataClient): + """ + The abstract base class for all live data clients. + + This class should not be used directly, but through its concrete subclasses. + """ + + def __init__( + self, + str name not None, + LiveDataEngine engine not None, + LiveClock clock not None, + Logger logger not None, + dict config=None, + ): + """ + Initialize a new instance of the `LiveMarketDataClient` class. + + Parameters + ---------- + name : str + The data client name. + engine : LiveDataEngine + The data engine for the client. + clock : LiveClock + The clock for the client. + logger : Logger + The logger for the client. + config : dict[str, object], optional + The configuration options. + + """ + super().__init__( + name, engine, clock, logger, diff --git a/nautilus_trader/live/execution_engine.pyx b/nautilus_trader/live/execution_engine.pyx index aca1adb08821..9751d5ff4969 100644 --- a/nautilus_trader/live/execution_engine.pyx +++ b/nautilus_trader/live/execution_engine.pyx @@ -194,7 +194,7 @@ cdef class LiveExecutionEngine(ExecutionEngine): break await asyncio.sleep(0.001) # One millisecond sleep - self._log.info(f"State resolved.", LogColor.GREEN) + self._log.info(f"State reconciled.", LogColor.GREEN) return True # Execution states resolved cpdef void kill(self) except *: diff --git a/nautilus_trader/trading/strategy.pyx b/nautilus_trader/trading/strategy.pyx index 6f97df7a9b68..329799e96118 100644 --- a/nautilus_trader/trading/strategy.pyx +++ b/nautilus_trader/trading/strategy.pyx @@ -44,6 +44,7 @@ from nautilus_trader.common.logging cimport RECV from nautilus_trader.common.logging cimport SENT from nautilus_trader.core.constants cimport * # str constants only from nautilus_trader.core.correctness cimport Condition +from nautilus_trader.data.base cimport DataType from nautilus_trader.data.engine cimport DataEngine from nautilus_trader.data.messages cimport DataRequest from nautilus_trader.data.messages cimport Subscribe @@ -78,6 +79,7 @@ from nautilus_trader.model.tick cimport QuoteTick from nautilus_trader.model.tick cimport TradeTick +# Events for WRN log level cdef tuple _WARNING_EVENTS = ( OrderInvalid, OrderDenied, @@ -731,9 +733,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_engine") cdef Subscribe subscribe = Subscribe( - venue=symbol.venue, - data_type=Instrument, - metadata={SYMBOL: symbol}, + provider=symbol.venue.value, + data_type=DataType(Instrument, metadata={SYMBOL: symbol}), handler=self.handle_instrument, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -792,15 +793,14 @@ cdef class TradingStrategy(Component): Condition.not_negative(interval, "interval") cdef Subscribe subscribe = Subscribe( - venue=symbol.venue, - data_type=OrderBook, - metadata={ + provider=symbol.venue.value, + data_type=DataType(OrderBook, metadata={ SYMBOL: symbol, LEVEL: level, DEPTH: depth, INTERVAL: interval, KWARGS: kwargs, - }, + }), handler=self.handle_order_book, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -824,9 +824,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_client") cdef Subscribe subscribe = Subscribe( - venue=symbol.venue, - data_type=QuoteTick, - metadata={SYMBOL: symbol}, + provider=symbol.venue.value, + data_type=DataType(QuoteTick, metadata={SYMBOL: symbol}), handler=self.handle_quote_tick, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -850,9 +849,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_engine") cdef Subscribe subscribe = Subscribe( - venue=symbol.venue, - data_type=TradeTick, - metadata={SYMBOL: symbol}, + provider=symbol.venue.value, + data_type=DataType(TradeTick, metadata={SYMBOL: symbol}), handler=self.handle_trade_tick, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -876,9 +874,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_client") cdef Subscribe subscribe = Subscribe( - venue=bar_type.symbol.venue, - data_type=Bar, - metadata={BAR_TYPE: bar_type}, + provider=bar_type.symbol.venue.value, + data_type=DataType(Bar, metadata={BAR_TYPE: bar_type}), handler=self.handle_bar, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -902,9 +899,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_client") cdef Unsubscribe unsubscribe = Unsubscribe( - venue=symbol.venue, - data_type=Instrument, - metadata={SYMBOL: symbol}, + provider=symbol.venue.value, + data_type=DataType(Instrument, metadata={SYMBOL: symbol}), handler=self.handle_instrument, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -933,12 +929,11 @@ cdef class TradingStrategy(Component): Condition.not_none(symbol, "symbol") cdef Unsubscribe unsubscribe = Unsubscribe( - venue=symbol.venue, - data_type=OrderBook, - metadata={ + provider=symbol.venue.value, + data_type=DataType(OrderBook, metadata={ SYMBOL: symbol, INTERVAL: interval, - }, + }), handler=self.handle_order_book, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -962,9 +957,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_client") cdef Unsubscribe unsubscribe = Unsubscribe( - venue=symbol.venue, - data_type=QuoteTick, - metadata={SYMBOL: symbol}, + provider=symbol.venue.value, + data_type=DataType(QuoteTick, metadata={SYMBOL: symbol}), handler=self.handle_quote_tick, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -988,9 +982,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_engine") cdef Unsubscribe unsubscribe = Unsubscribe( - venue=symbol.venue, - data_type=TradeTick, - metadata={SYMBOL: symbol}, + provider=symbol.venue.value, + data_type=DataType(TradeTick, metadata={SYMBOL: symbol}), handler=self.handle_trade_tick, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -1014,9 +1007,8 @@ cdef class TradingStrategy(Component): Condition.not_none(self._data_engine, "data_engine") cdef Unsubscribe unsubscribe = Unsubscribe( - venue=bar_type.symbol.venue, - data_type=Bar, - metadata={BAR_TYPE: bar_type}, + provider=bar_type.symbol.venue.value, + data_type=DataType(Bar, metadata={BAR_TYPE: bar_type}), handler=self.handle_bar, command_id=self.uuid_factory.generate_c(), command_timestamp=self.clock.utc_now_c(), @@ -1060,14 +1052,13 @@ cdef class TradingStrategy(Component): Condition.true(from_datetime < to_datetime, "from_datetime was >= to_datetime") cdef DataRequest request = DataRequest( - venue=symbol.venue, - data_type=QuoteTick, - metadata={ + provider=symbol.venue.value, + data_type=DataType(QuoteTick, metadata={ SYMBOL: symbol, FROM_DATETIME: from_datetime, TO_DATETIME: to_datetime, LIMIT: self._data_engine.cache.tick_capacity, - }, + }), callback=self.handle_quote_ticks, request_id=self.uuid_factory.generate_c(), request_timestamp=self.clock.utc_now_c(), @@ -1107,14 +1098,13 @@ cdef class TradingStrategy(Component): Condition.true(from_datetime < to_datetime, "from_datetime was >= to_datetime") cdef DataRequest request = DataRequest( - venue=symbol.venue, - data_type=TradeTick, - metadata={ + provider=symbol.venue.value, + data_type=DataType(TradeTick, metadata={ SYMBOL: symbol, FROM_DATETIME: from_datetime, TO_DATETIME: to_datetime, LIMIT: self._data_engine.cache.tick_capacity, - }, + }), callback=self.handle_trade_ticks, request_id=self.uuid_factory.generate_c(), request_timestamp=self.clock.utc_now_c(), @@ -1154,14 +1144,13 @@ cdef class TradingStrategy(Component): Condition.true(from_datetime < to_datetime, "from_datetime was >= to_datetime") cdef DataRequest request = DataRequest( - venue=bar_type.symbol.venue, - data_type=Bar, - metadata={ + provider=bar_type.symbol.venue.value, + data_type=DataType(Bar, metadata={ BAR_TYPE: bar_type, FROM_DATETIME: from_datetime, TO_DATETIME: to_datetime, LIMIT: self._data_engine.cache.bar_capacity, - }, + }), callback=self.handle_bars, request_id=self.uuid_factory.generate_c(), request_timestamp=self.clock.utc_now_c(), @@ -1197,10 +1186,8 @@ cdef class TradingStrategy(Component): cdef AccountId account_id = self.execution.account_id(order.symbol.venue) if account_id is None: - self.log.error( - f"Cannot submit {order} " - f"(no account registered for {order.symbol.venue})." - ) + self.log.error(f"Cannot submit order: " + f"no account registered for {order.symbol.venue}, {order}.") return # Cannot send command cdef SubmitOrder command = SubmitOrder( @@ -1236,10 +1223,8 @@ cdef class TradingStrategy(Component): cdef AccountId account_id = self.execution.account_id(bracket_order.entry.symbol.venue) if account_id is None: - self.log.error( - f"Cannot submit {bracket_order} " - f"(no account registered for {bracket_order.entry.symbol.venue})." - ) + self.log.error(f"Cannot submit bracket order: " + f"no account registered for {bracket_order.entry.symbol.venue}, {bracket_order}.") return # Cannot send command cdef SubmitBracketOrder command = SubmitBracketOrder( diff --git a/tests/integration_tests/adapters/ccxt/test_ccxt_data.py b/tests/integration_tests/adapters/ccxt/test_ccxt_data.py index 07eac10eadb9..a9b6cb4edd57 100644 --- a/tests/integration_tests/adapters/ccxt/test_ccxt_data.py +++ b/tests/integration_tests/adapters/ccxt/test_ccxt_data.py @@ -24,6 +24,7 @@ from nautilus_trader.common.logging import LogLevel from nautilus_trader.common.uuid import UUIDFactory from nautilus_trader.core.uuid import uuid4 +from nautilus_trader.data.base import DataType from nautilus_trader.data.messages import DataRequest from nautilus_trader.live.data_engine import LiveDataEngine from nautilus_trader.model.bar import Bar @@ -445,14 +446,13 @@ async def run_test(): handler = ObjectStorer() request = DataRequest( - venue=BINANCE, - data_type=TradeTick, - metadata={ + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={ "Symbol": ETHUSDT, "FromDateTime": None, "ToDateTime": None, "Limit": 100, - }, + }), callback=handler.store, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), @@ -490,14 +490,13 @@ async def run_test(): bar_type = BarType(symbol=ETHUSDT, bar_spec=bar_spec) request = DataRequest( - venue=BINANCE, - data_type=Bar, - metadata={ + provider=BINANCE.value, + data_type=DataType(Bar, metadata={ "BarType": bar_type, "FromDateTime": None, "ToDateTime": None, "Limit": 100, - }, + }), callback=handler.store_2, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), diff --git a/tests/integration_tests/adapters/oanda/test_oanda_data.py b/tests/integration_tests/adapters/oanda/test_oanda_data.py index 6d4b40b366ab..2d5e7f6267d7 100644 --- a/tests/integration_tests/adapters/oanda/test_oanda_data.py +++ b/tests/integration_tests/adapters/oanda/test_oanda_data.py @@ -25,6 +25,7 @@ from nautilus_trader.common.logging import LogLevel from nautilus_trader.common.uuid import UUIDFactory from nautilus_trader.core.uuid import uuid4 +from nautilus_trader.data.base import DataType from nautilus_trader.data.messages import DataRequest from nautilus_trader.live.data_engine import LiveDataEngine from nautilus_trader.model.bar import Bar @@ -289,14 +290,13 @@ async def run_test(): bar_type = BarType(symbol=AUDUSD, bar_spec=bar_spec) request = DataRequest( - venue=OANDA, - data_type=Bar, - metadata={ + provider=OANDA.value, + data_type=DataType(Bar, metadata={ "BarType": bar_type, "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.store_2, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), diff --git a/tests/unit_tests/backtest/test_backtest_data_client.py b/tests/unit_tests/backtest/test_backtest_data_client.py index 5257b12e049a..6715b876c4fe 100644 --- a/tests/unit_tests/backtest/test_backtest_data_client.py +++ b/tests/unit_tests/backtest/test_backtest_data_client.py @@ -15,7 +15,7 @@ import unittest -from nautilus_trader.backtest.data_client import BacktestDataClient +from nautilus_trader.backtest.data_client import BacktestMarketDataClient from nautilus_trader.common.clock import TestClock from nautilus_trader.common.logging import TestLogger from nautilus_trader.common.uuid import UUIDFactory @@ -48,9 +48,9 @@ def setUp(self): logger=self.logger, ) - self.client = BacktestDataClient( + self.client = BacktestMarketDataClient( instruments=[USDJPY_SIM], - venue=Venue("SIM"), + name="SIM", engine=self.data_engine, clock=TestClock(), logger=self.logger, diff --git a/tests/unit_tests/data/test_data_client.py b/tests/unit_tests/data/test_data_client.py index 325e8bcd1fa0..8082c2973395 100644 --- a/tests/unit_tests/data/test_data_client.py +++ b/tests/unit_tests/data/test_data_client.py @@ -18,7 +18,7 @@ from nautilus_trader.common.clock import TestClock from nautilus_trader.common.logging import TestLogger from nautilus_trader.common.uuid import UUIDFactory -from nautilus_trader.data.client import DataClient +from nautilus_trader.data.client import MarketDataClient from nautilus_trader.data.engine import DataEngine from nautilus_trader.model.bar import Bar from nautilus_trader.model.enums import OrderSide @@ -38,7 +38,7 @@ AUDUSD_SIM = TestInstrumentProvider.default_fx_ccy(Symbol("AUD/USD", SIM)) -class DataClientTests(unittest.TestCase): +class MarketDataClientTests(unittest.TestCase): def setUp(self): # Fixture Setup @@ -59,8 +59,8 @@ def setUp(self): self.venue = Venue("SIM") - self.client = DataClient( - venue=self.venue, + self.client = MarketDataClient( + name=self.venue.value, engine=self.data_engine, clock=self.clock, logger=self.logger, diff --git a/tests/unit_tests/data/test_data_engine.py b/tests/unit_tests/data/test_data_engine.py index 8fc935129f2e..c4fe777e7c94 100644 --- a/tests/unit_tests/data/test_data_engine.py +++ b/tests/unit_tests/data/test_data_engine.py @@ -15,11 +15,12 @@ import unittest -from nautilus_trader.backtest.data_client import BacktestDataClient +from nautilus_trader.backtest.data_client import BacktestMarketDataClient from nautilus_trader.common.clock import TestClock from nautilus_trader.common.logging import TestLogger from nautilus_trader.common.uuid import UUIDFactory from nautilus_trader.core.fsm import InvalidStateTrigger +from nautilus_trader.data.base import DataType from nautilus_trader.data.engine import DataEngine from nautilus_trader.data.messages import DataCommand from nautilus_trader.data.messages import DataRequest @@ -77,17 +78,17 @@ def setUp(self): self.portfolio.register_cache(self.data_engine.cache) - self.binance_client = BacktestDataClient( + self.binance_client = BacktestMarketDataClient( instruments=[BTCUSDT_BINANCE, ETHUSDT_BINANCE], - venue=BINANCE, + name=BINANCE.value, engine=self.data_engine, clock=self.clock, logger=self.logger, ) - self.bitmex_client = BacktestDataClient( + self.bitmex_client = BacktestMarketDataClient( instruments=[XBTUSD_BITMEX], - venue=BITMEX, + name=BITMEX.value, engine=self.data_engine, clock=self.clock, logger=self.logger, @@ -97,7 +98,7 @@ def test_registered_venues(self): # Arrange # Act # Assert - self.assertEqual([], self.data_engine.registered_venues) + self.assertEqual([], self.data_engine.registered_clients) def test_subscribed_instruments_when_nothing_subscribed_returns_empty_list(self): # Arrange @@ -129,7 +130,7 @@ def test_register_client_successfully_adds_client(self): self.data_engine.register_client(self.binance_client) # Assert - self.assertIn(BINANCE, self.data_engine.registered_venues) + self.assertIn(BINANCE.value, self.data_engine.registered_clients) def test_deregister_client_successfully_removes_client(self): # Arrange @@ -139,7 +140,7 @@ def test_deregister_client_successfully_removes_client(self): self.data_engine.deregister_client(self.binance_client) # Assert - self.assertNotIn(BINANCE, self.data_engine.registered_venues) + self.assertNotIn(BINANCE.value, self.data_engine.registered_clients) def test_register_strategy_successfully_registered_with_strategy(self): # Arrange @@ -266,9 +267,8 @@ def test_execute_unrecognized_message_logs_and_does_nothing(self): # Bogus message command = DataCommand( - venue=BINANCE, - data_type=QuoteTick, - metadata={}, + provider=BINANCE.value, + data_type=DataType(QuoteTick), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -284,14 +284,13 @@ def test_send_request_when_no_data_clients_registered_does_nothing(self): # Arrange handler = [] request = DataRequest( - venue=Venue("RANDOM"), - data_type=QuoteTick, - metadata={ + provider="RANDOM", + data_type=DataType(QuoteTick, metadata={ "Symbol": Symbol("SOMETHING", Venue("RANDOM")), "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.append, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), @@ -309,14 +308,13 @@ def test_send_data_request_when_data_type_unrecognized_logs_and_does_nothing(sel handler = [] request = DataRequest( - venue=BINANCE, - data_type=str, # str data type is invalid - metadata={ + provider=BINANCE.value, + data_type=DataType(str, metadata={ # str data type is invalid "Symbol": Symbol("SOMETHING", Venue("RANDOM")), "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.append, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), @@ -336,28 +334,26 @@ def test_send_data_request_with_duplicate_ids_logs_and_does_not_handle_second(se uuid = self.uuid_factory.generate() # We'll use this as a duplicate request1 = DataRequest( - venue=BINANCE, - data_type=QuoteTick, # str data type is invalid - metadata={ + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={ # str data type is invalid "Symbol": Symbol("SOMETHING", Venue("RANDOM")), "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.append, request_id=uuid, # Duplicate request_timestamp=self.clock.utc_now(), ) request2 = DataRequest( - venue=BINANCE, - data_type=QuoteTick, # str data type is invalid - metadata={ + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={ # str data type is invalid "Symbol": Symbol("SOMETHING", Venue("RANDOM")), "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.append, request_id=uuid, # Duplicate request_timestamp=self.clock.utc_now(), @@ -375,9 +371,8 @@ def test_execute_subscribe_when_data_type_unrecognized_logs_and_does_nothing(sel self.data_engine.register_client(self.binance_client) subscribe = Subscribe( - venue=BINANCE, - data_type=str, # str data type is invalid - metadata={}, # Invalid anyway + provider=BINANCE.value, + data_type=DataType(str), # str data type is invalid handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -395,9 +390,8 @@ def test_execute_subscribe_when_already_subscribed_does_not_add_and_logs(self): self.binance_client.connect() subscribe = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -416,9 +410,8 @@ def test_execute_unsubscribe_when_data_type_unrecognized_logs_and_does_nothing(s handler = [] unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=str, # str data type is invalid - metadata={}, # Invalid anyway + provider=BINANCE.value, + data_type=DataType(type(str)), # str data type is invalid handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -437,9 +430,8 @@ def test_execute_unsubscribe_when_not_subscribed_logs_and_does_nothing(self): handler = [] unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(type(QuoteTick), metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -454,9 +446,8 @@ def test_execute_unsubscribe_when_not_subscribed_logs_and_does_nothing(self): def test_receive_response_when_no_data_clients_registered_does_nothing(self): # Arrange response = DataResponse( - venue=BINANCE, - data_type=QuoteTick, - metadata={}, + provider=BINANCE.value, + data_type=DataType(QuoteTick), data=[], correlation_id=self.uuid_factory.generate(), response_id=self.uuid_factory.generate(), @@ -493,9 +484,8 @@ def test_execute_subscribe_instrument_then_adds_handler(self): self.binance_client.connect() subscribe = Subscribe( - venue=BINANCE, - data_type=Instrument, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(Instrument, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -514,9 +504,8 @@ def test_execute_unsubscribe_instrument_then_removes_handler(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=Instrument, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(Instrument, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -525,9 +514,8 @@ def test_execute_unsubscribe_instrument_then_removes_handler(self): self.data_engine.execute(subscribe) unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=Instrument, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(Instrument, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -546,9 +534,8 @@ def test_process_instrument_when_subscriber_then_sends_to_registered_handler(sel handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=Instrument, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(Instrument, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -569,9 +556,8 @@ def test_process_instrument_when_subscribers_then_sends_to_registered_handlers(s handler1 = [] subscribe1 = Subscribe( - venue=BINANCE, - data_type=Instrument, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(Instrument, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler1.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -579,9 +565,8 @@ def test_process_instrument_when_subscribers_then_sends_to_registered_handlers(s handler2 = [] subscribe2 = Subscribe( - venue=BINANCE, - data_type=Instrument, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(Instrument, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler2.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -604,14 +589,13 @@ def test_execute_subscribe_order_book_stream_then_adds_handler(self): self.binance_client.connect() subscribe = Subscribe( - venue=BINANCE, - data_type=OrderBook, - metadata={ + provider=BINANCE.value, + data_type=DataType(OrderBook, metadata={ "Symbol": ETHUSDT_BINANCE.symbol, "Level": 2, "Depth": 10, "Interval": 0, - }, + }), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -629,14 +613,13 @@ def test_execute_subscribe_order_book_intervals_then_adds_handler(self): self.binance_client.connect() subscribe = Subscribe( - venue=BINANCE, - data_type=OrderBook, - metadata={ + provider=BINANCE.value, + data_type=DataType(OrderBook, metadata={ "Symbol": ETHUSDT_BINANCE.symbol, "Level": 2, "Depth": 25, "Interval": 10, - }, + }), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -655,14 +638,13 @@ def test_execute_unsubscribe_order_book_stream_then_removes_handler(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=OrderBook, - metadata={ + provider=BINANCE.value, + data_type=DataType(OrderBook, metadata={ "Symbol": ETHUSDT_BINANCE.symbol, "Level": 2, "Depth": 25, "Interval": 0, - }, + }), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -671,12 +653,11 @@ def test_execute_unsubscribe_order_book_stream_then_removes_handler(self): self.data_engine.execute(subscribe) unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=OrderBook, - metadata={ + provider=BINANCE.value, + data_type=DataType(OrderBook, metadata={ "Symbol": ETHUSDT_BINANCE.symbol, "Interval": 0, - }, + }), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -695,14 +676,13 @@ def test_execute_unsubscribe_order_book_interval_then_removes_handler(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=OrderBook, - metadata={ + provider=BINANCE.value, + data_type=DataType(OrderBook, metadata={ "Symbol": ETHUSDT_BINANCE.symbol, "Level": 2, "Depth": 25, "Interval": 10, - }, + }), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -711,12 +691,11 @@ def test_execute_unsubscribe_order_book_interval_then_removes_handler(self): self.data_engine.execute(subscribe) unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=OrderBook, - metadata={ + provider=BINANCE.value, + data_type=DataType(OrderBook, metadata={ "Symbol": ETHUSDT_BINANCE.symbol, "Interval": 10, - }, + }), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -730,13 +709,14 @@ def test_execute_unsubscribe_order_book_interval_then_removes_handler(self): def test_process_order_book_when_subscriber_then_sends_to_registered_handler(self): pass + # TODO: WIP # # Arrange # self.data_engine.register_client(self.binance_client) # self.binance_client.connect() # # handler = [] # subscribe = Subscribe( - # venue=BINANCE, + # provider=BINANCE.value, # data_type=Instrument, # metadata={"Symbol": ETHUSDT_BINANCE.symbol}, # handler=handler.append, @@ -754,13 +734,14 @@ def test_process_order_book_when_subscriber_then_sends_to_registered_handler(sel def test_process_order_book_when_subscribers_then_sends_to_registered_handlers(self): pass + # TODO: WIP # # Arrange # self.data_engine.register_client(self.binance_client) # self.binance_client.connect() # # handler1 = [] # subscribe1 = Subscribe( - # venue=BINANCE, + # provider=BINANCE.value, # data_type=Instrument, # metadata={"Symbol": ETHUSDT_BINANCE.symbol}, # handler=handler1.append, @@ -770,7 +751,7 @@ def test_process_order_book_when_subscribers_then_sends_to_registered_handlers(s # # handler2 = [] # subscribe2 = Subscribe( - # venue=BINANCE, + # provider=BINANCE.value, # data_type=Instrument, # metadata={"Symbol": ETHUSDT_BINANCE.symbol}, # handler=handler2.append, @@ -796,9 +777,8 @@ def test_execute_subscribe_for_quote_ticks(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -816,9 +796,8 @@ def test_execute_unsubscribe_for_quote_ticks(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -827,9 +806,8 @@ def test_execute_unsubscribe_for_quote_ticks(self): self.data_engine.execute(subscribe) unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -848,9 +826,8 @@ def test_process_quote_tick_when_subscriber_then_sends_to_registered_handler(sel handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -881,9 +858,8 @@ def test_process_quote_tick_when_subscribers_then_sends_to_registered_handlers(s handler1 = [] subscribe1 = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler1.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -891,9 +867,8 @@ def test_process_quote_tick_when_subscribers_then_sends_to_registered_handlers(s handler2 = [] subscribe2 = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(QuoteTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler2.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -926,9 +901,8 @@ def test_subscribe_trade_tick_then_subscribes(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=TradeTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -947,9 +921,8 @@ def test_unsubscribe_trade_tick_then_unsubscribes(self): handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=TradeTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -958,9 +931,8 @@ def test_unsubscribe_trade_tick_then_unsubscribes(self): self.data_engine.execute(subscribe) unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=TradeTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -979,9 +951,8 @@ def test_process_trade_tick_when_subscriber_then_sends_to_registered_handler(sel handler = [] subscribe = Subscribe( - venue=BINANCE, - data_type=TradeTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1011,9 +982,8 @@ def test_process_trade_tick_when_subscribers_then_sends_to_registered_handlers(s handler1 = [] subscribe1 = Subscribe( - venue=BINANCE, - data_type=TradeTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler1.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1021,9 +991,8 @@ def test_process_trade_tick_when_subscribers_then_sends_to_registered_handlers(s handler2 = [] subscribe2 = Subscribe( - venue=BINANCE, - data_type=TradeTick, - metadata={"Symbol": ETHUSDT_BINANCE.symbol}, + provider=BINANCE.value, + data_type=DataType(TradeTick, metadata={"Symbol": ETHUSDT_BINANCE.symbol}), handler=handler2.append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1058,9 +1027,8 @@ def test_subscribe_bar_type_then_subscribes(self): handler = ObjectStorer() subscribe = Subscribe( - venue=BINANCE, - data_type=Bar, - metadata={"BarType": bar_type}, + provider=BINANCE.value, + data_type=DataType(Bar, metadata={"BarType": bar_type}), handler=handler.store_2, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1082,9 +1050,8 @@ def test_unsubscribe_bar_type_then_unsubscribes(self): handler = ObjectStorer() subscribe = Subscribe( - venue=BINANCE, - data_type=Bar, - metadata={"BarType": bar_type}, + provider=BINANCE.value, + data_type=DataType(Bar, metadata={"BarType": bar_type}), handler=handler.store_2, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1093,9 +1060,8 @@ def test_unsubscribe_bar_type_then_unsubscribes(self): self.data_engine.execute(subscribe) unsubscribe = Unsubscribe( - venue=BINANCE, - data_type=Bar, - metadata={"BarType": bar_type}, + provider=BINANCE.value, + data_type=DataType(Bar, metadata={"BarType": bar_type}), handler=handler.store_2, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1117,9 +1083,8 @@ def test_process_bar_when_subscriber_then_sends_to_registered_handler(self): handler = ObjectStorer() subscribe = Subscribe( - venue=BINANCE, - data_type=Bar, - metadata={"BarType": bar_type}, + provider=BINANCE.value, + data_type=DataType(Bar, metadata={"BarType": bar_type}), handler=handler.store_2, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1154,9 +1119,8 @@ def test_process_bar_when_subscribers_then_sends_to_registered_handlers(self): handler1 = ObjectStorer() subscribe1 = Subscribe( - venue=BINANCE, - data_type=Bar, - metadata={"BarType": bar_type}, + provider=BINANCE.value, + data_type=DataType(Bar, metadata={"BarType": bar_type}), handler=handler1.store_2, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -1164,9 +1128,8 @@ def test_process_bar_when_subscribers_then_sends_to_registered_handlers(self): handler2 = ObjectStorer() subscribe2 = Subscribe( - venue=BINANCE, - data_type=Bar, - metadata={"BarType": bar_type}, + provider=BINANCE.value, + data_type=DataType(Bar, metadata={"BarType": bar_type}), handler=handler2.store_2, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), diff --git a/tests/unit_tests/live/test_live_data_client.py b/tests/unit_tests/live/test_live_data_client.py index 7a6d56c4e516..47b1ee14bebf 100644 --- a/tests/unit_tests/live/test_live_data_client.py +++ b/tests/unit_tests/live/test_live_data_client.py @@ -20,7 +20,7 @@ from nautilus_trader.common.logging import LogLevel from nautilus_trader.common.logging import TestLogger from nautilus_trader.common.uuid import UUIDFactory -from nautilus_trader.live.data_client import LiveDataClient +from nautilus_trader.live.data_client import LiveMarketDataClient from nautilus_trader.live.data_engine import LiveDataEngine from nautilus_trader.model.identifiers import Venue from nautilus_trader.trading.portfolio import Portfolio @@ -33,7 +33,7 @@ ETHUSDT_BINANCE = TestInstrumentProvider.ethusdt_binance() -class LiveDataClientTests(unittest.TestCase): +class LiveMarketDataClientTests(unittest.TestCase): def setUp(self): # Fixture Setup @@ -57,8 +57,8 @@ def setUp(self): logger=self.logger, ) - self.client = LiveDataClient( - venue=BINANCE, + self.client = LiveMarketDataClient( + name=BINANCE.value, engine=self.engine, clock=self.clock, logger=self.logger, diff --git a/tests/unit_tests/live/test_live_data_engine.py b/tests/unit_tests/live/test_live_data_engine.py index 782aef531c3f..11debcad5f3a 100644 --- a/tests/unit_tests/live/test_live_data_engine.py +++ b/tests/unit_tests/live/test_live_data_engine.py @@ -21,6 +21,7 @@ from nautilus_trader.common.logging import LogLevel from nautilus_trader.common.logging import TestLogger from nautilus_trader.common.uuid import UUIDFactory +from nautilus_trader.data.base import DataType from nautilus_trader.data.messages import DataRequest from nautilus_trader.data.messages import DataResponse from nautilus_trader.data.messages import Subscribe @@ -88,9 +89,8 @@ def test_message_qsize_at_max_blocks_on_put_data_command(self): ) subscribe = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={}, + provider=BINANCE.value, + data_type=DataType(QuoteTick), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -116,14 +116,13 @@ def test_message_qsize_at_max_blocks_on_send_request(self): handler = [] request = DataRequest( - venue=Venue("RANDOM"), - data_type=QuoteTick, - metadata={ + provider="RANDOM", + data_type=DataType(QuoteTick, metadata={ "Symbol": Symbol("SOMETHING", Venue("RANDOM")), "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.append, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), @@ -148,9 +147,8 @@ def test_message_qsize_at_max_blocks_on_receive_response(self): ) response = DataResponse( - venue=Venue("BINANCE"), - data_type=QuoteTick, - metadata={}, + provider="BINANCE", + data_type=DataType(QuoteTick), data=[], correlation_id=self.uuid_factory.generate(), response_id=self.uuid_factory.generate(), @@ -236,9 +234,8 @@ async def run_test(): self.data_engine.start() subscribe = Subscribe( - venue=BINANCE, - data_type=QuoteTick, - metadata={}, + provider=BINANCE.value, + data_type=DataType(QuoteTick), handler=[].append, command_id=self.uuid_factory.generate(), command_timestamp=self.clock.utc_now(), @@ -264,14 +261,13 @@ async def run_test(): handler = [] request = DataRequest( - venue=Venue("RANDOM"), - data_type=QuoteTick, - metadata={ + provider="RANDOM", + data_type=DataType(QuoteTick, metadata={ "Symbol": Symbol("SOMETHING", Venue("RANDOM")), "FromDateTime": None, "ToDateTime": None, "Limit": 1000, - }, + }), callback=handler.append, request_id=self.uuid_factory.generate(), request_timestamp=self.clock.utc_now(), @@ -296,9 +292,8 @@ async def run_test(): self.data_engine.start() response = DataResponse( - venue=Venue("BINANCE"), - data_type=QuoteTick, - metadata={}, + provider="BINANCE", + data_type=DataType(QuoteTick), data=[], correlation_id=self.uuid_factory.generate(), response_id=self.uuid_factory.generate(), diff --git a/tests/unit_tests/serialization/test_serialization_base.py b/tests/unit_tests/serialization/test_serialization_base.py index 08470f7be8af..d59f3a281f5c 100644 --- a/tests/unit_tests/serialization/test_serialization_base.py +++ b/tests/unit_tests/serialization/test_serialization_base.py @@ -18,11 +18,11 @@ from nautilus_trader.common.clock import TestClock from nautilus_trader.common.factories import OrderFactory from nautilus_trader.core.uuid import uuid4 +from nautilus_trader.data.base import DataType from nautilus_trader.data.messages import Subscribe from nautilus_trader.model.enums import OrderSide from nautilus_trader.model.identifiers import StrategyId from nautilus_trader.model.identifiers import TraderId -from nautilus_trader.model.identifiers import Venue from nautilus_trader.model.objects import Quantity from nautilus_trader.model.tick import QuoteTick from nautilus_trader.serialization.base import CommandSerializer @@ -71,9 +71,8 @@ def test_order_serializer_methods_raise_not_implemented_error(self): def test_command_serializer_methods_raise_not_implemented_error(self): # Arrange command = Subscribe( - venue=Venue("SIM"), - data_type=QuoteTick, - metadata={}, + provider="SIM", + data_type=DataType(QuoteTick), handler=[].append, command_id=uuid4(), command_timestamp=UNIX_EPOCH, diff --git a/tests/unit_tests/trading/test_trading_strategy.py b/tests/unit_tests/trading/test_trading_strategy.py index bdcc8bc042af..f1a2b9909a2c 100644 --- a/tests/unit_tests/trading/test_trading_strategy.py +++ b/tests/unit_tests/trading/test_trading_strategy.py @@ -21,7 +21,7 @@ import pytz from nautilus_trader.analysis.performance import PerformanceAnalyzer -from nautilus_trader.backtest.data_client import BacktestDataClient +from nautilus_trader.backtest.data_client import BacktestMarketDataClient from nautilus_trader.backtest.exchange import SimulatedExchange from nautilus_trader.backtest.execution import BacktestExecClient from nautilus_trader.backtest.models import FillModel @@ -114,9 +114,9 @@ def setUp(self): logger=self.logger, ) - self.data_client = BacktestDataClient( + self.data_client = BacktestMarketDataClient( instruments=[AUDUSD_SIM, GBPUSD_SIM, USDJPY_SIM], - venue=Venue("SIM"), + name="SIM", engine=self.data_engine, clock=self.clock, logger=self.logger, diff --git a/tests/unit_tests/trading/test_trading_trader.py b/tests/unit_tests/trading/test_trading_trader.py index 85c1f331a1a5..101db9abd64c 100644 --- a/tests/unit_tests/trading/test_trading_trader.py +++ b/tests/unit_tests/trading/test_trading_trader.py @@ -16,7 +16,7 @@ import unittest from nautilus_trader.analysis.performance import PerformanceAnalyzer -from nautilus_trader.backtest.data_client import BacktestDataClient +from nautilus_trader.backtest.data_client import BacktestMarketDataClient from nautilus_trader.backtest.exchange import SimulatedExchange from nautilus_trader.backtest.execution import BacktestExecClient from nautilus_trader.backtest.models import FillModel @@ -92,9 +92,9 @@ def setUp(self): logger=logger, ) - self.data_client = BacktestDataClient( + self.data_client = BacktestMarketDataClient( instruments=[USDJPY_SIM], - venue=Venue("SIM"), + name="SIM", engine=self.data_engine, clock=clock, logger=logger, From 403a5f0d93eca2e04d02abe1d64be62d58595845 Mon Sep 17 00:00:00 2001 From: cjdsellers Date: Fri, 19 Feb 2021 14:21:32 +1100 Subject: [PATCH 4/5] Complete headers --- nautilus_trader/data/base.pxd | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/nautilus_trader/data/base.pxd b/nautilus_trader/data/base.pxd index 0c4689d491a6..9dd232e05bb5 100644 --- a/nautilus_trader/data/base.pxd +++ b/nautilus_trader/data/base.pxd @@ -28,13 +28,17 @@ from nautilus_trader.model.tick cimport TradeTick cdef class Data: - cdef DataType data_type - cdef object data + cdef readonly DataType data_type + """The data type for the data.\n\n:returns: `DataType`""" + cdef readonly object data + """The data.\n\n:returns: `object`""" cdef class DataType: - cdef type type - cdef dict metadata + cdef readonly type type + """The type of the data.\n\n:returns: `type`""" + cdef readonly dict metadata + """The data types metadata.\n\n:returns: `dict[str, object]`""" cdef class DataCacheFacade: From 2f5b7b2f947417fdd5a5640190c11c53d15f5133 Mon Sep 17 00:00:00 2001 From: cjdsellers Date: Fri, 19 Feb 2021 14:24:49 +1100 Subject: [PATCH 5/5] Add tests --- tests/unit_tests/data/test_data_base.py | 26 +++ tests/unit_tests/data/test_data_engine.py | 156 +++++++++++------- .../live/test_live_execution_engine.py | 4 +- 3 files changed, 122 insertions(+), 64 deletions(-) diff --git a/tests/unit_tests/data/test_data_base.py b/tests/unit_tests/data/test_data_base.py index 8bf003e85065..f8baaa306832 100644 --- a/tests/unit_tests/data/test_data_base.py +++ b/tests/unit_tests/data/test_data_base.py @@ -15,7 +15,9 @@ import unittest +from nautilus_trader.data.base import Data from nautilus_trader.data.base import DataCacheFacade +from nautilus_trader.data.base import DataType from nautilus_trader.model.enums import PriceType from nautilus_trader.model.identifiers import Symbol from nautilus_trader.model.identifiers import Venue @@ -27,6 +29,30 @@ AUDUSD_SIM = TestInstrumentProvider.default_fx_ccy(Symbol("AUD/USD", SIM)) +class DataTypeTests(unittest.TestCase): + + def test_data_type_instantiation(self): + # Arrange + # Act + data_type = DataType(str, {"Type": "NEWS_FLASH"}) + + # Assert + self.assertEqual(str, data_type.type) + self.assertEqual({"Type": "NEWS_FLASH"}, data_type.metadata) + self.assertEqual(" {'Type': 'NEWS_FLASH'}", str(data_type)) + self.assertEqual("DataType(type=str, metadata={'Type': 'NEWS_FLASH'})", repr(data_type)) + + def test_data_instantiation(self): + # Arrange + # Act + data_type = DataType(str, {"Type": "NEWS_FLASH"}) + data = Data(data_type, "SOME_NEWS_HEADLINE") + + # Assert + self.assertEqual(data_type, data.data_type) + self.assertEqual("SOME_NEWS_HEADLINE", data.data) + + class DataCacheFacadeTests(unittest.TestCase): def setUp(self): diff --git a/tests/unit_tests/data/test_data_engine.py b/tests/unit_tests/data/test_data_engine.py index c4fe777e7c94..81bea238177a 100644 --- a/tests/unit_tests/data/test_data_engine.py +++ b/tests/unit_tests/data/test_data_engine.py @@ -707,68 +707,100 @@ def test_execute_unsubscribe_order_book_interval_then_removes_handler(self): # Assert self.assertEqual([], self.data_engine.subscribed_order_books) - def test_process_order_book_when_subscriber_then_sends_to_registered_handler(self): - pass - # TODO: WIP - # # Arrange - # self.data_engine.register_client(self.binance_client) - # self.binance_client.connect() - # - # handler = [] - # subscribe = Subscribe( - # provider=BINANCE.value, - # data_type=Instrument, - # metadata={"Symbol": ETHUSDT_BINANCE.symbol}, - # handler=handler.append, - # command_id=self.uuid_factory.generate(), - # command_timestamp=self.clock.utc_now(), - # ) - # - # self.data_engine.execute(subscribe) - # - # # Act - # self.data_engine.process(ETHUSDT_BINANCE) - # - # # Assert - # self.assertEqual([ETHUSDT_BINANCE], handler) - - def test_process_order_book_when_subscribers_then_sends_to_registered_handlers(self): - pass - # TODO: WIP - # # Arrange - # self.data_engine.register_client(self.binance_client) - # self.binance_client.connect() - # - # handler1 = [] - # subscribe1 = Subscribe( - # provider=BINANCE.value, - # data_type=Instrument, - # metadata={"Symbol": ETHUSDT_BINANCE.symbol}, - # handler=handler1.append, - # command_id=self.uuid_factory.generate(), - # command_timestamp=self.clock.utc_now(), - # ) - # - # handler2 = [] - # subscribe2 = Subscribe( - # provider=BINANCE.value, - # data_type=Instrument, - # metadata={"Symbol": ETHUSDT_BINANCE.symbol}, - # handler=handler2.append, - # command_id=self.uuid_factory.generate(), - # command_timestamp=self.clock.utc_now(), - # ) - # - # self.data_engine.execute(subscribe1) - # self.data_engine.execute(subscribe2) - # - # # Act - # self.data_engine.process(ETHUSDT_BINANCE) - # - # # Assert - # self.assertEqual([ETHUSDT_BINANCE.symbol], self.data_engine.subscribed_instruments) - # self.assertEqual([ETHUSDT_BINANCE], handler1) - # self.assertEqual([ETHUSDT_BINANCE], handler2) + def test_process_order_book_when_one_subscriber_then_sends_to_registered_handler(self): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.connect() + + order_book = OrderBook( + symbol=ETHUSDT_BINANCE.symbol, + level=2, + depth=25, + price_precision=2, + size_precision=5, + bids=[], + asks=[], + update_id=0, + timestamp=0, + ) + + handler = [] + subscribe = Subscribe( + provider=BINANCE.value, + data_type=DataType(OrderBook, { + "Symbol": ETHUSDT_BINANCE.symbol, + "Level": 2, + "Depth": 25, + "Interval": 0, # Streaming + }), + handler=handler.append, + command_id=self.uuid_factory.generate(), + command_timestamp=self.clock.utc_now(), + ) + + self.data_engine.execute(subscribe) + + # Act + self.data_engine.process(order_book) + + # Assert + self.assertEqual(order_book, handler[0]) + + def test_process_order_book_when_multiple_subscribers_then_sends_to_registered_handlers(self): + # Arrange + self.data_engine.register_client(self.binance_client) + self.binance_client.connect() + + order_book = OrderBook( + symbol=ETHUSDT_BINANCE.symbol, + level=2, + depth=25, + price_precision=2, + size_precision=5, + bids=[], + asks=[], + update_id=0, + timestamp=0, + ) + + handler1 = [] + subscribe1 = Subscribe( + provider=BINANCE.value, + data_type=DataType(OrderBook, { + "Symbol": ETHUSDT_BINANCE.symbol, + "Level": 2, + "Depth": 25, + "Interval": 0, # Streaming + }), + handler=handler1.append, + command_id=self.uuid_factory.generate(), + command_timestamp=self.clock.utc_now(), + ) + + handler2 = [] + subscribe2 = Subscribe( + provider=BINANCE.value, + data_type=DataType(OrderBook, { + "Symbol": ETHUSDT_BINANCE.symbol, + "Level": 2, + "Depth": 25, + "Interval": 0, # Streaming + }), + handler=handler2.append, + command_id=self.uuid_factory.generate(), + command_timestamp=self.clock.utc_now(), + ) + + self.data_engine.execute(subscribe1) + self.data_engine.execute(subscribe2) + + # Act + self.data_engine.process(order_book) + + # Assert + self.assertEqual([ETHUSDT_BINANCE.symbol], self.data_engine.subscribed_order_books) + self.assertEqual(order_book, handler1[0]) + self.assertEqual(order_book, handler2[0]) def test_execute_subscribe_for_quote_ticks(self): # Arrange diff --git a/tests/unit_tests/live/test_live_execution_engine.py b/tests/unit_tests/live/test_live_execution_engine.py index a5feba7702f2..70544797723b 100644 --- a/tests/unit_tests/live/test_live_execution_engine.py +++ b/tests/unit_tests/live/test_live_execution_engine.py @@ -331,7 +331,7 @@ async def run_test(): self.loop.run_until_complete(run_test()) # TODO: WIP - # def test_resolve_state_with_multiple_active_orders_resolved_correctly1(self): + # def test_reconcile_state_with_multiple_active_orders_resolved_correctly1(self): # async def run_test(): # # Arrange # self.exec_engine.start() @@ -393,7 +393,7 @@ async def run_test(): # self.exec_engine.process(TestStubs.event_order_submitted(order2)) # # # Act - # await self.exec_engine.resolve_state() + # await self.exec_engine.reconcile_state() # self.exec_engine.stop() # # self.loop.run_until_complete(run_test())