From 275505beccad40c6481ace22e3b7d445d91d2cc0 Mon Sep 17 00:00:00 2001 From: Kolea Plesco Date: Tue, 27 Sep 2022 01:51:05 +0300 Subject: [PATCH 1/4] Updated SPARQL report --- .../sparql_query_results_report.jinja2 | 108 +++++++++++------- 1 file changed, 69 insertions(+), 39 deletions(-) diff --git a/ted_sws/notice_validator/resources/templates/sparql_query_results_report.jinja2 b/ted_sws/notice_validator/resources/templates/sparql_query_results_report.jinja2 index ae35b4179..b9e0d3b5f 100644 --- a/ted_sws/notice_validator/resources/templates/sparql_query_results_report.jinja2 +++ b/ted_sws/notice_validator/resources/templates/sparql_query_results_report.jinja2 @@ -1,7 +1,7 @@ {% macro increment(dictionary, key, increment_by=1) %} {% if dictionary.update({key: dictionary[key] + increment_by}) %} {% endif %} {% endmacro %} - +{% set results_count = validation_results|length %} @@ -63,6 +63,54 @@
  • Mapping suite identifier: {{ mapping_suite_identifier }}

  • +

    Results summary

    + + + + + + + + + + + {% set _value = "valid" %} + {% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %} + + + + + + {% set _value = "unverifiable" %} + {% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %} + + + + + + {% set _value = "warning" %} + {% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %} + + + + + + {% set _value = "invalid" %} + {% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %} + + + + + + {% set _value = "error" %} + {% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %} + + + + + +
    ResultCountRatio (%)
    {{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    {{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    {{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    {{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    {{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    +

    Results

    @@ -75,11 +123,6 @@ - {% set counter = { - 'true_executions': 0, - 'false_executions': 0, - 'error_executions': 0, - } %} {% for result in validation_results %} @@ -130,53 +173,40 @@ - {% if result.query_result == "True" %} - {{ increment(counter, 'true_executions') }} - {% elif result.error %} - {{ increment(counter, 'error_executions') }} - {% else %} - {{ increment(counter, 'false_executions') }} - {% endif %} - {% endfor %}
    {{ result.query.title }}
    -

    Results summary

    +
    +

    Query results summary

    - + - + - - - {% if counter.true_executions == 0 %} - - {% else %} - - {% endif %} + {% set _value = "True" %} + {% set _results = validation_results | selectattr("query_result", "equalto", _value) | list | count %} + + + - - - {% if counter.false_executions == 0 %} - - {% else %} - - {% endif %} + {% set _value = "False" %} + {% set _results = validation_results | selectattr("query_result", "equalto", _value) | list | count %} + + + - - - - {% if counter.error_executions == 0 %} - - {% else %} - - {% endif %} + + {% set _value = "Error" %} + {% set _results = validation_results | selectattr("result", "equalto", _value) | list | count %} + + +
    Result typeQuery result Number ofRatioRatio (%)
    {{ "True" }}{{ counter.true_executions }}{{ 0 }}{{ counter.true_executions / (counter.true_executions + counter.false_executions + counter.error_executions) }}{{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    {{ "False" }}{{ counter.false_executions }}{{ 0 }}{{ counter.false_executions / (counter.true_executions + counter.false_executions + counter.error_executions) }}{{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    {{ "Error" }}{{ counter.error_executions }}{{ 0 }}{{ counter.error_executions / (counter.true_executions + counter.false_executions + counter.error_executions) }}
    {{ _value }}{{ _results }}{{ (_results / results_count * 100) | round(2) }}%
    From d6edcee48513fbc6bc6177d4414270c5b031dc75 Mon Sep 17 00:00:00 2001 From: Kolea Plesco Date: Tue, 27 Sep 2022 16:51:14 +0300 Subject: [PATCH 2/4] WIP --- ted_sws/core/adapters/cmd_runner.py | 7 ++++--- .../entrypoints/cli/cmd_mapping_suite_validator.py | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ted_sws/core/adapters/cmd_runner.py b/ted_sws/core/adapters/cmd_runner.py index 70ae5278e..bb3830de6 100644 --- a/ted_sws/core/adapters/cmd_runner.py +++ b/ted_sws/core/adapters/cmd_runner.py @@ -165,8 +165,9 @@ def skip_notice(self, notice_id: str) -> bool: """ return self.notice_ids and notice_id not in self.notice_ids - def run_cmd(self): - if self.mapping_suite_id: + def on_begin(self): + super().on_begin() + if hasattr(self, "mapping_suite_id") and self.mapping_suite_id: self.log(LOG_WARN_TEXT.format("MappingSuite: ") + self.mapping_suite_id) - if self.notice_ids: + if hasattr(self, "notice_ids") and self.notice_ids: self.log(LOG_WARN_TEXT.format("Notices: ") + str(self.notice_ids)) diff --git a/ted_sws/mapping_suite_processor/entrypoints/cli/cmd_mapping_suite_validator.py b/ted_sws/mapping_suite_processor/entrypoints/cli/cmd_mapping_suite_validator.py index 8ca467d55..8cc810162 100755 --- a/ted_sws/mapping_suite_processor/entrypoints/cli/cmd_mapping_suite_validator.py +++ b/ted_sws/mapping_suite_processor/entrypoints/cli/cmd_mapping_suite_validator.py @@ -5,7 +5,7 @@ import click -from ted_sws.core.adapters.cmd_runner import CmdRunner as BaseCmdRunner, DEFAULT_MAPPINGS_PATH +from ted_sws.core.adapters.cmd_runner import CmdRunnerForMappingSuite as BaseCmdRunner, DEFAULT_MAPPINGS_PATH from ted_sws.mapping_suite_processor.services.mapping_suite_validation_service import validate_mapping_suite CMD_NAME = "CMD_MAPPING_SUITE_VALIDATOR" @@ -32,6 +32,7 @@ def __init__( self.mappings_path = mappings_path def run_cmd(self): + super().run_cmd() mapping_suite_path: Path = Path(self.mappings_path).resolve() / Path(self.mapping_suite_id) is_valid: bool = validate_mapping_suite(mapping_suite_path) result = self.run_cmd_result(Exception("Mapping Suite has an invalid structure") if not is_valid else None) From 01b7a4dd8496eeefd1f2986af219780fd70bb483 Mon Sep 17 00:00:00 2001 From: Kolea Plesco Date: Fri, 30 Sep 2022 09:11:19 +0300 Subject: [PATCH 3/4] Updated tests --- .../check_availability_of_notice_in_cellar.py | 5 ++-- ted_sws/rml_to_html/services/rml_to_html.py | 10 +++++++- ... check_availability_of_notice_in_cellar.py | 24 ++++++++++++++----- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/ted_sws/notice_validator/services/check_availability_of_notice_in_cellar.py b/ted_sws/notice_validator/services/check_availability_of_notice_in_cellar.py index 34c537b0c..3f9837c1d 100644 --- a/ted_sws/notice_validator/services/check_availability_of_notice_in_cellar.py +++ b/ted_sws/notice_validator/services/check_availability_of_notice_in_cellar.py @@ -19,9 +19,10 @@ def generate_notice_uri_from_notice_id(notice_id: str) -> str: return 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type-invalid' -def validate_notice_availability_in_cellar(notice: Notice) -> Notice: +def validate_notice_availability_in_cellar(notice: Notice, notice_uri: str = None) -> Notice: if notice.status == NoticeStatus.PUBLISHED: - notice_uri = generate_notice_uri_from_notice_id(notice_id=notice.ted_id) + if not notice_uri: + notice_uri = generate_notice_uri_from_notice_id(notice_id=notice.ted_id) if check_availability_of_notice_in_cellar(notice_uri=notice_uri): notice.update_status_to(new_status=NoticeStatus.PUBLICLY_AVAILABLE) else: diff --git a/ted_sws/rml_to_html/services/rml_to_html.py b/ted_sws/rml_to_html/services/rml_to_html.py index 926e2c3c0..2460f3a97 100644 --- a/ted_sws/rml_to_html/services/rml_to_html.py +++ b/ted_sws/rml_to_html/services/rml_to_html.py @@ -3,6 +3,7 @@ from jinja2 import Environment, PackageLoader +from ted_sws.core.model.transform import FileResource from ted_sws.data_manager.adapters.repository_abc import MappingSuiteRepositoryABC from ted_sws.notice_validator.adapters.sparql_runner import SPARQLRunner from ted_sws.rml_to_html.resources.query_registry import QueryRegistry @@ -59,6 +60,10 @@ def run_queries_for_triple_map(triple_map_uri: str, query_registry: QueryRegistr } +def _join_file_resources(files: [FileResource] = None) -> str: + return '\n\n'.join(map(lambda file: file.file_content, files)) + + def rml_files_to_html_report(mapping_suite_identifier: str, mapping_suite_repository: MappingSuiteRepositoryABC): """ Creating an html report from loaded rml files @@ -69,7 +74,10 @@ def rml_files_to_html_report(mapping_suite_identifier: str, mapping_suite_reposi mapping_suite_package = mapping_suite_repository.get(reference=mapping_suite_identifier) if mapping_suite_package is None: raise ValueError(f'Mapping suite package, with {mapping_suite_identifier} id, was not found') - rml_files = mapping_suite_package.transformation_rule_set.rml_mapping_rules + rml_files = [FileResource( + file_name="joined_rml_files", + file_content=_join_file_resources(mapping_suite_package.transformation_rule_set.rml_mapping_rules) + )] query_registry = QueryRegistry() sparql_runner = SPARQLRunner(files=rml_files) diff --git a/tests/e2e/notice_validator/test_ check_availability_of_notice_in_cellar.py b/tests/e2e/notice_validator/test_ check_availability_of_notice_in_cellar.py index 19a6822de..98ec3d907 100644 --- a/tests/e2e/notice_validator/test_ check_availability_of_notice_in_cellar.py +++ b/tests/e2e/notice_validator/test_ check_availability_of_notice_in_cellar.py @@ -1,9 +1,21 @@ +from ted_sws.core.model.notice import NoticeStatus from ted_sws.notice_validator.services.check_availability_of_notice_in_cellar import \ - check_availability_of_notice_in_cellar + check_availability_of_notice_in_cellar, validate_notice_availability_in_cellar -def test_check_availability_of_notice_in_cellar(cellar_sparql_endpoint, valid_cellar_uri, invalid_cellar_uri): - assert check_availability_of_notice_in_cellar(notice_uri=valid_cellar_uri, - endpoint_url=cellar_sparql_endpoint) - assert not check_availability_of_notice_in_cellar(notice_uri=invalid_cellar_uri, - endpoint_url=cellar_sparql_endpoint) +def test_check_availability_of_notice_in_cellar(valid_cellar_uri, invalid_cellar_uri): + assert check_availability_of_notice_in_cellar(notice_uri=valid_cellar_uri) + assert not check_availability_of_notice_in_cellar(notice_uri=invalid_cellar_uri) + + +def test_validate_notice_availability_in_cellar(fake_notice_F03, valid_cellar_uri, invalid_cellar_uri): + fake_notice_F03._status = NoticeStatus.PUBLISHED + validate_notice_availability_in_cellar(notice=fake_notice_F03) + + fake_notice_F03._status = NoticeStatus.PUBLISHED + validate_notice_availability_in_cellar(notice=fake_notice_F03, notice_uri=valid_cellar_uri) + assert fake_notice_F03.status == NoticeStatus.PUBLICLY_AVAILABLE + + fake_notice_F03._status = NoticeStatus.PUBLISHED + validate_notice_availability_in_cellar(notice=fake_notice_F03, notice_uri=invalid_cellar_uri) + assert fake_notice_F03.status == NoticeStatus.PUBLICLY_UNAVAILABLE From 0ebc05bad0038498b445c3043cc21808132745a0 Mon Sep 17 00:00:00 2001 From: CaptainOfHacks <39195263+CaptainOfHacks@users.noreply.github.com> Date: Tue, 4 Oct 2022 22:55:26 +0300 Subject: [PATCH 4/4] Add Capability to run Supra notice DAG in manual mode --- dags/notice_fetch_by_date_workflow.py | 3 ++- .../notice_fetch_for_date_range_orchestrator.py | 6 ++++-- dags/notice_validation_workflow.py | 17 ++++++++++++++--- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/dags/notice_fetch_by_date_workflow.py b/dags/notice_fetch_by_date_workflow.py index b78d95aee..32492800b 100644 --- a/dags/notice_fetch_by_date_workflow.py +++ b/dags/notice_fetch_by_date_workflow.py @@ -48,7 +48,8 @@ def fetch_by_date_notice_from_ted(): execute_only_one_step=True) def _branch_selector(): - trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, default_value=False) + trigger_complete_workflow = get_dag_param(key=TRIGGER_COMPLETE_WORKFLOW_DAG_KEY, + default_value=True) push_dag_downstream(key=NOTICE_IDS_KEY, value=pull_dag_upstream(key=NOTICE_IDS_KEY)) if trigger_complete_workflow: return [TRIGGER_COMPLETE_WORKFLOW_TASK_ID] diff --git a/dags/notice_fetch_for_date_range_orchestrator.py b/dags/notice_fetch_for_date_range_orchestrator.py index e9c0289a7..e77258bc5 100644 --- a/dags/notice_fetch_for_date_range_orchestrator.py +++ b/dags/notice_fetch_for_date_range_orchestrator.py @@ -8,7 +8,7 @@ from dags import DEFAULT_DAG_ARGUMENTS from dags.dags_utils import get_dag_param -from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY +from dags.notice_fetch_by_date_workflow import WILD_CARD_DAG_KEY, TRIGGER_COMPLETE_WORKFLOW_DAG_KEY from ted_sws.event_manager.adapters.event_log_decorator import event_log from ted_sws.event_manager.model.event_message import TechnicalEventMessage, EventMessageMetadata, \ EventMessageProcessType @@ -50,7 +50,9 @@ def trigger_notice_by_date_for_each_date_in_range(): TriggerDagRunOperator( task_id=f'trigger_notice_fetch_by_date_workflow_dag_{date_wildcard[:-1]}', trigger_dag_id="notice_fetch_by_date_workflow", - conf={WILD_CARD_DAG_KEY: date_wildcard} + conf={WILD_CARD_DAG_KEY: date_wildcard, + TRIGGER_COMPLETE_WORKFLOW_DAG_KEY: False + } ).execute(context=context) trigger_notice_by_date_for_each_date_in_range() diff --git a/dags/notice_validation_workflow.py b/dags/notice_validation_workflow.py index 5804cc1e6..0554cce84 100644 --- a/dags/notice_validation_workflow.py +++ b/dags/notice_validation_workflow.py @@ -1,9 +1,11 @@ from datetime import timedelta, datetime from airflow.decorators import dag, task +from airflow.operators.python import get_current_context from pymongo import MongoClient from dags import DEFAULT_DAG_ARGUMENTS +from dags.dags_utils import get_dag_param from ted_sws import config from ted_sws.data_manager.adapters.notice_repository import NoticeRepository from ted_sws.data_manager.adapters.supra_notice_repository import DailySupraNoticeRepository @@ -16,6 +18,15 @@ summary_validation_for_daily_supra_notice DAG_NAME = "notice_daily_validation_workflow" +NOTICE_PUBLICATION_DATE_DAG_CONF_KEY = "notice_publication_date" + + +def get_notice_publication_date(): + notice_publication_date = get_dag_param(key=NOTICE_PUBLICATION_DATE_DAG_CONF_KEY) + if notice_publication_date: + return datetime.strptime(notice_publication_date, "%Y%m%d") + else: + return datetime.now() - timedelta(days=2) @dag(default_args=DEFAULT_DAG_ARGUMENTS, @@ -31,7 +42,7 @@ def notice_daily_validation_workflow(): )) ) def validate_fetched_notices(): - publication_date = datetime.now() - timedelta(days=2) + publication_date = get_notice_publication_date() mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) validate_and_update_daily_supra_notice(notice_publication_day=publication_date, mongodb_client=mongodb_client @@ -45,7 +56,7 @@ def validate_fetched_notices(): )) ) def summarize_validation_for_daily_supra_notice(): - publication_date = datetime.now() - timedelta(days=2) + publication_date = get_notice_publication_date() mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) summary_validation_for_daily_supra_notice(notice_publication_day=publication_date, mongodb_client=mongodb_client @@ -59,7 +70,7 @@ def summarize_validation_for_daily_supra_notice(): )) ) def validate_availability_of_notice_in_cellar(): - notice_publication_day = datetime.now() - timedelta(days=2) + notice_publication_day = get_notice_publication_date() mongodb_client = MongoClient(config.MONGO_DB_AUTH_URL) repo = DailySupraNoticeRepository(mongodb_client=mongodb_client) supra_notice = repo.get(reference=notice_publication_day)