Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ted 938 #351

Merged
merged 11 commits into from
Nov 9, 2022
7 changes: 3 additions & 4 deletions dags/daily_materialized_view_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, update_notice_collection_materialised_view
create_notice_collection_materialised_view

DAG_NAME = "daily_materialized_view_update"

Expand All @@ -22,12 +22,11 @@ def create_materialised_view():
create_notice_collection_materialised_view(mongo_client=mongo_client)

@task
def update_materialised_view_with_logs():
def aggregate_batch_logs():
mongo_client = MongoClient(config.MONGO_DB_AUTH_URL)
update_notice_collection_materialised_view(mongo_client=mongo_client)
create_batch_collection_materialised_view(mongo_client=mongo_client)

create_materialised_view() >> update_materialised_view_with_logs()
create_materialised_view() >> aggregate_batch_logs()


dag = daily_materialized_view_update()
Original file line number Diff line number Diff line change
@@ -1,16 +1,38 @@
from pymongo import MongoClient, ASCENDING, DESCENDING
from ted_sws.data_manager.services import MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME

from ted_sws import config
from ted_sws.data_manager.services import MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME

NOTICE_COLLECTION_NAME = "notice_collection"
NOTICES_MATERIALISED_VIEW_NAME = "notices_collection_materialised_view"
NOTICE_EVENTS_COLLECTION_NAME = "notice_events"


def create_notice_collection_materialised_view(mongo_client: MongoClient):
"""
Create or update a collection with materialized view used on metabase by aggregating notices collection.
:param mongo_client: MongoDB client to connect
"""
database = mongo_client[config.MONGO_DB_AGGREGATES_DATABASE_NAME or MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME]
notice_collection = database[NOTICE_COLLECTION_NAME]
notice_collection.aggregate([
{
"$lookup":
{
"from": f"{NOTICE_EVENTS_COLLECTION_NAME}",
"localField": "_id",
"foreignField": "notice_id",
"as": "notice_logs",
"pipeline": [
{
"$group": {
"_id": "$notice_id",
"exec_time": {"$sum": "$duration"}
}
}
]
},
},
{
"$project": {
"_id": True,
Expand All @@ -29,6 +51,7 @@ def create_notice_collection_materialised_view(mongo_client: MongoClient):
"notice_type": "$normalised_metadata.notice_type",
"xsd_version": "$normalised_metadata.xsd_version",
"publication_date": "$normalised_metadata.publication_date",
"execution_time": {"$first": "$notice_logs.exec_time"}
}
},
{
Expand All @@ -42,38 +65,4 @@ def create_notice_collection_materialised_view(mongo_client: MongoClient):
materialised_view.create_index([("status", ASCENDING)])
materialised_view.create_index([("form_number", ASCENDING)])
materialised_view.create_index([("form_number", ASCENDING), ("status", ASCENDING)])


def update_notice_collection_materialised_view(mongo_client: MongoClient):
"""
Updates notice collection materialised view with logs about notice execution time.

:param mongo_client: mongodb client to connect
"""
database = mongo_client[config.MONGO_DB_AGGREGATES_DATABASE_NAME or MONGO_DB_AGGREGATES_DATABASE_DEFAULT_NAME]
notice_collection = database[NOTICE_COLLECTION_NAME]
notice_collection.aggregate([
{
"$lookup":
{
"from": f"{NOTICE_EVENTS_COLLECTION_NAME}",
"localField": "_id",
"foreignField": "notice_id",
"as": "notice_logs",
"pipeline": [
{
"$group": {
"_id": "$notice_id",
"exec_time": {"$sum": "$duration"}
}
}
]
},
},
{
"$unwind": '$notice_logs'
},
{
"$out": NOTICES_MATERIALISED_VIEW_NAME
}
])
materialised_view.create_index([("execution_time", ASCENDING)])
13 changes: 4 additions & 9 deletions tests/e2e/data_manager/test_mongodb_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
from ted_sws.data_manager.services.create_batch_collection_materialised_view import \
create_batch_collection_materialised_view, NOTICE_PROCESS_BATCH_COLLECTION_NAME
from ted_sws.data_manager.services.create_notice_collection_materialised_view import \
create_notice_collection_materialised_view, NOTICES_MATERIALISED_VIEW_NAME, \
update_notice_collection_materialised_view
create_notice_collection_materialised_view, NOTICES_MATERIALISED_VIEW_NAME


def test_mongodb_client(notice_2016):
Expand Down Expand Up @@ -142,19 +141,15 @@ def test_create_matview_for_notices():
fields_in_the_materialised_view = document.keys()
assert 'form_type' in fields_in_the_materialised_view
assert 'form_number' in fields_in_the_materialised_view
assert 'eforms_subtype' in fields_in_the_materialised_view


def test_update_matview_for_notices():
def test_create_matview_for_batches():
uri = config.MONGO_DB_AUTH_URL
mongodb_client = MongoClient(uri)
update_notice_collection_materialised_view(mongo_client=mongodb_client)
create_batch_collection_materialised_view(mongo_client=mongodb_client)
db = mongodb_client[config.MONGO_DB_AGGREGATES_DATABASE_NAME]
assert NOTICES_MATERIALISED_VIEW_NAME in db.list_collection_names()
document = db[NOTICES_MATERIALISED_VIEW_NAME].find_one()
if document is not None:
fields_in_the_materialised_view = document.keys()
assert 'notice_logs' in fields_in_the_materialised_view
assert NOTICE_PROCESS_BATCH_COLLECTION_NAME in db.list_collection_names()
document = db[NOTICE_PROCESS_BATCH_COLLECTION_NAME].find_one()
if document is not None:
fields_in_the_materialised_view = document.keys()
Expand Down