diff --git a/hummingbot/connector/exchange/gate_io/gate_io_auth.py b/hummingbot/connector/exchange/gate_io/gate_io_auth.py index 9c7403b06e..312b19667b 100644 --- a/hummingbot/connector/exchange/gate_io/gate_io_auth.py +++ b/hummingbot/connector/exchange/gate_io/gate_io_auth.py @@ -17,10 +17,13 @@ class GateIoAuth(AuthBase): Auth Gate.io API https://www.gate.io/docs/apiv4/en/#authentication """ - def __init__(self, api_key: str, secret_key: str, time_provider: TimeSynchronizer): + def __init__(self, + gate_io_api_key: str = "", + gate_io_secret_key: str = "", + time_provider: TimeSynchronizer = None): self.nonce = None - self.api_key = api_key - self.secret_key = secret_key + self.api_key = gate_io_api_key + self.secret_key = gate_io_secret_key self.time_provider = time_provider async def rest_authenticate(self, request: RESTRequest) -> RESTRequest: diff --git a/hummingbot/connector/exchange/gate_io/gate_io_exchange.py b/hummingbot/connector/exchange/gate_io/gate_io_exchange.py index 8e43f70887..8e011a2fc0 100644 --- a/hummingbot/connector/exchange/gate_io/gate_io_exchange.py +++ b/hummingbot/connector/exchange/gate_io/gate_io_exchange.py @@ -1,47 +1,22 @@ -import time -import asyncio -from decimal import Decimal -from typing import Any, Dict, List, Optional -import json +from typing import List, Optional -from hummingbot.connector.exchange_base_v2 import ExchangeBaseV2 from hummingbot.connector.exchange.gate_io import ( gate_io_constants as CONSTANTS, - gate_io_web_utils as web_utils ) -from hummingbot.connector.exchange.gate_io.gate_io_auth import GateIoAuth -from hummingbot.connector.exchange.gate_io.gate_io_api_order_book_data_source import GateIoAPIOrderBookDataSource -from hummingbot.connector.exchange.gate_io.gate_io_api_user_stream_data_source import GateIoAPIUserStreamDataSource -from hummingbot.connector.trading_rule import TradingRule -from hummingbot.connector.constants import s_decimal_NaN -from hummingbot.core.data_type.common import OrderType, TradeType -from hummingbot.core.data_type.trade_fee import AddedToCostTradeFee, TradeFeeBase, TokenAmount -from hummingbot.core.data_type.in_flight_order import TradeUpdate, OrderUpdate, OrderState -from hummingbot.core.utils.async_utils import safe_gather +from hummingbot.connector.exchange_base_v2 import ExchangeBaseV2 +from hummingbot.connector.exchange.gate_io.gate_io_exchange_api import GateIoExchangeApi +from hummingbot.core.data_type.common import OrderType class GateIoExchange(ExchangeBaseV2): DEFAULT_DOMAIN = "" - RATE_LIMITS = CONSTANTS.RATE_LIMITS SUPPORTED_ORDER_TYPES = [ OrderType.LIMIT ] - HBOT_ORDER_ID_PREFIX = CONSTANTS.HBOT_ORDER_ID MAX_ORDER_ID_LEN = CONSTANTS.MAX_ID_LEN - ORDERBOOK_DS_CLASS = GateIoAPIOrderBookDataSource - USERSTREAM_DS_CLASS = GateIoAPIUserStreamDataSource - - CHECK_NETWORK_URL = CONSTANTS.NETWORK_CHECK_PATH_URL - SYMBOLS_PATH_URL = CONSTANTS.SYMBOL_PATH_URL - FEE_PATH_URL = SYMBOLS_PATH_URL - - INTERVAL_TRADING_RULES = CONSTANTS.INTERVAL_TRADING_RULES - # Using 120 seconds here as Gate.io websocket is quiet - TICK_INTERVAL_LIMIT = 10.0 # 120.0 - - web_utils = web_utils + EXCHANGE_API_CLASS = GateIoExchangeApi def __init__(self, gate_io_api_key: str, @@ -55,373 +30,15 @@ def __init__(self, :param trading_pairs: The market trading pairs which to track order book data. :param trading_required: Whether actual trading is needed. """ - self._gate_io_api_key = gate_io_api_key - self._gate_io_secret_key = gate_io_secret_key + self._auth_credentials = { + "gate_io_api_key": gate_io_api_key, + "gate_io_secret_key": gate_io_secret_key, + } self._domain = domain self._trading_required = trading_required self._trading_pairs = trading_pairs - super().__init__() - def init_auth(self): - return GateIoAuth( - api_key=self._gate_io_api_key, - secret_key=self._gate_io_secret_key, - time_provider=self._time_synchronizer) - @property def name(self) -> str: return "gate_io" - - async def _format_trading_rules(self, raw_trading_pair_info: Dict[str, Any]) -> Dict[str, TradingRule]: - """ - Converts json API response into a dictionary of trading rules. - - :param raw_trading_pair_info: The json API response - :return A dictionary of trading rules. - - Example raw_trading_pair_info: - https://www.gate.io/docs/apiv4/en/#list-all-currency-pairs-supported - """ - result = [] - for rule in raw_trading_pair_info: - try: - trading_pair = web_utils.convert_from_exchange_trading_pair(rule["id"]) - - min_amount_inc = Decimal(f"1e-{rule['amount_precision']}") - min_price_inc = Decimal(f"1e-{rule['precision']}") - min_amount = Decimal(str(rule.get("min_base_amount", min_amount_inc))) - min_notional = Decimal(str(rule.get("min_quote_amount", min_price_inc))) - result.append( - TradingRule( - trading_pair, - min_order_size=min_amount, - min_price_increment=min_price_inc, - min_base_amount_increment=min_amount_inc, - min_notional_size=min_notional, - min_order_value=min_notional, - ) - ) - except Exception: - self.logger().error( - f"Error parsing the trading pair rule {rule}. Skipping.", exc_info=True) - return result - - async def _place_order(self, - order_id: str, - trading_pair: str, - amount: Decimal, - trade_type: TradeType, - order_type: OrderType, - price: Decimal) -> str: - - order_type_str = order_type.name.lower().split("_")[0] - symbol = web_utils.convert_to_exchange_trading_pair(trading_pair) - data = { - "text": order_id, - "currency_pair": symbol, - "side": trade_type.name.lower(), - "type": order_type_str, - "price": f"{price:f}", - "amount": f"{amount:f}", - } - # RESTRequest does not support json, and if we pass a dict - # the underlying aiohttp will encode it to params - data = json.dumps(data) - endpoint = CONSTANTS.ORDER_CREATE_PATH_URL - order_result = await self._api_post( - path_url=endpoint, - data=data, - is_auth_required=True, - limit_id=endpoint, - ) - if order_result.get('status') in {"cancelled", "expired", "failed"}: - raise web_utils.APIError({'label': 'ORDER_REJECTED', 'message': 'Order rejected.'}) - exchange_order_id = str(order_result["id"]) - return exchange_order_id, time.time() - - async def _place_cancel(self, order_id, tracked_order): - """ - This implementation-specific method is called by _cancel - returns True if successful - """ - cancelled = False - exchange_order_id = await tracked_order.get_exchange_order_id() - try: - params = { - 'currency_pair': web_utils.convert_to_exchange_trading_pair(tracked_order.trading_pair) - } - resp = await self._api_delete( - path_url=CONSTANTS.ORDER_DELETE_PATH_URL.format(order_id=exchange_order_id), - params=params, - is_auth_required=True, - limit_id=CONSTANTS.ORDER_DELETE_LIMIT_ID, - ) - if resp["status"] == "cancelled": - cancelled = True - except (asyncio.TimeoutError, web_utils.APIError) as e: - self.logger().debug(f"Canceling order {order_id} failed: {e}") - return cancelled - - async def _status_polling_loop_fetch_updates(self): - """ - Called by _status_polling_loop, which executes after each tick() is executed - """ - self.logger().debug(f"Running _status_polling_loop_fetch_updates() at {time.time()}") - return await safe_gather( - self._update_balances(), - self._update_order_status(), - ) - - async def _update_balances(self): - """ - Calls REST API to update total and available balances. - """ - account_info = "" - try: - account_info = await self._api_get( - path_url=CONSTANTS.USER_BALANCES_PATH_URL, - is_auth_required=True, - limit_id=CONSTANTS.USER_BALANCES_PATH_URL - ) - self._process_balance_message(account_info) - except Exception as e: - self.logger().network( - f"Unexpected error while fetching balance update - {str(e)}", exc_info=True, - app_warning_msg=(f"Could not fetch balance update from {self.name_cap}")) - return account_info - - async def _update_order_status(self): - """ - Calls REST API to get status update for each in-flight order. - """ - orders_tasks = [] - trades_tasks = [] - reviewed_orders = [] - tracked_orders = list(self.in_flight_orders.values()) - if len(tracked_orders) <= 0: - return - - # Prepare requests to update trades and orders - for tracked_order in tracked_orders: - try: - exchange_order_id = await tracked_order.get_exchange_order_id() - reviewed_orders.append(tracked_order) - except asyncio.TimeoutError: - self.logger().network( - f"Skipped order status update for {tracked_order.client_order_id} " - "- waiting for exchange order id.") - await self._order_tracker.process_order_not_found(tracked_order.client_order_id) - continue - - trading_pair = web_utils.convert_to_exchange_trading_pair(tracked_order.trading_pair) - trades_tasks.append(self._api_get( - path_url=CONSTANTS.MY_TRADES_PATH_URL, - params={ - "currency_pair": trading_pair, - "order_id": exchange_order_id - }, - is_auth_required=True, - limit_id=CONSTANTS.MY_TRADES_PATH_URL, - )) - orders_tasks.append(self._api_get( - path_url=CONSTANTS.ORDER_STATUS_PATH_URL.format(order_id=exchange_order_id), - params={ - "currency_pair": trading_pair - }, - is_auth_required=True, - limit_id=CONSTANTS.ORDER_STATUS_LIMIT_ID, - )) - - # Process order trades first before processing order statuses - responses = await safe_gather(*trades_tasks, return_exceptions=True) - self.logger().debug(f"Polled trade updates for {len(tracked_orders)} orders: {len(responses)}.") - for response, tracked_order in zip(responses, reviewed_orders): - if not isinstance(response, Exception): - if len(response) > 0: - for trade_fills in response: - self._process_trade_message(trade_fills, tracked_order.client_order_id) - else: - self.logger().warning( - f"Failed to fetch trade updates for order {tracked_order.client_order_id}. " - f"Response: {response}") - if 'ORDER_NOT_FOUND' in str(response): - self._order_tracker.stop_tracking_order(client_order_id=tracked_order.client_order_id) - - # Process order statuses - responses = await safe_gather(*orders_tasks, return_exceptions=True) - self.logger().debug(f"Polled order updates for {len(tracked_orders)} orders: {len(responses)}.") - for response, tracked_order in zip(responses, tracked_orders): - if not isinstance(response, Exception): - self._process_order_message(response) - else: - self.logger().warning( - f"Failed to fetch order status updates for order {tracked_order.client_order_id}. " - f"Response: {response}") - if 'ORDER_NOT_FOUND' in str(response): - self._order_tracker.stop_tracking_order(client_order_id=tracked_order.client_order_id) - - def _get_fee(self, - base_currency: str, - quote_currency: str, - order_type: OrderType, - order_side: TradeType, - amount: Decimal, - price: Decimal = s_decimal_NaN, - is_maker: Optional[bool] = None) -> AddedToCostTradeFee: - is_maker = order_type is OrderType.LIMIT_MAKER - return AddedToCostTradeFee(percent=self.estimate_fee_pct(is_maker)) - - async def _update_trading_fees(self): - """ - Initialize mapping of trade symbols in exchange notation to trade symbols in client notation - """ - pass - - async def _user_stream_event_listener(self): - """ - Listens to messages from _user_stream_tracker.user_stream queue. - Traders, Orders, and Balance updates from the WS. - """ - user_channels = [ - CONSTANTS.USER_TRADES_ENDPOINT_NAME, - CONSTANTS.USER_ORDERS_ENDPOINT_NAME, - CONSTANTS.USER_BALANCE_ENDPOINT_NAME, - ] - async for event_message in self._iter_user_event_queue(): - channel: str = event_message.get("channel", None) - results: str = event_message.get("result", None) - try: - if channel not in user_channels: - self.logger().error( - f"Unexpected message in user stream: {event_message}.", exc_info=True) - continue - - if channel == CONSTANTS.USER_TRADES_ENDPOINT_NAME: - for trade_msg in results: - self._process_trade_message(trade_msg) - elif channel == CONSTANTS.USER_ORDERS_ENDPOINT_NAME: - for order_msg in results: - self._process_order_message(order_msg) - elif channel == CONSTANTS.USER_BALANCE_ENDPOINT_NAME: - self._process_balance_message_ws(results) - - except asyncio.CancelledError: - raise - except Exception: - self.logger().error( - "Unexpected error in user stream listener loop.", exc_info=True) - await self._sleep(5.0) - - def _normalise_order_message_state(self, order_msg: Dict[str, Any], tracked_order): - state = None - # we do not handle: - # "failed" because it is handled by create order - # "put" as the exchange order id is returned in the create order response - # "open" for same reason - - # same field for both WS and REST - amount_left = Decimal(order_msg.get("left")) - - # WS - if "event" in order_msg: - event_type = order_msg.get("event") - if event_type == "update": - state = OrderState.FILLED - if amount_left > 0: - state = OrderState.PARTIALLY_FILLED - if event_type == "finish": - state = OrderState.FILLED - if amount_left > 0: - state = OrderState.CANCELLED - else: - status = order_msg.get("status") - if status == "closed": - state = OrderState.FILLED - if amount_left > 0: - state = OrderState.PARTIALLY_FILLED - if status == "cancelled": - state = OrderState.CANCELLED - return state - - def _process_order_message(self, order_msg: Dict[str, Any]): - """ - Updates in-flight order and triggers cancellation or failure event if needed. - - :param order_msg: The order response from either REST or web socket API (they are of the same format) - - Example Order: - https://www.gate.io/docs/apiv4/en/#list-orders - """ - state = None - client_order_id = str(order_msg.get("text", "")) - tracked_order = self.in_flight_orders.get(client_order_id, None) - if not tracked_order: - self.logger().debug(f"Ignoring order message with id {client_order_id}: not in in_flight_orders.") - return - - state = self._normalise_order_message_state(order_msg, tracked_order) - if state: - order_update = OrderUpdate( - trading_pair=tracked_order.trading_pair, - update_timestamp=order_msg["update_time"], - new_state=state, - client_order_id=client_order_id, - exchange_order_id=str(order_msg["id"]), - ) - self._order_tracker.process_order_update(order_update=order_update) - self.logger().info(f"Successfully updated order {tracked_order.client_order_id}.") - - def _process_trade_message(self, trade: Dict[str, Any], client_order_id: Optional[str] = None): - """ - Updates in-flight order and trigger order filled event for trade message received. Triggers order completed - event if the total executed amount equals to the specified order amount. - Example Trade: - https://www.gate.io/docs/apiv4/en/#retrieve-market-trades - """ - client_order_id = client_order_id or str(trade["text"]) - tracked_order = self.in_flight_orders.get(client_order_id, None) - if not tracked_order: - self.logger().debug(f"Ignoring trade message with id {client_order_id}: not in in_flight_orders.") - return - - fee = TradeFeeBase.new_spot_fee( - fee_schema=self.trade_fee_schema(), - trade_type=tracked_order.trade_type, - percent_token=trade["fee_currency"], - flat_fees=[TokenAmount( - amount=Decimal(trade["fee"]), - token=trade["fee_currency"] - )] - ) - trade_update = TradeUpdate( - trade_id=str(trade["id"]), - client_order_id=tracked_order.client_order_id, - exchange_order_id=tracked_order.exchange_order_id, - trading_pair=tracked_order.trading_pair, - fee=fee, - fill_base_amount=Decimal(trade["amount"]), - fill_quote_amount=Decimal(trade["amount"]) * Decimal(trade["price"]), - fill_price=Decimal(trade["price"]), - fill_timestamp=trade["create_time"], - ) - self._order_tracker.process_trade_update(trade_update) - - def _process_balance_message(self, balance_update): - local_asset_names = set(self._account_balances.keys()) - remote_asset_names = set() - for account in balance_update: - asset_name = account["currency"] - self._account_available_balances[asset_name] = Decimal(str(account["available"])) - self._account_balances[asset_name] = Decimal(str(account["locked"])) + Decimal(str(account["available"])) - remote_asset_names.add(asset_name) - asset_names_to_remove = local_asset_names.difference(remote_asset_names) - for asset_name in asset_names_to_remove: - del self._account_available_balances[asset_name] - del self._account_balances[asset_name] - - def _process_balance_message_ws(self, balance_update): - for account in balance_update: - asset_name = account["currency"] - self._account_available_balances[asset_name] = Decimal(str(account["available"])) - self._account_balances[asset_name] = Decimal(str(account["total"])) diff --git a/hummingbot/connector/exchange/gate_io/gate_io_exchange_api.py b/hummingbot/connector/exchange/gate_io/gate_io_exchange_api.py new file mode 100644 index 0000000000..b660cc5c59 --- /dev/null +++ b/hummingbot/connector/exchange/gate_io/gate_io_exchange_api.py @@ -0,0 +1,359 @@ +import time +import asyncio +from decimal import Decimal +from typing import Any, Dict, List, Optional +import json + +from hummingbot.connector.exchange_api_base import ExchangeApiBase +from hummingbot.connector.exchange.gate_io import ( + gate_io_constants as CONSTANTS, + gate_io_web_utils as web_utils +) +from hummingbot.connector.exchange.gate_io.gate_io_auth import GateIoAuth +from hummingbot.connector.exchange.gate_io.gate_io_api_order_book_data_source import GateIoAPIOrderBookDataSource +from hummingbot.connector.exchange.gate_io.gate_io_api_user_stream_data_source import GateIoAPIUserStreamDataSource +from hummingbot.connector.trading_rule import TradingRule +from hummingbot.core.data_type.common import OrderType, TradeType +from hummingbot.core.data_type.in_flight_order import OrderState +from hummingbot.core.utils.async_utils import safe_gather + + +class GateIoExchangeApi(ExchangeApiBase): + DEFAULT_DOMAIN = "" + RATE_LIMITS = CONSTANTS.RATE_LIMITS + + ORDERBOOK_DS_CLASS = GateIoAPIOrderBookDataSource + USERSTREAM_DS_CLASS = GateIoAPIUserStreamDataSource + + CHECK_NETWORK_URL = CONSTANTS.NETWORK_CHECK_PATH_URL + SYMBOLS_PATH_URL = CONSTANTS.SYMBOL_PATH_URL + FEE_PATH_URL = SYMBOLS_PATH_URL + + INTERVAL_TRADING_RULES = CONSTANTS.INTERVAL_TRADING_RULES + # Using 120 seconds here as Gate.io websocket is quiet + TICK_INTERVAL_LIMIT = 10.0 # 120.0 + + web_utils = web_utils + + # Defined in __init__ + # TODO + USER_CHANNELS = {} + + def __init__(self, + exchange, + auth_credentials: {}, + trading_pairs: Optional[List[str]] = None, + trading_required: bool = True, + domain: str = DEFAULT_DOMAIN + ): + """ + :param gate_io_api_key: The API key to connect to private Gate.io APIs. + :param gate_io_secret_key: The API secret. + :param trading_pairs: The market trading pairs which to track order book data. + :param trading_required: Whether actual trading is needed. + """ + # TODO this is bad, cross references mean wrong decoupling + # acceptable given it's very limited and the benefits + # but better to solve, see below order_books() property comments + self.exchange = exchange + + self._auth_credentials = auth_credentials + self._trading_required = trading_required + self._trading_pairs = trading_pairs + self._domain = domain + self.USER_CHANNELS = ( + (CONSTANTS.USER_TRADES_ENDPOINT_NAME, self.enqueue_trade), + (CONSTANTS.USER_ORDERS_ENDPOINT_NAME, self.enqueue_order), + (CONSTANTS.USER_BALANCE_ENDPOINT_NAME, self.enqueue_balance), + ) + super().__init__() + + # TODO understand how to cleanly decouple Exchange and Api + # regarding order books, balances, in flight orders + # Probably the best thing is to have an ExchangeBooks object + # that contains the shared state(s) to limit cross references + # to what is really needed + # + # OrderBook is a hbot class and should be in the Exchange / ExchangeBooks + @property + def order_books(self) -> Dict: + return self._api.order_book_tracker.order_books + + def init_auth(self): + # TODO improve + self._auth_credentials["time_provider"] = self._time_synchronizer + return GateIoAuth(**self._auth_credentials) + + @property + def name(self) -> str: + return "gate_io" + + async def _polling_status_fetch_updates(self): + """ + Called by _polling_status_loop, which executes after each tick() is executed + """ + self.logger().debug(f"Running _status_polling_loop_fetch_updates() at {time.time()}") + return await safe_gather( + self._polling_balances(), + self._polling_orders(), + ) + + async def _polling_balances(self): + """ + Calls REST API to update total and available balances. + """ + account_info = "" + try: + account_info = await self._api_get( + path_url=CONSTANTS.USER_BALANCES_PATH_URL, + is_auth_required=True, + limit_id=CONSTANTS.USER_BALANCES_PATH_URL + ) + await self.enqueue_balance(account_info) + except Exception as e: + self.logger().network( + f"Unexpected error while fetching balance update - {str(e)}", exc_info=True, + app_warning_msg=(f"Could not fetch balance update from {self.name_cap}")) + return account_info + + async def _polling_orders(self): + """ + Calls REST API to get status update for each in-flight order. + """ + orders_tasks = [] + trades_tasks = [] + reviewed_orders = [] + tracked_orders = list(self.exchange.in_flight_orders.values()) + if len(tracked_orders) <= 0: + return + + # Prepare requests to update trades and orders + for tracked_order in tracked_orders: + try: + exchange_order_id = await tracked_order.get_exchange_order_id() + reviewed_orders.append(tracked_order) + except asyncio.TimeoutError: + self.logger().network( + f"Skipped order status update for {tracked_order.client_order_id} " + "- waiting for exchange order id.") + await self._order_tracker.process_order_not_found(tracked_order.client_order_id) + continue + + trading_pair = self.web_utils.convert_to_exchange_trading_pair(tracked_order.trading_pair) + trades_tasks.append(self._api_get( + path_url=CONSTANTS.MY_TRADES_PATH_URL, + params={ + "currency_pair": trading_pair, + "order_id": exchange_order_id + }, + is_auth_required=True, + limit_id=CONSTANTS.MY_TRADES_PATH_URL, + )) + orders_tasks.append(self._api_get( + path_url=CONSTANTS.ORDER_STATUS_PATH_URL.format(order_id=exchange_order_id), + params={ + "currency_pair": trading_pair + }, + is_auth_required=True, + limit_id=CONSTANTS.ORDER_STATUS_LIMIT_ID, + )) + + # Process order trades first before processing order statuses + responses = await safe_gather(*trades_tasks, return_exceptions=True) + self.logger().debug(f"Polled trade updates for {len(tracked_orders)} orders: {len(responses)}.") + for response, tracked_order in zip(responses, reviewed_orders): + if not isinstance(response, Exception): + if len(response) > 0: + for trade_fills in response: + await self.enqueue_trade(trade_fills, tracked_order.client_order_id) + else: + self.logger().warning( + f"Failed to fetch trade updates for order {tracked_order.client_order_id}. " + f"Response: {response}") + if 'ORDER_NOT_FOUND' in str(response): + self._order_tracker.stop_tracking_order(client_order_id=tracked_order.client_order_id) + + # Process order statuses + responses = await safe_gather(*orders_tasks, return_exceptions=True) + self.logger().debug(f"Polled order updates for {len(tracked_orders)} orders: {len(responses)}.") + for response, tracked_order in zip(responses, tracked_orders): + if not isinstance(response, Exception): + await self.enqueue_order(response) + else: + self.logger().warning( + f"Failed to fetch order status updates for order {tracked_order.client_order_id}. " + f"Response: {response}") + if 'ORDER_NOT_FOUND' in str(response): + self._order_tracker.stop_tracking_order(client_order_id=tracked_order.client_order_id) + + async def enqueue_trade(self, trades, client_order_id: Optional[str] = None): + if type(trades) != list: + trades = list(trades) + for trade in trades: + print("TRADE ", trade) + e = dict( + type="trade", + trade_id=trade["id"], + fee_currency=trade["fee_currency"], + fee_amount=trade["fee"], + fill_base_amount=trade["amount"], + fill_quote_amount=Decimal(trade["amount"]) * Decimal(trade["price"]), + fill_price=Decimal(trade["price"]), + fill_timestamp=trade["create_time"] + ) + e["client_order_id"] = client_order_id + await self.queue.put(e) + + async def enqueue_order(self, orders): + if type(orders) != list: + orders = list(orders) + for order in orders: + print("ORDER ", order) + await self.queue.put(dict( + type="order", + order_id=order["id"], + state=self._normalise_order_message_state(order), + )) + + async def enqueue_balance(self, balances): + if type(balances) != list: + balances = list(balances) + print("BALANCE", balances) + await self.queue.put(dict( + type="balance", + accounts=balances)) + + def _normalise_order_message_state(self, order_msg: Dict[str, Any]): + state = None + # we do not handle: + # "failed" because it is handled by create order + # "put" as the exchange order id is returned in the create order response + # "open" for same reason + + # same field for both WS and REST + amount_left = Decimal(order_msg.get("left")) + + # WS + if "event" in order_msg: + event_type = order_msg.get("event") + if event_type == "update": + state = OrderState.FILLED + if amount_left > 0: + state = OrderState.PARTIALLY_FILLED + if event_type == "finish": + state = OrderState.FILLED + if amount_left > 0: + state = OrderState.CANCELLED + else: + status = order_msg.get("status") + if status == "closed": + state = OrderState.FILLED + if amount_left > 0: + state = OrderState.PARTIALLY_FILLED + if status == "cancelled": + state = OrderState.CANCELLED + return state + + async def _polling_trading_rules(self): + exchange_info = await self._api_get(path_url=self.SYMBOLS_PATH_URL) + trading_rules_list = await self._format_trading_rules(exchange_info) + self._trading_rules.clear() + for trading_rule in trading_rules_list: + self._trading_rules[trading_rule.trading_pair] = trading_rule + + async def _format_trading_rules(self, raw_trading_pair_info: Dict[str, Any]) -> Dict[str, TradingRule]: + """ + Converts json API response into a dictionary of trading rules. + + :param raw_trading_pair_info: The json API response + :return A dictionary of trading rules. + + Example raw_trading_pair_info: + https://www.gate.io/docs/apiv4/en/#list-all-currency-pairs-supported + """ + result = [] + for rule in raw_trading_pair_info: + try: + trading_pair = self.web_utils.convert_from_exchange_trading_pair(rule["id"]) + + min_amount_inc = Decimal(f"1e-{rule['amount_precision']}") + min_price_inc = Decimal(f"1e-{rule['precision']}") + min_amount = Decimal(str(rule.get("min_base_amount", min_amount_inc))) + min_notional = Decimal(str(rule.get("min_quote_amount", min_price_inc))) + result.append( + TradingRule( + trading_pair, + min_order_size=min_amount, + min_price_increment=min_price_inc, + min_base_amount_increment=min_amount_inc, + min_notional_size=min_notional, + min_order_value=min_notional, + ) + ) + except Exception: + self.logger().error( + f"Error parsing the trading pair rule {rule}. Skipping.", exc_info=True) + return result + + # Exchange API actions + # + # TODO do not pass higher level object here + async def cancel_order(self, order_id, tracked_order): + """ + Cancels an order via the API + + returns (client_order_id, True) if successful + """ + cancelled = False + exchange_order_id = await tracked_order.get_exchange_order_id() + try: + params = { + 'currency_pair': self.web_utils.convert_to_exchange_trading_pair(tracked_order.trading_pair) + } + resp = await self._api_delete( + path_url=CONSTANTS.ORDER_DELETE_PATH_URL.format(order_id=exchange_order_id), + params=params, + is_auth_required=True, + limit_id=CONSTANTS.ORDER_DELETE_LIMIT_ID, + ) + if resp["status"] == "cancelled": + cancelled = True + except (asyncio.TimeoutError, self.web_utils.APIError) as e: + self.logger().debug(f"Canceling order {order_id} failed: {e}") + return (order_id, cancelled) + + async def create_order(self, + order_id: str, + trading_pair: str, + amount: Decimal, + trade_type: TradeType, + order_type: OrderType, + price: Decimal) -> str: + + order_type_str = order_type.name.lower().split("_")[0] + symbol = self.web_utils.convert_to_exchange_trading_pair(trading_pair) + data = { + "text": order_id, + "currency_pair": symbol, + "side": trade_type.name.lower(), + "type": order_type_str, + "price": f"{price:f}", + "amount": f"{amount:f}", + } + # RESTRequest does not support json, and if we pass a dict + # the underlying aiohttp will encode it to params + data = json.dumps(data) + endpoint = CONSTANTS.ORDER_CREATE_PATH_URL + order_result = await self._api_post( + path_url=endpoint, + data=data, + is_auth_required=True, + limit_id=endpoint, + ) + if order_result.get('status') in {"cancelled", "expired", "failed"}: + raise self.web_utils.APIError({'label': 'ORDER_REJECTED', 'message': 'Order rejected.'}) + exchange_order_id = str(order_result["id"]) + return exchange_order_id, time.time() + + async def get_api_message(self): + return await self._api.queue.get() diff --git a/hummingbot/connector/exchange_api_base.py b/hummingbot/connector/exchange_api_base.py new file mode 100644 index 0000000000..50c82b17de --- /dev/null +++ b/hummingbot/connector/exchange_api_base.py @@ -0,0 +1,408 @@ +import logging +import asyncio +from typing import Any, AsyncIterable, Dict, Optional + +from hummingbot.connector.constants import MINUTE, TWELVE_HOURS +from hummingbot.connector.time_synchronizer import TimeSynchronizer +from hummingbot.core.data_type.order_book_tracker import OrderBookTracker +from hummingbot.core.data_type.user_stream_tracker import UserStreamTracker +from hummingbot.core.network_iterator import NetworkStatus +from hummingbot.core.web_assistant.connections.data_types import RESTMethod +from hummingbot.core.api_throttler.async_throttler import AsyncThrottler +from hummingbot.core.utils.async_utils import safe_ensure_future, safe_gather +from hummingbot.logger import HummingbotLogger + + +class ExchangeApiBase(object): + _logger = None + + # The following class vars MUST be redefined in the child class + # because they cannot have defaults, so we keep them commented + # to cause a NameError if not defined + # + + # RATE_LIMITS = None + # SYMBOLS_PATH_URL = "" + # FEE_PATH_URL = "" + # CHECK_NETWORK_URL = "" + # ORDERBOOK_DS_CLASS = None + # USERSTREAM_DS_CLASS = None + + SHORT_POLL_INTERVAL = 5.0 + LONG_POLL_INTERVAL = 120.0 + TRADING_RULES_INTERVAL = 30 * MINUTE + TRADING_FEES_INTERVAL = TWELVE_HOURS + UPDATE_ORDERS_INTERVAL = 10.0 + TICK_INTERVAL_LIMIT = 60.0 + + web_utils = None + + def __init__(self): + self._trading_rules = {} + + self._last_poll_timestamp = 0 + self._last_timestamp = 0 + + self._polling_task = None + self._trading_rules_polling_task = None + self._trading_fees_polling_task = None + + self._user_stream_event_listener_task = None + self._user_stream_tracker_task = None + + self._time_synchronizer = TimeSynchronizer() + self._throttler = AsyncThrottler(self.RATE_LIMITS) + self._poll_notifier = asyncio.Event() + + # all events (WS and polling) coming from API to Exchange + self.queue: asyncio.Queue = asyncio.Queue() + + # init Auth and Api factory + self._auth = self.init_auth() + self._api_factory = self.web_utils.build_api_factory( + throttler=self._throttler, + time_synchronizer=self._time_synchronizer, + domain=self._domain, + auth=self._auth) + + # init OrderBook Data Source and Tracker + self._orderbook_ds = self.ORDERBOOK_DS_CLASS( + trading_pairs=self._trading_pairs, + domain=self._domain, + api_factory=self._api_factory, + throttler=self._throttler, + time_synchronizer=self._time_synchronizer) + self._order_book_tracker = OrderBookTracker( + data_source=self._orderbook_ds, + trading_pairs=self._trading_pairs, + domain=self._domain) + + # init UserStream Data Source and Tracker + self._userstream_ds = self.USERSTREAM_DS_CLASS( + trading_pairs=self._trading_pairs, + auth=self._auth, + domain=self._domain, + api_factory=self._api_factory, + throttler=self._throttler) + self._user_stream_tracker = UserStreamTracker( + data_source=self._userstream_ds) + + @classmethod + def logger(cls) -> HummingbotLogger: + if cls._logger is None: + cls._logger = logging.getLogger(__name__) + return cls._logger + + @property + def name_cap(self) -> str: + return self.name.capitalize() + + async def _sleep(self, delay: float): + await asyncio.sleep(delay) + + # TODO move this to Exchange? + @property + def status_dict(self) -> Dict[str, bool]: + return { + "symbols_mapping_initialized": + self._orderbook_ds.trading_pair_symbol_map_ready(domain=self._domain), + "user_stream_initialized": + self._user_stream_tracker.data_source.last_recv_time > 0 if self._trading_required else True, + "order_books_initialized": + self._order_book_tracker.ready, + "account_balance": + not self._trading_required or len(self._account_balances) > 0, + "trading_rule_initialized": + len(self._trading_rules) > 0 if self._trading_required else True, + } + + def tick(self, timestamp: float): + """ + Includes the logic that has to be processed every time a new tick happens in the bot. Particularly it enables + the execution of the status update polling loop using an event. + """ + last_recv_diff = timestamp - self._user_stream_tracker.last_recv_time + poll_interval = (self.SHORT_POLL_INTERVAL + if last_recv_diff > self.TICK_INTERVAL_LIMIT + else self.LONG_POLL_INTERVAL) + last_tick = int(self._last_timestamp / poll_interval) + current_tick = int(timestamp / poll_interval) + if current_tick > last_tick: + if not self._poll_notifier.is_set(): + self._poll_notifier.set() + self._last_timestamp = timestamp + + async def start_network(self): + """ + Start all required tasks to update the status of the connector. Those tasks include: + - The order book tracker + - The polling loops to update the trading rules and trading fees + - The polling loop to update order status and balance status using REST API (backup for main update process) + - The background task to process the events received through the user stream tracker (websocket connection) + """ + self._stop_network() + self._order_book_tracker.start() + self._trading_rules_polling_task = safe_ensure_future(self._polling_trading_rules_loop()) + self._trading_fees_polling_task = safe_ensure_future(self._polling_trading_fees_loop()) + if self._trading_required: + self._polling_task = safe_ensure_future(self._polling_status_loop()) + self._user_stream_tracker_task = safe_ensure_future(self._user_stream_tracker.start()) + self._user_stream_event_listener_task = safe_ensure_future(self._user_stream_event_listener()) + + def _stop_network(self): + # Resets timestamps and events for status_polling_loop + self._last_poll_timestamp = 0 + self._last_timestamp = 0 + self._poll_notifier = asyncio.Event() + + self._order_book_tracker.stop() + if self._polling_task is not None: + self._status_polling_task.cancel() + self._status_polling_task = None + if self._trading_rules_polling_task is not None: + self._trading_rules_polling_task.cancel() + self._trading_rules_polling_task = None + if self._trading_fees_polling_task is not None: + self._trading_fees_polling_task.cancel() + self._trading_fees_polling_task = None + if self._user_stream_tracker_task is not None: + self._user_stream_tracker_task.cancel() + self._user_stream_tracker_task = None + if self._user_stream_event_listener_task is not None: + self._user_stream_event_listener_task.cancel() + self._user_stream_event_listener_task = None + + async def stop_network(self): + """ + This function is executed when the connector is stopped. It perform a general cleanup and stops all background + tasks that require the connection with the exchange to work. + """ + self._stop_network() + + async def check_network(self) -> NetworkStatus: + """ + Checks connectivity with the exchange using the API + """ + try: + await self._api_get(path_url=self.CHECK_NETWORK_URL) + except asyncio.CancelledError: + raise + except Exception: + return NetworkStatus.NOT_CONNECTED + return NetworkStatus.CONNECTED + + # loops and sync related methods + # + async def _polling_trading_rules_loop(self): + """ + Updates the trading rules by requesting the latest definitions from the exchange. + Executes regularly every 30 minutes + """ + while True: + try: + await safe_gather(self._polling_trading_rules()) + await self._sleep(self.TRADING_RULES_INTERVAL) + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unexpected error while fetching trading rules.", exc_info=True, + app_warning_msg=f"Could not fetch new trading rules from {self.name_cap}" + " Check network connection.") + await self._sleep(0.5) + + async def _polling_trading_fees_loop(self): + """ + Only some exchanges provide a fee endpoint. + If _update_trading_fees() is not defined, we just exit the loop + """ + while True: + try: + await safe_gather(self._polling_trading_fees()) + await self._sleep(self.TRADING_FEES_INTERVAL) + except NotImplementedError: + return + except asyncio.CancelledError: + raise + except Exception: + self.logger().network( + "Unexpected error while fetching trading fees.", exc_info=True, + app_warning_msg=f"Could not fetch new trading fees from {self.name_cap}." + " Check network connection.") + await self._sleep(0.5) + + async def _polling_status_loop(self): + """ + Performs all required operation to keep the connector updated and synchronized with the exchange. + It contains the backup logic to update status using API requests in case the main update source + (the user stream data source websocket) fails. + It also updates the time synchronizer. This is necessary because the exchange requires + the time of the client to be the same as the time in the exchange. + Executes when the _poll_notifier event is enabled by the `tick` function. + + _polling_status_loop + _polling_status_fetch_updates + _polling_balances + _polling_orders + """ + while True: + try: + await self._poll_notifier.wait() + await self._update_time_synchronizer() + + # the following method is implementation-specific + await self._polling_status_fetch_updates() + + self._last_poll_timestamp = self.exchange.current_timestamp + self._poll_notifier = asyncio.Event() + except asyncio.CancelledError: + raise + except NotImplementedError: + raise + except Exception: + self.logger().network( + "Unexpected error while fetching account updates.", + exc_info=True, + app_warning_msg=f"Could not fetch account updates from {self.name_cap}. " + "Check API key and network connection.") + await self._sleep(0.5) + + async def _update_time_synchronizer(self): + try: + await self._time_synchronizer.update_server_time_offset_with_time_provider( + time_provider=self.web_utils.get_current_server_time( + throttler=self._throttler, + domain=self._domain)) + except asyncio.CancelledError: + raise + except Exception: + self.logger().exception(f"Error requesting time from {self.name_cap} server") + raise + + # Exchange / Trading logic methods + # that call the API + # + async def _api_get(self, *args, **kwargs): + kwargs["method"] = RESTMethod.GET + return await self._api_request(*args, **kwargs) + + async def _api_post(self, *args, **kwargs): + kwargs["method"] = RESTMethod.POST + return await self._api_request(*args, **kwargs) + + async def _api_put(self, *args, **kwargs): + kwargs["method"] = RESTMethod.PUT + return await self._api_request(*args, **kwargs) + + async def _api_delete(self, *args, **kwargs): + kwargs["method"] = RESTMethod.DELETE + return await self._api_request(*args, **kwargs) + + async def _api_request(self, + path_url, + method: RESTMethod = RESTMethod.GET, + params: Optional[Dict[str, Any]] = None, + data: Optional[Dict[str, Any]] = None, + is_auth_required: bool = False, + limit_id: Optional[str] = None) -> Dict[str, Any]: + + return await self.web_utils.api_request( + path=path_url, + api_factory=self._api_factory, + throttler=self._throttler, + time_synchronizer=self._time_synchronizer, + domain=self._domain, + params=params, + data=data, + method=method, + is_auth_required=is_auth_required, + limit_id=limit_id) + + # TODO review naming + # data normalisation could be better encapsuled in an ApiDataModel obj + def _get_enqueue_callable(self, channel): + for ch, clb in self.USER_CHANNELS: + if channel == ch: + return clb + return None + + async def _iter_user_event_queue(self) -> AsyncIterable[Dict[str, any]]: + """ + Called by _user_stream_event_listener. + """ + while True: + try: + yield await self._user_stream_tracker.user_stream.get() + except asyncio.CancelledError: + raise + except Exception: + self.logger().exception("Error while reading user events queue. Retrying in 1s.") + await self._sleep(1.0) + + async def _user_stream_event_listener(self): + """ + Listens to messages from _user_stream_tracker.user_stream queue. + Traders, Orders, and Balance updates from the WS. + """ + async for event_message in self._iter_user_event_queue(): + channel: str = event_message.get("channel", None) + results: str = event_message.get("result", None) + try: + # TODO ApiDataModel.process_ws_message() + enqueue_callable = self._get_enqueue_callable(channel) + if not enqueue_callable: + self.logger().error( + f"Unexpected message in user stream: {event_message}.", + exc_info=True) + continue + + await enqueue_callable(results) + + except asyncio.CancelledError: + raise + except Exception: + self.logger().error( + "Unexpected error in user stream listener loop.", exc_info=True) + await self._sleep(5.0) + + # Public methods + # + def name(self): + raise NotImplementedError + + def init_auth(self): + raise NotImplementedError + + def cancel_order(self): + raise NotImplementedError + + def create_order(self): + raise NotImplementedError + + def get_fee(self): + raise NotImplementedError + + # Private methods: update data and process + # handle WS messages + # polling of loops + # + async def _format_trading_rules(self): + raise NotImplementedError + + async def _process_balance_message_ws(self): + raise NotImplementedError + + async def _polling_status_fetch_updates(self): + raise NotImplementedError + + async def _polling_orders(self): + raise NotImplementedError + + async def _polling_balances(self): + raise NotImplementedError + + async def _polling_trading_fees(self): + raise NotImplementedError + + async def _polling_trading_rules(self): + raise NotImplementedError diff --git a/hummingbot/connector/exchange_base_v2.py b/hummingbot/connector/exchange_base_v2.py index b8a7bc37c7..8444ac5267 100644 --- a/hummingbot/connector/exchange_base_v2.py +++ b/hummingbot/connector/exchange_base_v2.py @@ -6,22 +6,17 @@ from async_timeout import timeout from hummingbot.connector.exchange_base import ExchangeBase -from hummingbot.connector.constants import s_decimal_NaN, s_decimal_0, MINUTE, TWELVE_HOURS +from hummingbot.connector.constants import s_decimal_NaN, s_decimal_0 from hummingbot.connector.client_order_tracker import ClientOrderTracker -from hummingbot.connector.time_synchronizer import TimeSynchronizer from hummingbot.connector.utils import get_new_client_order_id from hummingbot.connector.trading_rule import TradingRule from hummingbot.core.data_type.limit_order import LimitOrder from hummingbot.core.data_type.trade_fee import AddedToCostTradeFee from hummingbot.core.data_type.cancellation_result import CancellationResult -from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderUpdate, OrderState +from hummingbot.core.data_type.in_flight_order import InFlightOrder, OrderUpdate, OrderState, TradeUpdate from hummingbot.core.data_type.order_book import OrderBook -from hummingbot.core.data_type.order_book_tracker import OrderBookTracker -from hummingbot.core.data_type.user_stream_tracker import UserStreamTracker -from hummingbot.core.network_iterator import NetworkStatus -from hummingbot.core.web_assistant.connections.data_types import RESTMethod -from hummingbot.core.api_throttler.async_throttler import AsyncThrottler from hummingbot.core.data_type.common import OrderType, TradeType +from hummingbot.core.data_type.trade_fee import TradeFeeBase, TokenAmount from hummingbot.core.utils.async_utils import safe_ensure_future, safe_gather from hummingbot.logger import HummingbotLogger @@ -34,72 +29,25 @@ class ExchangeBaseV2(ExchangeBase): # to cause a NameError if not defined # # DEFAULT_DOMAIN = "" - # RATE_LIMITS = None # SUPPORTED_ORDER_TYPES = [] # MAX_ORDER_ID_LEN = None # HBOT_ORDER_ID_PREFIX = None - # SYMBOLS_PATH_URL = "" - # FEE_PATH_URL = "" - # CHECK_NETWORK_URL = "" - # ORDERBOOK_DS_CLASS = None - # USERSTREAM_DS_CLASS = None - - SHORT_POLL_INTERVAL = 5.0 - LONG_POLL_INTERVAL = 120.0 - TRADING_RULES_INTERVAL = 30 * MINUTE - TRADING_FEES_INTERVAL = TWELVE_HOURS - UPDATE_ORDERS_INTERVAL = 10.0 - TICK_INTERVAL_LIMIT = 60.0 + # EXCHANGE_API_CLASS = None def __init__(self): super().__init__() - - self._last_poll_timestamp = 0 self._last_timestamp = 0 - self._trading_rules = {} - self._trading_fees = {} - - self._status_polling_task = None - self._user_stream_tracker_task = None - self._user_stream_event_listener_task = None - self._trading_rules_polling_task = None - self._trading_fees_polling_task = None - - self._time_synchronizer = TimeSynchronizer() - self._throttler = AsyncThrottler(self.RATE_LIMITS) - self._poll_notifier = asyncio.Event() - - # init Auth and Api factory - self._auth = self.init_auth() - self._api_factory = self.web_utils.build_api_factory( - throttler=self._throttler, - time_synchronizer=self._time_synchronizer, - domain=self._domain, - auth=self._auth) - - # init OrderBook Data Source and Tracker - self._orderbook_ds = self.ORDERBOOK_DS_CLASS( - trading_pairs=self._trading_pairs, - domain=self._domain, - api_factory=self._api_factory, - throttler=self._throttler, - time_synchronizer=self._time_synchronizer) - self._order_book_tracker = OrderBookTracker( - data_source=self._orderbook_ds, - trading_pairs=self._trading_pairs, - domain=self._domain) - - # init UserStream Data Source and Tracker - self._userstream_ds = self.USERSTREAM_DS_CLASS( - trading_pairs=self._trading_pairs, - auth=self._auth, - domain=self._domain, - api_factory=self._api_factory, - throttler=self._throttler) - self._user_stream_tracker = UserStreamTracker( - data_source=self._userstream_ds) - + self._api = self.EXCHANGE_API_CLASS( + self, # TODO bad + self._auth_credentials, + self._trading_pairs, + self._trading_required, + self._domain, + ) + self._api_listener_task = None self._order_tracker: ClientOrderTracker = ClientOrderTracker(connector=self) + # TODO + self._trading_rules = self._api._trading_rules @classmethod def logger(cls) -> HummingbotLogger: @@ -107,8 +55,16 @@ def logger(cls) -> HummingbotLogger: cls._logger = logging.getLogger(__name__) return cls._logger + @property + def name_cap(self) -> str: + return self.name.capitalize() + + # Implementation-specific method + # + def name(self): + raise NotImplementedError + # Price logic - # TODO trading rules creation should have logic to ensure Decimal # @staticmethod def quantize_value(value: Decimal, quantum: Decimal) -> Decimal: @@ -175,9 +131,14 @@ def quantize_order_amount(self, trading_pair: str, amount: Decimal, price: Decim def supported_order_types(self): return self.SUPPORTED_ORDER_TYPES + @property + def account_balances(self) -> Dict: + return self._account_balances + + # TODO @property def order_books(self) -> Dict[str, OrderBook]: - return self._order_book_tracker.order_books + return self._api.order_book_tracker.order_books @property def in_flight_orders(self) -> Dict[str, InFlightOrder]: @@ -204,43 +165,73 @@ def get_order_book(self, trading_pair: str) -> OrderBook: raise ValueError(f"No order book exists for '{trading_pair}'.") return self._order_book_tracker.order_books[trading_pair] + # Order Tracking + # @property - def status_dict(self) -> Dict[str, bool]: + def tracking_states(self) -> Dict[str, any]: + """ + Returns a dictionary associating current active orders client id to their JSON representation + """ return { - "symbols_mapping_initialized": self._orderbook_ds.trading_pair_symbol_map_ready( - domain=self._domain), - "order_books_initialized": self._order_book_tracker.ready, - "account_balance": not self._trading_required or len(self._account_balances) > 0, - "trading_rule_initialized": len(self._trading_rules) > 0 if self._trading_required else True, - "user_stream_initialized": - self._user_stream_tracker.data_source.last_recv_time > 0 if self._trading_required else True, + key: value.to_json() + for key, value in self.in_flight_orders.items() + if not value.is_done } - @property - def ready(self) -> bool: + def restore_tracking_states(self, saved_states: Dict[str, Any]): """ - Returns True if the connector is ready to operate (all connections established with the exchange). If it is - not ready it returns False. + Restore in-flight orders from saved tracking states, this is st the connector can pick up on where it left off + when it disconnects. + + :param saved_states: The saved tracking_states. """ - return all(self.status_dict.values()) + self._order_tracker.restore_tracking_states(tracking_states=saved_states) - def tick(self, timestamp: float): + def start_tracking_order(self, + order_id: str, + exchange_order_id: Optional[str], + trading_pair: str, + trade_type: TradeType, + price: Decimal, + amount: Decimal, + order_type: OrderType): """ - Includes the logic that has to be processed every time a new tick happens in the bot. Particularly it enables - the execution of the status update polling loop using an event. + Starts tracking an order by adding it to the order tracker. + + :param order_id: the order identifier + :param exchange_order_id: the identifier for the order in the exchange + :param trading_pair: the token pair for the operation + :param trade_type: the type of order (buy or sell) + :param price: the price for the order + :param amount: the amount for the order + :param order_type: type of execution for the order (MARKET, LIMIT, LIMIT_MAKER) """ - last_recv_diff = timestamp - self._user_stream_tracker.last_recv_time - poll_interval = (self.SHORT_POLL_INTERVAL - if last_recv_diff > self.TICK_INTERVAL_LIMIT - else self.LONG_POLL_INTERVAL) - last_tick = int(self._last_timestamp / poll_interval) - current_tick = int(timestamp / poll_interval) - if current_tick > last_tick: - if not self._poll_notifier.is_set(): - self._poll_notifier.set() - self._last_timestamp = timestamp + self._order_tracker.start_tracking_order( + InFlightOrder( + client_order_id=order_id, + exchange_order_id=exchange_order_id, + trading_pair=trading_pair, + order_type=order_type, + trade_type=trade_type, + amount=amount, + price=price, + creation_timestamp=self.current_timestamp + ) + ) - # Orders placing + def stop_tracking_order(self, order_id: str): + """ + Stops tracking an order + + :param order_id: The id of the order that will not be tracked any more + """ + self._order_tracker.stop_tracking_order(client_order_id=order_id) + + async def _sleep(self, delay: float): + await asyncio.sleep(delay) + + # Buy / Sell / Cancel Orders + # Get fee # def buy(self, trading_pair: str, @@ -273,8 +264,12 @@ def buy(self, price=price)) return order_id - def sell(self, trading_pair: str, amount: Decimal, order_type: OrderType = OrderType.MARKET, - price: Decimal = s_decimal_NaN, **kwargs) -> str: + def sell(self, + trading_pair: str, + amount: Decimal, + order_type: OrderType = OrderType.MARKET, + price: Decimal = s_decimal_NaN, + **kwargs) -> str: """ Creates a promise to create a sell order using the parameters. :param trading_pair: the token pair to operate with @@ -321,7 +316,11 @@ def get_fee(self, :return: the calculated or estimated fee """ - return self._get_fee(base_currency, quote_currency, order_type, order_side, amount, price, is_maker) + try: + return self._api.get_fee(base_currency, quote_currency, order_type, order_side, amount, price, is_maker) + except NotImplementedError: + is_maker = order_type is OrderType.LIMIT_MAKER + return AddedToCostTradeFee(percent=self.estimate_fee_pct(is_maker)) def cancel(self, trading_pair: str, order_id: str): """ @@ -332,7 +331,7 @@ def cancel(self, trading_pair: str, order_id: str): :return: the client id of the order to cancel """ - safe_ensure_future(self._execute_cancel(trading_pair, order_id)) + safe_ensure_future(self._cancel_order(trading_pair, order_id)) return order_id async def cancel_all(self, timeout_seconds: float) -> List[CancellationResult]: @@ -344,7 +343,7 @@ async def cancel_all(self, timeout_seconds: float) -> List[CancellationResult]: :return: a list of CancellationResult instances, one for each of the orders to be cancelled """ incomplete_orders = [o for o in self.in_flight_orders.values() if not o.is_done] - tasks = [self._execute_cancel(o.trading_pair, o.client_order_id) for o in incomplete_orders] + tasks = [self._cancel_order(o.trading_pair, o.client_order_id) for o in incomplete_orders] order_id_set = set([o.client_order_id for o in incomplete_orders]) successful_cancellations = [] @@ -356,14 +355,10 @@ async def cancel_all(self, timeout_seconds: float) -> List[CancellationResult]: continue client_order_id = None - # Binance allows cancellations only by using its own order id, not ours - # TODO this should go to an implementation specific method that is - # overridden by Binance and in other cases where applicable - if isinstance(cr, dict) and "origClientOrderId" in cr: - client_order_id = cr.get("origClientOrderId") if cr is not None: if not client_order_id: client_order_id = cr + order_id_set.remove(client_order_id) successful_cancellations.append(CancellationResult(client_order_id, True)) except Exception: @@ -433,7 +428,7 @@ def order_failed(): return try: - exchange_order_id, update_timestamp = await self._place_order( + exchange_order_id, update_timestamp = await self._api.create_order( order_id=order_id, trading_pair=trading_pair, amount=amount, @@ -468,7 +463,7 @@ def order_failed(): self._order_tracker.process_order_update(order_update) return order_id, exchange_order_id - async def _execute_cancel(self, trading_pair: str, order_id: str) -> str: + async def _cancel_order(self, trading_pair: str, order_id: str) -> str: """ Requests the exchange to cancel an active order @@ -478,7 +473,7 @@ async def _execute_cancel(self, trading_pair: str, order_id: str) -> str: tracked_order = self._order_tracker.fetch_tracked_order(order_id) if tracked_order is not None: try: - cancelled = await self._place_cancel(order_id, tracked_order) + client_order_id, cancelled = await self._api.cancel_order(order_id, tracked_order) if cancelled: order_update: OrderUpdate = OrderUpdate( client_order_id=order_id, @@ -501,98 +496,27 @@ async def _execute_cancel(self, trading_pair: str, order_id: str) -> str: f"Failed to cancel order {order_id}", exc_info=True) return None - # Order Tracking + # lower level API interaction methods # @property - def tracking_states(self) -> Dict[str, any]: - """ - Returns a dictionary associating current active orders client id to their JSON representation - """ - return { - key: value.to_json() - for key, value in self.in_flight_orders.items() - if not value.is_done - } - - def restore_tracking_states(self, saved_states: Dict[str, Any]): - """ - Restore in-flight orders from saved tracking states, this is st the connector can pick up on where it left off - when it disconnects. - - :param saved_states: The saved tracking_states. + def ready(self) -> bool: """ - self._order_tracker.restore_tracking_states(tracking_states=saved_states) - - def start_tracking_order(self, - order_id: str, - exchange_order_id: Optional[str], - trading_pair: str, - trade_type: TradeType, - price: Decimal, - amount: Decimal, - order_type: OrderType): + Returns True if the connector is ready to operate (all connections established with the exchange). If it is + not ready it returns False. """ - Starts tracking an order by adding it to the order tracker. + return all(self._api.status_dict.values()) - :param order_id: the order identifier - :param exchange_order_id: the identifier for the order in the exchange - :param trading_pair: the token pair for the operation - :param trade_type: the type of order (buy or sell) - :param price: the price for the order - :param amount: the amount for the order - :param order_type: type of execution for the order (MARKET, LIMIT, LIMIT_MAKER) - """ - self._order_tracker.start_tracking_order( - InFlightOrder( - client_order_id=order_id, - exchange_order_id=exchange_order_id, - trading_pair=trading_pair, - order_type=order_type, - trade_type=trade_type, - amount=amount, - price=price, - creation_timestamp=self.current_timestamp - ) - ) + @property + def status_dict(self): + return self._api.status_dict - def stop_tracking_order(self, order_id: str): + def tick(self, timestamp: float): """ - Stops tracking an order - - :param order_id: The id of the order that will not be tracked any more + Includes the logic that has to be processed every time a new tick happens in the bot. Particularly it enables + the execution of the status update polling loop using an event. """ - self._order_tracker.stop_tracking_order(client_order_id=order_id) - - async def _sleep(self, delay: float): - await asyncio.sleep(delay) - - @property - def name_cap(self) -> str: - return self.name.capitalize() - - # Implementation-specific methods - # - def name(self): - raise NotImplementedError - - def init_auth(self): - raise NotImplementedError - - def _place_cancel(self): - raise NotImplementedError - - def _place_order(self): - raise NotImplementedError - - def _get_fee(self): - raise NotImplementedError - - # Network-API-related code - # - - # overridden in implementation of exchanges - # - web_utils = None + self._api.tick(timestamp) + self._last_timestamp = timestamp async def start_network(self): """ @@ -602,214 +526,137 @@ async def start_network(self): - The polling loop to update order status and balance status using REST API (backup for main update process) - The background task to process the events received through the user stream tracker (websocket connection) """ - self._stop_network() - self._order_book_tracker.start() - self._trading_rules_polling_task = safe_ensure_future(self._trading_rules_polling_loop()) - self._trading_fees_polling_task = safe_ensure_future(self._trading_fees_polling_loop()) - if self._trading_required: - self._status_polling_task = safe_ensure_future(self._status_polling_loop()) - self._user_stream_tracker_task = safe_ensure_future(self._user_stream_tracker.start()) - self._user_stream_event_listener_task = safe_ensure_future(self._user_stream_event_listener()) + await self._api.start_network() + self._api_listener_task = safe_ensure_future(self._api_listener_loop()) def _stop_network(self): # Resets timestamps and events for status_polling_loop - self._last_poll_timestamp = 0 self._last_timestamp = 0 - self._poll_notifier = asyncio.Event() - - self._order_book_tracker.stop() - if self._status_polling_task is not None: - self._status_polling_task.cancel() - self._status_polling_task = None - if self._trading_rules_polling_task is not None: - self._trading_rules_polling_task.cancel() - self._trading_rules_polling_task = None - if self._trading_fees_polling_task is not None: - self._trading_fees_polling_task.cancel() - self._trading_fees_polling_task = None - if self._user_stream_tracker_task is not None: - self._user_stream_tracker_task.cancel() - self._user_stream_tracker_task = None - if self._user_stream_event_listener_task is not None: - self._user_stream_event_listener_task.cancel() - self._user_stream_event_listener_task = None + self._api._stop_network() + if self._api_listener_task is not None: + self._api_listener_task.cancel() + self._api_listener_task = None async def stop_network(self): """ - This function is executed when the connector is stopped. It perform a general cleanup and stops all background - tasks that require the connection with the exchange to work. + This function is executed when the connector is stopped. + It stops all background tasks that connect to the exchange. """ self._stop_network() - async def check_network(self) -> NetworkStatus: + async def check_network(self): """ - Checks connectivity with the exchange using the API + Checks connectivity with the exchange using the API. """ - try: - await self._api_get(path_url=self.CHECK_NETWORK_URL) - except asyncio.CancelledError: - raise - except Exception: - return NetworkStatus.NOT_CONNECTED - return NetworkStatus.CONNECTED + return await self._api.check_network() - # loops and sync related methods - # - async def _trading_rules_polling_loop(self): - """ - Updates the trading rules by requesting the latest definitions from the exchange. - Executes regularly every 30 minutes - """ - while True: + async def _api_listener_loop(self): + async for event in self._iter_api_queue(): try: - await safe_gather(self._update_trading_rules()) - await asyncio.sleep(self.TRADING_RULES_INTERVAL) + if event["type"] == "trade": + self._process_trade(event) + if event["type"] == "order": + self._process_order(event) + if event["type"] == "balance": + self._process_balance(event) except asyncio.CancelledError: raise except Exception: - self.logger().network( - "Unexpected error while fetching trading rules.", exc_info=True, - app_warning_msg=f"Could not fetch new trading rules from {self.name_cap}" - " Check network connection.") - await asyncio.sleep(0.5) + self.logger().error( + "Unexpected error in API listener loop. Retrying in 5s.", + exc_info=True) + await self._sleep(5) - async def _trading_fees_polling_loop(self): - """ - Only some exchanges provide a fee endpoint. - If _update_trading_fees() is not defined, we just exit the loop - """ + async def _iter_api_queue(self) -> AsyncIterable[Dict[str, any]]: while True: try: - await safe_gather(self._update_trading_fees()) - await self._sleep(self.TRADING_FEES_INTERVAL) - except NotImplementedError: - return + yield await self._api.get_api_message() except asyncio.CancelledError: raise except Exception: - self.logger().network( - "Unexpected error while fetching trading fees.", exc_info=True, - app_warning_msg=f"Could not fetch new trading fees from {self.name_cap}." - " Check network connection.") - await self._sleep(0.5) + self.logger().exception("Error while reading API queue. Retrying in 1s.") + await asyncio.sleep(1) - async def _status_polling_loop(self): + def _process_trade(self, trade: Dict[str, Any], client_order_id: Optional[str] = None): """ - Performs all required operation to keep the connector updated and synchronized with the exchange. - It contains the backup logic to update status using API requests in case the main update source - (the user stream data source websocket) fails. - It also updates the time synchronizer. This is necessary because the exchange requires - the time of the client to be the same as the time in the exchange. - Executes when the _poll_notifier event is enabled by the `tick` function. + Updates in-flight order and trigger order filled event for trade message received. Triggers order completed + event if the total executed amount equals to the specified order amount. + Example Trade: + https://www.gate.io/docs/apiv4/en/#retrieve-market-trades """ - while True: - try: - await self._poll_notifier.wait() - await self._update_time_synchronizer() - - # the following method is implementation-specific - await self._status_polling_loop_fetch_updates() + client_order_id = client_order_id or str(trade["text"]) + tracked_order = self.in_flight_orders.get(client_order_id, None) + if not tracked_order: + self.logger().debug(f"Ignoring trade message with id {client_order_id}: not in in_flight_orders.") + return - self._last_poll_timestamp = self.current_timestamp - self._poll_notifier = asyncio.Event() - except asyncio.CancelledError: - raise - except NotImplementedError: - raise - except Exception: - self.logger().network( - "Unexpected error while fetching account updates.", - exc_info=True, - app_warning_msg=f"Could not fetch account updates from {self.name_cap}. " - "Check API key and network connection.") - await self._sleep(0.5) - - async def _update_time_synchronizer(self): - try: - await self._time_synchronizer.update_server_time_offset_with_time_provider( - time_provider=self.web_utils.get_current_server_time( - throttler=self._throttler, - domain=self._domain, - ) - ) - except asyncio.CancelledError: - raise - except Exception: - self.logger().exception(f"Error requesting time from {self.name_cap} server") - raise + fee = TradeFeeBase.new_spot_fee( + fee_schema=self.trade_fee_schema(), + trade_type=tracked_order.trade_type, + percent_token=trade["fee_currency"], + flat_fees=[TokenAmount( + amount=Decimal(trade["fee_amount"]), + token=trade["fee_currency"] + )] + ) + trade_update = TradeUpdate( + trade_id=str(trade["id"]), + client_order_id=tracked_order.client_order_id, + exchange_order_id=tracked_order.exchange_order_id, + trading_pair=tracked_order.trading_pair, + fee=fee, + fill_base_amount=Decimal(trade["fill_base_amount"]), + fill_quote_amount=Decimal(trade["fill_quote_amount"]), + fill_price=Decimal(trade["fill_price"]), + fill_timestamp=trade["fill_timestamp"], + ) + self._order_tracker.process_trade_update(trade_update) - async def _iter_user_event_queue(self) -> AsyncIterable[Dict[str, any]]: - """ - Called by _user_stream_event_listener. + def _process_order(self, order_msg: Dict[str, Any]): """ - while True: - try: - yield await self._user_stream_tracker.user_stream.get() - except asyncio.CancelledError: - raise - except Exception: - self.logger().exception("Error while reading user events queue. Retrying in 1s.") - await asyncio.sleep(1.0) + Updates in-flight order and triggers cancellation or failure event if needed. - # Exchange / Trading logic methods - # that call the API - # - async def _update_trading_rules(self): - exchange_info = await self._api_get(path_url=self.SYMBOLS_PATH_URL) - trading_rules_list = await self._format_trading_rules(exchange_info) - self._trading_rules.clear() - for trading_rule in trading_rules_list: - self._trading_rules[trading_rule.trading_pair] = trading_rule - - async def _api_get(self, *args, **kwargs): - kwargs["method"] = RESTMethod.GET - return await self._api_request(*args, **kwargs) - - async def _api_post(self, *args, **kwargs): - kwargs["method"] = RESTMethod.POST - return await self._api_request(*args, **kwargs) - - async def _api_put(self, *args, **kwargs): - kwargs["method"] = RESTMethod.PUT - return await self._api_request(*args, **kwargs) - - async def _api_delete(self, *args, **kwargs): - kwargs["method"] = RESTMethod.DELETE - return await self._api_request(*args, **kwargs) - - async def _api_request(self, - path_url, - method: RESTMethod = RESTMethod.GET, - params: Optional[Dict[str, Any]] = None, - data: Optional[Dict[str, Any]] = None, - is_auth_required: bool = False, - limit_id: Optional[str] = None) -> Dict[str, Any]: - - return await self.web_utils.api_request( - path=path_url, - api_factory=self._api_factory, - throttler=self._throttler, - time_synchronizer=self._time_synchronizer, - domain=self._domain, - params=params, - data=data, - method=method, - is_auth_required=is_auth_required, - limit_id=limit_id) - - # Methods tied to specific API data formats - # - async def _update_trading_fees(self): - raise NotImplementedError + :param order_msg: The order response from either REST or web socket API (they are of the same format) - def _user_stream_event_listener(self): - raise NotImplementedError - - def _format_trading_rules(self): - raise NotImplementedError - - def _update_order_status(self): - raise NotImplementedError + Example Order: + https://www.gate.io/docs/apiv4/en/#list-orders + """ + state = None + client_order_id = str(order_msg.get("text", "")) + tracked_order = self.in_flight_orders.get(client_order_id, None) + if not tracked_order: + self.logger().debug(f"Ignoring order message with id {client_order_id}: not in in_flight_orders.") + return - def _update_balances(self): - raise NotImplementedError + state = self._normalise_order_message_state(order_msg, tracked_order) + if state: + order_update = OrderUpdate( + trading_pair=tracked_order.trading_pair, + update_timestamp=order_msg["update_timestamp"], + new_state=order_msg["state"], + client_order_id=client_order_id, + exchange_order_id=str(order_msg["id"]), + ) + self._order_tracker.process_order_update(order_update=order_update) + self.logger().info(f"Successfully updated order {tracked_order.client_order_id}.") + + # TODO + def _process_balance(self, balance_update): + local_asset_names = set(self._account_balances.keys()) + remote_asset_names = set() + for account in balance_update["accounts"]: + asset_name = account["currency"] + self._account_available_balances[asset_name] = Decimal(str(account["available"])) + self._account_balances[asset_name] = Decimal(str(account["locked"])) + Decimal(str(account["available"])) + remote_asset_names.add(asset_name) + asset_names_to_remove = local_asset_names.difference(remote_asset_names) + for asset_name in asset_names_to_remove: + del self._account_available_balances[asset_name] + del self._account_balances[asset_name] + + # TODO + def _process_balance_message_ws(self, balance_update): + for account in balance_update["accounts"]: + asset_name = account["currency"] + self._account_available_balances[asset_name] = Decimal(str(account["available"])) + self._account_balances[asset_name] = Decimal(str(account["total"])) diff --git a/test/connector/integration_test_tool.py b/test/connector/integration_test_tool.py index bab1273a69..8b03ae9214 100644 --- a/test/connector/integration_test_tool.py +++ b/test/connector/integration_test_tool.py @@ -134,7 +134,7 @@ def get_gate_io(ec): gate_io_secret_key=creds.s, trading_pairs=[ec.pair], ) - exchange.ORDERBOOK_DS_CLASS._trading_pair_symbol_map = { + exchange._api.ORDERBOOK_DS_CLASS._trading_pair_symbol_map = { exchange.DEFAULT_DOMAIN: bidict({f"{ec.base}{ec.quote}": ec.pair}) } return exchange @@ -157,12 +157,12 @@ def config_logger(obj): obj.logger().addHandler(self) objs = ( self.exchange, - self.exchange._time_synchronizer, self.exchange._order_tracker, - self.exchange._user_stream_tracker, - self.exchange._order_book_tracker, - self.exchange._userstream_ds, - self.exchange._orderbook_ds, + self.exchange._api._time_synchronizer, + self.exchange._api._user_stream_tracker, + self.exchange._api._order_book_tracker, + self.exchange._api._userstream_ds, + self.exchange._api._orderbook_ds, ) for obj in objs: config_logger(obj) @@ -191,8 +191,8 @@ async def loop_clock(self): ts = time.time() self.exchange.tick(ts) debug_msg = f"ticking {ts}\n" \ - f"UST recv time: {self.exchange._user_stream_tracker.last_recv_time}\n" \ - f"OBT recv time: {self.exchange._user_stream_tracker.last_recv_time}\n" + f"UST recv time: {self.exchange._api._user_stream_tracker.last_recv_time}\n" \ + f"OBT recv time: {self.exchange._api._user_stream_tracker.last_recv_time}\n" self.debug(debug_msg) await asyncio.sleep(1) @@ -213,8 +213,8 @@ async def start(self): # self.exchange = get_binance(self) self.exchange = get_gate_io(self) - self.exchange._time_synchronizer.add_time_offset_ms_sample(0) - self.exchange._user_stream_tracker._user_stream = PrintingQueue() + self.exchange._api._time_synchronizer.add_time_offset_ms_sample(0) + self.exchange._api._user_stream_tracker._user_stream = PrintingQueue() self.initialize_exchange_loggers() self.initialize_event_loggers() @@ -245,7 +245,7 @@ async def tests(self): print(f'\n{r}') if r == NetworkStatus.CONNECTED: print("Exchange status: ", self.exchange.status_dict) - print("Trading pair symbol map from OB DS: ", self.exchange._orderbook_ds._trading_pair_symbol_map) + print("Trading pair symbol map from OB DS: ", self.exchange._api._orderbook_ds._trading_pair_symbol_map) print('\n') break