Skip to content

Commit

Permalink
Merge pull request #422 from OP-TED/feature/TED-1173
Browse files Browse the repository at this point in the history
check availability in Cellar with a batch of URIes
  • Loading branch information
CaptainOfHacks authored Jan 17, 2023
2 parents 43990b8 + 0eee7d7 commit d31b1f6
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 13 deletions.
9 changes: 1 addition & 8 deletions dags/dags_utils.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,10 @@
from itertools import islice, chain
from typing import Any, Iterable
from typing import Any

from airflow.operators.python import get_current_context

TASK_INSTANCE = "ti"


def chunks(iterable: Iterable, chunk_size: int):
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, chunk_size - 1))


def select_first_non_none(data):
"""
Expand Down
3 changes: 2 additions & 1 deletion dags/operators/DagBatchPipelineOperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from pymongo import MongoClient

from dags.dags_utils import pull_dag_upstream, push_dag_downstream, chunks, get_dag_param, smart_xcom_pull, \
from dags.dags_utils import pull_dag_upstream, push_dag_downstream, get_dag_param, smart_xcom_pull, \
smart_xcom_push
from ted_sws.core.service.batch_processing import chunks
from dags.pipelines.pipeline_protocols import NoticePipelineCallable
from ted_sws import config
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
Expand Down
14 changes: 14 additions & 0 deletions ted_sws/core/service/batch_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from itertools import chain, islice
from typing import Iterable


def chunks(iterable: Iterable, chunk_size: int):
"""
This function split in chunks a iterable structure based on chunk_size parameter.
:param iterable:
:param chunk_size:
:return:
"""
iterator = iter(iterable)
for first in iterator:
yield chain([first], islice(iterator, chunk_size - 1))
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
from typing import List
from typing import List, Set

from pymongo import MongoClient
from ted_sws.core.model.notice import Notice, NoticeStatus
from ted_sws.core.service.batch_processing import chunks
from ted_sws.data_manager.adapters.notice_repository import NoticeRepository
from ted_sws.data_manager.adapters.sparql_endpoint import SPARQLTripleStoreEndpoint

WEBAPI_SPARQL_URL = "https://publications.europa.eu/webapi/rdf/sparql"
CELLAR_NOTICE_AVAILABILITY_QUERY = "ASK {{ VALUES ?instance {{<{notice_uri}>}} ?instance ?predicate [] . }}"
CELLAR_NOTICES_AVAILABILITY_QUERY = "select distinct ?s {{VALUES ?s {{$notice_uries}} ?s ?p ?o . }}"
WEBAPI_SPARQL_RUN_FORMAT = "application/sparql-results+json"
INVALID_NOTICE_URI = 'https://www.w3.org/1999/02/22-rdf-syntax-ns#type-invalid'
DEFAULT_NOTICES_BATCH_SIZE = 1000


def check_availability_of_notice_in_cellar(notice_uri: str, endpoint_url: str = WEBAPI_SPARQL_URL) -> bool:
Expand All @@ -23,6 +26,19 @@ def check_availability_of_notice_in_cellar(notice_uri: str, endpoint_url: str =
return result['boolean']


def check_availability_of_notices_in_cellar(notice_uries: List[str], endpoint_url: str = WEBAPI_SPARQL_URL) -> Set[str]:
"""
This service check the notices availability in Cellar, and return available set of notice uries.
:param notice_uries:
:param endpoint_url:
:return:
"""
notice_uries = " ".join([f"<{notice_uri}>" for notice_uri in notice_uries])
query = CELLAR_NOTICE_AVAILABILITY_QUERY.format(notice_uri=notice_uries)
result = SPARQLTripleStoreEndpoint(endpoint_url=endpoint_url).with_query(sparql_query=query).fetch_tabular()
return set(result['s'].to_list())


def generate_notice_uri_from_notice_id(notice_id: str) -> str:
"""
This service generates Cellar URI for a notice, determined by notice_id
Expand Down Expand Up @@ -60,6 +76,17 @@ def validate_notices_availability_in_cellar(notice_statuses: List[NoticeStatus],
notice_repository = NoticeRepository(mongodb_client=mongodb_client)
for notice_status in notice_statuses:
selected_notices = notice_repository.get_notices_by_status(notice_status=notice_status)
for selected_notice in selected_notices:
validate_notice_availability_in_cellar(notice=selected_notice)
notice_repository.update(notice=selected_notice)
for selected_notices_chunk in chunks(selected_notices, chunk_size=DEFAULT_NOTICES_BATCH_SIZE):
selected_notices_map = {
generate_notice_uri_from_notice_id(notice_id=notice.ted_id): notice
for notice in selected_notices_chunk
}
selected_notices_uries = list(selected_notices_map.keys())
available_notice_uries_in_cellar = check_availability_of_notices_in_cellar(
notice_uries=selected_notices_uries)
for notice_uri, notice in selected_notices_map.items():
if notice_uri in available_notice_uries_in_cellar:
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_AVAILABLE)
else:
notice.update_status_to(new_status=NoticeStatus.PUBLICLY_UNAVAILABLE)
notice_repository.update(notice=notice)

0 comments on commit d31b1f6

Please sign in to comment.