Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

12 feature request elastic search connector #34

Merged
merged 13 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions TEx/core/mapper/telethon_message_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""Telethon Event Entity Mapper."""
from __future__ import annotations

from typing import Optional, Union

from pydantic import BaseModel
from telethon.tl.patched import Message
from telethon.tl.types import Channel, Chat, PeerUser, User

from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity
from TEx.models.facade.media_handler_facade_entity import MediaHandlingEntity


class TelethonMessageEntityMapper:
"""Telethon Event Entity Mapper."""

class ChatPropsModel(BaseModel):
"""Model for __map_chat_props method."""

chat_id: int
chat_title: str

@staticmethod
async def to_finder_notification_facade_entity(message: Message, downloaded_media_info: Optional[MediaHandlingEntity]) -> \
Optional[FinderNotificationMessageEntity]:
"""Map Telethon Event to FinderNotificationMessageEntity."""
if not message:
return None

mapped_chat_props: TelethonMessageEntityMapper.ChatPropsModel = TelethonMessageEntityMapper.__map_chat_props(
entity=await message.get_chat(),
)

h_result: FinderNotificationMessageEntity = FinderNotificationMessageEntity(
date_time=message.date,
raw_text=message.raw_text,
group_name=mapped_chat_props.chat_title,
group_id=mapped_chat_props.chat_id,
from_id=message.from_id.user_id if isinstance(message.from_id, PeerUser) else None,
to_id=message.to_id.channel_id if message.to_id is not None else None,
reply_to_msg_id=message.reply_to.reply_to_msg_id if message.is_reply and message.reply_to else None,
message_id=message.id,
is_reply=message.is_reply,
downloaded_media_info=downloaded_media_info,
)

return h_result

@staticmethod
def __map_chat_props(entity: Union[Channel, User, Chat]) -> TelethonMessageEntityMapper.ChatPropsModel:
"""Map Chat Specific Props."""
if isinstance(entity, (Channel, Chat)):
return TelethonMessageEntityMapper.ChatPropsModel(
chat_id=entity.id,
chat_title=entity.title if entity.title else '',
)

if isinstance(entity, User):
return TelethonMessageEntityMapper.ChatPropsModel(
chat_id=entity.id,
chat_title=entity.username if entity.username else (entity.phone if entity.phone else ''),
)

raise AttributeError(entity, 'Invalid entity type')
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Dict

from telethon.tl.types import Message
from telethon.tl.patched import Message


class DoNothingMediaDownloader:
Expand Down
2 changes: 1 addition & 1 deletion TEx/core/media_download_handling/photo_media_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
from typing import Dict

from telethon.tl.types import Message
from telethon.tl.patched import Message


class PhotoMediaDownloader:
Expand Down
2 changes: 1 addition & 1 deletion TEx/core/media_download_handling/std_media_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
from typing import Dict, List

from telethon.tl.types import Message
from telethon.tl.patched import Message


class StandardMediaDownloader:
Expand Down
12 changes: 10 additions & 2 deletions TEx/core/media_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from TEx.core.media_metadata_handling.text_handler import TextPlainHandler
from TEx.core.media_metadata_handling.webimage_handler import WebImageStickerHandler
from TEx.database.telegram_group_database import TelegramMediaDatabaseManager
from TEx.models.facade.media_handler_facade_entity import MediaHandlingEntity

logger = logging.getLogger('TelegramExplorer')

Expand Down Expand Up @@ -77,7 +78,7 @@ class UniversalTelegramMediaHandler:
},
}

async def handle_medias(self, message: Message, group_id: int, data_path: str) -> Optional[int]:
async def handle_medias(self, message: Message, group_id: int, data_path: str) -> Optional[MediaHandlingEntity]:
"""Handle Message Media, Photo, File, etc."""
executor_id: Optional[str] = self.__resolve_executor_id(message=message)

Expand Down Expand Up @@ -124,7 +125,14 @@ async def handle_medias(self, message: Message, group_id: int, data_path: str) -
# Update Reference into DB
if media_metadata is not None:
media_metadata['group_id'] = group_id
return TelegramMediaDatabaseManager.insert(entity_values=media_metadata)
media_id: int = TelegramMediaDatabaseManager.insert(entity_values=media_metadata)

return MediaHandlingEntity(
media_id=media_id,
file_name=media_metadata['file_name'],
content_type=media_metadata['mime_type'],
size_bytes=media_metadata['size_bytes'],
)

return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Dict, Optional

from telethon.tl.types import Message
from telethon.tl.patched import Message


class DoNothingHandler:
Expand Down
5 changes: 3 additions & 2 deletions TEx/core/media_metadata_handling/generic_binary_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, List, Optional

from telethon.tl.types import DocumentAttributeFilename, Message, MessageMediaDocument
from telethon.tl.patched import Message
from telethon.tl.types import DocumentAttributeFilename, MessageMediaDocument


class GenericBinaryMediaHandler:
Expand All @@ -26,4 +27,4 @@ def handle_metadata(message: Message) -> Optional[Dict]:
'size_bytes': media.document.size,
'title': None,
'name': None,
}
}
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/geo_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, Optional

from telethon.tl.types import Message, MessageMediaGeo
from telethon.tl.patched import Message
from telethon.tl.types import MessageMediaGeo


class GeoMediaHandler:
Expand Down
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/mp4_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, List, Optional

from telethon.tl.types import DocumentAttributeFilename, DocumentAttributeVideo, Message, MessageMediaDocument
from telethon.tl.patched import Message
from telethon.tl.types import DocumentAttributeFilename, DocumentAttributeVideo, MessageMediaDocument


class MediaMp4Handler:
Expand Down
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/pdf_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, Optional

from telethon.tl.types import DocumentAttributeFilename, Message, MessageMediaPhoto
from telethon.tl.patched import Message
from telethon.tl.types import DocumentAttributeFilename, MessageMediaPhoto


class PdfMediaHandler:
Expand Down
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/photo_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, Optional

from telethon.tl.types import Message, MessageMediaPhoto
from telethon.tl.patched import Message
from telethon.tl.types import MessageMediaPhoto


class PhotoMediaHandler:
Expand Down
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/sticker_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, List, Optional

from telethon.tl.types import DocumentAttributeFilename, DocumentAttributeImageSize, Message, MessageMediaDocument
from telethon.tl.patched import Message
from telethon.tl.types import DocumentAttributeFilename, DocumentAttributeImageSize, MessageMediaDocument


class MediaStickerHandler:
Expand Down
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/text_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, Optional

from telethon.tl.types import DocumentAttributeFilename, Message, MessageMediaDocument
from telethon.tl.patched import Message
from telethon.tl.types import DocumentAttributeFilename, MessageMediaDocument


class TextPlainHandler:
Expand Down
3 changes: 2 additions & 1 deletion TEx/core/media_metadata_handling/webimage_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from typing import Dict, List, Optional

from telethon.tl.types import DocumentAttributeFilename, DocumentAttributeImageSize, Message, MessageMediaDocument
from telethon.tl.patched import Message
from telethon.tl.types import DocumentAttributeFilename, DocumentAttributeImageSize, MessageMediaDocument


class WebImageStickerHandler:
Expand Down
2 changes: 1 addition & 1 deletion TEx/core/state_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def write_file_text(path: str, content: str) -> None:
"""
# Delete if Exists
DbManager.SESSIONS['temp'].execute(
StateFileOrmEntity.__table__.delete().where(StateFileOrmEntity.path == path),
StateFileOrmEntity.__table__.delete().where(StateFileOrmEntity.path == path), # type: ignore
)

entity: StateFileOrmEntity = StateFileOrmEntity(
Expand Down
6 changes: 3 additions & 3 deletions TEx/core/temp_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def read_file_text(path: str) -> str:
def remove_expired_entries() -> int:
"""Remove all Expired Entries."""
total: int = DbManager.SESSIONS['temp'].execute(
TempDataOrmEntity.__table__.delete().where(
TempDataOrmEntity.__table__.delete().where( # type: ignore
TempDataOrmEntity.valid_at <= int(datetime.now(tz=pytz.UTC).timestamp()),
),
).rowcount
Expand All @@ -47,7 +47,7 @@ def remove_expired_entries() -> int:
@staticmethod
def purge() -> int:
"""Remove all Entries."""
total: int = DbManager.SESSIONS['temp'].execute(TempDataOrmEntity.__table__.delete()).rowcount
total: int = DbManager.SESSIONS['temp'].execute(TempDataOrmEntity.__table__.delete()).rowcount # type: ignore
DbManager.SESSIONS['temp'].flush()
DbManager.SESSIONS['temp'].commit()
return total
Expand All @@ -64,7 +64,7 @@ def write_file_text(path: str, content: str, validate_seconds: int = 3600) -> No
"""
# Delete if Exists
DbManager.SESSIONS['temp'].execute(
TempDataOrmEntity.__table__.delete().where(TempDataOrmEntity.path == path),
TempDataOrmEntity.__table__.delete().where(TempDataOrmEntity.path == path), # type: ignore
)

entity: TempDataOrmEntity = TempDataOrmEntity(
Expand Down
8 changes: 4 additions & 4 deletions TEx/database/telegram_group_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def insert(entity_values: Dict) -> None:
DbManager.SESSIONS['data'].commit()

except sqlalchemy.exc.IntegrityError as exc:
if 'UNIQUE' in exc.orig.args[0]:
if 'UNIQUE' in exc.orig.args[0]: # type: ignore
return

raise
Expand Down Expand Up @@ -294,11 +294,11 @@ def get_all_medias_from_group_and_mimetype(group_id: int, mime_type: str, file_d
parts_or_filter: List[BinaryExpression] = []

for name_part in file_name_part:
parts_or_filter.append(TelegramMediaOrmEntity.file_name.contains(name_part))
parts_or_filter.append(TelegramMediaOrmEntity.file_name.contains(name_part)) # type: ignore

select_statement = select_statement.where(or_(*parts_or_filter))

return DbManager.SESSIONS['data'].execute(select_statement)
return DbManager.SESSIONS['data'].execute(select_statement) # type: ignore

@staticmethod
def stats_all_medias_from_group_by_mimetype(group_id: int, file_datetime_limit_seconds: Optional[int] = None) -> Dict:
Expand Down Expand Up @@ -346,7 +346,7 @@ def get_all_medias_by_age(group_id: int, media_limit_days: int) -> List[Telegram
:param media_limit_days: Age of Media in Days
:return: Number of Medias Removed
"""
statement: Delete = select(TelegramMediaOrmEntity).where(
statement: Delete = select(TelegramMediaOrmEntity).where( # type: ignore
TelegramMediaOrmEntity.date_time <= (datetime.datetime.now(tz=pytz.UTC) - datetime.timedelta(days=media_limit_days)),
)

Expand Down
15 changes: 15 additions & 0 deletions TEx/finder/all_messages_finder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""All Messages Finder."""
from configparser import SectionProxy

from TEx.finder.base_finder import BaseFinder


class AllMessagesFinder(BaseFinder):
"""All Messages Based Finder."""

def __init__(self, config: SectionProxy) -> None:
"""Initialize All Messages Finder."""

async def find(self, raw_text: str) -> bool:
"""Find Message. Always Return True."""
return True
28 changes: 20 additions & 8 deletions TEx/finder/finder_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from __future__ import annotations

from configparser import ConfigParser
from typing import Dict, List

from telethon.events import NewMessage
from typing import Dict, List, Optional

from TEx.finder.all_messages_finder import AllMessagesFinder
from TEx.finder.regex_finder import RegexFinder
from TEx.models.facade.finder_notification_facade_entity import FinderNotificationMessageEntity
from TEx.notifier.notifier_engine import NotifierEngine


Expand Down Expand Up @@ -36,26 +36,38 @@ def __load_rules(self, config: ConfigParser) -> None:
'instance': RegexFinder(config=config[sec]),
'notifier': config[sec]['notifier'],
})
elif config[sec]['type'] == 'all':
self.rules.append({
'id': sec,
'instance': AllMessagesFinder(config=config[sec]),
'notifier': config[sec]['notifier'],
})

def configure(self, config: ConfigParser) -> None:
"""Configure Finder."""
self.is_finder_enabled = self.__is_finder_enabled(config=config)
self.__load_rules(config=config)
self.notification_engine.configure(config=config)

async def run(self, message: NewMessage.Event) -> None:
"""Execute the Finder with Raw Text."""
if not self.is_finder_enabled:
async def run(self, entity: Optional[FinderNotificationMessageEntity], source: str) -> None:
"""Execute the Finder with Raw Text.

:param entity: Facade Object
:param source: Source Account/Phone Number
:return:
"""
if not self.is_finder_enabled or not entity:
return

for rule in self.rules:
is_found: bool = await rule['instance'].find(raw_text=message.raw_text)
is_found: bool = await rule['instance'].find(raw_text=entity.raw_text)

if is_found:

# Runt the Notification Engine
await self.notification_engine.run(
notifiers=rule['notifier'].split(','),
message=message,
entity=entity,
rule_id=rule['id'],
source=source,
)
14 changes: 12 additions & 2 deletions TEx/logging.conf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[loggers]
keys=root,sqlalchemy,TelegramExplorer
keys=root,sqlalchemy,TelegramExplorer,elasticsearch,elastic_transport.transport

#######################

Expand All @@ -17,6 +17,16 @@ keys=simpleFormatter
level=INFO
handlers=consoleHandler

[logger_elasticsearch]
level=ERROR
handlers=consoleHandler
qualname=elasticsearch

[logger_elastic_transport.transport]
level=ERROR
handlers=consoleHandler
qualname=elastic_transport.transport

[logger_TelegramExplorer]
level=INFO
handlers=consoleHandler
Expand All @@ -40,4 +50,4 @@ args=(sys.stdout,)
#######################

[formatter_simpleFormatter]
format=%(asctime)s - %(levelname)s - %(message)s
format= %(asctime)s - %(levelname)s - %(message)s
Loading
Loading