Skip to content

Commit

Permalink
thread lock initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
malciller committed Feb 14, 2025
1 parent 1d3646e commit 38879b2
Showing 1 changed file with 22 additions and 35 deletions.
57 changes: 22 additions & 35 deletions diogrid.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def __init__(self):
self.jitter = 0.1 # Add 10% random jitter
self.earn_strategies = {}
self.strategy_ids = {}
self._public_ping_lock = asyncio.Lock()
"""
Generates a Kraken API signature for authentication.
Expand Down Expand Up @@ -226,15 +227,14 @@ async def connect_with_backoff(self):
)
self.connection_status['public']['connected'] = True
Logger.success("CONNECTION: PUBLIC - 200 - OK")
return token # Return token only after both connections are successful
except Exception as e:
Logger.error(f"Public WebSocket connection failed: {str(e)}")
# Close private connection if public fails
if self.websocket:
await self.websocket.close()
raise

return token

except websockets.exceptions.InvalidStatus as e:
if "HTTP 429" in str(e):
Logger.warning(f"Rate limit hit. Backing off for {current_backoff} seconds...")
Expand Down Expand Up @@ -472,7 +472,7 @@ async def reconnect(self):
if self.active_trading_pairs:
await self.subscribe_ticker(list(self.active_trading_pairs))

Logger.success("Successfully reconnected to WebSocket")
Logger.success("CONNECTION: RECONNECT - 200 - OK")
return True

except Exception as e:
Expand Down Expand Up @@ -924,7 +924,7 @@ async def handle_execution_updates(self, data):
self.orders[order_id].update(execution)
else:
self.orders[order_id] = execution
Logger.info(f"New order {order_id} for {symbol}")
Logger.info(f"ORDER: NEW {symbol} - {order_id}")
elif order_id in self.orders:
self.orders[order_id].update(execution)
"""
Expand Down Expand Up @@ -1319,7 +1319,7 @@ async def update_order_price(self, trading_pair: str, order: dict, new_price: fl
await self.client.websocket.send(json.dumps(amend_message))
response = await self.client.wait_for_response(req_id, timeout=AMEND_RESPONSE_TIMEOUT)
if response and response.get('success') is True:
Logger.success(f"ORDER: Successfully updated order price to ${formatted_price}")
Logger.success(f"ORDER: UPDATE PRICE - 200 - OK")
return True
else:
if attempt < MAX_AMEND_RETRIES - 1:
Expand Down Expand Up @@ -1404,7 +1404,7 @@ async def place_orders(self, trading_pair: str):
return
self.grid_settings[trading_pair]['active_orders'].add(order_id)
self.grid_orders[trading_pair]['buy'] = order_id
Logger.success(f"ORDER: Grid buy order placed for {trading_pair} with ID: {order_id}")
Logger.success(f"ORDER: {trading_pair} ID: {order_id}")
# Try to place corresponding sell order
try:
sell_req_id = int(time.time() * 1000)
Expand All @@ -1426,7 +1426,7 @@ async def place_orders(self, trading_pair: str):
sell_order_id = sell_response.get('result', {}).get('order_id')
if sell_order_id:
self.grid_orders[trading_pair]['sell'] = sell_order_id
Logger.success(f"ORDER: Grid sell order placed for {trading_pair} with ID: {sell_order_id}")
Logger.success(f"ORDER: {trading_pair} ID: {sell_order_id}")
else:
Logger.warning(f"ORDER: Sell order placed but no ID received for {trading_pair}")
else:
Expand All @@ -1447,79 +1447,66 @@ async def main():
max_retries = 3
retry_delay = 5
retry_count = 0
while True:

while retry_count < max_retries:
try:
# Connect and set up WebSocket handlers
token = await client.connect()
if not token:
raise Exception("Failed to obtain connection token")

client.set_handler('balances', client.handle_balance_updates)
client.set_handler('executions', client.handle_execution_updates)
client.set_handler('ticker', client.handle_ticker)
await client.subscribe(['balances', 'executions'], token)

# Reset retry count on successful connection
retry_count = 0

# Start the grid bot
await grid_bot.start()

while client.running:
try:
all_orders_valid = True
for pair in TRADING_PAIRS:
# Check if we have valid orders within our grid
current_order = await grid_bot.check_open_orders(pair)
if current_order:
# If we have an order, check if it's still valid
order_valid = await grid_bot.check_open_orders_open_order_interval(pair, current_order)
all_orders_valid = all_orders_valid and order_valid
if all_orders_valid:
Logger.success("ORDERS: 200 - OK")
await asyncio.sleep(LONG_SLEEP_TIME)
except websockets.exceptions.ConnectionClosed:
Logger.warning("Connection lost, attempting to reconnect...")
raise # Propagate to outer try block for reconnection
except asyncio.CancelledError:
raise
except Exception as e:
Logger.error(f"Error in trading loop: {str(e)}")
await asyncio.sleep(LONG_SLEEP_TIME)

except (websockets.exceptions.ConnectionClosed, KrakenAPIError) as e:
retry_count += 1
if retry_count < max_retries:
retry_count += 1
Logger.warning(f"Connection lost. Attempting reconnection {retry_count}/{max_retries}")
# Clean up existing connections
try:
await client.disconnect()
except Exception as cleanup_error:
Logger.error(f"Error during cleanup: {str(cleanup_error)}")
# Wait before retrying
await client.disconnect()
await asyncio.sleep(retry_delay)
continue
else:
Logger.error("Max reconnection attempts reached. Exiting.")
break
break

except KeyboardInterrupt:
Logger.info("Keyboard interrupt received. Shutting down...")
break

except Exception as e:
Logger.error(f"Unexpected error: {str(e)}\n{traceback.format_exc()}")
retry_count += 1
if retry_count < max_retries:
retry_count += 1
await asyncio.sleep(retry_delay)
continue
else:
break
# Final cleanup
try:
# First unsubscribe from all channels if connection is still open
if client.websocket:
await client.unsubscribe(['balances', 'executions'])
if client.active_trading_pairs:
await client.unsubscribe_ticker(list(client.active_trading_pairs))
# Wait briefly for unsubscribe confirmations
await asyncio.sleep(LONG_SLEEP_TIME)
# Then disconnect
await client.disconnect()
Logger.success("Successfully disconnected from all Kraken WebSocket streams.")
except Exception as e:
Logger.error(f"Shutdown error details:\n{traceback.format_exc()}")

if __name__ == "__main__":
try:
Expand Down

0 comments on commit 38879b2

Please sign in to comment.