-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbot.py
257 lines (219 loc) · 9.13 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import os
import sys
import logging
import requests
import re
import asyncio
from dotenv import load_dotenv
from telegram import Update
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
filters,
ContextTypes,
ConversationHandler
)
# Load environment variables
load_dotenv()
# Set up logging
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
# States
WAITING_FOR_TAGS = 1
# Configuration
TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
MY_CHAT_ID = os.getenv("MY_CHAT_ID")
WALLABAG_URL = os.getenv("WALLABAG_URL")
WALLABAG_CLIENT_ID = os.getenv("WALLABAG_CLIENT_ID")
WALLABAG_CLIENT_SECRET = os.getenv("WALLABAG_CLIENT_SECRET")
WALLABAG_USERNAME = os.getenv("WALLABAG_USERNAME")
WALLABAG_PASSWORD = os.getenv("WALLABAG_PASSWORD")
WALLABAG_DEFAULT_ARCHIVE = os.getenv("WALLABAG_DEFAULT_ARCHIVE", "1")
# Validate required environment variables
if not all([TELEGRAM_TOKEN, MY_CHAT_ID, WALLABAG_URL, WALLABAG_CLIENT_ID, WALLABAG_CLIENT_SECRET,
WALLABAG_USERNAME, WALLABAG_PASSWORD]):
logger.error("Missing required environment variables!")
sys.exit(1)
def chat_id_restricted(func):
"""Decorator to restrict bot usage to specific chat ID"""
async def wrapped(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
if str(update.effective_chat.id) != MY_CHAT_ID:
logger.warning(f"Unauthorized access denied for {update.effective_chat.id}")
await update.message.reply_text("You are not authorized to use this bot.")
return ConversationHandler.END
return await func(update, context, *args, **kwargs)
return wrapped
def get_wallabag_token():
"""Get Wallabag access token"""
token_url = f"{WALLABAG_URL}/oauth/v2/token"
data = {
"grant_type": "password",
"client_id": WALLABAG_CLIENT_ID,
"client_secret": WALLABAG_CLIENT_SECRET,
"username": WALLABAG_USERNAME,
"password": WALLABAG_PASSWORD
}
response = requests.post(token_url, data=data)
response.raise_for_status()
return response.json()["access_token"]
def is_valid_url(url: str) -> bool:
"""Validate the provided URL"""
url_regex = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain...
r'localhost|' # localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|' # ...or ipv4
r'\[?[A-F0-9]*:[A-F0-9:]+\]?)' # ...or ipv6
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return re.match(url_regex, url) is not None
def is_valid_tag(tag: str) -> bool:
"""Validate the provided tag"""
# Assuming tags should not contain spaces and special characters
tag_regex = re.compile(r'^[a-zA-Z0-9_-]+$')
return re.match(tag_regex, tag) is not None
@chat_id_restricted
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Send a message when the command /start is issued"""
await update.message.reply_text("Send me a URL to save it to Wallabag. "
"You can add tags in the same message after the URL (in a new line) "
"or in a follow-up message within 5 seconds.")
return ConversationHandler.END
async def process_url_and_tags(update: Update, url: str, tags: str) -> None:
"""Process URL and tags with Wallabag"""
try:
token = get_wallabag_token()
headers = {'Authorization': f'Bearer {token}'}
archive_value = 1 if WALLABAG_DEFAULT_ARCHIVE not in ["0", "1"] else int(WALLABAG_DEFAULT_ARCHIVE)
data = {
'url': url,
'tags': tags,
'archive': archive_value
}
response = requests.post(f'{WALLABAG_URL}/api/entries.json',
headers=headers,
json=data)
if response.status_code == 200:
if tags:
await update.message.reply_text(f"Article saved with tags: {tags}")
else:
await update.message.reply_text("Article saved without tags")
else:
await update.message.reply_text(f"Failed to save the article. Status code: {response.status_code}")
logger.error(f"Failed to save article. Response: {response.text}")
except Exception as e:
logger.error(f"Error saving article: {e}")
await update.message.reply_text("An error occurred while saving the article")
@chat_id_restricted
async def handle_url(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handle the initial URL message"""
# Clear any previous state
context.user_data.clear()
message_text = update.message.text.strip()
# Split message into lines
lines = message_text.split('\n')
first_line_parts = lines[0].split()
url = first_line_parts[0].strip()
if not is_valid_url(url):
await update.message.reply_text("The provided URL is invalid. Please provide a valid URL.")
return ConversationHandler.END
# Store URL in context
context.user_data['current_url'] = url
# Check for tags in the same line after the URL
if len(first_line_parts) > 1:
tags = " ".join(first_line_parts[1:]).strip()
if not is_valid_tag(tags):
await update.message.reply_text("One or more provided tags are invalid. Please provide valid tags.")
return ConversationHandler.END
await process_url_and_tags(update, url, tags)
context.user_data.clear() # Clear state after processing
return ConversationHandler.END
# Check for tags in next line
elif len(lines) > 1:
tags = lines[1].strip()
if not is_valid_tag(tags):
await update.message.reply_text("One or more provided tags are invalid. Please provide valid tags.")
return ConversationHandler.END
await process_url_and_tags(update, url, tags)
context.user_data.clear() # Clear state after processing
return ConversationHandler.END
else:
# Wait for potential tags
await update.message.reply_text("Waiting 5 seconds for tags...")
context.job_queue.run_once(
callback=timeout_callback,
when=5,
data={'update': update, 'url': url},
name='timeout'
)
return WAITING_FOR_TAGS
@chat_id_restricted
async def handle_tags(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handle the tags message"""
url = context.user_data.get('current_url')
if not url: # If no URL in context, ignore
return ConversationHandler.END
tags = update.message.text.strip()
# Remove the scheduled timeout job
current_jobs = context.job_queue.get_jobs_by_name('timeout')
for job in current_jobs:
job.schedule_removal()
await process_url_and_tags(update, url, tags)
context.user_data.clear() # Clear state after processing
return ConversationHandler.END
async def timeout_callback(context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle timeout when no tags are provided"""
job = context.job
update = job.data['update']
url = job.data['url']
await process_url_and_tags(update, url, "")
context.application.user_data[update.effective_user.id].clear() # Clear state after timeout
@chat_id_restricted
async def timeout(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Handle timeout when waiting for tags"""
url = context.user_data.get('current_url')
await process_url_and_tags(update, url, "")
return ConversationHandler.END
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
"""Cancel the conversation"""
await update.message.reply_text("Operation cancelled.")
return ConversationHandler.END
def main() -> None:
"""Start the bot"""
try:
logger.info("Starting bot...")
# Initialize with job queue
application = (
Application.builder()
.token(TELEGRAM_TOKEN)
.build()
)
# Add error handler
application.add_error_handler(error_handler)
# Add conversation handler
conv_handler = ConversationHandler(
entry_points=[MessageHandler(filters.TEXT & ~filters.COMMAND, handle_url)],
states={
WAITING_FOR_TAGS: [
MessageHandler(filters.TEXT & ~filters.COMMAND, handle_tags),
]
},
fallbacks=[CommandHandler("cancel", cancel)]
)
# Add handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(conv_handler)
# Start the bot
application.run_polling()
except Exception as e:
logger.error(f"Error running bot: {e}")
sys.exit(1)
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Handle errors"""
logger.error(f"Exception while handling an update: {context.error}")
if __name__ == '__main__':
main()