-
Notifications
You must be signed in to change notification settings - Fork 7
/
twittermd_Bot.py
133 lines (104 loc) · 3.97 KB
/
twittermd_Bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import re
from datetime import datetime
from TwitterMedia import TwitterMedia
import sqlite3 as sql
from sqlite3.dbapi2 import IntegrityError
from telegram import ParseMode
from telegram.ext import Updater, MessageHandler, CommandHandler
Pattern = r'(https?://)?(mobile\.)?(twitter|x)\.com/.+?/status/\d+'
Downloader = TwitterMedia(use_print = True)
con = sql.connect('banned.db', isolation_level = None, check_same_thread = False)
c = con.cursor()
c.execute('CREATE TABLE IF NOT EXISTS punished (id INTEGER NOT NULL PRIMARY KEY);')
def log(text):
print(f"| {datetime.now().strftime('%H:%M:%S')} | {text}")
def start(update, context):
context.bot.send_message(
chat_id = update.effective_chat.id,
text = "Hi, send a twitter status url that contains a video/gif to use this bot.\nIf you face any issues please message @ulfsjar"
)
def ban(update, context):
if int(update.message.chat_id) != 'enter your own user id here.':
return
else:
try:
prisoner = context.args[0]
log(f'Banning {prisoner}')
c.execute('INSERT INTO punished VALUES (?)', (prisoner,))
context.bot.send_message(
chat_id = update.message.chat_id,
parse_mode = ParseMode.MARKDOWN,
text = f'Banned [{prisoner}](tg://user?id={prisoner})'
)
except IntegrityError:
log('The id is probably already in the database, ignoring.')
def processor(update, context):
text = update.message.text
chat_id = update.message.chat_id
name = update.message.chat.first_name
username = update.message.chat.username
message_id = update.message.message_id
c.execute('SELECT id FROM punished WHERE id=?', (chat_id,))
exists = c.fetchall()
if exists:
log(f'{chat_id} is banned, skipping..')
return
if not text:
log(f"@{username} | {name} | {chat_id}\nRequest doesn't contain text, skipping...")
return
log(f'Request: {username} | {name} | {text}')
result = re.search(Pattern, text)
if not result:
context.bot.send_message(
chat_id = chat_id,
reply_to_message_id = message_id,
text = "_I'm sorry, I couldn't find a twitter url in your message._",
parse_mode = ParseMode.MARKDOWN
)
return
try:
tweet = Downloader.fetch_media(result.group(0))
except:
context.bot.send_message(
chat_id = chat_id,
reply_to_message_id = message_id,
text = "_I'm sorry, something went wrong while trying to find the video._",
parse_mode = ParseMode.MARKDOWN
)
return
if tweet is not None:
try:
context.bot.send_video(
chat_id = chat_id,
video = tweet.url,
reply_to_message_id = message_id,
supports_streaming = True
)
except:
context.bot.send_message(
chat_id = chat_id,
reply_to_message_id = message_id,
text = f"I'm sorry, telegram is being a lil meanie and not accepting the video.\n\nHere's the URL instead;\n\n{tweet.url}"
)
else:
context.bot.send_message(
chat_id = chat_id,
reply_to_message_id = message_id,
text = "_I'm sorry, something went wrong while trying to find the video._",
parse_mode = ParseMode.MARKDOWN
)
def main():
updater = Updater(
token = '',
use_context = True,
request_kwargs = {'read_timeout': 1000, 'connect_timeout': 1000}
)
dispatcher = updater.dispatcher
dispatcher.add_handler(CommandHandler('ban', ban, pass_args = True))
dispatcher.add_handler(CommandHandler('start', start))
dispatcher.add_handler(MessageHandler(None, processor))
updater.start_polling()
updater.idle()
if __name__ == '__main__':
log('ONLINE')
main()