diff --git a/TEx/core/mapper/keep_alive_entity_mapper.py b/TEx/core/mapper/keep_alive_entity_mapper.py new file mode 100644 index 0000000..29ddcf3 --- /dev/null +++ b/TEx/core/mapper/keep_alive_entity_mapper.py @@ -0,0 +1,38 @@ +"""Signal Entity Mapper.""" +from __future__ import annotations + +from configparser import SectionProxy +from typing import Optional + +from TEx.models.facade.signal_entity_model import SignalEntity + + +class SignalEntityMapper: + """Signal Entity Mapper.""" + + @staticmethod + def to_entity(section_proxy: Optional[SectionProxy]) -> SignalEntity: + """Map the Configuration KEEP_ALIVE to Entity.""" + # Build Model + if section_proxy: + return SignalEntity( + enabled=section_proxy.get('enabled', fallback='false') == 'true', + keep_alive_interval=int(section_proxy.get('keep_alive_interval', fallback='0')), + notifiers={ + 'KEEP-ALIVE': section_proxy.get('keep_alive_notifer', fallback='').split(','), + 'INITIALIZATION': section_proxy.get('initialization_notifer', fallback='').split(','), + 'SHUTDOWN': section_proxy.get('shutdown_notifer', fallback='').split(','), + 'NEW-GROUP': section_proxy.get('new_group_notifer', fallback='').split(','), + }, + ) + + return SignalEntity( + enabled=False, + keep_alive_interval=300, + notifiers={ + 'KEEP-ALIVE': [], + 'INITIALIZATION': [], + 'SHUTDOWN': [], + 'NEW-GROUP': [], + }, + ) diff --git a/TEx/finder/finder_engine.py b/TEx/finder/finder_engine.py index 48321d3..bad2fef 100644 --- a/TEx/finder/finder_engine.py +++ b/TEx/finder/finder_engine.py @@ -17,7 +17,7 @@ def __init__(self) -> None: """Initialize Finder Engine.""" self.is_finder_enabled: bool = False self.rules: List[Dict] = [] - self.notification_engine: NotifierEngine = NotifierEngine() + self.notification_engine: NotifierEngine def __is_finder_enabled(self, config: ConfigParser) -> bool: """Check if Finder Module is Enabled.""" @@ -43,11 +43,11 @@ def __load_rules(self, config: ConfigParser) -> None: 'notifier': config[sec]['notifier'], }) - def configure(self, config: ConfigParser) -> None: + def configure(self, config: ConfigParser, notification_engine: NotifierEngine) -> None: """Configure Finder.""" self.is_finder_enabled = self.__is_finder_enabled(config=config) self.__load_rules(config=config) - self.notification_engine.configure(config=config) + self.notification_engine = notification_engine async def run(self, entity: Optional[FinderNotificationMessageEntity], source: str) -> None: """Execute the Finder with Raw Text. diff --git a/TEx/models/facade/finder_notification_facade_entity.py b/TEx/models/facade/finder_notification_facade_entity.py index 82c43d1..8fb6dd5 100644 --- a/TEx/models/facade/finder_notification_facade_entity.py +++ b/TEx/models/facade/finder_notification_facade_entity.py @@ -17,10 +17,10 @@ class FinderNotificationMessageEntity(BaseModel): date_time: datetime raw_text: str group_name: Optional[str] - group_id: int + group_id: Optional[int] from_id: Optional[int] to_id: Optional[int] reply_to_msg_id: Optional[int] - message_id: int - is_reply: bool + message_id: Optional[int] + is_reply: Optional[bool] downloaded_media_info: Optional[MediaHandlingEntity] diff --git a/TEx/models/facade/signal_entity_model.py b/TEx/models/facade/signal_entity_model.py new file mode 100644 index 0000000..705904b --- /dev/null +++ b/TEx/models/facade/signal_entity_model.py @@ -0,0 +1,16 @@ +"""Signal Entity.""" +from __future__ import annotations + +from typing import Dict + +from pydantic import BaseModel, ConfigDict + + +class SignalEntity(BaseModel): + """Signal Entity.""" + + model_config = ConfigDict(extra='forbid') + + enabled: bool + keep_alive_interval: int + notifiers: Dict diff --git a/TEx/models/facade/signal_notification_model.py b/TEx/models/facade/signal_notification_model.py new file mode 100644 index 0000000..bc60b85 --- /dev/null +++ b/TEx/models/facade/signal_notification_model.py @@ -0,0 +1,16 @@ +"""Facade Entities for Signal based Notifications.""" +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, ConfigDict + + +class SignalNotificationEntityModel(BaseModel): + """Facade Entities for Signal based Notifications.""" + + model_config = ConfigDict(extra='forbid') + + signal: str + date_time: datetime + content: str diff --git a/TEx/modules/telegram_messages_listener.py b/TEx/modules/telegram_messages_listener.py index 26f949e..4589e69 100644 --- a/TEx/modules/telegram_messages_listener.py +++ b/TEx/modules/telegram_messages_listener.py @@ -1,9 +1,12 @@ """Telegram Group Listener.""" from __future__ import annotations +import asyncio +import contextlib import logging +import signal from configparser import ConfigParser -from typing import Dict, List, Optional, cast +from typing import Dict, List, Optional, Tuple, cast import pytz from telethon import TelegramClient, events @@ -22,6 +25,8 @@ from TEx.database.telegram_group_database import TelegramGroupDatabaseManager, TelegramMessageDatabaseManager, TelegramUserDatabaseManager from TEx.finder.finder_engine import FinderEngine from TEx.models.facade.media_handler_facade_entity import MediaHandlingEntity +from TEx.notifier.notifier_engine import NotifierEngine +from TEx.notifier.signals_engine import SignalsEngine, SignalsEngineFactory logger = logging.getLogger('TelegramExplorer') @@ -45,7 +50,21 @@ def __init__(self) -> None: self.media_handler: UniversalTelegramMediaHandler = UniversalTelegramMediaHandler() self.target_phone_number: str = '' self.finder: FinderEngine = FinderEngine() + self.notification_engine: NotifierEngine = NotifierEngine() self.ocr_engine: OcrEngineBase + self.signals_engine: SignalsEngine + self.term_signal: bool = False + self.sleep_task: asyncio.Task + + def __handle_term_signal(self, *args: Tuple) -> None: + """Handle the Interruption and Termination Signals.""" + self.term_signal = True + + # If Have an Active Sleep, Cancel + if self.sleep_task: + self.sleep_task.cancel() + + logger.warning('\t\tTermination Signal Received, please wait to Stop Processing Gracefully.') async def __handler(self, event: NewMessage.Event) -> None: """Handle the Message.""" @@ -113,6 +132,9 @@ async def __handler(self, event: NewMessage.Event) -> None: # Add to DB TelegramMessageDatabaseManager.insert(values) + # Update Signals Engine + self.signals_engine.inc_messages_sent() + def __build_final_message(self, message: str, ocr_data: Optional[str]) -> str: """Compute Final Message for Dict.""" h_result: str = '' @@ -178,6 +200,12 @@ async def __ensure_group_exists(self, event: NewMessage.Event) -> None: TelegramGroupDatabaseManager.insert_or_update(group_dict_data) + # Send Signal + await self.signals_engine.new_group( + group_id=str(group_dict_data['id']), + group_title=group_dict_data['title'], + ) + async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None: """Execute Module.""" if not await self.can_activate(config, args, data): @@ -190,8 +218,18 @@ async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None: self.target_phone_number = config['CONFIGURATION']['phone_number'] try: + # Attach Termination Signals + signal.signal(signal.SIGINT, self.__handle_term_signal) # type: ignore + signal.signal(signal.SIGTERM, self.__handle_term_signal) # type: ignore + + # Set Notification Engines + self.notification_engine.configure(config=config) + # Set Finder - self.finder.configure(config=config) + self.finder.configure( + config=config, + notification_engine=self.notification_engine, + ) # Setup Media Handler self.media_handler.configure(config=config) @@ -199,6 +237,13 @@ async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None: # Set OCR Engine self.ocr_engine = OcrEngineFactory.get_instance(config=config) + # Set Keep Alive Settings + self.signals_engine = SignalsEngineFactory.get_instance( + config=config, + notification_engine=self.notification_engine, + source=self.target_phone_number, + ) + except AttributeError as ex: logger.fatal(ex) data['internals']['panic'] = True @@ -221,6 +266,43 @@ async def run(self, config: ConfigParser, args: Dict, data: Dict) -> None: # Read all Messages from Now logger.info('\t\tListening New Messages...') - await client.run_until_disconnected() # Code Stops Here until telegram disconnects + + # Send Init Signal + await self.signals_engine.init() + + # Loop Until Signal Termination + while not self.term_signal: + + if client.is_connected(): + self.sleep_task = asyncio.create_task(self.__sleep()) + await self.sleep_task + + else: + break # Future: Handle Reconnection + Configure Reconnection in config file + + # Send Keep-Alive Signal + await self.signals_engine.keep_alive() + + # Disconnect Telegram Client + await self.__disconnect(client=client) + + async def __disconnect(self, client: TelegramClient) -> None: + """Disconnect Telegram Client.""" + # Disconnect the Client + client.disconnect() + + # Wait Disconnect + while client.is_connected(): + logger.info('\t\tWaiting Client Disconnection...') + await asyncio.sleep(1) logger.info('\t\tTelegram Client Disconnected...') + + # Send Shutdown Signal + await self.signals_engine.shutdown() + + async def __sleep(self) -> None: + """Allow Sleep Function to be Canceled.""" + with contextlib.suppress(asyncio.CancelledError): + await asyncio.sleep(self.signals_engine.keep_alive_interval) + diff --git a/TEx/notifier/discord_notifier.py b/TEx/notifier/discord_notifier.py index 6a0b179..b5d4632 100644 --- a/TEx/notifier/discord_notifier.py +++ b/TEx/notifier/discord_notifier.py @@ -1,9 +1,13 @@ """Discord Notifier.""" +from __future__ import annotations + from configparser import SectionProxy +from typing import Union from discord_webhook import DiscordEmbed, DiscordWebhook from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity +from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel from TEx.notifier.notifier_base import BaseNotifier @@ -20,32 +24,71 @@ def configure(self, url: str, config: SectionProxy) -> None: self.url = url self.configure_base(config=config) - async def run(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str) -> None: + async def run(self, entity: Union[FinderNotificationMessageEntity, SignalNotificationEntityModel], rule_id: str, source: str) -> None: """Run Discord Notifier.""" - # Check and Update Deduplication Control - is_duplicated, duplication_tag = self.check_is_duplicated(message=entity.raw_text) - if is_duplicated: - return - - # Run the Notification Process. - webhook = DiscordWebhook( - url=self.url, - rate_limit_retry=True, + embed: DiscordEmbed + if isinstance(entity, FinderNotificationMessageEntity): + is_duplicated, duplication_tag = self.check_is_duplicated(message=entity.raw_text) + if is_duplicated: + return + + embed = await self.__get_finder_notification_embed( + entity=entity, + rule_id=rule_id, + source=source, + duplication_tag=duplication_tag, ) + else: + embed = await self.__get_signal_notification_embed( + entity=entity, + source=source, + ) + + # Run the Notification Process + webhook = DiscordWebhook(url=self.url, rate_limit_retry=True) + webhook.add_embed(embed) + webhook.execute() + + async def __get_signal_notification_embed(self, entity: SignalNotificationEntityModel, source: str) -> DiscordEmbed: + """Return the Embed Object for Signals.""" embed = DiscordEmbed( - title=f'**{entity.group_name}** ({entity.group_id})', + title=entity.signal, + description=entity.content, + ) + + embed.add_embed_field(name='Source', value=source, inline=True) + embed.add_embed_field(name='Message Date', value=str(entity.date_time), inline=True) + + return embed + + async def __get_finder_notification_embed(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str, duplication_tag: str) -> DiscordEmbed: + """Return the Embed Object for Notification.""" + # Build Title + title: str = '' + if entity.group_name and entity.group_id: + title = f'**{entity.group_name}** ({entity.group_id})' + elif entity.group_name: + title = f'**{entity.group_name}**' + elif entity.group_id: + title = f'**{entity.group_id}**' + + embed = DiscordEmbed( + title=title, description=entity.raw_text, ) embed.add_embed_field(name='Source', value=source, inline=True) embed.add_embed_field(name='Rule', value=rule_id, inline=True) - embed.add_embed_field(name='Message ID', value=str(entity.message_id), inline=False) - embed.add_embed_field(name='Group Name', value=entity.group_name if entity.group_name else '', inline=True) - embed.add_embed_field(name='Group ID', value=str(entity.group_id), inline=True) + + if entity.message_id: + embed.add_embed_field(name='Message ID', value=str(entity.message_id), inline=False) + + if entity.group_id: + embed.add_embed_field(name='Group Name', value=entity.group_name if entity.group_name else '', inline=True) + embed.add_embed_field(name='Group ID', value=str(entity.group_id), inline=True) + embed.add_embed_field(name='Message Date', value=str(entity.date_time), inline=False) embed.add_embed_field(name='Tag', value=duplication_tag, inline=False) - # add embed object to webhook - webhook.add_embed(embed) - webhook.execute() + return embed diff --git a/TEx/notifier/elastic_search_notifier.py b/TEx/notifier/elastic_search_notifier.py index b6c1060..8415d97 100644 --- a/TEx/notifier/elastic_search_notifier.py +++ b/TEx/notifier/elastic_search_notifier.py @@ -2,12 +2,13 @@ from __future__ import annotations from configparser import SectionProxy -from typing import Dict, Optional +from typing import Dict, Optional, Union import pytz from elasticsearch import AsyncElasticsearch from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity +from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel from TEx.notifier.notifier_base import BaseNotifier @@ -31,17 +32,40 @@ def configure(self, config: SectionProxy) -> None: api_key=config.get('api_key', fallback=None), verify_certs=config.get('verify_ssl_cert', fallback='True') == 'True', cloud_id=config.get('cloud_id', fallback=None), - request_timeout=10, - max_retries=5, + request_timeout=20, + max_retries=10, + ssl_show_warn=False, ) self.index = config['index_name'] self.pipeline = config['pipeline_name'] - async def run(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str) -> None: + async def run(self, entity: Union[FinderNotificationMessageEntity, SignalNotificationEntityModel], rule_id: str, source: str) -> None: """Run Elastic Search Notifier.""" if not self.client: return + content: Dict + + if isinstance(entity, FinderNotificationMessageEntity): + content = await self.__get_dict_for_finder_notification( + entity=entity, + rule_id=rule_id, + source=source, + ) + else: + content = await self.__get_dict_for_signal_notification( + entity=entity, + source=source, + ) + + await self.client.index( + index=self.index, + pipeline=self.pipeline, + document=content, + ) + + async def __get_dict_for_finder_notification(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str) -> Dict: + """Return the Dict for Finder Notifications.""" content: Dict = { 'time': entity.date_time.astimezone(tz=pytz.utc), 'source': source, @@ -65,9 +89,16 @@ async def run(self, entity: FinderNotificationMessageEntity, rule_id: str, sourc content['media_mime_type'] = None content['media_size'] = None - await self.client.index( - index=self.index, - pipeline=self.pipeline, - id=f'{str(entity.group_id)}_{str(entity.message_id)}', - document=content, - ) + return content + + async def __get_dict_for_signal_notification(self, entity: SignalNotificationEntityModel, source: str) -> Dict: + """Return the Dict for Signal Notifications.""" + content: Dict = { + 'time': entity.date_time.astimezone(tz=pytz.utc), + 'source': source, + 'signal': entity.signal, + 'content': entity.content, + } + + return content + diff --git a/TEx/notifier/notifier_base.py b/TEx/notifier/notifier_base.py index f557e11..97dfc7d 100644 --- a/TEx/notifier/notifier_base.py +++ b/TEx/notifier/notifier_base.py @@ -4,11 +4,12 @@ import abc import hashlib from configparser import SectionProxy -from typing import Optional, Tuple +from typing import Optional, Tuple, Union from cachetools import TTLCache from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity +from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel class BaseNotifier: @@ -39,5 +40,5 @@ def check_is_duplicated(self, message: str) -> Tuple[bool, str]: return False, tag @abc.abstractmethod - async def run(self, entity: FinderNotificationMessageEntity, rule_id: str, source: str) -> None: + async def run(self, entity: Union[FinderNotificationMessageEntity, SignalNotificationEntityModel], rule_id: str, source: str) -> None: """Run the Notification Process.""" diff --git a/TEx/notifier/notifier_engine.py b/TEx/notifier/notifier_engine.py index f4308bd..30d3be0 100644 --- a/TEx/notifier/notifier_engine.py +++ b/TEx/notifier/notifier_engine.py @@ -1,14 +1,18 @@ """Notifier Modules.""" from __future__ import annotations +import logging from configparser import ConfigParser -from typing import Dict, List +from typing import Dict, List, Union from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity +from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel from TEx.notifier.discord_notifier import DiscordNotifier from TEx.notifier.elastic_search_notifier import ElasticSearchNotifier from TEx.notifier.notifier_base import BaseNotifier +logger = logging.getLogger('TelegramExplorer') + class NotifierEngine: """Primary Notification Engine.""" @@ -43,7 +47,7 @@ def configure(self, config: ConfigParser) -> None: """Configure Finder.""" self.__load_notifiers(config) - async def run(self, notifiers: List[str], entity: FinderNotificationMessageEntity, rule_id: str, source: str) -> None: + async def run(self, notifiers: List[str], entity: Union[FinderNotificationMessageEntity, SignalNotificationEntityModel], rule_id: str, source: str) -> None: """Dispatch all Notifications. :param notifiers: @@ -58,4 +62,9 @@ async def run(self, notifiers: List[str], entity: FinderNotificationMessageEntit for dispatcher_name in notifiers: target_notifier: BaseNotifier = self.notifiers[dispatcher_name]['instance'] - await target_notifier.run(entity=entity, rule_id=rule_id, source=source) + + try: + await target_notifier.run(entity=entity, rule_id=rule_id, source=source) + + except Exception: # Yes, Catch All + logging.exception('Unable to Send Notification') diff --git a/TEx/notifier/signals_engine.py b/TEx/notifier/signals_engine.py new file mode 100644 index 0000000..9db185b --- /dev/null +++ b/TEx/notifier/signals_engine.py @@ -0,0 +1,103 @@ +"""Signals Notification Engine.""" +from __future__ import annotations + +from configparser import ConfigParser +from datetime import datetime +from typing import List + +import pytz + +from TEx.core.mapper.keep_alive_entity_mapper import SignalEntityMapper +from TEx.models.facade.signal_entity_model import SignalEntity +from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel +from TEx.notifier.notifier_engine import NotifierEngine + + +class SignalsEngineFactory: + """Signals Notification Engine Factory.""" + + @staticmethod + def get_instance(config: ConfigParser, notification_engine: NotifierEngine, source: str) -> SignalsEngine: + """Get the Signals Engine Instance.""" + return SignalsEngine( + entity=SignalEntityMapper.to_entity(section_proxy=config['SIGNALS'] if config.has_section('SIGNALS') else None), + notification_engine=notification_engine, + source=source, + ) + + +class SignalsEngine: + """Signals Notification Engine.""" + + def __init__(self, entity: SignalEntity, notification_engine: NotifierEngine, source: str) -> None: + """Initialize the Signals Engine.""" + self.signal_entity: SignalEntity = entity + self.messages_sent: int = 0 + self.notification_engine: NotifierEngine = notification_engine + self.source: str = source + + @property + def keep_alive_interval(self) -> int: + """Return the Keep Alive Engine Interval.""" + return self.signal_entity.keep_alive_interval + + def inc_messages_sent(self) -> None: + """Increment the Messages Sent Counter.""" + self.messages_sent += 1 + + async def keep_alive(self) -> None: + """Send the Keep Alive.""" + await self.__send_signal( + entity=SignalNotificationEntityModel( + date_time=datetime.now(tz=pytz.UTC), + content=f'Messages Processed in Period: {self.messages_sent}', + signal='KEEP-ALIVE', + ), + ) + + # Reset Messages Sent Counter + self.messages_sent = 0 + + async def shutdown(self) -> None: + """Send the Shutdown.""" + await self.__send_signal( + entity=SignalNotificationEntityModel( + date_time=datetime.now(tz=pytz.UTC), + content=f'Last Messages Processed in Period: {self.messages_sent}', + signal='SHUTDOWN', + ), + ) + + async def init(self) -> None: + """Send the Shutdown.""" + await self.__send_signal( + entity=SignalNotificationEntityModel( + date_time=datetime.now(tz=pytz.UTC), + content='', + signal='INITIALIZATION', + ), + ) + + async def new_group(self, group_id: str, group_title: str) -> None: + """Send the New Group Event.""" + await self.__send_signal( + entity=SignalNotificationEntityModel( + date_time=datetime.now(tz=pytz.UTC), + content=f'ID: {group_id} | Title: "{group_title}"', + signal='NEW-GROUP', + ), + ) + + async def __send_signal(self, entity: SignalNotificationEntityModel) -> None: + """Send the Signal.""" + signal_notifiers: List[str] = self.signal_entity.notifiers[entity.signal] + + if len(signal_notifiers) == 0: + return + + await self.notification_engine.run( + notifiers=signal_notifiers, + entity=entity, + rule_id='SIGNALS', + source=self.source, + ) diff --git a/tests/finder/test_finder_engine.py b/tests/finder/test_finder_engine.py index fe58889..13f79fc 100644 --- a/tests/finder/test_finder_engine.py +++ b/tests/finder/test_finder_engine.py @@ -44,24 +44,24 @@ def test_run_with_regex_finder(self): data: Dict = {} TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) - with mock.patch('TEx.finder.finder_engine.NotifierEngine', return_value=notifier_engine_mock): - target: FinderEngine = FinderEngine() + target: FinderEngine = FinderEngine() - # Execute Discord Notifier Configure Method - target.configure( - config=self.config - ) - target.notification_engine = notifier_engine_mock + # Execute Discord Notifier Configure Method + target.configure( + config=self.config, + notification_engine=notifier_engine_mock + ) + target.notification_engine = notifier_engine_mock - loop = asyncio.get_event_loop() - loop.run_until_complete( + loop = asyncio.get_event_loop() + loop.run_until_complete( - # Invoke Test Target - target.run( - entity=message_entity, - source='+15558987453' - ) + # Invoke Test Target + target.run( + entity=message_entity, + source='+15558987453' ) + ) # Check if Webhook was Executed target.notification_engine.run.assert_has_awaits([ @@ -95,24 +95,24 @@ def test_run_not_found(self): data: Dict = {} TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) - with mock.patch('TEx.finder.finder_engine.NotifierEngine', return_value=notifier_engine_mock): - target: FinderEngine = FinderEngine() + target: FinderEngine = FinderEngine() - # Execute Discord Notifier Configure Method - target.configure( - config=self.config - ) - target.notification_engine = notifier_engine_mock + # Execute Discord Notifier Configure Method + target.configure( + config=self.config, + notification_engine=notifier_engine_mock + ) + target.notification_engine = notifier_engine_mock - loop = asyncio.get_event_loop() - loop.run_until_complete( + loop = asyncio.get_event_loop() + loop.run_until_complete( - # Invoke Test Target - target.run( - entity=message_entity, - source='+15558987453' - ) + # Invoke Test Target + target.run( + entity=message_entity, + source='+15558987453' ) + ) # Check if Webhook was Executed target.notification_engine.run.assert_not_called() diff --git a/tests/modules/test_telegram_messages_listener.py b/tests/modules/test_telegram_messages_listener.py index 47048f0..a009d03 100644 --- a/tests/modules/test_telegram_messages_listener.py +++ b/tests/modules/test_telegram_messages_listener.py @@ -39,8 +39,9 @@ def setUp(self) -> None: def tearDown(self) -> None: DbManager.SESSIONS['data'].close() + @mock.patch('TEx.modules.telegram_messages_listener.SignalsEngineFactory') @mock.patch('TEx.modules.telegram_messages_listener.OcrEngineFactory') - def test_run_listen_messages(self, mocked_ocr_engine_factory): + def test_run_listen_messages(self, mocked_ocr_engine_factory, mocked_signals_engine_factory): """Test Run Method for Listem Telegram Messages.""" # Setup Mock @@ -54,6 +55,7 @@ async def async_generator_side_effect(items): # Mock the the Message Iterator Async Method telegram_client_mockup.iter_messages = mock.MagicMock(return_value=async_generator_side_effect(base_messages_mockup_data)) + telegram_client_mockup.is_connected = mock.MagicMock(side_effect=[True, True, False, False]) # Add the Async Mocks to Messages [message for message in base_messages_mockup_data if message.id == 183018][0].download_media = mock.AsyncMock(side_effect=self.coroutine_download_photo) @@ -64,6 +66,16 @@ async def async_generator_side_effect(items): [message for message in base_messages_mockup_data if message.id == 4622199][0].download_media = mock.AsyncMock(side_effect=self.coroutine_download_text_plain) [message for message in base_messages_mockup_data if message.id == 34357][0].download_media = mock.AsyncMock(side_effect=self.coroutine_download_pdf) + # Setup the Signals Engine Mockup + mock_signals_engine = mock.MagicMock() + mock_signals_engine.keep_alive_interval = 1 + mock_signals_engine.inc_messages_sent = mock.MagicMock() + mock_signals_engine.keep_alive = mock.AsyncMock() + mock_signals_engine.shutdown = mock.AsyncMock() + mock_signals_engine.init = mock.AsyncMock() + mock_signals_engine.new_group = mock.AsyncMock() + mocked_signals_engine_factory.get_instance = mock.MagicMock(return_value=mock_signals_engine) + # Call Test Target Method target: TelegramGroupMessageListener = TelegramGroupMessageListener() args: Dict = { @@ -138,12 +150,38 @@ async def async_generator_side_effect(items): # Assert Call catch_up telegram_client_mockup.catch_up.assert_awaited_once() - # Asset Call run_until_disconnected - telegram_client_mockup.run_until_disconnected.assert_awaited_once() + # Asset Call disconnect + telegram_client_mockup.disconnect.assert_called_once() # Check if calls OCR Engine Factory Correctly mocked_ocr_engine_factory.get_instance.assert_called_once_with(config=self.config) + # Check if calls Signals Engine Correctly + mock_signals_engine.init.assert_awaited_once() + mock_signals_engine.new_group.assert_has_awaits([ + mock.call(group_id='10981', group_title='Channel Title Alpha'), + mock.call(group_id='10984', group_title='Channel Title Echo'), + mock.call(group_id='12099', group_title='johnsnow55'), + mock.call(group_id='12000', group_title='Chat 12000'), + ]) + mock_signals_engine.inc_messages_sent.assert_has_calls([ + mock.call(), mock.call(), mock.call(), + mock.call(), mock.call(), mock.call(), + mock.call(), mock.call(), mock.call() + ]) + mock_signals_engine.keep_alive.assert_has_awaits([ + mock.call(), + mock.call() + ]) + mock_signals_engine.shutdown.assert_awaited_once() + + # Check if calls Signals Engine Factory Correctly + mocked_signals_engine_factory.get_instance.assert_called_once_with( + config=self.config, + notification_engine=target.notification_engine, + source='5526986587745' + ) + # Check Logs self.assertEqual(18, len(captured.records)) self.assertEqual('\t\tListening Past Messages...', captured.records[0].message) @@ -342,8 +380,9 @@ async def async_generator_side_effect(items): # TODO: Check if calls OCR Engine Correctly + @mock.patch('TEx.modules.telegram_messages_listener.SignalsEngineFactory') @mock.patch('TEx.modules.telegram_messages_listener.OcrEngineFactory') - def test_run_listen_messages_filtered(self, mocked_ocr_engine_factory): + def test_run_listen_messages_filtered(self, mocked_ocr_engine_factory, mocked_signals_engine_factory): """Test Run Method for Listem Filtered Messages.""" # Setup Mock @@ -356,6 +395,7 @@ async def async_generator_side_effect(items): # Mock the the Message Iterator Async Method telegram_client_mockup.iter_messages = mock.MagicMock(return_value=async_generator_side_effect(base_messages_mockup_data)) + telegram_client_mockup.is_connected = mock.MagicMock(side_effect=[True, True, False, False]) mocked_ocr_engine_factory.get_instance = mock.Mock(return_value=UnitTestEchoOcrEngine(echo_message='brown dog jumped over the lazy fox.\n')) # Add the Async Mocks to Messages @@ -367,6 +407,16 @@ async def async_generator_side_effect(items): [message for message in base_messages_mockup_data if message.id == 4622199][0].download_media = mock.AsyncMock(side_effect=self.coroutine_download_text_plain) [message for message in base_messages_mockup_data if message.id == 34357][0].download_media = mock.AsyncMock(side_effect=self.coroutine_download_pdf) + # Setup the Signals Engine Mockup + mock_signals_engine = mock.MagicMock() + mock_signals_engine.keep_alive_interval = 1 + mock_signals_engine.inc_messages_sent = mock.MagicMock() + mock_signals_engine.keep_alive = mock.AsyncMock() + mock_signals_engine.shutdown = mock.AsyncMock() + mock_signals_engine.init = mock.AsyncMock() + mock_signals_engine.new_group = mock.AsyncMock() + mocked_signals_engine_factory.get_instance = mock.MagicMock(return_value=mock_signals_engine) + # Call Test Target Method target: TelegramGroupMessageListener = TelegramGroupMessageListener() args: Dict = { @@ -422,15 +472,6 @@ async def async_generator_side_effect(items): target._TelegramGroupMessageListener__handler(event=mocked_event) ) - # Assert Event Handler Added - telegram_client_mockup.add_event_handler.assert_called_once_with(mock.ANY, NewMessage) - - # Assert Call catch_up - telegram_client_mockup.catch_up.assert_awaited_once() - - # Asset Call run_until_disconnected - telegram_client_mockup.run_until_disconnected.assert_awaited_once() - for message in captured.records: print(message.message) diff --git a/tests/notifier/test_elastic_search_notifier.py b/tests/notifier/test_elastic_search_notifier.py index 16ab7cb..78c118d 100644 --- a/tests/notifier/test_elastic_search_notifier.py +++ b/tests/notifier/test_elastic_search_notifier.py @@ -138,7 +138,6 @@ def test_run_without_downloaded_file(self): self.assertEqual(call_arg['index'], 'test_index_name') self.assertEqual(call_arg['pipeline'], 'test_pipeline_name') - self.assertEqual(call_arg['id'], '1972142108_5975883') submited_document = call_arg['document'] expected_document = { @@ -218,7 +217,6 @@ def test_run_with_downloaded_file(self): self.assertEqual(call_arg['index'], 'test_index_name') self.assertEqual(call_arg['pipeline'], 'test_pipeline_name') - self.assertEqual(call_arg['id'], '1972142108_5975883') submited_document = call_arg['document'] expected_document = { diff --git a/tests/notifier/test_signals_engine.py b/tests/notifier/test_signals_engine.py new file mode 100644 index 0000000..b98b09f --- /dev/null +++ b/tests/notifier/test_signals_engine.py @@ -0,0 +1,261 @@ +import asyncio +import datetime +import unittest +from configparser import ConfigParser +from typing import Dict +from unittest import mock +from unittest.mock import ANY, call + +import pytz + +from TEx.models.facade.signal_notification_model import SignalNotificationEntityModel +from TEx.notifier.signals_engine import SignalsEngine, SignalsEngineFactory +from tests.modules.common import TestsCommon + + +class SignalsEngineFactoryTest(unittest.TestCase): + + def setUp(self) -> None: + self.config = ConfigParser() + self.config.read('../../config.ini') + + @mock.patch('TEx.notifier.signals_engine.NotifierEngine') + def test_get_instance(self, mocked_signal_engine): + """Test get_instance Method with Success.""" + args: Dict = { + 'config': 'unittest_configfile.config', + } + data: Dict = {} + TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) + + h_result: SignalsEngine = SignalsEngineFactory.get_instance( + config=self.config, + notification_engine=mocked_signal_engine, + source='+1234567809' + ) + + # Check Results + self.assertEqual(mocked_signal_engine, h_result.notification_engine) + self.assertTrue(h_result.signal_entity.enabled) + self.assertEqual(2, h_result.signal_entity.keep_alive_interval) + self.assertEqual(['NOTIFIER.DISCORD.NOT_001'], h_result.signal_entity.notifiers['KEEP-ALIVE']) + self.assertEqual(['NOTIFIER.ELASTIC_SEARCH.UT_01'], h_result.signal_entity.notifiers['INITIALIZATION']) + self.assertEqual(['NOTIFIER.DISCORD.NOT_001', 'NOTIFIER.ELASTIC_SEARCH.UT_01'], h_result.signal_entity.notifiers['SHUTDOWN']) + self.assertEqual(['NOTIFIER.ELASTIC_SEARCH.UT_01', 'NOTIFIER.DISCORD.NOT_001'], h_result.signal_entity.notifiers['NEW-GROUP']) + + @mock.patch('TEx.notifier.signals_engine.NotifierEngine') + def test_get_instance_without_signals_on_config_file(self, mocked_signal_engine): + """Test get_instance Method with Success When Have no SIGNALS section on Config File.""" + args: Dict = { + 'config': 'unittest_configfile.config', + } + data: Dict = {} + TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) + + self.config.remove_section('SIGNALS') + + h_result: SignalsEngine = SignalsEngineFactory.get_instance( + config=self.config, + notification_engine=mocked_signal_engine, + source='+1234567809' + ) + + # Check Results + self.assertEqual(mocked_signal_engine, h_result.notification_engine) + self.assertFalse(h_result.signal_entity.enabled) + self.assertEqual(300, h_result.signal_entity.keep_alive_interval) + self.assertEqual([], h_result.signal_entity.notifiers['KEEP-ALIVE']) + self.assertEqual([], h_result.signal_entity.notifiers['INITIALIZATION']) + self.assertEqual([], h_result.signal_entity.notifiers['SHUTDOWN']) + self.assertEqual([], h_result.signal_entity.notifiers['NEW-GROUP']) + + +class SignalsEngineTest(unittest.TestCase): + + def setUp(self) -> None: + self.config = ConfigParser() + self.config.read('../../config.ini') + + @mock.patch('TEx.notifier.signals_engine.NotifierEngine') + def test_keep_alive(self, mocked_signal_engine): + """Test keep_alive method.""" + args: Dict = { + 'config': 'unittest_configfile.config', + } + data: Dict = {} + TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) + + # Configure Mock + mocked_signal_engine.run = mock.AsyncMock() + + target: SignalsEngine = SignalsEngineFactory.get_instance( + config=self.config, + notification_engine=mocked_signal_engine, + source='+1234567809' + ) + for i in range(10): + target.inc_messages_sent() + + # Invoke Test Target + loop = asyncio.get_event_loop() + loop.run_until_complete( + target.keep_alive() + ) + + # Check Results + mocked_signal_engine.run.assert_has_awaits([ + call( + notifiers=['NOTIFIER.DISCORD.NOT_001'], + entity=ANY, + rule_id='SIGNALS', + source='+1234567809' + ) + ]) + + # Check Entity Used + self.__check_result_entity( + mocked_signal_engine=mocked_signal_engine, + signal='KEEP-ALIVE', + message='Messages Processed in Period: 10' + ) + + # Check the Messages Sent Counter Reset + self.assertEqual(0, target.messages_sent) + + @mock.patch('TEx.notifier.signals_engine.NotifierEngine') + def test_shutdown(self, mocked_signal_engine): + """Test shutdown Method.""" + args: Dict = { + 'config': 'unittest_configfile.config', + } + data: Dict = {} + TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) + + # Configure Mock + mocked_signal_engine.run = mock.AsyncMock() + + target: SignalsEngine = SignalsEngineFactory.get_instance( + config=self.config, + notification_engine=mocked_signal_engine, + source='+1234567802' + ) + for i in range(9): + target.inc_messages_sent() + + # Invoke Test Target + loop = asyncio.get_event_loop() + loop.run_until_complete( + target.shutdown() + ) + + # Check Results + mocked_signal_engine.run.assert_has_awaits([ + call( + notifiers=['NOTIFIER.DISCORD.NOT_001', 'NOTIFIER.ELASTIC_SEARCH.UT_01'], + entity=ANY, + rule_id='SIGNALS', + source='+1234567802' + ) + ]) + + # Check Entity Used + self.__check_result_entity( + mocked_signal_engine=mocked_signal_engine, + signal='SHUTDOWN', + message='Last Messages Processed in Period: 9' + ) + + @mock.patch('TEx.notifier.signals_engine.NotifierEngine') + def test_init(self, mocked_signal_engine): + """Test init Method.""" + args: Dict = { + 'config': 'unittest_configfile.config', + } + data: Dict = {} + TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) + + # Configure Mock + mocked_signal_engine.run = mock.AsyncMock() + + target: SignalsEngine = SignalsEngineFactory.get_instance( + config=self.config, + notification_engine=mocked_signal_engine, + source='+1234567801' + ) + + # Invoke Test Target + loop = asyncio.get_event_loop() + loop.run_until_complete( + target.init() + ) + + # Check Results + mocked_signal_engine.run.assert_has_awaits([ + call( + notifiers=['NOTIFIER.ELASTIC_SEARCH.UT_01'], + entity=ANY, + rule_id='SIGNALS', + source='+1234567801' + ) + ]) + + # Check Entity Used + self.__check_result_entity( + mocked_signal_engine=mocked_signal_engine, + signal='INITIALIZATION', + message='' + ) + + @mock.patch('TEx.notifier.signals_engine.NotifierEngine') + def test_new_group(self, mocked_signal_engine): + """Test new_group Method.""" + args: Dict = { + 'config': 'unittest_configfile.config', + } + data: Dict = {} + TestsCommon.execute_basic_pipeline_steps_for_initialization(config=self.config, args=args, data=data) + + # Configure Mock + mocked_signal_engine.run = mock.AsyncMock() + + target: SignalsEngine = SignalsEngineFactory.get_instance( + config=self.config, + notification_engine=mocked_signal_engine, + source='+12345678451' + ) + + # Invoke Test Target + loop = asyncio.get_event_loop() + loop.run_until_complete( + target.new_group( + group_id='9988', + group_title='UT Group Title' + ) + ) + + # Check Results + mocked_signal_engine.run.assert_has_awaits([ + call( + notifiers=['NOTIFIER.ELASTIC_SEARCH.UT_01', 'NOTIFIER.DISCORD.NOT_001'], + entity=ANY, + rule_id='SIGNALS', + source='+12345678451' + ) + ]) + + # Check Entity Used + self.__check_result_entity( + mocked_signal_engine=mocked_signal_engine, + signal='NEW-GROUP', + message='ID: 9988 | Title: "UT Group Title"' + ) + + def __check_result_entity(self, mocked_signal_engine, signal: str, message: str): + + # Check Entity Used + entity: SignalNotificationEntityModel = mocked_signal_engine.run.mock_calls[0][2]['entity'] + self.assertEqual(signal, entity.signal) + self.assertEqual(message, entity.content) + + time_delta_seconds: datetime.timedelta = datetime.datetime.now(tz=pytz.UTC) - entity.date_time + self.assertTrue(time_delta_seconds.seconds <= 1) diff --git a/tests/unittest_configfile.config b/tests/unittest_configfile.config index f73eb83..0bd9cd8 100644 --- a/tests/unittest_configfile.config +++ b/tests/unittest_configfile.config @@ -84,3 +84,12 @@ api_key=test_api_key verify_ssl_cert=False index_name=test_index_name pipeline_name=test_pipeline_name + +[SIGNALS] +enabled=true +keep_alive_interval=2 + +keep_alive_notifer=NOTIFIER.DISCORD.NOT_001 +initialization_notifer=NOTIFIER.ELASTIC_SEARCH.UT_01 +shutdown_notifer=NOTIFIER.DISCORD.NOT_001,NOTIFIER.ELASTIC_SEARCH.UT_01 +new_group_notifer=NOTIFIER.ELASTIC_SEARCH.UT_01,NOTIFIER.DISCORD.NOT_001 \ No newline at end of file