Skip to content

Commit

Permalink
Added Signals Support (init, keep-alive, shutdown, and, new groups
Browse files Browse the repository at this point in the history
Signed-off-by: Guilherme Bacellar Moralez <guibacellar@gmail.com>
  • Loading branch information
guibacellar committed Nov 6, 2023
1 parent f945046 commit ed3a199
Show file tree
Hide file tree
Showing 16 changed files with 732 additions and 84 deletions.
38 changes: 38 additions & 0 deletions TEx/core/mapper/keep_alive_entity_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""Signal Entity Mapper."""
from __future__ import annotations

from configparser import SectionProxy
from typing import Optional

from TEx.models.facade.signal_entity_model import SignalEntity


class SignalEntityMapper:
"""Signal Entity Mapper."""

@staticmethod
def to_entity(section_proxy: Optional[SectionProxy]) -> SignalEntity:
"""Map the Configuration KEEP_ALIVE to Entity."""
# Build Model
if section_proxy:
return SignalEntity(
enabled=section_proxy.get('enabled', fallback='false') == 'true',
keep_alive_interval=int(section_proxy.get('keep_alive_interval', fallback='0')),
notifiers={
'KEEP-ALIVE': section_proxy.get('keep_alive_notifer', fallback='').split(','),
'INITIALIZATION': section_proxy.get('initialization_notifer', fallback='').split(','),
'SHUTDOWN': section_proxy.get('shutdown_notifer', fallback='').split(','),
'NEW-GROUP': section_proxy.get('new_group_notifer', fallback='').split(','),
},
)

return SignalEntity(
enabled=False,
keep_alive_interval=300,
notifiers={
'KEEP-ALIVE': [],
'INITIALIZATION': [],
'SHUTDOWN': [],
'NEW-GROUP': [],
},
)
6 changes: 3 additions & 3 deletions TEx/finder/finder_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def __init__(self) -> None:
"""Initialize Finder Engine."""
self.is_finder_enabled: bool = False
self.rules: List[Dict] = []
self.notification_engine: NotifierEngine = NotifierEngine()
self.notification_engine: NotifierEngine

def __is_finder_enabled(self, config: ConfigParser) -> bool:
"""Check if Finder Module is Enabled."""
Expand All @@ -43,11 +43,11 @@ def __load_rules(self, config: ConfigParser) -> None:
'notifier': config[sec]['notifier'],
})

def configure(self, config: ConfigParser) -> None:
def configure(self, config: ConfigParser, notification_engine: NotifierEngine) -> None:
"""Configure Finder."""
self.is_finder_enabled = self.__is_finder_enabled(config=config)
self.__load_rules(config=config)
self.notification_engine.configure(config=config)
self.notification_engine = notification_engine

async def run(self, entity: Optional[FinderNotificationMessageEntity], source: str) -> None:
"""Execute the Finder with Raw Text.
Expand Down
6 changes: 3 additions & 3 deletions TEx/models/facade/finder_notification_facade_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ class FinderNotificationMessageEntity(BaseModel):
date_time: datetime
raw_text: str
group_name: Optional[str]
group_id: int
group_id: Optional[int]
from_id: Optional[int]
to_id: Optional[int]
reply_to_msg_id: Optional[int]
message_id: int
is_reply: bool
message_id: Optional[int]
is_reply: Optional[bool]
downloaded_media_info: Optional[MediaHandlingEntity]
16 changes: 16 additions & 0 deletions TEx/models/facade/signal_entity_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Signal Entity."""
from __future__ import annotations

from typing import Dict

from pydantic import BaseModel, ConfigDict


class SignalEntity(BaseModel):
"""Signal Entity."""

model_config = ConfigDict(extra='forbid')

enabled: bool
keep_alive_interval: int
notifiers: Dict
16 changes: 16 additions & 0 deletions TEx/models/facade/signal_notification_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Facade Entities for Signal based Notifications."""
from __future__ import annotations

from datetime import datetime

from pydantic import BaseModel, ConfigDict


class SignalNotificationEntityModel(BaseModel):
"""Facade Entities for Signal based Notifications."""

model_config = ConfigDict(extra='forbid')

signal: str
date_time: datetime
content: str
88 changes: 85 additions & 3 deletions TEx/modules/telegram_messages_listener.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""Telegram Group Listener."""
from __future__ import annotations

import asyncio
import contextlib
import logging
import signal
from configparser import ConfigParser
from typing import Dict, List, Optional, cast
from typing import Dict, List, Optional, Tuple, cast

import pytz
from telethon import TelegramClient, events
Expand All @@ -22,6 +25,8 @@
from TEx.database.telegram_group_database import TelegramGroupDatabaseManager, TelegramMessageDatabaseManager, TelegramUserDatabaseManager
from TEx.finder.finder_engine import FinderEngine
from TEx.models.facade.media_handler_facade_entity import MediaHandlingEntity
from TEx.notifier.notifier_engine import NotifierEngine
from TEx.notifier.signals_engine import SignalsEngine, SignalsEngineFactory

logger = logging.getLogger('TelegramExplorer')

Expand All @@ -45,7 +50,21 @@ def __init__(self) -> None:
self.media_handler: UniversalTelegramMediaHandler = UniversalTelegramMediaHandler()
self.target_phone_number: str = ''
self.finder: FinderEngine = FinderEngine()
self.notification_engine: NotifierEngine = NotifierEngine()
self.ocr_engine: OcrEngineBase
self.signals_engine: SignalsEngine
self.term_signal: bool = False
self.sleep_task: asyncio.Task

def __handle_term_signal(self, *args: Tuple) -> None:
"""Handle the Interruption and Termination Signals."""
self.term_signal = True

# If Have an Active Sleep, Cancel
if self.sleep_task:
self.sleep_task.cancel()

logger.warning('\t\tTermination Signal Received, please wait to Stop Processing Gracefully.')

async def __handler(self, event: NewMessage.Event) -> None:
"""Handle the Message."""
Expand Down Expand Up @@ -113,6 +132,9 @@ async def __handler(self, event: NewMessage.Event) -> None:
# Add to DB
TelegramMessageDatabaseManager.insert(values)

# Update Signals Engine
self.signals_engine.inc_messages_sent()

def __build_final_message(self, message: str, ocr_data: Optional[str]) -> str:
"""Compute Final Message for Dict."""
h_result: str = ''
Expand Down Expand Up @@ -178,6 +200,12 @@ async def __ensure_group_exists(self, event: NewMessage.Event) -> None:

TelegramGroupDatabaseManager.insert_or_update(group_dict_data)

# Send Signal
await self.signals_engine.new_group(
group_id=str(group_dict_data['id']),
group_title=group_dict_data['title'],
)

async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None:
"""Execute Module."""
if not await self.can_activate(config, args, data):
Expand All @@ -190,15 +218,32 @@ async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None:
self.target_phone_number = config['CONFIGURATION']['phone_number']

try:
# Attach Termination Signals
signal.signal(signal.SIGINT, self.__handle_term_signal) # type: ignore
signal.signal(signal.SIGTERM, self.__handle_term_signal) # type: ignore

# Set Notification Engines
self.notification_engine.configure(config=config)

# Set Finder
self.finder.configure(config=config)
self.finder.configure(
config=config,
notification_engine=self.notification_engine,
)

# Setup Media Handler
self.media_handler.configure(config=config)

# Set OCR Engine
self.ocr_engine = OcrEngineFactory.get_instance(config=config)

# Set Keep Alive Settings
self.signals_engine = SignalsEngineFactory.get_instance(
config=config,
notification_engine=self.notification_engine,
source=self.target_phone_number,
)

except AttributeError as ex:
logger.fatal(ex)
data['internals']['panic'] = True
Expand All @@ -221,6 +266,43 @@ async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None:

# Read all Messages from Now
logger.info('\t\tListening New Messages...')
await client.run_until_disconnected() # Code Stops Here until telegram disconnects

# Send Init Signal
await self.signals_engine.init()

# Loop Until Signal Termination
while not self.term_signal:

if client.is_connected():
self.sleep_task = asyncio.create_task(self.__sleep())
await self.sleep_task

else:
break # Future: Handle Reconnection + Configure Reconnection in config file

# Send Keep-Alive Signal
await self.signals_engine.keep_alive()

# Disconnect Telegram Client
await self.__disconnect(client=client)

async def __disconnect(self, client: TelegramClient) -> None:
"""Disconnect Telegram Client."""
# Disconnect the Client
client.disconnect()

# Wait Disconnect
while client.is_connected():
logger.info('\t\tWaiting Client Disconnection...')
await asyncio.sleep(1)

logger.info('\t\tTelegram Client Disconnected...')

# Send Shutdown Signal
await self.signals_engine.shutdown()

async def __sleep(self) -> None:
"""Allow Sleep Function to be Canceled."""
with contextlib.suppress(asyncio.CancelledError):
await asyncio.sleep(self.signals_engine.keep_alive_interval)

77 changes: 60 additions & 17 deletions TEx/notifier/discord_notifier.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
"""Discord Notifier."""
from __future__ import annotations

from configparser import SectionProxy
from typing import Union

from discord_webhook import DiscordEmbed, DiscordWebhook

from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity
from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel
from TEx.notifier.notifier_base import BaseNotifier


Expand All @@ -20,32 +24,71 @@ def configure(self, url: str, config: SectionProxy) -> None:
self.url = url
self.configure_base(config=config)

async def run(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str) -> None:
async def run(self, entity: Union[FinderNotificationMessageEntity, SignalNotificationEntityModel], rule_id: str, source: str) -> None:
"""Run Discord Notifier."""
# Check and Update Deduplication Control
is_duplicated, duplication_tag = self.check_is_duplicated(message=entity.raw_text)
if is_duplicated:
return

# Run the Notification Process.
webhook = DiscordWebhook(
url=self.url,
rate_limit_retry=True,
embed: DiscordEmbed
if isinstance(entity, FinderNotificationMessageEntity):
is_duplicated, duplication_tag = self.check_is_duplicated(message=entity.raw_text)
if is_duplicated:
return

embed = await self.__get_finder_notification_embed(
entity=entity,
rule_id=rule_id,
source=source,
duplication_tag=duplication_tag,
)

else:
embed = await self.__get_signal_notification_embed(
entity=entity,
source=source,
)

# Run the Notification Process
webhook = DiscordWebhook(url=self.url, rate_limit_retry=True)
webhook.add_embed(embed)
webhook.execute()

async def __get_signal_notification_embed(self, entity: SignalNotificationEntityModel, source: str) -> DiscordEmbed:
"""Return the Embed Object for Signals."""
embed = DiscordEmbed(
title=f'**{entity.group_name}** ({entity.group_id})',
title=entity.signal,
description=entity.content,
)

embed.add_embed_field(name='Source', value=source, inline=True)
embed.add_embed_field(name='Message Date', value=str(entity.date_time), inline=True)

return embed

async def __get_finder_notification_embed(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str, duplication_tag: str) -> DiscordEmbed:
"""Return the Embed Object for Notification."""
# Build Title
title: str = ''
if entity.group_name and entity.group_id:
title = f'**{entity.group_name}** ({entity.group_id})'
elif entity.group_name:
title = f'**{entity.group_name}**'
elif entity.group_id:
title = f'**{entity.group_id}**'

embed = DiscordEmbed(
title=title,
description=entity.raw_text,
)

embed.add_embed_field(name='Source', value=source, inline=True)
embed.add_embed_field(name='Rule', value=rule_id, inline=True)
embed.add_embed_field(name='Message ID', value=str(entity.message_id), inline=False)
embed.add_embed_field(name='Group Name', value=entity.group_name if entity.group_name else '', inline=True)
embed.add_embed_field(name='Group ID', value=str(entity.group_id), inline=True)

if entity.message_id:
embed.add_embed_field(name='Message ID', value=str(entity.message_id), inline=False)

if entity.group_id:
embed.add_embed_field(name='Group Name', value=entity.group_name if entity.group_name else '', inline=True)
embed.add_embed_field(name='Group ID', value=str(entity.group_id), inline=True)

embed.add_embed_field(name='Message Date', value=str(entity.date_time), inline=False)
embed.add_embed_field(name='Tag', value=duplication_tag, inline=False)

# add embed object to webhook
webhook.add_embed(embed)
webhook.execute()
return embed
Loading

0 comments on commit ed3a199

Please sign in to comment.