diff --git a/examples/live/bybit/bybit_ema_cross.py b/examples/live/bybit/bybit_ema_cross.py index ece439534b88..bb21ba46ed61 100644 --- a/examples/live/bybit/bybit_ema_cross.py +++ b/examples/live/bybit/bybit_ema_cross.py @@ -80,7 +80,7 @@ api_key=None, # 'BYBIT_API_KEY' env var api_secret=None, # 'BYBIT_API_SECRET' env var base_url_http=None, # Override with custom endpoint - base_url_ws=None, # Override with custom endpoint + base_url_ws_private=None, # Override with custom endpoint instrument_provider=InstrumentProviderConfig(load_all=True), product_types=[product_type], testnet=False, # If client uses the testnet diff --git a/examples/live/bybit/bybit_ema_cross_bracket_algo.py b/examples/live/bybit/bybit_ema_cross_bracket_algo.py index 202b3735f1bd..176a71e122dd 100644 --- a/examples/live/bybit/bybit_ema_cross_bracket_algo.py +++ b/examples/live/bybit/bybit_ema_cross_bracket_algo.py @@ -83,7 +83,7 @@ api_key=None, # 'BYBIT_API_KEY' env var api_secret=None, # 'BYBIT_API_SECRET' env var base_url_http=None, # Override with custom endpoint - base_url_ws=None, # Override with custom endpoint + base_url_ws_private=None, # Override with custom endpoint instrument_provider=InstrumentProviderConfig(load_all=True), product_types=[product_type], testnet=False, # If client uses the testnet diff --git a/examples/live/bybit/bybit_ema_cross_stop_entry.py b/examples/live/bybit/bybit_ema_cross_stop_entry.py index c837bb9a5513..8d476d0fef0b 100644 --- a/examples/live/bybit/bybit_ema_cross_stop_entry.py +++ b/examples/live/bybit/bybit_ema_cross_stop_entry.py @@ -69,7 +69,7 @@ api_key=None, # 'BYBIT_API_KEY' env var api_secret=None, # 'BYBIT_API_SECRET' env var base_url_http=None, # Override with custom endpoint - base_url_ws=None, # Override with custom endpoint + base_url_ws_private=None, # Override with custom endpoint instrument_provider=InstrumentProviderConfig(load_all=True), product_types=[product_type], testnet=False, # If client uses the testnet diff --git a/examples/live/bybit/bybit_ema_cross_with_trailing_stop.py b/examples/live/bybit/bybit_ema_cross_with_trailing_stop.py index 8068d7436a43..0c5266f2ed36 100644 --- a/examples/live/bybit/bybit_ema_cross_with_trailing_stop.py +++ b/examples/live/bybit/bybit_ema_cross_with_trailing_stop.py @@ -69,7 +69,7 @@ api_key=None, # 'BYBIT_API_KEY' env var api_secret=None, # 'BYBIT_API_SECRET' env var base_url_http=None, # Override with custom endpoint - base_url_ws=None, # Override with custom endpoint + base_url_ws_private=None, # Override with custom endpoint instrument_provider=InstrumentProviderConfig(load_all=True), product_types=[product_type], testnet=False, # If client uses the testnet diff --git a/examples/live/bybit/bybit_market_maker.py b/examples/live/bybit/bybit_market_maker.py index 8a2b928eb308..363105b4afd7 100644 --- a/examples/live/bybit/bybit_market_maker.py +++ b/examples/live/bybit/bybit_market_maker.py @@ -95,7 +95,7 @@ api_key=None, # 'BYBIT_API_KEY' env var api_secret=None, # 'BYBIT_API_SECRET' env var base_url_http=None, # Override with custom endpoint - base_url_ws=None, # Override with custom endpoint + base_url_ws_private=None, # Override with custom endpoint instrument_provider=InstrumentProviderConfig(load_all=True), product_types=[product_type], demo=False, # If client uses the demo API diff --git a/nautilus_trader/adapters/bybit/common/constants.py b/nautilus_trader/adapters/bybit/common/constants.py index 80a284b27c19..1a9a41019ad7 100644 --- a/nautilus_trader/adapters/bybit/common/constants.py +++ b/nautilus_trader/adapters/bybit/common/constants.py @@ -32,6 +32,12 @@ # Set of Bybit error codes for which Nautilus will attempt retries, # potentially temporary conditions where a retry might make sense. BYBIT_RETRY_ERRORS_UTA: Final[set[int]] = { + # > ------------------------------------------------------------ + # > Self defined error codes + -10_408, # Client request timed out + # > ------------------------------------------------------------ + # > Bybit defined error codes + # > https://bybit-exchange.github.io/docs/v5/error 10_000, # Server Timeout 10_002, # The request time exceeds the time window range 10_006, # Too many visits. Exceeded the API Rate Limit diff --git a/nautilus_trader/adapters/bybit/common/enums.py b/nautilus_trader/adapters/bybit/common/enums.py index 55fbf40a5b81..a97622697071 100644 --- a/nautilus_trader/adapters/bybit/common/enums.py +++ b/nautilus_trader/adapters/bybit/common/enums.py @@ -116,6 +116,13 @@ def parse_to_position_side(self) -> PositionSide: raise RuntimeError(f"invalid position side, was {self}") +@unique +class BybitWsOrderRequestMsgOP(Enum): + CREATE = "order.create" + AMEND = "order.amend" + CANCEL = "order.cancel" + + @unique class BybitKlineInterval(Enum): MINUTE_1 = "1" @@ -149,6 +156,7 @@ class BybitOrderStatus(Enum): @unique class BybitOrderSide(Enum): + UNKNOWN = "" # It will be an empty string in some cases BUY = "Buy" SELL = "Sell" diff --git a/nautilus_trader/adapters/bybit/common/urls.py b/nautilus_trader/adapters/bybit/common/urls.py index 2715f5f2f627..6b9e612cb668 100644 --- a/nautilus_trader/adapters/bybit/common/urls.py +++ b/nautilus_trader/adapters/bybit/common/urls.py @@ -62,3 +62,10 @@ def get_ws_base_url_private(is_testnet: bool) -> str: return "wss://stream-testnet.bybit.com/v5/private" else: return "wss://stream.bybit.com/v5/private" + + +def get_ws_base_url_trade(is_testnet: bool) -> str: + if is_testnet: + return "wss://stream-testnet.bybit.com/v5/trade" + else: + return "wss://stream.bybit.com/v5/trade" diff --git a/nautilus_trader/adapters/bybit/config.py b/nautilus_trader/adapters/bybit/config.py index 67a408403914..ebe493edaa03 100644 --- a/nautilus_trader/adapters/bybit/config.py +++ b/nautilus_trader/adapters/bybit/config.py @@ -78,6 +78,10 @@ class BybitExecClientConfig(LiveExecClientConfig, frozen=True): If None then will default to 'SPOT', you also cannot mix 'SPOT' with any other product type for execution, and it will use a `CASH` account type, vs `MARGIN` for the other derivative products. + base_url_ws_private : str, optional + The base URL for the `private` WebSocket client. + base_url_ws_trade : str, optional + The base URL for the `trade` WebSocket client. demo : bool, default False If the client is connecting to the Bybit demo API. testnet : bool, default False @@ -85,6 +89,11 @@ class BybitExecClientConfig(LiveExecClientConfig, frozen=True): use_gtd : bool, default False If False, then GTD time in force will be remapped to GTC (this is useful if managing GTD orders locally). + use_ws_trade_api : bool, default False + If the client is using websocket to send order requests. + use_http_batch_api : bool, default False + If the client is using http api to send batch order requests. + Effective only when `use_ws_trade_api` is set to `True`. max_retries : PositiveInt, optional The maximum number of times a submit, cancel or modify order request will be retried. retry_delay : PositiveFloat, optional @@ -92,6 +101,8 @@ class BybitExecClientConfig(LiveExecClientConfig, frozen=True): max_ws_reconnection_tries: int, default 3 The number of retries to reconnect the websocket connection if the connection is broken. + ws_trade_timeout_secs : float, default 5.0 + The timeout for trade websocket messages. Warnings -------- @@ -103,10 +114,14 @@ class BybitExecClientConfig(LiveExecClientConfig, frozen=True): api_secret: str | None = None product_types: list[BybitProductType] | None = None base_url_http: str | None = None - base_url_ws: str | None = None + base_url_ws_private: str | None = None + base_url_ws_trade: str | None = None demo: bool = False testnet: bool = False use_gtd: bool = False # Not supported on Bybit + use_ws_trade_api: bool = False + use_http_batch_api: bool = False max_retries: PositiveInt | None = None retry_delay: PositiveFloat | None = None max_ws_reconnection_tries: int | None = 3 + ws_trade_timeout_secs: float | None = 5.0 diff --git a/nautilus_trader/adapters/bybit/execution.py b/nautilus_trader/adapters/bybit/execution.py index 9d55f13d7026..7bb41812da7d 100644 --- a/nautilus_trader/adapters/bybit/execution.py +++ b/nautilus_trader/adapters/bybit/execution.py @@ -13,11 +13,12 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- -import asyncio +from __future__ import annotations + from decimal import Decimal +from typing import TYPE_CHECKING import msgspec -import pandas as pd from nautilus_trader.adapters.bybit.common.constants import BYBIT_VENUE from nautilus_trader.adapters.bybit.common.credentials import get_api_key @@ -31,39 +32,22 @@ from nautilus_trader.adapters.bybit.common.enums import BybitTpSlMode from nautilus_trader.adapters.bybit.common.enums import BybitTriggerDirection from nautilus_trader.adapters.bybit.common.symbol import BybitSymbol -from nautilus_trader.adapters.bybit.config import BybitExecClientConfig from nautilus_trader.adapters.bybit.endpoints.trade.batch_cancel_order import BybitBatchCancelOrder from nautilus_trader.adapters.bybit.endpoints.trade.batch_place_order import BybitBatchPlaceOrder from nautilus_trader.adapters.bybit.http.account import BybitAccountHttpAPI -from nautilus_trader.adapters.bybit.http.client import BybitHttpClient from nautilus_trader.adapters.bybit.http.errors import BybitError from nautilus_trader.adapters.bybit.http.errors import should_retry -from nautilus_trader.adapters.bybit.providers import BybitInstrumentProvider from nautilus_trader.adapters.bybit.schemas.common import BYBIT_PONG from nautilus_trader.adapters.bybit.schemas.ws import BybitWsAccountExecution from nautilus_trader.adapters.bybit.schemas.ws import BybitWsAccountExecutionMsg from nautilus_trader.adapters.bybit.schemas.ws import BybitWsAccountOrderMsg -from nautilus_trader.adapters.bybit.schemas.ws import BybitWsAccountPositionMsg from nautilus_trader.adapters.bybit.schemas.ws import BybitWsAccountWalletMsg from nautilus_trader.adapters.bybit.schemas.ws import BybitWsMessageGeneral -from nautilus_trader.adapters.bybit.schemas.ws import BybitWsSubscriptionMsg from nautilus_trader.adapters.bybit.websocket.client import BybitWebSocketClient -from nautilus_trader.cache.cache import Cache -from nautilus_trader.common.component import LiveClock -from nautilus_trader.common.component import MessageBus from nautilus_trader.common.enums import LogColor from nautilus_trader.core.correctness import PyCondition from nautilus_trader.core.datetime import millis_to_nanos from nautilus_trader.core.uuid import UUID4 -from nautilus_trader.execution.messages import BatchCancelOrders -from nautilus_trader.execution.messages import CancelAllOrders -from nautilus_trader.execution.messages import CancelOrder -from nautilus_trader.execution.messages import ModifyOrder -from nautilus_trader.execution.messages import SubmitOrder -from nautilus_trader.execution.messages import SubmitOrderList -from nautilus_trader.execution.reports import FillReport -from nautilus_trader.execution.reports import OrderStatusReport -from nautilus_trader.execution.reports import PositionStatusReport from nautilus_trader.live.execution_client import LiveExecutionClient from nautilus_trader.live.retry import RetryManagerPool from nautilus_trader.model.enums import AccountType @@ -92,7 +76,29 @@ from nautilus_trader.model.orders import StopMarketOrder from nautilus_trader.model.orders import TrailingStopLimitOrder from nautilus_trader.model.orders import TrailingStopMarketOrder -from nautilus_trader.model.position import Position + + +if TYPE_CHECKING: + import asyncio + + import pandas as pd + + from nautilus_trader.adapters.bybit.config import BybitExecClientConfig + from nautilus_trader.adapters.bybit.http.client import BybitHttpClient + from nautilus_trader.adapters.bybit.providers import BybitInstrumentProvider + from nautilus_trader.cache.cache import Cache + from nautilus_trader.common.component import LiveClock + from nautilus_trader.common.component import MessageBus + from nautilus_trader.execution.messages import BatchCancelOrders + from nautilus_trader.execution.messages import CancelAllOrders + from nautilus_trader.execution.messages import CancelOrder + from nautilus_trader.execution.messages import ModifyOrder + from nautilus_trader.execution.messages import SubmitOrder + from nautilus_trader.execution.messages import SubmitOrderList + from nautilus_trader.execution.reports import FillReport + from nautilus_trader.execution.reports import OrderStatusReport + from nautilus_trader.execution.reports import PositionStatusReport + from nautilus_trader.model.position import Position class BybitExecutionClient(LiveExecutionClient): @@ -115,8 +121,10 @@ class BybitExecutionClient(LiveExecutionClient): The instrument provider. product_types : list[BybitProductType] The product types for the client. - base_url_ws : str - The base URL for the WebSocket client. + base_url_ws_private : str + The base URL for the `private` WebSocket client. + base_url_ws_trade : str + The base URL for the `trade` WebSocket client. config : BybitExecClientConfig The configuration for the client. name : str, optional @@ -133,7 +141,8 @@ def __init__( clock: LiveClock, instrument_provider: BybitInstrumentProvider, product_types: list[BybitProductType], - base_url_ws: str, + base_url_ws_private: str, + base_url_ws_trade: str, config: BybitExecClientConfig, name: str | None, ) -> None: @@ -160,9 +169,16 @@ def __init__( # Configuration self._product_types = product_types self._use_gtd = config.use_gtd + self._use_ws_trade_api = config.use_ws_trade_api + self._use_http_batch_api = config.use_http_batch_api + self._log.info(f"Account type: {account_type_to_str(account_type)}", LogColor.BLUE) self._log.info(f"Product types: {[p.value for p in product_types]}", LogColor.BLUE) self._log.info(f"{config.use_gtd=}", LogColor.BLUE) + self._log.info(f"{config.use_ws_trade_api=}", LogColor.BLUE) + self._log.info(f"{config.use_http_batch_api=}", LogColor.BLUE) + self._log.info(f"{config.ws_trade_timeout_secs=}", LogColor.BLUE) + self._log.info(f"{config.max_ws_reconnection_tries=}", LogColor.BLUE) self._log.info(f"{config.max_retries=}", LogColor.BLUE) self._log.info(f"{config.retry_delay=}", LogColor.BLUE) @@ -171,12 +187,18 @@ def __init__( account_id = AccountId(f"{name or BYBIT_VENUE.value}-UNIFIED") self._set_account_id(account_id) - # WebSocket API + # HTTP API + self._http_account = BybitAccountHttpAPI( + client=client, + clock=clock, + ) + + # WebSocket private client self._ws_client = BybitWebSocketClient( clock=clock, - handler=self._handle_ws_message, + handler=self._handle_ws_message_private, handler_reconnect=None, - base_url=base_url_ws, + base_url=base_url_ws_private, is_private=True, api_key=config.api_key or get_api_key(config.demo, config.testnet), api_secret=config.api_secret or get_api_secret(config.demo, config.testnet), @@ -184,11 +206,30 @@ def __init__( max_reconnection_tries=config.max_ws_reconnection_tries, ) - # HTTP API - self._http_account = BybitAccountHttpAPI( - client=client, - clock=clock, - ) + # WebSocket trade client + self._order_single_client: BybitWebSocketClient | BybitAccountHttpAPI + self._order_batch_client: BybitWebSocketClient | BybitAccountHttpAPI + if self._use_ws_trade_api: + self._ws_order_client = BybitWebSocketClient( + clock=clock, + handler=self._handle_ws_message_trade, + handler_reconnect=None, + base_url=base_url_ws_trade, + is_trade=True, + api_key=config.api_key or get_api_key(config.demo, config.testnet), + api_secret=config.api_secret or get_api_secret(config.demo, config.testnet), + loop=loop, + max_reconnection_tries=config.max_ws_reconnection_tries, + ws_trade_timeout_secs=config.ws_trade_timeout_secs, + ) + self._order_single_client = self._ws_order_client + if config.use_http_batch_api: + self._order_batch_client = self._http_account + else: + self._order_batch_client = self._ws_order_client + else: + self._order_single_client = self._http_account + self._order_batch_client = self._http_account # Order submission self._submit_order_methods = { @@ -203,11 +244,11 @@ def __init__( # Decoders self._decoder_ws_msg_general = msgspec.json.Decoder(BybitWsMessageGeneral) - self._decoder_ws_subscription = msgspec.json.Decoder(BybitWsSubscriptionMsg) + # self._decoder_ws_subscription = msgspec.json.Decoder(BybitWsSubscriptionMsg) self._decoder_ws_account_order_update = msgspec.json.Decoder(BybitWsAccountOrderMsg) self._decoder_ws_account_execution_update = msgspec.json.Decoder(BybitWsAccountExecutionMsg) - self._decoder_ws_account_position_update = msgspec.json.Decoder(BybitWsAccountPositionMsg) + # self._decoder_ws_account_position_update = msgspec.json.Decoder(BybitWsAccountPositionMsg) self._decoder_ws_account_wallet_update = msgspec.json.Decoder(BybitWsAccountWalletMsg) # Hot caches @@ -232,9 +273,15 @@ async def _connect(self) -> None: await self._ws_client.subscribe_orders_update() await self._ws_client.subscribe_wallet_update() + if self._use_ws_trade_api: + await self._ws_order_client.connect() + async def _disconnect(self) -> None: await self._ws_client.disconnect() + if self._use_ws_trade_api: + await self._ws_order_client.disconnect() + def _stop(self) -> None: self._retry_manager_pool.shutdown() @@ -539,7 +586,7 @@ async def _cancel_order(self, command: CancelOrder) -> None: await retry_manager.run( "cancel_order", [client_order_id, venue_order_id], - self._http_account.cancel_order, + self._order_single_client.cancel_order, bybit_symbol.product_type, bybit_symbol.raw_symbol, client_order_id=client_order_id, @@ -593,7 +640,7 @@ async def _batch_cancel_orders(self, command: BatchCancelOrders) -> None: await retry_manager.run( "batch_cancel_orders", None, - self._http_account.batch_cancel_orders, + self._order_batch_client.batch_cancel_orders, product_type=product_type, cancel_orders=cancel_orders, ) @@ -660,7 +707,7 @@ async def _modify_order(self, command: ModifyOrder) -> None: await retry_manager.run( "modify_order", [client_order_id, venue_order_id], - self._http_account.amend_order, + self._order_single_client.amend_order, bybit_symbol.product_type, bybit_symbol.raw_symbol, client_order_id=client_order_id, @@ -758,7 +805,7 @@ async def _submit_order_list(self, command: SubmitOrderList) -> None: await retry_manager.run( "submit_order_list", None, - self._http_account.batch_place_orders, + self._order_batch_client.batch_place_orders, product_type=product_type, submit_orders=submit_orders, ) @@ -784,7 +831,7 @@ async def _submit_market_order(self, order: MarketOrder) -> None: bybit_symbol = BybitSymbol(order.instrument_id.symbol.value) time_in_force = self._determine_time_in_force(order) order_side = self._enum_parser.parse_nautilus_order_side(order.side) - await self._http_account.place_order( + await self._order_single_client.place_order( product_type=bybit_symbol.product_type, symbol=bybit_symbol.raw_symbol, side=order_side, @@ -800,7 +847,7 @@ async def _submit_limit_order(self, order: LimitOrder) -> None: bybit_symbol = BybitSymbol(order.instrument_id.symbol.value) time_in_force = self._determine_time_in_force(order) order_side = self._enum_parser.parse_nautilus_order_side(order.side) - await self._http_account.place_order( + await self._order_single_client.place_order( product_type=bybit_symbol.product_type, symbol=bybit_symbol.raw_symbol, side=order_side, @@ -820,7 +867,7 @@ async def _submit_stop_market_order(self, order: StopMarketOrder) -> None: order_side = self._enum_parser.parse_nautilus_order_side(order.side) trigger_direction = self._enum_parser.parse_trigger_direction(order.order_type, order.side) trigger_type = self._enum_parser.parse_nautilus_trigger_type(order.trigger_type) - await self._http_account.place_order( + await self._order_single_client.place_order( product_type=product_type, symbol=bybit_symbol.raw_symbol, side=order_side, @@ -845,7 +892,7 @@ async def _submit_stop_limit_order(self, order: StopLimitOrder) -> None: order_side = self._enum_parser.parse_nautilus_order_side(order.side) trigger_direction = self._enum_parser.parse_trigger_direction(order.order_type, order.side) trigger_type = self._enum_parser.parse_nautilus_trigger_type(order.trigger_type) - await self._http_account.place_order( + await self._order_single_client.place_order( product_type=product_type, symbol=bybit_symbol.raw_symbol, side=order_side, @@ -870,7 +917,7 @@ async def _submit_market_if_touched_order(self, order: MarketIfTouchedOrder) -> order_type = BybitOrderType.MARKET trigger_direction = self._enum_parser.parse_trigger_direction(order.order_type, order.side) trigger_type = self._enum_parser.parse_nautilus_trigger_type(order.trigger_type) - await self._http_account.place_order( + await self._order_single_client.place_order( product_type=product_type, symbol=bybit_symbol.raw_symbol, side=order_side, @@ -893,7 +940,7 @@ async def _submit_limit_if_touched_order(self, order: LimitIfTouchedOrder) -> No order_side = self._enum_parser.parse_nautilus_order_side(order.side) trigger_direction = self._enum_parser.parse_trigger_direction(order.order_type, order.side) trigger_type = self._enum_parser.parse_nautilus_trigger_type(order.trigger_type) - await self._http_account.place_order( + await self._order_single_client.place_order( product_type=product_type, symbol=bybit_symbol.raw_symbol, side=order_side, @@ -926,7 +973,10 @@ async def _submit_trailing_stop_market(self, order: TrailingStopMarketOrder) -> trailing_offset=str(order.trailing_offset), ) - def _handle_ws_message(self, raw: bytes) -> None: + def _handle_ws_message_trade(self, raw: bytes) -> None: + pass + + def _handle_ws_message_private(self, raw: bytes) -> None: try: ws_message = self._decoder_ws_msg_general.decode(raw) if ws_message.op == BYBIT_PONG: @@ -1119,9 +1169,9 @@ def _handle_account_order_update(self, raw: bytes) -> None: # noqa: C901 (too c venue_order_id=report.venue_order_id, ts_event=report.ts_last, ) - elif ( - bybit_order.orderStatus == BybitOrderStatus.CANCELED - or bybit_order.orderStatus == BybitOrderStatus.DEACTIVATED + elif bybit_order.orderStatus in ( + BybitOrderStatus.CANCELED, + BybitOrderStatus.DEACTIVATED, ): self.generate_order_canceled( strategy_id=strategy_id, diff --git a/nautilus_trader/adapters/bybit/factories.py b/nautilus_trader/adapters/bybit/factories.py index 49d463cf8dc2..3fe420815db9 100644 --- a/nautilus_trader/adapters/bybit/factories.py +++ b/nautilus_trader/adapters/bybit/factories.py @@ -23,6 +23,7 @@ from nautilus_trader.adapters.bybit.common.urls import get_http_base_url from nautilus_trader.adapters.bybit.common.urls import get_ws_base_url_private from nautilus_trader.adapters.bybit.common.urls import get_ws_base_url_public +from nautilus_trader.adapters.bybit.common.urls import get_ws_base_url_trade from nautilus_trader.adapters.bybit.config import BybitDataClientConfig from nautilus_trader.adapters.bybit.config import BybitExecClientConfig from nautilus_trader.adapters.bybit.data import BybitDataClient @@ -255,7 +256,10 @@ def create( # type: ignore product_types=frozenset(config.product_types or BYBIT_ALL_PRODUCTS), config=config.instrument_provider, ) - base_url_ws: str = get_ws_base_url_private(config.testnet) + + base_url_ws_private: str = get_ws_base_url_private(config.testnet) + base_url_ws_trade: str = get_ws_base_url_trade(config.testnet) + return BybitExecutionClient( loop=loop, client=client, @@ -264,7 +268,8 @@ def create( # type: ignore clock=clock, instrument_provider=provider, product_types=config.product_types or [BybitProductType.SPOT], - base_url_ws=config.base_url_ws or base_url_ws, + base_url_ws_private=config.base_url_ws_private or base_url_ws_private, + base_url_ws_trade=config.base_url_ws_trade or base_url_ws_trade, config=config, name=name, ) diff --git a/nautilus_trader/adapters/bybit/http/account.py b/nautilus_trader/adapters/bybit/http/account.py index 7f7e2a61cb9b..698dee43dbe8 100644 --- a/nautilus_trader/adapters/bybit/http/account.py +++ b/nautilus_trader/adapters/bybit/http/account.py @@ -79,7 +79,6 @@ if TYPE_CHECKING: from nautilus_trader.adapters.bybit.common.enums import BybitMarginMode - from nautilus_trader.adapters.bybit.common.enums import BybitProductType from nautilus_trader.adapters.bybit.http.client import BybitHttpClient from nautilus_trader.adapters.bybit.schemas.account.info import BybitAccountInfo from nautilus_trader.adapters.bybit.schemas.account.set_leverage import BybitSetLeverageResponse diff --git a/nautilus_trader/adapters/bybit/schemas/ws.py b/nautilus_trader/adapters/bybit/schemas/ws.py index f5ffd7b43e98..7207c18276c6 100644 --- a/nautilus_trader/adapters/bybit/schemas/ws.py +++ b/nautilus_trader/adapters/bybit/schemas/ws.py @@ -32,7 +32,14 @@ from nautilus_trader.adapters.bybit.common.enums import BybitTimeInForce from nautilus_trader.adapters.bybit.common.enums import BybitTriggerDirection from nautilus_trader.adapters.bybit.common.enums import BybitTriggerType +from nautilus_trader.adapters.bybit.common.enums import BybitWsOrderRequestMsgOP from nautilus_trader.adapters.bybit.common.parsing import parse_bybit_delta +from nautilus_trader.adapters.bybit.endpoints.trade.amend_order import BybitAmendOrderPostParams +from nautilus_trader.adapters.bybit.endpoints.trade.cancel_order import BybitCancelOrderPostParams +from nautilus_trader.adapters.bybit.endpoints.trade.place_order import BybitPlaceOrderPostParams +from nautilus_trader.adapters.bybit.schemas.order import BybitAmendOrder +from nautilus_trader.adapters.bybit.schemas.order import BybitCancelOrder +from nautilus_trader.adapters.bybit.schemas.order import BybitPlaceOrder from nautilus_trader.core.datetime import millis_to_nanos from nautilus_trader.core.uuid import UUID4 from nautilus_trader.execution.reports import OrderStatusReport @@ -79,6 +86,27 @@ class BybitWsSubscriptionMsg(msgspec.Struct): req_id: str | None = None +class BybitWsPrivateChannelAuthMsg(msgspec.Struct, kw_only=True): + success: bool + ret_msg: str | None = None + op: str + conn_id: str + + def is_auth_success(self) -> bool: + return (self.op == "auth") and (self.success is True) + + +class BybitWsTradeAuthMsg(msgspec.Struct, kw_only=True): + reqId: str | None = None + retCode: int + retMsg: str + op: str + connId: str + + def is_auth_success(self) -> bool: + return (self.op == "auth") and (self.retCode == 0) + + ################################################################################ # Public - Kline ################################################################################ @@ -514,16 +542,21 @@ class BybitWsAccountPosition(msgspec.Struct): takeProfit: str stopLoss: str trailingStop: str + sessionAvgPrice: str unrealisedPnl: str cumRealisedPnl: str createdTime: str updatedTime: str liqPrice: str bustPrice: str - category: str + category: BybitProductType positionStatus: str adlRankIndicator: int seq: int + autoAddMargin: int + leverageSysUpdatedTime: str + mmrSysUpdatedTime: str + isReduceOnly: bool tpslMode: str | None = None @@ -765,3 +798,38 @@ def handle_account_wallet_update(self, exec_client: BybitExecutionClient): reported=True, ts_event=millis_to_nanos(self.creationTime), ) + + +################################################################################ +# Trade +################################################################################ + + +class BybitWsOrderRequestMsg(msgspec.Struct, kw_only=True): + reqId: str | None = None + header: dict[str, str] + op: BybitWsOrderRequestMsgOP + args: list[ + BybitPlaceOrderPostParams | BybitAmendOrderPostParams | BybitCancelOrderPostParams + ] = [] + + +class BybitWsOrderResponseMsg(msgspec.Struct, kw_only=True): + reqId: str | None = None + retCode: int + retMsg: str + op: str + header: dict[str, str] | None = None + connId: str + + +class BybitWsPlaceOrderResponseMsg(BybitWsOrderResponseMsg): + data: BybitPlaceOrder + + +class BybitWsAmendOrderResponseMsg(BybitWsOrderResponseMsg): + data: BybitAmendOrder + + +class BybitWsCancelOrderResponseMsg(BybitWsOrderResponseMsg): + data: BybitCancelOrder diff --git a/nautilus_trader/adapters/bybit/websocket/client.py b/nautilus_trader/adapters/bybit/websocket/client.py index e215b4baae20..2c662f2610c2 100644 --- a/nautilus_trader/adapters/bybit/websocket/client.py +++ b/nautilus_trader/adapters/bybit/websocket/client.py @@ -13,14 +13,32 @@ # limitations under the License. # ------------------------------------------------------------------------------------------------- -import asyncio -from collections.abc import Awaitable -from collections.abc import Callable -from typing import Any - -import msgspec +from __future__ import annotations -from nautilus_trader.adapters.bybit.schemas.ws import BybitWsSubscriptionMsg +import asyncio +from typing import TYPE_CHECKING + +from msgspec import json as msgspec_json + +from nautilus_trader.adapters.bybit.common.enums import BybitOrderSide +from nautilus_trader.adapters.bybit.common.enums import BybitOrderType +from nautilus_trader.adapters.bybit.common.enums import BybitProductType +from nautilus_trader.adapters.bybit.common.enums import BybitTimeInForce +from nautilus_trader.adapters.bybit.common.enums import BybitTpSlMode +from nautilus_trader.adapters.bybit.common.enums import BybitTriggerDirection +from nautilus_trader.adapters.bybit.common.enums import BybitTriggerType +from nautilus_trader.adapters.bybit.common.enums import BybitWsOrderRequestMsgOP + +# fmt: off +from nautilus_trader.adapters.bybit.endpoints.trade.amend_order import BybitAmendOrderPostParams +from nautilus_trader.adapters.bybit.endpoints.trade.cancel_order import BybitCancelOrderPostParams +from nautilus_trader.adapters.bybit.endpoints.trade.place_order import BybitPlaceOrderPostParams +from nautilus_trader.adapters.bybit.http.errors import BybitError +from nautilus_trader.adapters.bybit.schemas.ws import BybitWsMessageGeneral +from nautilus_trader.adapters.bybit.schemas.ws import BybitWsOrderRequestMsg +from nautilus_trader.adapters.bybit.schemas.ws import BybitWsOrderResponseMsg +from nautilus_trader.adapters.bybit.schemas.ws import BybitWsPrivateChannelAuthMsg +from nautilus_trader.adapters.bybit.schemas.ws import BybitWsTradeAuthMsg from nautilus_trader.common.component import LiveClock from nautilus_trader.common.component import Logger from nautilus_trader.common.enums import LogColor @@ -28,10 +46,24 @@ from nautilus_trader.core.nautilus_pyo3 import WebSocketClientError from nautilus_trader.core.nautilus_pyo3 import WebSocketConfig from nautilus_trader.core.nautilus_pyo3 import hmac_signature +from nautilus_trader.core.uuid import UUID4 + +if TYPE_CHECKING: + from collections.abc import Awaitable + from collections.abc import Callable + from collections.abc import Coroutine + from typing import Any + + from nautilus_trader.adapters.bybit.endpoints.trade.batch_cancel_order import BybitBatchCancelOrder + from nautilus_trader.adapters.bybit.endpoints.trade.batch_place_order import BybitBatchPlaceOrder + +# fmt: on MAX_ARGS_PER_SUBSCRIPTION_REQUEST = 10 +WsOrderResponseFuture = asyncio.Future[BybitWsOrderResponseMsg] + class BybitWebSocketClient: """ @@ -47,9 +79,15 @@ class BybitWebSocketClient: The callback handler for message events. handler_reconnect : Callable[..., Awaitable[None]], optional The callback handler to be called on reconnect. + is_private : bool, optional + Whether the client is a private channel. + is_trade : bool, optional + Whether the client is a trade channel. max_reconnection_tries: int, default 3 The number of retries to reconnect the websocket connection if the connection is broken. + ws_trade_timeout_secs: float, default 5.0 + The timeout for trade websocket messages. """ @@ -63,7 +101,9 @@ def __init__( api_secret: str, loop: asyncio.AbstractEventLoop, is_private: bool | None = False, + is_trade: bool | None = False, max_reconnection_tries: int | None = 3, + ws_trade_timeout_secs: float | None = 5.0, ) -> None: self._clock = clock self._log: Logger = Logger(name=type(self).__name__) @@ -73,17 +113,30 @@ def __init__( self._handler_reconnect: Callable[..., Awaitable[None]] | None = handler_reconnect self._loop = loop self._max_reconnection_tries = max_reconnection_tries + self._ws_trade_timeout_secs = ws_trade_timeout_secs self._client: WebSocketClient | None = None self._api_key = api_key self._api_secret = api_secret - self._is_private = is_private self._is_running = False + self._reconnecting = False self._subscriptions: list[str] = [] + self._is_private = is_private + self._is_trade = is_trade + self._auth_required = is_private or is_trade self._is_authenticated = False - self._decoder_ws_subscription = msgspec.json.Decoder(BybitWsSubscriptionMsg) + + self._decoder_ws_message_general = msgspec_json.Decoder(BybitWsMessageGeneral) + self._decoder_ws_private_channel_auth = msgspec_json.Decoder(BybitWsPrivateChannelAuthMsg) + self._decoder_ws_trade_auth = msgspec_json.Decoder(BybitWsTradeAuthMsg) + self._decoder_ws_order_response = msgspec_json.Decoder(BybitWsOrderResponseMsg) + + self._pending_order_requests: dict[str, WsOrderResponseFuture] = {} + + if self._is_private and self._is_trade: + raise ValueError("`is_private` and `is_trade` cannot both be True") @property def subscriptions(self) -> list[str]: @@ -92,6 +145,15 @@ def subscriptions(self) -> list[str]: def has_subscription(self, item: str) -> bool: return item in self._subscriptions + @property + def channel_type(self) -> str: + if self._is_private: + return "Private" + elif self._is_trade: + return "Trade" + else: + return "Public" + async def connect(self) -> None: self._is_running = True self._log.debug(f"Connecting to {self._base_url} websocket stream") @@ -99,7 +161,7 @@ async def connect(self) -> None: url=self._base_url, handler=self._msg_handler, heartbeat=20, - heartbeat_msg=msgspec.json.encode({"op": "ping"}).decode(), + heartbeat_msg=msgspec_json.encode({"op": "ping"}).decode(), headers=[], max_reconnection_tries=self._max_reconnection_tries, ) @@ -111,34 +173,42 @@ async def connect(self) -> None: self._log.info(f"Connected to {self._base_url}", LogColor.BLUE) # Authenticate - if self._is_private: + if self._auth_required: await self._authenticate() def reconnect(self) -> None: """ Reconnect the client to the server and resubscribe to all streams. """ - if not self._is_running: + if not self._is_running or self._reconnecting: return self._log.warning(f"Trying to reconnect to {self._base_url}") + self._reconnecting = True self._loop.create_task(self._reconnect_wrapper()) async def _reconnect_wrapper(self) -> None: - # Authenticate - if self._is_private: - await self._authenticate() + try: + # Authenticate + if self._auth_required: + await self._authenticate() - # Re-subscribe to all streams - await self._subscribe_all() + if self._is_private: + # Re-subscribe to all streams + await self._subscribe_all() - if self._handler_reconnect: - await self._handler_reconnect() + if self._handler_reconnect: + await self._handler_reconnect() - self._log.warning(f"Reconnected to {self._base_url}") + self._log.warning(f"Reconnected to {self._base_url}") + except Exception as e: + self._log.error(f"Reconnection failed: {e}") + finally: + self._reconnecting = False async def disconnect(self) -> None: self._is_running = False + self._reconnecting = False if self._client is None: self._log.warning("Cannot disconnect: not connected.") @@ -163,27 +233,47 @@ def _msg_handler(self, raw: bytes) -> None: The received message in bytes. """ - if self._is_private and not self._is_authenticated: - msg = self._decoder_ws_subscription.decode(raw) - if msg.op == "auth": - if msg.success is True: - self._is_authenticated = True - self._log.info("Private channel authenticated") - else: - raise RuntimeError(f"Private channel authentication failed: {msg}") + # TODO: better way to improve performance with high message volume? + + msg = self._decoder_ws_message_general.decode(raw) + op = msg.op + + if self._auth_required and not self._is_authenticated and op == "auth": + self._check_auth_success(raw) + return + + if self._is_trade and op and "order." in op: + self._handle_order_ack(raw) self._handler(raw) + def _check_auth_success(self, raw: bytes) -> None: + msg: BybitWsPrivateChannelAuthMsg | BybitWsTradeAuthMsg + + if self._is_private: + msg = self._decoder_ws_private_channel_auth.decode(raw) + elif self._is_trade: + msg = self._decoder_ws_trade_auth.decode(raw) + else: + raise RuntimeError("Invalid channel type") + + if msg.is_auth_success(): + self._is_authenticated = True + self._log.info(f"{self.channel_type} channel authenticated", LogColor.GREEN) + else: + raise RuntimeError(f"{self.channel_type} channel authentication failed: {msg}") + async def _authenticate(self) -> None: self._is_authenticated = False signature = self._get_signature() await self._send(signature) while not self._is_authenticated: - self._log.debug("Waiting for private channel authentication") + self._log.debug(f"Waiting for {self.channel_type} channel authentication") await asyncio.sleep(0.1) async def _subscribe(self, subscription: str) -> None: + self._log.debug(f"Subscribing to {subscription}") if subscription in self._subscriptions: self._log.warning(f"Cannot subscribe '{subscription}': already subscribed") return @@ -201,6 +291,38 @@ async def _unsubscribe(self, subscription: str) -> None: msg = {"op": "unsubscribe", "args": [subscription]} await self._send(msg) + async def _subscribe_all(self) -> None: + if self._client is None: + self._log.error("Cannot subscribe all: not connected") + return + + self._log.info("Resubscribe to all data streams") + + # You can input up to 10 args for each subscription request sent to one connection + subscription_lists = [ + self._subscriptions[i : i + MAX_ARGS_PER_SUBSCRIPTION_REQUEST] + for i in range(0, len(self._subscriptions), MAX_ARGS_PER_SUBSCRIPTION_REQUEST) + ] + + for subscriptions in subscription_lists: + msg = {"op": "subscribe", "args": subscriptions} + await self._send(msg) + + async def _send(self, msg: dict[str, Any]) -> None: + await self._send_text(msgspec_json.encode(msg)) + + async def _send_text(self, msg: bytes) -> None: + if self._client is None: + self._log.error(f"Cannot send message {msg!r}: not connected") + return + + self._log.debug(f"SENDING: {msg!r}") + + try: + await self._client.send_text(msg) + except WebSocketClientError as e: + self._log.error(str(e)) + ################################################################################ # Public ################################################################################ @@ -270,31 +392,235 @@ def _get_signature(self): "args": [self._api_key, expires, signature], } - async def _subscribe_all(self) -> None: - if self._client is None: - self._log.error("Cannot subscribe all: not connected") - return - - self._log.info("Resubscribe to all data streams") + ################################################################################ + # Trade + ################################################################################ - # You can input up to 10 args for each subscription request sent to one connection - subscription_lists = [ - self._subscriptions[i : i + MAX_ARGS_PER_SUBSCRIPTION_REQUEST] - for i in range(0, len(self._subscriptions), MAX_ARGS_PER_SUBSCRIPTION_REQUEST) - ] + def _handle_order_ack(self, raw: bytes) -> None: + try: + msg = self._decoder_ws_order_response.decode(raw) + except Exception as e: + self._log.error(f"Failed to decode order ack response {raw!r}: {e}") + return - for subscriptions in subscription_lists: - msg = {"op": "subscribe", "args": subscriptions} - await self._send(msg) + req_id = msg.reqId + if not req_id: + self._log.debug(f"No `reqId` in order ack response: {msg}") + return - async def _send(self, msg: dict[str, Any]) -> None: - if self._client is None: - self._log.error(f"Cannot send message {msg}: not connected") + ret_code = msg.retCode + future = self._pending_order_requests.pop(req_id, None) + if future is not None: + if ret_code == 0: + future.set_result(msg) + else: + future.set_exception(BybitError(code=ret_code, message=msg.retMsg)) + else: + self._log.warning(f"Received ack for `unknown/timeout` reqId={req_id}, msg={msg}") return - self._log.debug(f"SENDING: {msg}") + async def _order( + self, + op: BybitWsOrderRequestMsgOP, + args: list[ + BybitPlaceOrderPostParams | BybitAmendOrderPostParams | BybitCancelOrderPostParams + ], + timeout_secs: float | None, + ) -> BybitWsOrderResponseMsg: + req_id = UUID4().value + + future: WsOrderResponseFuture = self._loop.create_future() + self._pending_order_requests[req_id] = future + + # Build request + request = BybitWsOrderRequestMsg( + reqId=req_id, + header={ + "X-BAPI-TIMESTAMP": str(self._clock.timestamp_ms()), + }, + op=op, + args=args, # Args array, support one item only for now + ) + + # Send request + await self._send_text(msgspec_json.encode(request)) + # Wait for response or timeout try: - await self._client.send_text(msgspec.json.encode(msg)) - except WebSocketClientError as e: - self._log.error(str(e)) + ack_resp = await asyncio.wait_for(future, timeout_secs) + except TimeoutError as e: + self._log.error(f"Order request `{req_id}` timed out. op={op}, args={args}") + future.cancel() + self._pending_order_requests.pop(req_id, None) + raise BybitError(code=-10_000, message="Request timed out") from e + + return ack_resp + + async def _batch_orders( + self, + tasks: list[Coroutine[Any, Any, BybitWsOrderResponseMsg]], + ) -> list[BybitWsOrderResponseMsg]: + futures = await asyncio.gather(*tasks, return_exceptions=True) + + results: list[BybitWsOrderResponseMsg] = [] + for result in futures: + if isinstance(result, BybitWsOrderResponseMsg): + results.append(result) + else: + self._log.error(f"Batch orders error: {result}") + return results + + async def place_order( + self, + product_type: BybitProductType, + symbol: str, + side: BybitOrderSide, + quantity: str, + quote_quantity: bool, + order_type: BybitOrderType, + price: str | None = None, + time_in_force: BybitTimeInForce | None = None, + client_order_id: str | None = None, + reduce_only: bool | None = None, + tpsl_mode: BybitTpSlMode | None = None, + close_on_trigger: bool | None = None, + tp_order_type: BybitOrderType | None = None, + sl_order_type: BybitOrderType | None = None, + trigger_direction: BybitTriggerDirection | None = None, + trigger_type: BybitTriggerType | None = None, + trigger_price: str | None = None, + sl_trigger_price: str | None = None, + tp_trigger_price: str | None = None, + tp_limit_price: str | None = None, + sl_limit_price: str | None = None, + ) -> BybitWsOrderResponseMsg: + return await self._order( + timeout_secs=self._ws_trade_timeout_secs, + op=BybitWsOrderRequestMsgOP.CREATE, + args=[ + BybitPlaceOrderPostParams( + category=product_type, + symbol=symbol, + side=side, + orderType=order_type, + qty=quantity, + marketUnit="baseCoin" if not quote_quantity else "quoteCoin", + price=price, + timeInForce=time_in_force, + orderLinkId=client_order_id, + reduceOnly=reduce_only, + closeOnTrigger=close_on_trigger, + tpslMode=tpsl_mode if product_type != BybitProductType.SPOT else None, + triggerPrice=trigger_price, + triggerDirection=trigger_direction, + triggerBy=trigger_type, + takeProfit=tp_trigger_price if product_type == BybitProductType.SPOT else None, + stopLoss=sl_trigger_price if product_type == BybitProductType.SPOT else None, + slTriggerBy=trigger_type if product_type != BybitProductType.SPOT else None, + tpTriggerBy=trigger_type if product_type != BybitProductType.SPOT else None, + tpLimitPrice=tp_limit_price if product_type != BybitProductType.SPOT else None, + slLimitPrice=sl_limit_price if product_type != BybitProductType.SPOT else None, + tpOrderType=tp_order_type, + slOrderType=sl_order_type, + ), + ], + ) + + async def amend_order( + self, + product_type: BybitProductType, + symbol: str, + client_order_id: str | None = None, + venue_order_id: str | None = None, + trigger_price: str | None = None, + quantity: str | None = None, + price: str | None = None, + ) -> BybitWsOrderResponseMsg: + return await self._order( + timeout_secs=self._ws_trade_timeout_secs, + op=BybitWsOrderRequestMsgOP.AMEND, + args=[ + BybitAmendOrderPostParams( + category=product_type, + symbol=symbol, + orderId=venue_order_id, + orderLinkId=client_order_id, + triggerPrice=trigger_price, + qty=quantity, + price=price, + ), + ], + ) + + async def cancel_order( + self, + product_type: BybitProductType, + symbol: str, + client_order_id: str | None = None, + venue_order_id: str | None = None, + order_filter: str | None = None, + ) -> BybitWsOrderResponseMsg: + return await self._order( + timeout_secs=self._ws_trade_timeout_secs, + op=BybitWsOrderRequestMsgOP.CANCEL, + args=[ + BybitCancelOrderPostParams( + category=product_type, + symbol=symbol, + orderId=venue_order_id, + orderLinkId=client_order_id, + orderFilter=order_filter, + ), + ], + ) + + async def batch_place_orders( + self, + product_type: BybitProductType, + submit_orders: list[BybitBatchPlaceOrder], + ) -> list[BybitWsOrderResponseMsg]: + tasks = [ + self.place_order( + product_type=product_type, + symbol=order.symbol, + side=order.side, + order_type=order.orderType, + quantity=order.qty, + quote_quantity=order.marketUnit == "quoteCoin", + price=order.price, + time_in_force=order.timeInForce, + client_order_id=order.orderLinkId, + reduce_only=order.reduceOnly, + close_on_trigger=order.closeOnTrigger, + trigger_price=order.triggerPrice, + trigger_direction=order.triggerDirection, + tp_order_type=order.tpOrderType, + sl_order_type=order.slOrderType, + tpsl_mode=order.tpslMode, + tp_trigger_price=order.takeProfit, + sl_trigger_price=order.stopLoss, + trigger_type=order.triggerBy, + tp_limit_price=order.tpLimitPrice, + sl_limit_price=order.slLimitPrice, + ) + for order in submit_orders + ] + + return await self._batch_orders(tasks) + + async def batch_cancel_orders( + self, + product_type: BybitProductType, + cancel_orders: list[BybitBatchCancelOrder], + ) -> list[BybitWsOrderResponseMsg]: + tasks = [ + self.cancel_order( + product_type=product_type, + symbol=order.symbol, + client_order_id=order.orderLinkId, + venue_order_id=order.orderId, + ) + for order in cancel_orders + ] + + return await self._batch_orders(tasks) diff --git a/tests/integration_tests/adapters/bybit/resources/ws_messages/private/ws_position.json b/tests/integration_tests/adapters/bybit/resources/ws_messages/private/ws_position.json index b9cfaa3aab5c..c52b0507cc65 100644 --- a/tests/integration_tests/adapters/bybit/resources/ws_messages/private/ws_position.json +++ b/tests/integration_tests/adapters/bybit/resources/ws_messages/private/ws_position.json @@ -21,6 +21,7 @@ "takeProfit": "0", "stopLoss": "0", "trailingStop": "0", + "sessionAvgPrice": "0", "unrealisedPnl": "-1.8075", "cumRealisedPnl": "0.64782276", "createdTime": "1672121182216", @@ -31,7 +32,11 @@ "category": "linear", "positionStatus": "Normal", "adlRankIndicator": 2, + "autoAddMargin": 0, + "leverageSysUpdatedTime": "", + "mmrSysUpdatedTime": "", + "isReduceOnly": false, "seq": 4688002127 } ] -} \ No newline at end of file +} diff --git a/tests/integration_tests/adapters/bybit/test_ws_decoders.py b/tests/integration_tests/adapters/bybit/test_ws_decoders.py index 019ea3917f14..1dd2052aff36 100644 --- a/tests/integration_tests/adapters/bybit/test_ws_decoders.py +++ b/tests/integration_tests/adapters/bybit/test_ws_decoders.py @@ -575,6 +575,7 @@ def test_ws_private_position(self): takeProfit="0", stopLoss="0", trailingStop="0", + sessionAvgPrice="0", unrealisedPnl="-1.8075", cumRealisedPnl="0.64782276", createdTime="1672121182216", @@ -582,9 +583,13 @@ def test_ws_private_position(self): tpslMode="Full", liqPrice="", bustPrice="", - category="linear", + category=BybitProductType.LINEAR, positionStatus="Normal", adlRankIndicator=2, + autoAddMargin=0, + leverageSysUpdatedTime="", + mmrSysUpdatedTime="", + isReduceOnly=False, seq=4688002127, ) assert result.data == [target_data]