Skip to content

Commit

Permalink
Merge branch '4-draft-data-validation-as-dag' into 'main'
Browse files Browse the repository at this point in the history
Draft data validation as DAG

Closes noi-techpark#4

See merge request u-hopper/projects/industrial/open-data-hub-bz/bdp-elaborations!13
  • Loading branch information
Marco Angheben committed Feb 21, 2024
2 parents b9258c7 + 77302b5 commit e18a127
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 134 deletions.
2 changes: 1 addition & 1 deletion pollution_v2/src/common/data_model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SPDX-FileCopyrightText: NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later
from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType
from .common import VehicleClass, MeasureCollection, Measure, Provenance, DataType, StationLatestMeasure, Station
from .traffic import TrafficSensorStation, TrafficMeasure, TrafficMeasureCollection, TrafficEntry
from .pollution import PollutionEntry, PollutantClass, PollutionMeasure, PollutionMeasureCollection
2 changes: 1 addition & 1 deletion pollution_v2/src/common/data_model/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def from_json(cls, dict_data) -> Station:


@dataclass
class StationLatestMeasure():
class StationLatestMeasure:

def __init__(self, station_code, latest_time):
self.station_code = station_code
Expand Down
Empty file.
71 changes: 71 additions & 0 deletions pollution_v2/src/common/manager/traffic_station.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# SPDX-FileCopyrightText: NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

from __future__ import absolute_import, annotations

import itertools
import logging
from datetime import datetime
from typing import List

from common.connector.collector import ConnectorCollector
from common.data_model import TrafficMeasureCollection, TrafficSensorStation, StationLatestMeasure
from common.settings import ODH_MINIMUM_STARTING_DATE

logger = logging.getLogger("common.manager.traffic_station")


class TrafficStationManager:

def __init__(self, connector_collector: ConnectorCollector):
self._connector_collector = connector_collector
self._traffic_stations: List[TrafficSensorStation] = []

def get_traffic_stations_from_cache(self) -> List[TrafficSensorStation]:
if len(self._traffic_stations) == 0:
logger.info("Retrieving station list from ODH")
self._traffic_stations = self._get_station_list()
return self._traffic_stations

def get_all_latest_measures(self) -> List[StationLatestMeasure]:
"""
Returns a list of stations with its latest measure date.
:return: List of stations with its latest measure date.
"""
all_measures = self._connector_collector.traffic.get_latest_measures()

grouped = {}
for station_code, values in itertools.groupby(all_measures, lambda m: m.station.code):
tmp = list(values)
if len(tmp) > 0:
grouped[station_code] = tmp

res = []
for key, value in grouped.items():
res.append(StationLatestMeasure(key, max(list(map(lambda m: m.valid_time, value)),
default=ODH_MINIMUM_STARTING_DATE)))

return res

def _get_station_list(self) -> List[TrafficSensorStation]:
"""
Retrieve the list of all the available stations.
"""
return self._connector_collector.traffic.get_station_list()

def _download_traffic_data(self,
from_date: datetime,
to_date: datetime,
traffic_station: TrafficSensorStation
) -> TrafficMeasureCollection:
"""
Download traffic data measures in the given interval.
:param from_date: Traffic measures before this date are discarded if there isn't any latest pollution measure available.
:param to_date: Traffic measure after this date are discarded.
:return: The resulting TrafficMeasureCollection containing the traffic data.
"""

return TrafficMeasureCollection(measures=self._connector_collector.traffic.get_measures(from_date=from_date, to_date=to_date, station=traffic_station))
1 change: 1 addition & 0 deletions pollution_v2/src/common/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
ODH_COMPUTATION_BATCH_SIZE = int(Variable.get("ODH_COMPUTATION_BATCH_SIZE", 30))
ODH_MINIMUM_STARTING_DATE = dateutil.parser.parse(Variable.get("ODH_MINIMUM_STARTING_DATE", "2018-01-01"))
DAG_POLLUTION_EXECUTION_CRONTAB = Variable.get("DAG_POLLUTION_EXECUTION_CRONTAB", "0 0 * * *")
DAG_VALIDATION_EXECUTION_CRONTAB = Variable.get("DAG_VALIDATION_EXECUTION_CRONTAB", "0 0 * * *")
DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN = int(Variable.get("DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN", 24))

# General
Expand Down
89 changes: 14 additions & 75 deletions pollution_v2/src/dags/aiaas_pollution_computer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,26 @@
#
# SPDX-License-Identifier: AGPL-3.0-or-later

import json
import logging
import time
from datetime import timedelta, datetime, timezone
from typing import Optional

from airflow import DAG
from datetime import timedelta, datetime

from airflow.decorators import task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from redis.client import Redis

from dags.common import TrafficStationsDAG
from pollution_connector.cache.computation_checkpoint import ComputationCheckpointCache
from common.connector.collector import ConnectorCollector
from common.data_model.common import Provenance
from common.data_model.common import Provenance, StationLatestMeasure
from common.data_model import TrafficSensorStation
from common.settings import ODH_MINIMUM_STARTING_DATE, DEFAULT_TIMEZONE, \
from common.settings import ODH_MINIMUM_STARTING_DATE, \
COMPUTATION_CHECKPOINT_REDIS_DB, COMPUTATION_CHECKPOINT_REDIS_PORT, COMPUTATION_CHECKPOINT_REDIS_HOST, \
PROVENANCE_ID, PROVENANCE_LINEAGE, PROVENANCE_NAME, PROVENANCE_VERSION, DAG_POLLUTION_EXECUTION_CRONTAB, \
DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN
DEFAULT_TIMEZONE, DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN
from pollution_connector.tasks.pollution_computation import PollutionComputationManager

# see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html

logger = logging.getLogger("pollution_connector.tasks.pollution_computation")
logger = logging.getLogger("dags.aiaas_pollution_computer")

default_args = {
'owner': 'airflow',
Expand All @@ -41,7 +36,7 @@

THIS_DAG_ID = "pollution_computer"

with DAG(
with TrafficStationsDAG(
THIS_DAG_ID,

# execution interval if no backfill step length on date increment if backfill (interval determined by first slot
Expand Down Expand Up @@ -86,33 +81,6 @@ def _init_manager() -> PollutionComputationManager:
return manager


def _init_date_range(min_from_date: Optional[datetime], max_to_date: Optional[datetime]):
"""
As starting date for the batch is used the latest pollution measure available on the ODH, if no pollution
measures are available min_from_date is used.
:param min_from_date: Optional, if set traffic measures before this date are discarded if no pollution measures
are available. If not specified, the default will be taken from the environmental variable `ODH_MINIMUM_STARTING_DATE`.
:param max_to_date: Optional, if set the traffic measure after this date are discarded.
If not specified, the default will be the current datetime.
:return: adjusted dates
"""

if min_from_date is None:
min_from_date = ODH_MINIMUM_STARTING_DATE

if min_from_date.tzinfo is None:
min_from_date = DEFAULT_TIMEZONE.localize(min_from_date)

if max_to_date is None:
max_to_date = datetime.now(tz=DEFAULT_TIMEZONE)

if max_to_date.tzinfo is None:
max_to_date = DEFAULT_TIMEZONE.localize(max_to_date)

return min_from_date, max_to_date


@task
def get_stations_list(**kwargs) -> list[dict]:
"""
Expand All @@ -123,19 +91,7 @@ def get_stations_list(**kwargs) -> list[dict]:

manager = _init_manager()

traffic_stations = manager.get_traffic_stations_from_cache()

stations_from_prev_dag = kwargs["dag_run"].conf.get('stations_to_process')
if stations_from_prev_dag:
logger.info(f"{len(stations_from_prev_dag)} stations with unprocessed data from previous run, using them")
traffic_stations = list(filter(lambda station: station.code in stations_from_prev_dag, traffic_stations))

logger.info(f"found {len(traffic_stations)} traffic stations")

# Serialization and deserialization is dependent on speed.
# Use built-in functions like dict as much as you can and stay away
# from using classes and other complex structures.
station_dicts = [station.to_json() for station in traffic_stations]
station_dicts = dag.get_stations_list(manager, **kwargs)

return station_dicts

Expand All @@ -153,7 +109,7 @@ def process_station(station_dict: dict, **kwargs):

manager = _init_manager()

min_from_date, max_to_date = _init_date_range(None, None)
min_from_date, max_to_date = dag.init_date_range(None, None)

computation_start_dt = datetime.now()

Expand All @@ -173,31 +129,14 @@ def whats_next(already_processed_stations, **kwargs):
"""
manager = _init_manager()

now = datetime.now(tz=DEFAULT_TIMEZONE)
all_latest = manager.get_all_latest_measures()
stations = []
for item in all_latest:
station_code = item.station_code
date = item.latest_time
def has_remaining_data(measure: StationLatestMeasure) -> bool:
now = datetime.now(tz=DEFAULT_TIMEZONE)
date = measure.latest_time
if date.tzinfo is None:
date = DEFAULT_TIMEZONE.localize(date)
return (now - date).total_seconds() / 3600 > DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN

if (now - date).total_seconds() / 3600 > DAG_POLLUTION_TRIGGER_DAG_HOURS_SPAN:
stations.append(station_code)

# True if on ODH there are lots of data to be processed (e.g. new station with old unprocessed data)
run_again = len(stations) > 0

logger.info(f"{'' if run_again else 'NOT '}starting another self-triggered run as {len(stations)} "
f"have still unprocessed data")

if run_again:
TriggerDagRunOperator(
task_id="run_again_the_dag",
trigger_dag_id=THIS_DAG_ID,
dag=dag,
conf={'stations_to_process': stations}
).execute(kwargs)
dag.trigger_next_dag_run(manager, dag, has_remaining_data, **kwargs)


processed_stations = process_station.expand(station_dict=get_stations_list())
Expand Down
122 changes: 122 additions & 0 deletions pollution_v2/src/dags/aiaas_validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# SPDX-FileCopyrightText: NOI Techpark <digital@noi.bz.it>
#
# SPDX-License-Identifier: AGPL-3.0-or-later

import logging
from datetime import timedelta, datetime

from airflow.decorators import task

from common.manager.traffic_station import TrafficStationManager
from dags.common import TrafficStationsDAG
from common.connector.collector import ConnectorCollector
from common.data_model import TrafficSensorStation, StationLatestMeasure
from common.settings import ODH_MINIMUM_STARTING_DATE, DAG_VALIDATION_EXECUTION_CRONTAB

# see https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html

logger = logging.getLogger("dags.aiaas_validator")

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': ODH_MINIMUM_STARTING_DATE,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

THIS_DAG_ID = "validator"

with TrafficStationsDAG(
THIS_DAG_ID,

# execution interval if no backfill step length on date increment if backfill (interval determined by first slot
# available in queue)
# schedule_interval is deprecated
schedule=DAG_VALIDATION_EXECUTION_CRONTAB,

# execution date starting at (if needed, backfill)
start_date=ODH_MINIMUM_STARTING_DATE,

# if True, the scheduler creates a DAG Run for each completed interval between start_date and end_date
# and the scheduler will execute them sequentially
# no need to backfill with catch-up, we can rely on programmatically process-the-oldest-data-on-ODH
catchup=False,

tags=["aiaas", "validator"],

# 1 as the maximum number of active DAG runs per DAG:
# dag execution mode should be sequential to avoid periods overlapping and
# to avoid quick and recent runs blocking slow and older ones (as ODH does not accept "older" data writing)
max_active_runs=1,

default_args=default_args
) as dag:

def _init_manager() -> TrafficStationManager:

connector_collector = ConnectorCollector.build_from_env()
manager = TrafficStationManager(connector_collector)
return manager

@task
def get_stations_list(**kwargs) -> list[dict]:
"""
Returns the complete list of stations or the filtered list based on previous DAG run
:return: list of strings containing stations list
"""
manager = _init_manager()

station_dicts = dag.get_stations_list(manager, **kwargs)

return station_dicts


@task
def process_station(station_dict: dict, **kwargs):
"""
Process a single station
:param station_dict: the station to process
"""

station = TrafficSensorStation.from_json(station_dict)
logger.info(f"Received station {station}")

manager = _init_manager()

min_from_date, max_to_date = dag.init_date_range(None, None)

computation_start_dt = datetime.now()

logger.info(f"running validation from {min_from_date} to {max_to_date}")

# TODO: implement validation

computation_end_dt = datetime.now()
logger.info(f"Completed validation in [{(computation_end_dt - computation_start_dt).seconds}]")


@task
def whats_next(already_processed_stations, **kwargs):
"""
Checks if there are still data to be processed before ending DAG runs
:param already_processed_stations: the stations already processed (not used)
"""
manager = _init_manager()

def has_remaining_data(measure: StationLatestMeasure) -> bool:
# TODO: implement method to check if there are still data to be processed before ending DAG runs
raise NotImplementedError

dag.trigger_next_dag_run(manager, dag, has_remaining_data, **kwargs)


processed_stations = process_station.expand(station_dict=get_stations_list())

whats_next(processed_stations)
Loading

0 comments on commit e18a127

Please sign in to comment.