-
Notifications
You must be signed in to change notification settings - Fork 47
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added Notification & Finder Modules + UnitTests
Signed-off-by: Guilherme Bacellar Moralez <guibacellar@gmail.com>
- Loading branch information
1 parent
35ea745
commit d7981fd
Showing
21 changed files
with
512 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""TEx Finder Modules.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
"""Base Class for All Finders.""" | ||
import abc | ||
|
||
|
||
class BaseFinder: | ||
"""Base Finder Class.""" | ||
|
||
@abc.abstractmethod | ||
async def find(self, raw_text: str) -> bool: | ||
"""Apply Find Logic.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
"""Finder Engine.""" | ||
from configparser import ConfigParser | ||
from typing import Dict, List | ||
|
||
from telethon.events import NewMessage | ||
|
||
from TEx.finder.regex_finder import RegexFinder | ||
from TEx.notifier.notifier_engine import NotifierEngine | ||
|
||
|
||
class FinderEngine: | ||
"""Primary Finder Engine.""" | ||
|
||
def __init__(self) -> None: | ||
"""Initialize Finder Engine.""" | ||
self.is_finder_enabled: bool = False | ||
self.rules: List[Dict] = [] | ||
self.notification_engine: NotifierEngine = NotifierEngine() | ||
|
||
def __is_finder_enabled(self, config: ConfigParser) -> bool: | ||
"""Check if Finder Module is Enabled.""" | ||
return ( | ||
config.has_option('FINDER', 'enabled') and config['FINDER']['enabled'] == 'true' | ||
) | ||
|
||
def __load_rules(self, config: ConfigParser) -> None: | ||
"""Load Finder Rules.""" | ||
rules_sections: List[str] = [item for item in config.sections() if 'FINDER.RULE.' in item] | ||
|
||
for sec in rules_sections: | ||
if config[sec]['type'] == 'regex': | ||
self.rules.append({ | ||
'id': sec, | ||
'instance': RegexFinder(config=config[sec]), | ||
'notifier': config[sec]['notifier'] | ||
}) | ||
|
||
def configure(self, config: ConfigParser) -> None: | ||
"""Configure Finder.""" | ||
self.is_finder_enabled = self.__is_finder_enabled(config=config) | ||
self.__load_rules(config=config) | ||
self.notification_engine.configure(config=config) | ||
|
||
async def run(self, message: NewMessage.Event) -> None: | ||
"""Execute the Finder with Raw Text.""" | ||
if not self.is_finder_enabled: | ||
return | ||
|
||
for rule in self.rules: | ||
is_found: bool = await rule['instance'].find(raw_text=message.raw_text) | ||
|
||
if is_found: | ||
|
||
# Runt the Notification Engine | ||
await self.notification_engine.run( | ||
notifiers=rule['notifier'].split(','), | ||
message=message, | ||
rule_id=rule['id'] | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
"""Regex Finder.""" | ||
import re | ||
from configparser import SectionProxy | ||
|
||
from TEx.finder.base_finder import BaseFinder | ||
|
||
|
||
class RegexFinder(BaseFinder): | ||
"""Regex Based Finder.""" | ||
|
||
def __init__(self, config: SectionProxy) -> None: | ||
"""Initialize RegEx Finder.""" | ||
self.regex: re.Pattern = re.compile(config['regex'], flags=re.IGNORECASE | re.MULTILINE) | ||
|
||
async def find(self, raw_text: str) -> bool: | ||
"""Apply Find Logic.""" | ||
if not raw_text or len(raw_text) == 0: | ||
return False | ||
|
||
if len(self.regex.findall(raw_text)) > 0: | ||
return True | ||
|
||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Notifier Modules.""" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
"""Discord Notifier.""" | ||
from configparser import SectionProxy | ||
|
||
from discord_webhook import DiscordEmbed, DiscordWebhook | ||
from telethon.events import NewMessage | ||
|
||
from TEx.notifier.notifier_base import BaseNotifier | ||
|
||
|
||
class DiscordNotifier(BaseNotifier): | ||
"""Basic Discord Notifier.""" | ||
|
||
def __init__(self) -> None: | ||
"""Initialize Discord Notifier.""" | ||
super().__init__() | ||
self.url: str = '' | ||
|
||
def configure(self, url: str, config: SectionProxy) -> None: | ||
"""Configure the Notifier.""" | ||
self.url = url | ||
self.configure_base(config=config) | ||
|
||
async def run(self, message: NewMessage.Event, rule_id: str) -> None: | ||
"""Run Discord Notifier.""" | ||
# Check and Update Deduplication Control | ||
is_duplicated, duplication_tag = self.check_is_duplicated(message=message.raw_text) | ||
if is_duplicated: | ||
return | ||
|
||
# Run the Notification Process. | ||
webhook = DiscordWebhook( | ||
url=self.url, | ||
rate_limit_retry=True | ||
) | ||
|
||
embed = DiscordEmbed( | ||
title=f'**{message.chat.title}** ({message.chat.id})', | ||
description=message.raw_text | ||
) | ||
|
||
embed.add_embed_field(name="Rule", value=rule_id, inline=False) | ||
embed.add_embed_field(name="Message ID", value=str(message.id), inline=False) | ||
embed.add_embed_field(name="Group Name", value=message.chat.title, inline=True) | ||
embed.add_embed_field(name="Group ID", value=message.chat.id, inline=True) | ||
embed.add_embed_field(name="Message Date", value=str(message.date), inline=False) | ||
embed.add_embed_field(name="Tag", value=duplication_tag, inline=False) | ||
|
||
# add embed object to webhook | ||
webhook.add_embed(embed) | ||
webhook.execute() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
"""Base Class for All Notifiers.""" | ||
import abc | ||
import hashlib | ||
from configparser import SectionProxy | ||
from typing import Optional, Tuple | ||
|
||
from cachetools import TTLCache | ||
from telethon.events import NewMessage | ||
|
||
|
||
class BaseNotifier: | ||
"""Base Notifier.""" | ||
|
||
def __init__(self) -> None: | ||
"""Initialize the Base Notifier.""" | ||
self.cache: Optional[TTLCache] = None | ||
|
||
def configure_base(self, config: SectionProxy) -> None: | ||
"""Configure Base Notifier.""" | ||
self.cache = TTLCache(maxsize=4096, ttl=int(config['prevent_duplication_for_minutes']) * 60) | ||
|
||
def check_is_duplicated(self, message: str) -> Tuple[bool, str]: | ||
"""Check if Message is Duplicated on Notifier.""" | ||
if not message or self.cache is None: | ||
return False, '' | ||
|
||
# Compute Deduplication Tag | ||
tag: str = hashlib.md5(message.encode('UTF-8')).hexdigest() # nosec | ||
|
||
# If Found, Return True | ||
if self.cache.get(tag): | ||
return True, tag | ||
|
||
# Otherwise, Just Insert and Return False | ||
self.cache[tag] = True | ||
return False, tag | ||
|
||
@abc.abstractmethod | ||
async def run(self, message: NewMessage.Event, rule_id: str) -> None: | ||
"""Run the Notification Process.""" |
Oops, something went wrong.