Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean code and add improvments for backlog DAGs #361

Merged
merged 2 commits into from
Nov 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
NOTICE_IDS_KEY = "notice_ids"
START_WITH_STEP_NAME_KEY = "start_with_step_name"
EXECUTE_ONLY_ONE_STEP_KEY = "execute_only_one_step"
DEFAULT_NUBER_OF_CELERY_WORKERS = 144
DEFAULT_NUBER_OF_CELERY_WORKERS = 144 #TODO: revise this config
NOTICE_PROCESS_WORKFLOW_DAG_NAME = "notice_process_workflow"
DEFAULT_START_WITH_TASK_ID = "notice_normalisation_pipeline"
DEFAULT_PIPELINE_NAME_FOR_LOGS = "unknown_pipeline_name"
Expand Down
10 changes: 6 additions & 4 deletions dags/pipelines/notice_processor_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def notice_normalisation_pipeline(notice: Notice, mongodb_client: MongoClient) -
"""
from ted_sws.data_sampler.services.notice_xml_indexer import index_notice
from ted_sws.notice_metadata_processor.services.metadata_normalizer import normalise_notice

notice.update_status_to(new_status=NoticeStatus.RAW)
indexed_notice = index_notice(notice=notice)
normalised_notice = normalise_notice(notice=indexed_notice)

Expand All @@ -27,7 +27,7 @@ def notice_transformation_pipeline(notice: Notice, mongodb_client: MongoClient)
from ted_sws.notice_transformer.services.notice_transformer import transform_notice
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapper
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB

notice.update_status_to(new_status=NoticeStatus.NORMALISED_METADATA)
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
result = notice_eligibility_checker(notice=notice, mapping_suite_repository=mapping_suite_repository)
if not result:
Expand Down Expand Up @@ -57,7 +57,7 @@ def notice_validation_pipeline(notice: Notice, mongodb_client: MongoClient) -> N
from ted_sws.notice_validator.services.xpath_coverage_runner import validate_xpath_coverage_notice
from ted_sws.data_manager.adapters.mapping_suite_repository import MappingSuiteRepositoryMongoDB
from ted_sws.event_manager.services.log import log_notice_info

notice.update_status_to(new_status=NoticeStatus.DISTILLED)
mapping_suite_id = notice.distilled_rdf_manifestation.mapping_suite_id
mapping_suite_repository = MappingSuiteRepositoryMongoDB(mongodb_client=mongodb_client)
mapping_suite = mapping_suite_repository.get(reference=mapping_suite_id)
Expand All @@ -82,6 +82,7 @@ def notice_package_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
"""
from ted_sws.notice_packager.services.notice_packager import package_notice

notice.update_status_to(new_status=NoticeStatus.VALIDATED)
# TODO: Implement notice package eligiblity
notice.set_is_eligible_for_packaging(eligibility=True)
packaged_notice = package_notice(notice=notice)
Expand All @@ -95,7 +96,7 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
from ted_sws.notice_publisher.services.notice_publisher import publish_notice, publish_notice_rdf_into_s3
from ted_sws.event_manager.services.log import log_notice_error
from ted_sws import config

notice.update_status_to(new_status=NoticeStatus.PACKAGED)
if config.S3_PUBLISH_ENABLED:
published_into_s3 = publish_notice_rdf_into_s3(notice=notice)
if not published_into_s3:
Expand All @@ -106,4 +107,5 @@ def notice_publish_pipeline(notice: Notice, mongodb_client: MongoClient) -> Noti
if result:
return NoticePipelineOutput(notice=notice)
else:
notice.set_is_eligible_for_publishing(eligibility=False)
return NoticePipelineOutput(notice=notice, processed=False)
42 changes: 28 additions & 14 deletions dags/pipelines/notice_selectors_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,20 @@
PUBLICATION_DATE = "normalised_metadata.publication_date"


def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
def build_selector_mongodb_filter(notice_statuses: List[str], form_number: str = None,
start_date: str = None, end_date: str = None,
xsd_version: str = None) -> dict:
"""

:param notice_statuses:
:param form_number:
:param start_date:
:param end_date:
:param xsd_version:
:return:
"""
from datetime import datetime
mongodb_filter = {NOTICE_STATUS: notice_status}
mongodb_filter = {NOTICE_STATUS: {"$in": notice_statuses}}
if form_number:
mongodb_filter[FORM_NUMBER] = form_number
if start_date and end_date:
Expand All @@ -26,21 +35,26 @@ def build_selector_mongodb_filter(notice_status: str, form_number: str = None,
def notice_ids_selector_by_status(notice_statuses: List[NoticeStatus], form_number: str = None,
start_date: str = None, end_date: str = None,
xsd_version: str = None) -> List[str]:
"""

:param notice_statuses:
:param form_number:
:param start_date:
:param end_date:
:param xsd_version:
:return:
"""
from pymongo import MongoClient
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository, NOTICE_TED_ID

mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
notice_ids = []
for notice_status in notice_statuses:
mongodb_filter = build_selector_mongodb_filter(notice_status=str(notice_status),
form_number=form_number,
start_date=start_date,
end_date=end_date,
xsd_version=xsd_version
)
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
notice_ids.extend([result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator])

return notice_ids
notice_statuses = [str(notice_status) for notice_status in notice_statuses]
mongodb_filter = build_selector_mongodb_filter(notice_statuses=notice_statuses,
form_number=form_number,
start_date=start_date,
end_date=end_date,
xsd_version=xsd_version)
mongodb_result_iterator = notice_repository.collection.find(mongodb_filter, {NOTICE_TED_ID: 1})
return [result_dict[NOTICE_TED_ID] for result_dict in mongodb_result_iterator]
1 change: 1 addition & 0 deletions dags/selector_repackage_process_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
DAG_NAME = "selector_re_package_process_orchestrator"

RE_PACKAGE_TARGET_NOTICE_STATES = [NoticeStatus.VALIDATED, NoticeStatus.INELIGIBLE_FOR_PACKAGING,
NoticeStatus.ELIGIBLE_FOR_PACKAGING,
NoticeStatus.INELIGIBLE_FOR_PUBLISHING]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
Expand Down
4 changes: 3 additions & 1 deletion dags/selector_republish_process_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

DAG_NAME = "selector_re_publish_process_orchestrator"

RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.PUBLICLY_UNAVAILABLE, NoticeStatus.ELIGIBLE_FOR_PUBLISHING]
RE_PUBLISH_TARGET_NOTICE_STATES = [NoticeStatus.ELIGIBLE_FOR_PUBLISHING, NoticeStatus.INELIGIBLE_FOR_PUBLISHING,
NoticeStatus.PACKAGED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
Expand Down
5 changes: 4 additions & 1 deletion dags/selector_retransform_process_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

DAG_NAME = "selector_re_transform_process_orchestrator"

RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION]
RE_TRANSFORM_TARGET_NOTICE_STATES = [NoticeStatus.NORMALISED_METADATA, NoticeStatus.INELIGIBLE_FOR_TRANSFORMATION,
NoticeStatus.ELIGIBLE_FOR_TRANSFORMATION, NoticeStatus.PREPROCESSED_FOR_TRANSFORMATION,
NoticeStatus.TRANSFORMED, NoticeStatus.DISTILLED
]
TRIGGER_NOTICE_PROCESS_WORKFLOW_TASK_ID = "trigger_notice_process_workflow"
FORM_NUMBER_DAG_PARAM = "form_number"
START_DATE_DAG_PARAM = "start_date"
Expand Down
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ def open_local(paths, mode="r", encoding="utf8"):
"metadata_generator = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_metadata_generator:main",
"conceptual_mapping_differ = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_conceptual_mapping_differ:main",
"rdf_differ = ted_sws.rdf_differ.entrypoints.cli.cmd_rdf_differ:main",

"mapping_suite_processor = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_mapping_suite_processor:main",
"yarrrml2rml_converter = ted_sws.mapping_suite_processor.entrypoints.cli.cmd_yarrrml2rml_converter:main",
"normalisation_resource_generator = ted_sws.data_manager.entrypoints.cli.cmd_generate_mapping_resources:main",
Expand Down
6 changes: 3 additions & 3 deletions ted_sws/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def AIRFLOW__CORE__EXECUTOR(self, config_value: str) -> str:
def MONGO_DB_PORT(self, config_value: str) -> int:
return int(config_value)

@env_property()
@env_property(default_value="aggregates_db")
def MONGO_DB_AGGREGATES_DATABASE_NAME(self, config_value: str) -> str:
return config_value

Expand Down Expand Up @@ -115,7 +115,7 @@ def ELK_VERSION(self, config_value: str) -> int:


class LoggingConfig:
@env_property()
@env_property(default_value="aggregates_db")
def MONGO_DB_LOGS_DATABASE_NAME(self, config_value: str) -> str:
return config_value

Expand Down Expand Up @@ -151,7 +151,7 @@ class API:
def ID_MANAGER_PROD_API_HOST(self, config_value: str) -> str:
return config_value

@env_property(default_value="local_host")
@env_property(default_value="localhost")
def ID_MANAGER_DEV_API_HOST(self, config_value: str) -> str:
return config_value

Expand Down
4 changes: 2 additions & 2 deletions ted_sws/core/adapters/config_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ def env_property(config_resolver_class: Type[ConfigResolverABC] = EnvConfigResol
"""
def wrap(func):
@property
def wrapped_f(self, *args, **kwargs):
def wrapped_function(self, *args, **kwargs):
config_value = config_resolver_class().concrete_config_resolve(config_name=func.__name__,
default_value=default_value)
return func(self, config_value, *args, **kwargs)

return wrapped_f
return wrapped_function

return wrap
10 changes: 9 additions & 1 deletion ted_sws/core/model/notice.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,28 @@ class NoticeStatus(IntEnum):
"""
RAW = 10
INDEXED = 15
# STATES FOR RE-TRANSFORM ---BEGIN---
NORMALISED_METADATA = 20
INELIGIBLE_FOR_TRANSFORMATION = 23 # backlog status
ELIGIBLE_FOR_TRANSFORMATION = 27 # forward status
PREPROCESSED_FOR_TRANSFORMATION = 29
TRANSFORMED = 30
# STATES FOR RE-VALIDATE---BEGIN---
DISTILLED = 35
# STATES FOR RE-TRANSFORM ---END---
# STATES FOR RE-VALIDATE---END---
# STATES FOR RE-PACKAGE ---BEGIN---
VALIDATED = 40
INELIGIBLE_FOR_PACKAGING = 43 # backlog status
ELIGIBLE_FOR_PACKAGING = 47 # forward status
# STATES FOR RE-PACKAGE ---END---
# STATES FOR RE-PUBLISH ---BEGIN---
PACKAGED = 50
INELIGIBLE_FOR_PUBLISHING = 53 # backlog status
ELIGIBLE_FOR_PUBLISHING = 57 # forward status
# STATES FOR RE-PUBLISH ---END---
PUBLISHED = 60
PUBLICLY_UNAVAILABLE = 63 # to be investigated if more fine-grained checks can be adopted
PUBLICLY_UNAVAILABLE = 63 # to be investigated if more fine-grained checks can be adopted #TODO: Revalidate for public availability.
PUBLICLY_AVAILABLE = 67 # forward status

def __lt__(self, other):
Expand Down
5 changes: 2 additions & 3 deletions ted_sws/data_manager/adapters/mapping_suite_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@ class MappingSuiteRepositoryMongoDB(MappingSuiteRepositoryABC):
"""

_collection_name = "mapping_suite_collection"
_database_name = config.MONGO_DB_AGGREGATES_DATABASE_NAME or "aggregates_db"

def __init__(self, mongodb_client: MongoClient, database_name: str = _database_name):
def __init__(self, mongodb_client: MongoClient, database_name: str = None):
"""

:param mongodb_client:
:param database_name:
"""
mongodb_client = mongodb_client
self._database_name = database_name
self._database_name = database_name or config.MONGO_DB_AGGREGATES_DATABASE_NAME
notice_db = mongodb_client[self._database_name]
self.collection = notice_db[self._collection_name]

Expand Down
16 changes: 9 additions & 7 deletions ted_sws/data_manager/adapters/notice_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
NOTICE_TED_ID = "ted_id"
NOTICE_STATUS = "status"
NOTICE_CREATED_AT = "created_at"
NOTICE_ID = "notice_id"
NOTICE_NORMALISED_METADATA = "normalised_metadata"
NOTICE_PREPROCESSED_XML_MANIFESTATION = "preprocessed_xml_manifestation"
NOTICE_DISTILLED_RDF_MANIFESTATION = "distilled_rdf_manifestation"
NOTICE_RDF_MANIFESTATION = "rdf_manifestation"
NOTICE_METS_MANIFESTATION = "mets_manifestation"
METADATA_PUBLICATION_DATE = "publication_date"
METADATA_DOCUMENT_SENT_DATE = "document_sent_date"
FILE_STORAGE_COLLECTION_NAME = "fs.files"


class NoticeRepositoryInFileSystem(NoticeRepositoryABC):
Expand Down Expand Up @@ -122,18 +124,18 @@ class NoticeRepository(NoticeRepositoryABC):
"""

_collection_name = "notice_collection"
_database_name = config.MONGO_DB_AGGREGATES_DATABASE_NAME or "aggregates_db"

def __init__(self, mongodb_client: MongoClient, database_name: str = _database_name):
def __init__(self, mongodb_client: MongoClient, database_name: str = None):
database_name = database_name if database_name else config.MONGO_DB_AGGREGATES_DATABASE_NAME
self._database_name = database_name
self.mongodb_client = mongodb_client
notice_db = mongodb_client[self._database_name]
self.file_storage = gridfs.GridFS(notice_db)
self.collection = notice_db[self._collection_name]
self.collection.create_index([("created_at", ASCENDING)])
self.collection.create_index([("status", ASCENDING)])
self.file_storage_collection = notice_db["fs.files"]
self.file_storage_collection.create_index([("notice_id", ASCENDING)])
self.collection.create_index([(NOTICE_CREATED_AT, ASCENDING)])
self.collection.create_index([(NOTICE_STATUS, ASCENDING)])
self.file_storage_collection = notice_db[FILE_STORAGE_COLLECTION_NAME]
self.file_storage_collection.create_index([(NOTICE_ID, ASCENDING)])

def get_file_content_from_grid_fs(self, file_id: str) -> str:
"""
Expand Down Expand Up @@ -169,7 +171,7 @@ def write_notice_fields_in_grid_fs(self, notice: Notice) -> Tuple[Notice, list,
"""
notice = copy.deepcopy(notice)
linked_file_ids = [linked_file._id for linked_file in
self.file_storage.find({"notice_id": notice.ted_id})]
self.file_storage.find({NOTICE_ID: notice.ted_id})]

new_linked_file_ids = []

Expand Down
5 changes: 2 additions & 3 deletions ted_sws/data_manager/adapters/supra_notice_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ class DailySupraNoticeRepository(DailySupraNoticeRepositoryABC):
"""

_collection_name = "daily_supra_notice_collection"
_database_name = config.MONGO_DB_AGGREGATES_DATABASE_NAME or "aggregates_db"

def __init__(self, mongodb_client: MongoClient, database_name: str = _database_name):
self._database_name = database_name
def __init__(self, mongodb_client: MongoClient, database_name: str = None):
self._database_name = database_name or config.MONGO_DB_AGGREGATES_DATABASE_NAME
self.mongodb_client = mongodb_client
daily_supra_notice_db = mongodb_client[self._database_name]
self.collection = daily_supra_notice_db[self._collection_name]
Expand Down
18 changes: 5 additions & 13 deletions ted_sws/event_manager/adapters/event_logging_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ class EventLoggingRepository(EventLoggingRepositoryABC):
"""
This is the base/generic events' repository class.
"""
_database_name = config.MONGO_DB_LOGS_DATABASE_NAME or "logs_db"
_collection_name = "log_events"

def __init__(self, mongodb_client: MongoClient = None, database_name: str = _database_name,
def __init__(self, mongodb_client: MongoClient = None, database_name: str = None,
collection_name: str = _collection_name):
"""
This is the constructor/initialization of base/generic event logging repository.
Expand All @@ -41,7 +40,7 @@ def __init__(self, mongodb_client: MongoClient = None, database_name: str = _dat
:param database_name: The database name
:param collection_name: The collection name
"""
self._database_name = database_name
self._database_name = database_name or config.MONGO_DB_LOGS_DATABASE_NAME
self._collection_name = collection_name
if mongodb_client is None:
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
Expand Down Expand Up @@ -101,19 +100,14 @@ def add(self, event_message: EventMessage) -> str:
result = self.collection.insert_one(record)
return result.inserted_id

@classmethod
def get_default_database_name(cls):
return cls._database_name


class TechnicalEventRepository(EventLoggingRepository):
"""
This is the technical events' repository class.
"""
_database_name = EventLoggingRepository._database_name
_collection_name = "technical_events"

def __init__(self, mongodb_client: MongoClient, database_name: str = _database_name,
def __init__(self, mongodb_client: MongoClient, database_name: str = None,
collection_name: str = _collection_name):
"""
This is the constructor/initialization of technical event logging repository.
Expand All @@ -129,10 +123,9 @@ class NoticeEventRepository(EventLoggingRepository):
"""
This is the notice events' repository class.
"""
_database_name = EventLoggingRepository._database_name
_collection_name = "notice_events"

def __init__(self, mongodb_client: MongoClient, database_name: str = _database_name,
def __init__(self, mongodb_client: MongoClient, database_name: str = None,
collection_name: str = _collection_name):
"""
This is the constructor/initialization of notice event logging repository.
Expand All @@ -148,10 +141,9 @@ class MappingSuiteEventRepository(EventLoggingRepository):
"""
This is the mapping suite events' repository class.
"""
_database_name = EventLoggingRepository._database_name
_collection_name = "mapping_suite_events"

def __init__(self, mongodb_client: MongoClient, database_name: str = _database_name,
def __init__(self, mongodb_client: MongoClient, database_name: str = None,
collection_name: str = _collection_name):
"""
This is the constructor/initialization of mapping suite event logging repository.
Expand Down
8 changes: 7 additions & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pymongo
import pytest

from ted_sws import config
from ted_sws.notice_transformer.adapters.rml_mapper import RMLMapperABC, SerializationFormat as RMLSerializationFormat
from tests import TEST_DATA_PATH
from tests.fakes.fake_rml_mapper import FakeRMLMapper
Expand Down Expand Up @@ -43,4 +44,9 @@ def fake_mapping_suite_id() -> str:

@pytest.fixture
def invalid_mapping_suite_id() -> str:
return "test_invalid_package"
return "test_invalid_package"


@pytest.fixture
def aggregates_database_name():
return config.MONGO_DB_AGGREGATES_DATABASE_NAME
Loading