diff --git a/pollution_v2/src/common/data_model/__init__.py b/pollution_v2/src/common/data_model/__init__.py index 3c0f632c..a9d426db 100644 --- a/pollution_v2/src/common/data_model/__init__.py +++ b/pollution_v2/src/common/data_model/__init__.py @@ -1,6 +1,6 @@ # SPDX-FileCopyrightText: NOI Techpark # # SPDX-License-Identifier: AGPL-3.0-or-later -from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType +from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType, StationLatestMeasure, Station from .traffic import TrafficSensorStation, TrafficMeasure, TrafficMeasureCollection, TrafficEntry from .pollution import PollutionEntry, PollutantClass, PollutionMeasure, PollutionMeasureCollection diff --git a/pollution_v2/src/common/data_model/common.py b/pollution_v2/src/common/data_model/common.py index e31e775c..1ca7d7d8 100644 --- a/pollution_v2/src/common/data_model/common.py +++ b/pollution_v2/src/common/data_model/common.py @@ -161,7 +161,7 @@ def from_json(cls, dict_data) -> Station: @dataclass -class StationLatestMeasure(): +class StationLatestMeasure: def __init__(self, station_code, latest_time): self.station_code = station_code diff --git a/pollution_v2/src/common/manager/__init__.py b/pollution_v2/src/common/manager/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pollution_v2/src/common/manager/traffic_station.py b/pollution_v2/src/common/manager/traffic_station.py new file mode 100644 index 00000000..b19608e8 --- /dev/null +++ b/pollution_v2/src/common/manager/traffic_station.py @@ -0,0 +1,71 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +from __future__ import absolute_import, annotations + +import itertools +import logging +from datetime import datetime +from typing import List + +from common.connector.collector import ConnectorCollector +from common.data_model import TrafficMeasureCollection, TrafficSensorStation, StationLatestMeasure +from common.settings import ODH_MINIMUM_STARTING_DATE + +logger = logging.getLogger("common.manager.traffic_station") + + +class TrafficStationManager: + + def __init__(self, connector_collector: ConnectorCollector): + self._connector_collector = connector_collector + self._traffic_stations: List[TrafficSensorStation] = [] + + def get_traffic_stations_from_cache(self) -> List[TrafficSensorStation]: + if len(self._traffic_stations) == 0: + logger.info("Retrieving station list from ODH") + self._traffic_stations = self._get_station_list() + return self._traffic_stations + + def get_all_latest_measures(self) -> List[StationLatestMeasure]: + """ + Returns a list of stations with its latest measure date. + + :return: List of stations with its latest measure date. + """ + all_measures = self._connector_collector.traffic.get_latest_measures() + + grouped = {} + for station_code, values in itertools.groupby(all_measures, lambda m: m.station.code): + tmp = list(values) + if len(tmp) > 0: + grouped[station_code] = tmp + + res = [] + for key, value in grouped.items(): + res.append(StationLatestMeasure(key, max(list(map(lambda m: m.valid_time, value)), + default=ODH_MINIMUM_STARTING_DATE))) + + return res + + def _get_station_list(self) -> List[TrafficSensorStation]: + """ + Retrieve the list of all the available stations. + """ + return self._connector_collector.traffic.get_station_list() + + def _download_traffic_data(self, + from_date: datetime, + to_date: datetime, + traffic_station: TrafficSensorStation + ) -> TrafficMeasureCollection: + """ + Download traffic data measures in the given interval. + + :param from_date: Traffic measures before this date are discarded if there isn't any latest pollution measure available. + :param to_date: Traffic measure after this date are discarded. + :return: The resulting TrafficMeasureCollection containing the traffic data. + """ + + return TrafficMeasureCollection(measures=self._connector_collector.traffic.get_measures(from_date=from_date, to_date=to_date, station=traffic_station)) diff --git a/pollution_v2/src/common/settings.py b/pollution_v2/src/common/settings.py index a76cca76..c43a2c9c 100644 --- a/pollution_v2/src/common/settings.py +++ b/pollution_v2/src/common/settings.py @@ -34,6 +34,7 @@ ODH_COMPUTATION_BATCH_SIZE = int(Variable.get("ODH_COMPUTATION_BATCH_SIZE", 30)) ODH_MINIMUM_STARTING_DATE = dateutil.parser.parse(Variable.get("ODH_MINIMUM_STARTING_DATE", "2018-01-01")) DAG_POLLUTION_EXECUTION_CRONTAB = Variable.get("DAG_POLLUTION_EXECUTION_CRONTAB", "0 0 * * *") +DAG_VALIDATION_EXECUTION_CRONTAB = Variable.get("DAG_VALIDATION_EXECUTION_CRONTAB", "0 0 * * *") DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN = int(Variable.get("DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN", 24)) # General diff --git a/pollution_v2/src/dags/aiaas_pollution_computer.py b/pollution_v2/src/dags/aiaas_pollution_computer.py index 59ff3f9c..fab37e68 100644 --- a/pollution_v2/src/dags/aiaas_pollution_computer.py +++ b/pollution_v2/src/dags/aiaas_pollution_computer.py @@ -2,31 +2,26 @@ # # SPDX-License-Identifier: AGPL-3.0-or-later -import json import logging -import time -from datetime import timedelta, datetime, timezone -from typing import Optional - -from airflow import DAG +from datetime import timedelta, datetime from airflow.decorators import task -from airflow.operators.trigger_dagrun import TriggerDagRunOperator from redis.client import Redis +from dags.common import TrafficStationsDAG from pollution_connector.cache.computation_checkpoint import ComputationCheckpointCache from common.connector.collector import ConnectorCollector -from common.data_model.common import Provenance +from common.data_model.common import Provenance, StationLatestMeasure from common.data_model import TrafficSensorStation -from common.settings import ODH_MINIMUM_STARTING_DATE, DEFAULT_TIMEZONE, \ +from common.settings import ODH_MINIMUM_STARTING_DATE, \ COMPUTATION_CHECKPOINT_REDIS_DB, COMPUTATION_CHECKPOINT_REDIS_PORT, COMPUTATION_CHECKPOINT_REDIS_HOST, \ PROVENANCE_ID, PROVENANCE_LINEAGE, PROVENANCE_NAME, PROVENANCE_VERSION, DAG_POLLUTION_EXECUTION_CRONTAB, \ - DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN + DEFAULT_TIMEZONE, DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN from pollution_connector.tasks.pollution_computation import PollutionComputationManager # see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html -logger = logging.getLogger("pollution_connector.tasks.pollution_computation") +logger = logging.getLogger("dags.aiaas_pollution_computer") default_args = { 'owner': 'airflow', @@ -41,7 +36,7 @@ THIS_DAG_ID = "pollution_computer" -with DAG( +with TrafficStationsDAG( THIS_DAG_ID, # execution interval if no backfill step length on date increment if backfill (interval determined by first slot @@ -86,33 +81,6 @@ def _init_manager() -> PollutionComputationManager: return manager - def _init_date_range(min_from_date: Optional[datetime], max_to_date: Optional[datetime]): - """ - As starting date for the batch is used the latest pollution measure available on the ODH, if no pollution - measures are available min_from_date is used. - - :param min_from_date: Optional, if set traffic measures before this date are discarded if no pollution measures - are available. If not specified, the default will be taken from the environmental variable `ODH_MINIMUM_STARTING_DATE`. - :param max_to_date: Optional, if set the traffic measure after this date are discarded. - If not specified, the default will be the current datetime. - :return: adjusted dates - """ - - if min_from_date is None: - min_from_date = ODH_MINIMUM_STARTING_DATE - - if min_from_date.tzinfo is None: - min_from_date = DEFAULT_TIMEZONE.localize(min_from_date) - - if max_to_date is None: - max_to_date = datetime.now(tz=DEFAULT_TIMEZONE) - - if max_to_date.tzinfo is None: - max_to_date = DEFAULT_TIMEZONE.localize(max_to_date) - - return min_from_date, max_to_date - - @task def get_stations_list(**kwargs) -> list[dict]: """ @@ -123,19 +91,7 @@ def get_stations_list(**kwargs) -> list[dict]: manager = _init_manager() - traffic_stations = manager.get_traffic_stations_from_cache() - - stations_from_prev_dag = kwargs["dag_run"].conf.get('stations_to_process') - if stations_from_prev_dag: - logger.info(f"{len(stations_from_prev_dag)} stations with unprocessed data from previous run, using them") - traffic_stations = list(filter(lambda station: station.code in stations_from_prev_dag, traffic_stations)) - - logger.info(f"found {len(traffic_stations)} traffic stations") - - # Serialization and deserialization is dependent on speed. - # Use built-in functions like dict as much as you can and stay away - # from using classes and other complex structures. - station_dicts = [station.to_json() for station in traffic_stations] + station_dicts = dag.get_stations_list(manager, **kwargs) return station_dicts @@ -153,7 +109,7 @@ def process_station(station_dict: dict, **kwargs): manager = _init_manager() - min_from_date, max_to_date = _init_date_range(None, None) + min_from_date, max_to_date = dag.init_date_range(None, None) computation_start_dt = datetime.now() @@ -173,31 +129,14 @@ def whats_next(already_processed_stations, **kwargs): """ manager = _init_manager() - now = datetime.now(tz=DEFAULT_TIMEZONE) - all_latest = manager.get_all_latest_measures() - stations = [] - for item in all_latest: - station_code = item.station_code - date = item.latest_time + def has_remaining_data(measure: StationLatestMeasure) -> bool: + now = datetime.now(tz=DEFAULT_TIMEZONE) + date = measure.latest_time if date.tzinfo is None: date = DEFAULT_TIMEZONE.localize(date) + return (now - date).total_seconds() / 3600 > DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN - if (now - date).total_seconds() / 3600 > DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN: - stations.append(station_code) - - # True if on ODH there are lots of data to be processed (e.g. new station with old unprocessed data) - run_again = len(stations) > 0 - - logger.info(f"{'' if run_again else 'NOT '}starting another self-triggered run as {len(stations)} " - f"have still unprocessed data") - - if run_again: - TriggerDagRunOperator( - task_id="run_again_the_dag", - trigger_dag_id=THIS_DAG_ID, - dag=dag, - conf={'stations_to_process': stations} - ).execute(kwargs) + dag.trigger_next_dag_run(manager, dag, has_remaining_data, **kwargs) processed_stations = process_station.expand(station_dict=get_stations_list()) diff --git a/pollution_v2/src/dags/aiaas_validator.py b/pollution_v2/src/dags/aiaas_validator.py new file mode 100644 index 00000000..1d1dcdb4 --- /dev/null +++ b/pollution_v2/src/dags/aiaas_validator.py @@ -0,0 +1,122 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later + +import logging +from datetime import timedelta, datetime + +from airflow.decorators import task + +from common.manager.traffic_station import TrafficStationManager +from dags.common import TrafficStationsDAG +from common.connector.collector import ConnectorCollector +from common.data_model import TrafficSensorStation, StationLatestMeasure +from common.settings import ODH_MINIMUM_STARTING_DATE, DAG_VALIDATION_EXECUTION_CRONTAB + +# see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html + +logger = logging.getLogger("dags.aiaas_validator") + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': ODH_MINIMUM_STARTING_DATE, + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +THIS_DAG_ID = "validator" + +with TrafficStationsDAG( + THIS_DAG_ID, + + # execution interval if no backfill step length on date increment if backfill (interval determined by first slot + # available in queue) + # schedule_interval is deprecated + schedule=DAG_VALIDATION_EXECUTION_CRONTAB, + + # execution date starting at (if needed, backfill) + start_date=ODH_MINIMUM_STARTING_DATE, + + # if True, the scheduler creates a DAG Run for each completed interval between start_date and end_date + # and the scheduler will execute them sequentially + # no need to backfill with catch-up, we can rely on programmatically process-the-oldest-data-on-ODH + catchup=False, + + tags=["aiaas", "validator"], + + # 1 as the maximum number of active DAG runs per DAG: + # dag execution mode should be sequential to avoid periods overlapping and + # to avoid quick and recent runs blocking slow and older ones (as ODH does not accept "older" data writing) + max_active_runs=1, + + default_args=default_args +) as dag: + + def _init_manager() -> TrafficStationManager: + + connector_collector = ConnectorCollector.build_from_env() + manager = TrafficStationManager(connector_collector) + return manager + + @task + def get_stations_list(**kwargs) -> list[dict]: + """ + Returns the complete list of stations or the filtered list based on previous DAG run + + :return: list of strings containing stations list + """ + manager = _init_manager() + + station_dicts = dag.get_stations_list(manager, **kwargs) + + return station_dicts + + + @task + def process_station(station_dict: dict, **kwargs): + """ + Process a single station + + :param station_dict: the station to process + """ + + station = TrafficSensorStation.from_json(station_dict) + logger.info(f"Received station {station}") + + manager = _init_manager() + + min_from_date, max_to_date = dag.init_date_range(None, None) + + computation_start_dt = datetime.now() + + logger.info(f"running validation from {min_from_date} to {max_to_date}") + + # TODO: implement validation + + computation_end_dt = datetime.now() + logger.info(f"Completed validation in [{(computation_end_dt - computation_start_dt).seconds}]") + + + @task + def whats_next(already_processed_stations, **kwargs): + """ + Checks if there are still data to be processed before ending DAG runs + + :param already_processed_stations: the stations already processed (not used) + """ + manager = _init_manager() + + def has_remaining_data(measure: StationLatestMeasure) -> bool: + # TODO: implement method to check if there are still data to be processed before ending DAG runs + raise NotImplementedError + + dag.trigger_next_dag_run(manager, dag, has_remaining_data, **kwargs) + + + processed_stations = process_station.expand(station_dict=get_stations_list()) + + whats_next(processed_stations) diff --git a/pollution_v2/src/dags/common.py b/pollution_v2/src/dags/common.py new file mode 100644 index 00000000..ba66c2d7 --- /dev/null +++ b/pollution_v2/src/dags/common.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: NOI Techpark +# +# SPDX-License-Identifier: AGPL-3.0-or-later +import logging +from datetime import datetime +from typing import Optional, Callable + +from airflow import DAG +from airflow.operators.trigger_dagrun import TriggerDagRunOperator + +from common.data_model import StationLatestMeasure +from common.manager.traffic_station import TrafficStationManager +from common.settings import ODH_MINIMUM_STARTING_DATE, DEFAULT_TIMEZONE + +logger = logging.getLogger("dags.common") + + +class TrafficStationsDAG(DAG): + + @staticmethod + def init_date_range(min_from_date: Optional[datetime], max_to_date: Optional[datetime]): + """ + As starting date for the batch is used the latest pollution measure available on the ODH, if no pollution + measures are available min_from_date is used. + + :param min_from_date: Optional, if set traffic measures before this date are discarded if no pollution measures + are available. If not specified, the default will be taken from the environmental variable `ODH_MINIMUM_STARTING_DATE`. + :param max_to_date: Optional, if set the traffic measure after this date are discarded. + If not specified, the default will be the current datetime. + :return: adjusted dates + """ + + if min_from_date is None: + min_from_date = ODH_MINIMUM_STARTING_DATE + + if min_from_date.tzinfo is None: + min_from_date = DEFAULT_TIMEZONE.localize(min_from_date) + + if max_to_date is None: + max_to_date = datetime.now(tz=DEFAULT_TIMEZONE) + + if max_to_date.tzinfo is None: + max_to_date = DEFAULT_TIMEZONE.localize(max_to_date) + + return min_from_date, max_to_date + + @staticmethod + def get_stations_list(manager: TrafficStationManager, **kwargs): + """ + Returns the complete list of stations or the filtered list based on previous DAG run + + :return: list of strings containing stations list + """ + traffic_stations = manager.get_traffic_stations_from_cache() + + stations_from_prev_dag = kwargs["dag_run"].conf.get('stations_to_process') + if stations_from_prev_dag: + logger.info(f"{len(stations_from_prev_dag)} stations with unprocessed data from previous run, using them") + traffic_stations = list(filter(lambda station: station.code in stations_from_prev_dag, traffic_stations)) + + logger.info(f"found {len(traffic_stations)} traffic stations") + + # Serialization and deserialization is dependent on speed. + # Use built-in functions like dict as much as you can and stay away + # from using classes and other complex structures. + station_dicts = [station.to_json() for station in traffic_stations] + + return station_dicts + + @staticmethod + def trigger_next_dag_run(manager: TrafficStationManager, originator_dag: DAG, + has_remaining_data: Callable[[StationLatestMeasure], bool], **kwargs): + """ + Checks if there are still data to be processed before ending DAG runs + + :param manager: the manager to use + :param dag: the dag to trigger + :param has_remaining_data: the function to use to check if there are still data to process + """ + all_latest = manager.get_all_latest_measures() + stations = [] + for item in all_latest: + if has_remaining_data(item): + stations.append(item.station_code) + + # True if on ODH there are lots of data to be processed (e.g. new station with old unprocessed data) + run_again = len(stations) > 0 + + logger.info(f"{'' if run_again else 'NOT '}starting another self-triggered run as {len(stations)} " + f"stations have still unprocessed data") + + if run_again: + TriggerDagRunOperator( + task_id="run_again_the_dag", + trigger_dag_id=originator_dag.dag_id, + dag=originator_dag, + conf={'stations_to_process': stations} + ).execute(kwargs) diff --git a/pollution_v2/src/pollution_connector/tasks/pollution_computation.py b/pollution_v2/src/pollution_connector/tasks/pollution_computation.py index bf979766..d4ec845e 100644 --- a/pollution_v2/src/pollution_connector/tasks/pollution_computation.py +++ b/pollution_v2/src/pollution_connector/tasks/pollution_computation.py @@ -4,16 +4,16 @@ from __future__ import absolute_import, annotations -import itertools import logging from datetime import datetime, timedelta from typing import Optional, List from redis.client import Redis +from common.manager.traffic_station import TrafficStationManager from pollution_connector.cache.computation_checkpoint import ComputationCheckpointCache, ComputationCheckpoint from common.connector.collector import ConnectorCollector -from common.data_model.common import Provenance, StationLatestMeasure +from common.data_model.common import Provenance from common.data_model.pollution import PollutionMeasure, PollutionMeasureCollection, PollutionEntry from common.data_model import TrafficMeasureCollection, TrafficSensorStation from pollution_connector.pollution_computation_model.pollution_computation_model import PollutionComputationModel @@ -56,30 +56,13 @@ def compute_pollution_data(min_from_date: Optional[datetime] = None, manager.run_computation_and_upload_results(min_from_date, max_to_date) -def _station_code_func(s): - return s.station.code - - -class PollutionComputationManager: +class PollutionComputationManager(TrafficStationManager): def __init__(self, connector_collector: ConnectorCollector, provenance: Provenance, checkpoint_cache: Optional[ComputationCheckpointCache] = None): + super().__init__(connector_collector) self._checkpoint_cache = checkpoint_cache - self._connector_collector = connector_collector self._provenance = provenance self._create_data_types = True - self._traffic_stations: List[TrafficSensorStation] = [] - - def get_traffic_stations_from_cache(self) -> List[TrafficSensorStation]: - if len(self._traffic_stations) == 0: - logger.info("Retrieving station list from ODH") - self._traffic_stations = self._get_station_list() - return self._traffic_stations - - def _get_station_list(self) -> List[TrafficSensorStation]: - """ - Retrieve the list of all the available stations. - """ - return self._connector_collector.traffic.get_station_list() def _get_latest_pollution_measure(self, traffic_station: TrafficSensorStation) -> Optional[PollutionMeasure]: """ @@ -98,9 +81,11 @@ def _get_latest_pollution_measure(self, traffic_station: TrafficSensorStation) - def _get_starting_date_for_station(self, traffic_station: TrafficSensorStation, min_from_date: datetime) -> datetime: latest_pollution_measure = self._get_latest_pollution_measure(traffic_station) if latest_pollution_measure is None: + logger.info(f"No pollution measures available for station [{traffic_station.code}]") if self._checkpoint_cache is not None: checkpoint = self._checkpoint_cache.get(ComputationCheckpoint.get_id_for_station(traffic_station)) if checkpoint is not None: + logger.info(f"Using checkpoint date [{checkpoint.checkpoint_dt.isoformat()}] as starting date for station [{traffic_station.code}]") from_date = checkpoint.checkpoint_dt else: from_date = min_from_date # If there isn't any latest pollution measure available, the min_from_date is used as starting date for the batch @@ -128,42 +113,6 @@ def _get_latest_date_for_station(self, traffic_station: TrafficSensorStation) -> measures = self._connector_collector.traffic.get_latest_measures(station=traffic_station) return max(list(map(lambda m: m.valid_time, measures)), default=ODH_MINIMUM_STARTING_DATE) - def get_all_latest_measures(self) -> List[StationLatestMeasure]: - """ - Returns a list of stations with its latest measure date. - - :return: List of stations with its latest measure date. - """ - all_measures = self._connector_collector.traffic.get_latest_measures() - - grouped = {} - for station_code, values in itertools.groupby(all_measures, _station_code_func): - tmp = list(values) - if len(tmp) > 0: - grouped[station_code] = tmp - - res = [] - for key, value in grouped.items(): - res.append(StationLatestMeasure(key, max(list(map(lambda m: m.valid_time, value)), - default=ODH_MINIMUM_STARTING_DATE))) - - return res - - def _download_traffic_data(self, - from_date: datetime, - to_date: datetime, - traffic_station: TrafficSensorStation - ) -> TrafficMeasureCollection: - """ - Download traffic data measures in the given interval. - - :param from_date: Traffic measures before this date are discarded if there isn't any latest pollution measure available. - :param to_date: Traffic measure after this date are discarded. - :return: The resulting TrafficMeasureCollection containing the traffic data. - """ - - return TrafficMeasureCollection(measures=self._connector_collector.traffic.get_measures(from_date=from_date, to_date=to_date, station=traffic_station)) - @staticmethod def _compute_pollution_data(traffic_data: TrafficMeasureCollection) -> List[PollutionEntry]: """