Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/TED-713
Browse files Browse the repository at this point in the history
  • Loading branch information
Kolea Plesco committed Oct 6, 2022
2 parents ae357b1 + 4f302d2 commit 9ab02ca
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 58 deletions.
3 changes: 2 additions & 1 deletion dags/notice_fetch_by_date_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def fetch_by_date_notice_from_ted():
execute_only_one_step=True)

def _branch_selector():
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False)
trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY,
default_value=True)
push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY))
if trigger_complete_workflow:
return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID]
Expand Down
6 changes: 4 additions & 2 deletions dags/notice_fetch_for_date_range_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY
from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY
from ted_sws.event_manager.adapters.event_log_decorator import event_log
from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \
EventMessageProcessType
Expand Down Expand Up @@ -50,7 +50,9 @@ def trigger_notice_by_date_for_each_date_in_range():
TriggerDagRunOperator(
task_id=f'trigger_notice_fetch_by_date_workflow_dag_{date_wildcard[:-1]}',
trigger_dag_id="notice_fetch_by_date_workflow",
conf={WILD_CARD_DAG_KEY: date_wildcard}
conf={WILD_CARD_DAG_KEY: date_wildcard,
TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: False
}
).execute(context=context)

trigger_notice_by_date_for_each_date_in_range()
Expand Down
17 changes: 14 additions & 3 deletions dags/notice_validation_workflow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from datetime import timedelta, datetime

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from pymongo import MongoClient

from dags import DEFAULT_DAG_ARGUMENTS
from dags.dags_utils import get_dag_param
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.supra_notice_repository import DailySupraNoticeRepository
Expand All @@ -16,6 +18,15 @@
summary_validation_for_daily_supra_notice

DAG_NAME = "notice_daily_validation_workflow"
NOTICE_PUBLICATION_DATE_DAG_CONF_KEY = "notice_publication_date"


def get_notice_publication_date():
notice_publication_date = get_dag_param(key=NOTICE_PUBLICATION_DATE_DAG_CONF_KEY)
if notice_publication_date:
return datetime.strptime(notice_publication_date, "%Y%m%d")
else:
return datetime.now() - timedelta(days=2)


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
Expand All @@ -31,7 +42,7 @@ def notice_daily_validation_workflow():
))
)
def validate_fetched_notices():
publication_date = datetime.now() - timedelta(days=2)
publication_date = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
validate_and_update_daily_supra_notice(notice_publication_day=publication_date,
mongodb_client=mongodb_client
Expand All @@ -45,7 +56,7 @@ def validate_fetched_notices():
))
)
def summarize_validation_for_daily_supra_notice():
publication_date = datetime.now() - timedelta(days=2)
publication_date = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
summary_validation_for_daily_supra_notice(notice_publication_day=publication_date,
mongodb_client=mongodb_client
Expand All @@ -59,7 +70,7 @@ def summarize_validation_for_daily_supra_notice():
))
)
def validate_availability_of_notice_in_cellar():
notice_publication_day = datetime.now() - timedelta(days=2)
notice_publication_day = get_notice_publication_date()
mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL)
repo = DailySupraNoticeRepository(mongodb_client=mongodb_client)
supra_notice = repo.get(reference=notice_publication_day)
Expand Down
7 changes: 4 additions & 3 deletions ted_sws/core/adapters/cmd_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ def skip_notice(self, notice_id: str) -> bool:
"""
return self.notice_ids and notice_id not in self.notice_ids

def run_cmd(self):
if self.mapping_suite_id:
def on_begin(self):
super().on_begin()
if hasattr(self, "mapping_suite_id") and self.mapping_suite_id:
self.log(LOG_WARN_TEXT.format("MappingSuite: ") + self.mapping_suite_id)
if self.notice_ids:
if hasattr(self, "notice_ids") and self.notice_ids:
self.log(LOG_WARN_TEXT.format("Notices: ") + str(self.notice_ids))
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import click

from ted_sws.core.adapters.cmd_runner import CmdRunner as BaseCmdRunner, DEFAULT_MAPPINGS_PATH
from ted_sws.core.adapters.cmd_runner import CmdRunnerForMappingSuite as BaseCmdRunner, DEFAULT_MAPPINGS_PATH
from ted_sws.mapping_suite_processor.services.mapping_suite_validation_service import validate_mapping_suite

CMD_NAME = "CMD_MAPPING_SUITE_VALIDATOR"
Expand All @@ -32,6 +32,7 @@ def __init__(
self.mappings_path = mappings_path

def run_cmd(self):
super().run_cmd()
mapping_suite_path: Path = Path(self.mappings_path).resolve() / Path(self.mapping_suite_id)
is_valid: bool = validate_mapping_suite(mapping_suite_path)
result = self.run_cmd_result(Exception("Mapping Suite has an invalid structure") if not is_valid else None)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{% macro increment(dictionary, key, increment_by=1) %}
{% if dictionary.update({key: dictionary[key] + increment_by}) %} {% endif %}
{% endmacro %}

{% set results_count = validation_results|length %}
<!DOCTYPE html>
<html lang="en">
<head>
Expand Down Expand Up @@ -63,6 +63,54 @@
<li>Mapping suite identifier: {{ mapping_suite_identifier }}</li>
</ul>
<hr>
<h2>Results summary</h2>
<table class="display">
<thead class="center aligned">
<tr>
<th>Result</th>
<th>Count</th>
<th>Ratio (%)</th>
</tr>
</thead>
<tbody>
<tr>
{% set _value = "valid" %}
{% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %}
<td class="strong {{ _value }}">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
<tr>
{% set _value = "unverifiable" %}
{% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %}
<td class="strong {{ _value }}">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
<tr>
{% set _value = "warning" %}
{% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %}
<td class="strong {{ _value }}">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
<tr>
{% set _value = "invalid" %}
{% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %}
<td class="strong {{ _value }}">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
<tr>
{% set _value = "error" %}
{% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %}
<td class="strong {{ _value }}">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
</tbody>
</table>
<hr>
<h2>Results</h2>
<table class="display">
<thead class="center aligned">
Expand All @@ -75,11 +123,6 @@
</tr>
</thead>
<tbody>
{% set counter = {
'true_executions': 0,
'false_executions': 0,
'error_executions': 0,
} %}
{% for result in validation_results %}
<tr>
<td>{{ result.query.title }}</td>
Expand Down Expand Up @@ -130,53 +173,40 @@
</div>
</td>
</tr>
{% if result.query_result == "True" %}
{{ increment(counter, 'true_executions') }}
{% elif result.error %}
{{ increment(counter, 'error_executions') }}
{% else %}
{{ increment(counter, 'false_executions') }}
{% endif %}

{% endfor %}
</tbody>
</table>
<h2>Results summary</h2>
<hr>
<h2>Query results summary</h2>
<table class="display">
<thead class="center aligned">
<tr>
<th>Result type</th>
<th>Query result</th>
<th>Number of</th>
<th>Ratio</th>
<th>Ratio (%)</th>
</tr>
</thead>
<tbody>
<tr>
<td>{{ "True" }}</td>
<td>{{ counter.true_executions }}</td>
{% if counter.true_executions == 0 %}
<td>{{ 0 }}</td>
{% else %}
<td>{{ counter.true_executions / (counter.true_executions + counter.false_executions + counter.error_executions) }}</td>
{% endif %}
{% set _value = "True" %}
{% set _results = validation_results | selectattr("query_result", "equalto", _value) | list | count %}
<td class="strong">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
<tr>
<td>{{ "False" }}</td>
<td>{{ counter.false_executions }}</td>
{% if counter.false_executions == 0 %}
<td>{{ 0 }}</td>
{% else %}
<td>{{ counter.false_executions / (counter.true_executions + counter.false_executions + counter.error_executions) }}</td>
{% endif %}
{% set _value = "False" %}
{% set _results = validation_results | selectattr("query_result", "equalto", _value) | list | count %}
<td class="strong">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
<tr>
<td>{{ "Error" }}</td>
<td>{{ counter.error_executions }}</td>
{% if counter.error_executions == 0 %}
<td>{{ 0 }}</td>
{% else %}
<td>{{ counter.error_executions / (counter.true_executions + counter.false_executions + counter.error_executions) }}</td>
{% endif %}
<tr>
{% set _value = "Error" %}
{% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %}
<td class="strong">{{ _value }}</td>
<td>{{ _results }}</td>
<td>{{ (_results / results_count * 100) | round(2) }}%</td>
</tr>
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ def generate_notice_uri_from_notice_id(notice_id: str) -> str:
return 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type-invalid'


def validate_notice_availability_in_cellar(notice: Notice) -> Notice:
def validate_notice_availability_in_cellar(notice: Notice, notice_uri: str = None) -> Notice:
if notice.status == NoticeStatus.PUBLISHED:
notice_uri = generate_notice_uri_from_notice_id(notice_id=notice.ted_id)
if not notice_uri:
notice_uri = generate_notice_uri_from_notice_id(notice_id=notice.ted_id)
if check_availability_of_notice_in_cellar(notice_uri=notice_uri):
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_AVAILABLE)
else:
Expand Down
10 changes: 9 additions & 1 deletion ted_sws/rml_to_html/services/rml_to_html.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from jinja2 import Environment, PackageLoader

from ted_sws.core.model.transform import FileResource
from ted_sws.data_manager.adapters.repository_abc import MappingSuiteRepositoryABC
from ted_sws.notice_validator.adapters.sparql_runner import SPARQLRunner
from ted_sws.rml_to_html.resources.query_registry import QueryRegistry
Expand Down Expand Up @@ -59,6 +60,10 @@ def run_queries_for_triple_map(triple_map_uri: str, query_registry: QueryRegistr
}


def _join_file_resources(files: [FileResource] = None) -> str:
return '\n\n'.join(map(lambda file: file.file_content, files))


def rml_files_to_html_report(mapping_suite_identifier: str, mapping_suite_repository: MappingSuiteRepositoryABC):
"""
Creating an html report from loaded rml files
Expand All @@ -69,7 +74,10 @@ def rml_files_to_html_report(mapping_suite_identifier: str, mapping_suite_reposi
mapping_suite_package = mapping_suite_repository.get(reference=mapping_suite_identifier)
if mapping_suite_package is None:
raise ValueError(f'Mapping suite package, with {mapping_suite_identifier} id, was not found')
rml_files = mapping_suite_package.transformation_rule_set.rml_mapping_rules
rml_files = [FileResource(
file_name="joined_rml_files",
file_content=_join_file_resources(mapping_suite_package.transformation_rule_set.rml_mapping_rules)
)]
query_registry = QueryRegistry()
sparql_runner = SPARQLRunner(files=rml_files)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from ted_sws.core.model.notice import NoticeStatus
from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \
check_availability_of_notice_in_cellar
check_availability_of_notice_in_cellar, validate_notice_availability_in_cellar


def test_check_availability_of_notice_in_cellar(cellar_sparql_endpoint, valid_cellar_uri, invalid_cellar_uri):
assert check_availability_of_notice_in_cellar(notice_uri=valid_cellar_uri,
endpoint_url=cellar_sparql_endpoint)
assert not check_availability_of_notice_in_cellar(notice_uri=invalid_cellar_uri,
endpoint_url=cellar_sparql_endpoint)
def test_check_availability_of_notice_in_cellar(valid_cellar_uri, invalid_cellar_uri):
assert check_availability_of_notice_in_cellar(notice_uri=valid_cellar_uri)
assert not check_availability_of_notice_in_cellar(notice_uri=invalid_cellar_uri)


def test_validate_notice_availability_in_cellar(fake_notice_F03, valid_cellar_uri, invalid_cellar_uri):
fake_notice_F03._status = NoticeStatus.PUBLISHED
validate_notice_availability_in_cellar(notice=fake_notice_F03)

fake_notice_F03._status = NoticeStatus.PUBLISHED
validate_notice_availability_in_cellar(notice=fake_notice_F03, notice_uri=valid_cellar_uri)
assert fake_notice_F03.status == NoticeStatus.PUBLICLY_AVAILABLE

fake_notice_F03._status = NoticeStatus.PUBLISHED
validate_notice_availability_in_cellar(notice=fake_notice_F03, notice_uri=invalid_cellar_uri)
assert fake_notice_F03.status == NoticeStatus.PUBLICLY_UNAVAILABLE

0 comments on commit 9ab02ca

Please sign in to comment.