-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrecycle.py
142 lines (124 loc) · 4.96 KB
/
recycle.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import asyncio
import telegram
import requests
import json
from database_function import db
import os
from dotenv import load_dotenv
from ai_insight import ai_insight
from pymongo import MongoClient
from datetime import datetime
from messagecollection import get_token_contract_data, message_collection
load_dotenv()
mongo_uri = os.getenv("MONGO_URI")
mongo_client = MongoClient(mongo_uri)
mongodb = mongo_client["telegram_bot_db"]
token_collection = mongodb["token_contracts"]
TOKEN = os.getenv("bot_token")
# Default chat_id if no recent messages
URL_TELEGRAM_BASE = f'https://api.telegram.org/bot{TOKEN}'
URL_GET_UPDATES = f'{URL_TELEGRAM_BASE}/getUpdates'
# Flag to control the DM service
dm_task = None
async def send_message(text, chat_id):
try:
# Create a new bot instance for each message
temp_bot = telegram.Bot(token=TOKEN)
# Send async message using the temporary bot
await temp_bot.send_message(chat_id=chat_id, text=text)
return True
except telegram.error.TelegramError as e:
print(f"Failed to send message to {chat_id}: {str(e)}")
return False
async def send_dm():
try:
# Get all users from database
users = db.get_all_users()
processed_chat_ids = set()
if not users:
print("No users found in database.")
return
ai_insight_text = await ai_insight()
for user in users:
chat_id = user.get('chat_id')
if not chat_id:
print(f"Invalid chat_id for user: {user}")
continue
is_paid = user.get('is_paid', False)
username = user.get('username', 'User')
if chat_id not in processed_chat_ids:
message = (
f"Hello {username}!\n\n"
f"{' Thank you for being our premium member!' if is_paid else '💫 Upgrade to premium for more features!'}\n"
f"{f'{ai_insight_text}' if is_paid else ''}"
f"Use /help to see available commands."
)
if await send_message(text=message, chat_id=chat_id):
processed_chat_ids.add(chat_id)
print(f"Successfully sent message to {username} (ID: {chat_id})")
else:
print(f"Failed to send message to {username} (ID: {chat_id})")
except Exception as e:
print(f"Error in send_dm: {str(e)}")
async def stop_dm_service():
global dm_task
if dm_task:
dm_task.cancel()
try:
await dm_task
except asyncio.CancelledError:
pass
dm_task = None
print("DM service stopped successfully")
async def all_token_data_update():
print("💚all_token_data updating...")
cursor = token_collection.find() # Get regular cursor
token_contracts = list(cursor) # Convert cursor to list
print("💚token_contracts loaded")
for token_contract in token_contracts:
await token_data_update(token_contract)
async def token_data_update(token_contract):
token_contract_data = get_token_contract_data(token_contract["token_contracts"])
if token_contract_data == None:
return
else:
print("💚",token_contract_data["token_contracts"])
existing_entry = token_collection.find_one({"token_contracts": {"$in": [token_contract["token_contracts"]]}})
order_token_contract_data = datetime.now().hour
token_collection.update_one(
{"_id": existing_entry["_id"]},
{"$set": {
"all_data": {
**existing_entry["all_data"], # Preserve previous data
f"message_date({order_token_contract_data})": datetime.now(),
f"num_times_mentioned({order_token_contract_data})": existing_entry["num_times_mentioned"],
f"token_contract_data({order_token_contract_data})": token_contract_data,
}
}}
)
print(f"Successfully updated token data🆓")
async def periodic_dm():
while True:
try:
# await asyncio.gather(
# message_collection(),
# all_token_data_update()
# )
# print("Message collection and token data update completed")
# await asyncio.sleep(10)
print("Periodic DM service starting...")
print(datetime.now())
await send_dm()
print(datetime.now())
await asyncio.sleep(600)
print(datetime.now())
except asyncio.CancelledError:
print("DM service cancelled")
break
except Exception as e:
print(f"Error in DM service: {str(e)}")
await asyncio.sleep(50)
async def start_dm_service():
global dm_task
print("DM service starting...")
dm_task = asyncio.create_task(periodic_dm())