Skip to content

Commit

Permalink
Merge pull request #351 from OP-TED/feature/TED-938
Browse files Browse the repository at this point in the history
Feature/ted 938
  • Loading branch information
duprijil authored Nov 9, 2022
2 parents bcb0353 + 17f3ae9 commit 16a1c88
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 49 deletions.
7 changes: 3 additions & 4 deletions dags/daily_materialized_view_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, update_notice_collection_materialised_view
create_notice_collection_materialised_view

DAG_NAME = "daily_materialized_view_update"

Expand All @@ -22,12 +22,11 @@ def create_materialised_view():
create_notice_collection_materialised_view(mongo_client=mongo_client)

@task
def update_materialised_view_with_logs():
def aggregate_batch_logs():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
update_notice_collection_materialised_view(mongo_client=mongo_client)
create_batch_collection_materialised_view(mongo_client=mongo_client)

create_materialised_view() >> update_materialised_view_with_logs()
create_materialised_view() >> aggregate_batch_logs()


dag = daily_materialized_view_update()
Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
from pymongo import MongoClient, ASCENDING, DESCENDING
from ted_sws.data_manager.services import MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME

from ted_sws import config
from ted_sws.data_manager.services import MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME

NOTICE_COLLECTION_NAME = "notice_collection"
NOTICES_MATERIALISED_VIEW_NAME = "notices_collection_materialised_view"
NOTICE_EVENTS_COLLECTION_NAME = "notice_events"


def create_notice_collection_materialised_view(mongo_client: MongoClient):
"""
Create or update a collection with materialized view used on metabase by aggregating notices collection.
:param mongo_client: MongoDB client to connect
"""
database = mongo_client[config.MONGO_DB_AGGREGATES_DATABASE_NAME or MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME]
notice_collection = database[NOTICE_COLLECTION_NAME]
notice_collection.aggregate([
{
"$lookup":
{
"from": f"{NOTICE_EVENTS_COLLECTION_NAME}",
"localField": "_id",
"foreignField": "notice_id",
"as": "notice_logs",
"pipeline": [
{
"$group": {
"_id": "$notice_id",
"exec_time": {"$sum": "$duration"}
}
}
]
},
},
{
"$project": {
"_id": True,
Expand All @@ -29,6 +51,7 @@ def create_notice_collection_materialised_view(mongo_client: MongoClient):
"notice_type": "$normalised_metadata.notice_type",
"xsd_version": "$normalised_metadata.xsd_version",
"publication_date": "$normalised_metadata.publication_date",
"execution_time": {"$first": "$notice_logs.exec_time"}
}
},
{
Expand All @@ -42,38 +65,4 @@ def create_notice_collection_materialised_view(mongo_client: MongoClient):
materialised_view.create_index([("status", ASCENDING)])
materialised_view.create_index([("form_number", ASCENDING)])
materialised_view.create_index([("form_number", ASCENDING), ("status", ASCENDING)])


def update_notice_collection_materialised_view(mongo_client: MongoClient):
"""
Updates notice collection materialised view with logs about notice execution time.
:param mongo_client: mongodb client to connect
"""
database = mongo_client[config.MONGO_DB_AGGREGATES_DATABASE_NAME or MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME]
notice_collection = database[NOTICE_COLLECTION_NAME]
notice_collection.aggregate([
{
"$lookup":
{
"from": f"{NOTICE_EVENTS_COLLECTION_NAME}",
"localField": "_id",
"foreignField": "notice_id",
"as": "notice_logs",
"pipeline": [
{
"$group": {
"_id": "$notice_id",
"exec_time": {"$sum": "$duration"}
}
}
]
},
},
{
"$unwind": '$notice_logs'
},
{
"$out": NOTICES_MATERIALISED_VIEW_NAME
}
])
materialised_view.create_index([("execution_time", ASCENDING)])
13 changes: 4 additions & 9 deletions tests/e2e/data_manager/test_mongodb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view, NOTICE_PROCESS_BATCH_COLLECTION_NAME
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, NOTICES_MATERIALISED_VIEW_NAME, \
update_notice_collection_materialised_view
create_notice_collection_materialised_view, NOTICES_MATERIALISED_VIEW_NAME


def test_mongodb_client(notice_2016):
Expand Down Expand Up @@ -142,19 +141,15 @@ def test_create_matview_for_notices():
fields_in_the_materialised_view = document.keys()
assert 'form_type' in fields_in_the_materialised_view
assert 'form_number' in fields_in_the_materialised_view
assert 'eforms_subtype' in fields_in_the_materialised_view


def test_update_matview_for_notices():
def test_create_matview_for_batches():
uri = config.MONGO_DB_AUTH_URL
mongodb_client = MongoClient(uri)
update_notice_collection_materialised_view(mongo_client=mongodb_client)
create_batch_collection_materialised_view(mongo_client=mongodb_client)
db = mongodb_client[config.MONGO_DB_AGGREGATES_DATABASE_NAME]
assert NOTICES_MATERIALISED_VIEW_NAME in db.list_collection_names()
document = db[NOTICES_MATERIALISED_VIEW_NAME].find_one()
if document is not None:
fields_in_the_materialised_view = document.keys()
assert 'notice_logs' in fields_in_the_materialised_view
assert NOTICE_PROCESS_BATCH_COLLECTION_NAME in db.list_collection_names()
document = db[NOTICE_PROCESS_BATCH_COLLECTION_NAME].find_one()
if document is not None:
fields_in_the_materialised_view = document.keys()
Expand Down

0 comments on commit 16a1c88

Please sign in to comment.