-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmeowgram.py
237 lines (191 loc) · 9.01 KB
/
meowgram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import os
import time
import json
import asyncio
import logging
import requests
import tempfile
import soundfile as sf
from typing import Dict
import ffmpeg
from telegram import Update, Bot
from telegram.ext import (
Application,
ApplicationBuilder,
ContextTypes,
MessageHandler,
ApplicationHandlerStop,
filters
)
from telegram.constants import ChatAction
from ccat_connection import CCatConnection
class Meowgram():
def __init__(self, telegram_token: str, ccat_url: str = "localhost", ccat_port: int = 1865) -> None:
self.ccat_url = ccat_url
self.ccat_port = ccat_port
self._loop = asyncio.get_running_loop()
# Queue of the messages to send on telegram
self._out_queue = asyncio.Queue()
# Used to store for each connection when the last typing action is sended
self.last_typing_action = {}
self._connections: Dict[str, CCatConnection] = {}
# Create telegram application
self.telegram: Application = ApplicationBuilder().token(telegram_token).build()
self.bot: Bot = self.telegram.bot
# This handler open a connection to the cheshire cat for the user if it doesn't exist yet
self.connect_to_ccat = MessageHandler(filters.ALL, self._open_ccat_connection)
self.telegram.add_handler(self.connect_to_ccat)
# Handlers to manage different types of messages after the connection to the cheshire cat is opened
# in the previous handler group
self.text_message_handler = MessageHandler(filters.TEXT & (~filters.COMMAND), self._text_handler)
self.voice_message_handler = MessageHandler(filters.VOICE & (~filters.COMMAND), self._voice_note_handler)
self.document_message_handler = MessageHandler(filters.Document.ALL & (~filters.COMMAND), self._document_handler)
self.telegram.add_handler(handler=self.document_message_handler, group=1)
self.telegram.add_handler(handler=self.voice_message_handler, group=1)
self.telegram.add_handler(handler=self.text_message_handler, group=1)
self.telegram.add_handler(handler=self.document_message_handler, group=1)
async def run(self):
# https://docs.python-telegram-bot.org/en/stable/telegram.ext.application.html#telegram.ext.Application.run_polling
# Initializing and starting the app
try:
await self.telegram.initialize()
await self.telegram.updater.start_polling(read_timeout=10)
await self.telegram.start()
# Start main loop
responce_loop = self._loop.create_task(self._out_queue_dispatcher())
await responce_loop
except asyncio.CancelledError:
# Cancelled error from _out_queue_dispatcher
logging.info("STOPPING THE APPLICATION")
# Stop telegram updater
await self.telegram.updater.stop()
# Stop telegram bot application
await self.telegram.stop()
except Exception as e:
logging.exception(f"Unexpectet exeption occured: {e}")
finally:
# Shutdown telergram bot application
await self.telegram.shutdown()
# Close open ws connections
for connection in self._connections.values():
if connection.ccat is not None:
connection.ccat.close()
async def _open_ccat_connection(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
if chat_id not in self._connections:
self._connections[chat_id] = CCatConnection(
user_id=chat_id,
out_queue=self._out_queue,
ccat_url=self.ccat_url,
ccat_port=self.ccat_port
)
# waiting for websocket connection
if not self._connections[chat_id].is_connected:
await self._connections[chat_id].connect()
# If the connection is not successful, message handling is interrupted
if not self._connections[chat_id].is_connected:
logging.warning("Interrupt handling this message, failed to connect to the Cheshire Cat")
raise ApplicationHandlerStop
async def _text_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
# Send mesage to the cat
self._connections[chat_id].send(
message=update.message.text,
meowgram = {
"update": update.to_json()
},
)
async def _voice_note_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
chat_id = update.effective_chat.id
voice_message_file = await update.message.voice.get_file()
# Send mesage to the cat
self._connections[chat_id].ccat.send(
message="*[Voice Note]* (You can't hear)",
meowgram_voice=voice_message_file.file_path,
meowgram = {
"update": update.to_json()
},
)
async def _document_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
pass
async def _out_queue_dispatcher(self):
while True:
message, user_id = await self._out_queue.get()
logging.debug(f"Message from {user_id}: {json.dumps(message, indent=4)}")
try:
if message["type"] == "chat":
# send the message in chat
await self._dispatch_chat_message(message=message, user_id=user_id)
elif message['type'] == "chat_token":
# Send the chat action TYPING every 5 seconds
# during the tokens streaming
await self._dispatch_chat_token(user_id)
except Exception as e:
logging.error(f"An error occurred sending a telegram message: {e}")
async def _dispatch_chat_message(self, message, user_id):
send_params = message.get("meowgram", {}).get("send_params", {})
settings = message.get("meowgram", {}).get("settings", {
"show_tts_text": False
})
tts_url = message.get("tts", None)
if tts_url:
# Get audio file
response = requests.get(tts_url)
if response.status_code != 200:
# If there is an error in retrieving the audio file, it sends a text message
await self.bot.send_message(
chat_id=user_id,
text=message["content"],
**send_params
)
return
with tempfile.NamedTemporaryFile() as speech_file:
# Write the content of the response to the temporary file
speech_file.write(response.content)
# Convet audio to telegram voice note fromat
speech_file_ogg_path = await self._loop.run_in_executor(None, self.convert_audio_to_voice, speech_file.name)
# Check if converted file exists
if not os.path.exists(speech_file_ogg_path):
return
# Send voice note
await self.bot.send_voice(
chat_id=user_id,
voice=open(speech_file_ogg_path, "rb"),
duration=sf.info(speech_file_ogg_path).duration,
caption=message["content"] if settings["show_tts_text"] else None,
filename=speech_file_ogg_path,
**send_params
)
# Remove converted file
os.remove(speech_file_ogg_path)
else:
# If there is no TTS URL, simply send a text message
await self.bot.send_message(
chat_id=user_id,
text=message["content"],
**send_params
)
def convert_audio_to_voice(self, input_path: str) -> str:
# https://stackoverflow.com/questions/56448384/telegram-bot-api-voice-message-audio-spectrogram-is-missing-a-bug
logging.info("Convert audio file to Telegram voice note format")
output_path = os.path.splitext(input_path)[0] + "_converted.ogg"
(
ffmpeg.input(input_path)
.output(output_path, codec="libopus", audio_bitrate="32k", vbr="on", compression_level=10, frame_duration=60, application="voip")
.run()
)
return output_path
async def _dispatch_chat_token(self, user_id):
t = time.time()
if user_id not in self.last_typing_action:
self.last_typing_action[user_id] = t - 5
# Check elapsed time
if t - self.last_typing_action[user_id] > 5:
logging.info(f"Sending chat action Typing to user {user_id}")
# Update the time of the last typing action
self.last_typing_action[user_id] = t
# Send the action
await self.bot.send_chat_action(
chat_id=user_id,
action=ChatAction.TYPING
)