From 4e71e8981c8711905accb63f3626dd1109301ccc Mon Sep 17 00:00:00 2001 From: David Blom Date: Mon, 30 Sep 2024 08:53:19 +0200 Subject: [PATCH] Fix resubscribing to orderbooks for dYdX --- nautilus_trader/adapters/dydx/data.py | 7 +++++-- .../adapters/dydx/websocket/client.py | 16 ++++++++++------ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/nautilus_trader/adapters/dydx/data.py b/nautilus_trader/adapters/dydx/data.py index dfc74734e8a1..154294921e90 100644 --- a/nautilus_trader/adapters/dydx/data.py +++ b/nautilus_trader/adapters/dydx/data.py @@ -217,7 +217,10 @@ async def _resubscribe_orderbooks(self) -> None: async with self._resubscribe_orderbook_lock: for symbol in self._orderbook_subscriptions: await self._ws_client.unsubscribe_order_book(symbol, remove_subscription=False) - await self._ws_client.subscribe_order_book(symbol) + await self._ws_client.subscribe_order_book( + symbol, + bypass_subscription_validation=True, + ) def _send_all_instruments_to_data_engine(self) -> None: for instrument in self._instrument_provider.get_all().values(): @@ -269,7 +272,7 @@ def _handle_ws_message(self, raw: bytes) -> None: elif ws_message.type == "error": if ( ws_message.message - == "Internal error, could not fetch data for subscription: v4_orderbook" + == "Internal error, could not fetch data for subscription: v4_orderbook." ): # This error occurs when the websocket service fails to request the initial # orderbook snapshot. diff --git a/nautilus_trader/adapters/dydx/websocket/client.py b/nautilus_trader/adapters/dydx/websocket/client.py index 395e28e5f35e..d7a8fe899610 100644 --- a/nautilus_trader/adapters/dydx/websocket/client.py +++ b/nautilus_trader/adapters/dydx/websocket/client.py @@ -74,7 +74,7 @@ def __init__( # Every 30 seconds, the dYdX websocket API will send a heartbeat ping control # frame to the connected client. If a pong event is not received within 10 # seconds back, the websocket API will disconnect. - self._ping_timestamp: pd.Timestamp | None = None + self._ping_timestamp = self._clock.utc_now() self._ping_interval_secs: int = 40 self._reconnect_task: asyncio.Task | None = None @@ -133,6 +133,8 @@ async def connect(self) -> None: self._client = client self._log.info(f"Connected to {self._base_url}", LogColor.BLUE) + self._ping_timestamp = self._clock.utc_now() + if self._reconnect_task is None: self._reconnect_task = self._loop.create_task(self._reconnect_ping()) @@ -163,9 +165,7 @@ async def _reconnect_ping(self) -> None: now_timestamp = self._clock.utc_now() time_since_previous_ping = now_timestamp - self._ping_timestamp - if self._ping_timestamp is not None and time_since_previous_ping > pd.Timedelta( - seconds=self._ping_interval_secs, - ): + if time_since_previous_ping > pd.Timedelta(seconds=self._ping_interval_secs): self._log.error( f"Time since previous received ping message is {time_since_previous_ping}", ) @@ -232,7 +232,11 @@ async def subscribe_trades(self, symbol: str) -> None: self._log.debug(f"Subscribe to {symbol} trade ticks") await self._send(msg) - async def subscribe_order_book(self, symbol: str) -> None: + async def subscribe_order_book( + self, + symbol: str, + bypass_subscription_validation: bool = False, + ) -> None: """ Subscribe to trades messages. """ @@ -241,7 +245,7 @@ async def subscribe_order_book(self, symbol: str) -> None: return subscription = ("v4_orderbook", symbol) - if subscription in self._subscriptions: + if subscription in self._subscriptions and bypass_subscription_validation is False: self._log.warning(f"Cannot subscribe '{subscription}': already subscribed") return