diff --git a/catalog/dags/providers/factory_utils.py b/catalog/dags/providers/factory_utils.py index eccc73ad891..88d69044ea6 100644 --- a/catalog/dags/providers/factory_utils.py +++ b/catalog/dags/providers/factory_utils.py @@ -29,7 +29,7 @@ def pull_media_wrapper( # Initialize the ProviderDataIngester class, which will initialize the # media stores and DelayedRequester. logger.info(f"Initializing ProviderIngester {ingester_class.__name__}") - ingester = ingester_class(dag_run.conf, dag_run.dag_id, *args) + ingester = ingester_class(ti, dag_run.conf, dag_run.dag_id, *args) stores: dict[MediaType, MediaStore] = ingester.media_stores # Check that the ProviderDataIngester class has a store configuration for each diff --git a/catalog/dags/providers/provider_api_scripts/jamendo.py b/catalog/dags/providers/provider_api_scripts/jamendo.py index 9524205b3a3..9fbb08d51e0 100644 --- a/catalog/dags/providers/provider_api_scripts/jamendo.py +++ b/catalog/dags/providers/provider_api_scripts/jamendo.py @@ -16,12 +16,13 @@ channels: 1/2 """ import logging +from datetime import datetime, timezone from urllib.parse import parse_qs, urlencode, urlsplit from airflow.models import Variable import common -from common import constants +from common import constants, slack from common.licenses import get_license_info from common.loader import provider_details as prov from common.urls import rewrite_redirected_url @@ -38,19 +39,55 @@ class JamendoDataIngester(ProviderDataIngester): batch_limit = 200 headers = {"Accept": "application/json"} - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - # Keep track of the number of records which are not ingested due + def __init__(self, ti=None, *args, **kwargs): + super().__init__(ti, *args, **kwargs) + + # We will not ingest new Jamendo records that have downloads disabled. + # However, if a record that was previously ingested has been updated + # to have downloads disabled, we do want to ingest it in order to + # process the updates. We use the date of the last successful DagRun + # as the cutoff date for determining whether a record is a new + # addition to the catalog, or if it is an update to a record that was + # ingested previously. + if ti: + self.cutoff_date = ti.get_template_context().get( + "prev_data_interval_end_success" + ) + logger.info(ti.get_template_context().get("prev_start_date_success")) + else: + self.cutoff_date = datetime.utcnow() + logger.info(f"Using cutoff date: {self.cutoff_date}.") + + # Keep track of the number of new records which were not ingested due # to downloads being disabled - self.download_disabled_count = 0 + self.download_disabled_discarded_count = 0 + + # Keep track of the number of existing records which _were_ ingested + # that now have downloads disabled. If this count is greater than 0, + # we will report to Slack after ingestion. + self.download_disabled_updated_count = 0 def ingest_records(self, **kwargs): super().ingest_records(**kwargs) + # Report the number of new records which were not ingested due to having + # downloads disabled. logger.info( f"Discarded {self.download_disabled_count} records with" " `audiodownload_allowed` = False." ) + # If some pre-existing records had their downloads disabled, we will need + # to manually run the `delete_records` DAG to remove them. + if self.download_disabled_updated_count > 0: + message = ( + f"{self.download_disabled_updated_count} Jamendo records have" + " been updated to disallow audio downloads. Please run the" + "`delete_records` DAG to remove them. See" + " https://github.com/WordPress/openverse/pull/3618 for" + " additional context." + ) + slack.send_alert(message, dag_id=self.dag_id) + def get_media_type(self, record): return constants.AUDIO @@ -168,6 +205,7 @@ def _get_metadata(data): "downloads": stats.get("rate_download_total", 0), "listens": stats.get("rate_listened_total", 0), "playlists": stats.get("rate_playlisted_total", 0), + "audiodownload_allowed": data.get("audiodownload_allowed", True), } return {k: v for k, v in metadata.items() if v is not None} @@ -204,8 +242,21 @@ def get_record_data(self, data): return None if data.get("audiodownload_allowed") is False: - self.download_disabled_count += 1 - return None + release_date = datetime.strptime( + data.get("releasedate"), "%Y-%m-%d" + ).replace(tzinfo=timezone.utc) + + if release_date > self.cutoff_date: + # Do not ingest any new records with downloads disabled. + self.download_disabled_discarded_count += 1 + logger.info("Discarding record") + return None + else: + # Previously ingested records whose downloads have been + # disabled need to be updated, so they can be removed + # from the catalog. + logger.info("Updating existing record with download disabled.") + self.download_disabled_updated_count += 1 if duration := data.get("duration"): duration = int(duration) * 1000 diff --git a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py index 8c248d1392a..918256a4ff5 100644 --- a/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py +++ b/catalog/dags/providers/provider_api_scripts/provider_data_ingester.py @@ -6,7 +6,7 @@ from typing import TypedDict from airflow.exceptions import AirflowException -from airflow.models import Variable +from airflow.models import TaskInstance, Variable from common.requester import DelayedRequester from common.storage.media import MediaStore @@ -115,10 +115,12 @@ def endpoint(self): def __init__( self, + ti: TaskInstance = None, conf: dict = None, dag_id: str = None, date: str = None, day_shift: int = None, + **kwargs, ): """ Initialize the provider configuration. diff --git a/catalog/tests/dags/providers/provider_api_scripts/test_jamendo.py b/catalog/tests/dags/providers/provider_api_scripts/test_jamendo.py index 6d1511988bd..6164c319d8e 100644 --- a/catalog/tests/dags/providers/provider_api_scripts/test_jamendo.py +++ b/catalog/tests/dags/providers/provider_api_scripts/test_jamendo.py @@ -1,4 +1,5 @@ from unittest.mock import patch +from datetime import datetime, timezone import pytest @@ -89,6 +90,7 @@ def test_get_record_data(): "listens": 5616, "playlists": 0, "release_date": "2005-04-12", + "audiodownload_allowed": True, }, "raw_tags": ["instrumental", "speed_medium"], "audio_set_foreign_identifier": "119", @@ -210,21 +212,36 @@ def test_remove_track_id_handles_data(thumbnail_url, expected): @pytest.mark.parametrize( - "audiodownload_allowed, should_ingest", + "audiodownload_allowed, release_date, should_ingest", [ - # Happy path, download is allowed - (True, True), + # Always ingest if audiodownload_allowed is True + (True, "2024-01-01", True), + (True, "2020-09-04", True), # Only prevent ingestion if audiodownload_allowed is explicitly False. - (None, True), - # Download disabled; prevent ingestion - (False, False), + # If None, continue to ingest. + (None, "2024-01-01", True), + (None, "2020-09-04", True), + # Do not prevent ingestion if the record's release_date + # is earlier than the end interval of the last successful DagRun + # (meaning this record has been previously ingested), even if + # downloads are disabled + (False, "2020-09-04", True), + # Prevent ingestion when downloads are disabled and the release date + # is later than the end interval of the last successful run + (False, "2024-01-01", False), ], ) def test_get_record_data_discards_records_with_downloads_disabled( - audiodownload_allowed, should_ingest + audiodownload_allowed, release_date, should_ingest ): + # Mock date between the test parameters + jamendo.cutoff_date = datetime.strptime("2023-12-31", "%Y-%m-%d").replace( + tzinfo=timezone.utc + ) + item_data = _get_resource_json("audio_data_example.json") item_data["audiodownload_allowed"] = audiodownload_allowed + item_data["releasedate"] = release_date record_data = jamendo.get_record_data(item_data) diff --git a/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py b/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py index 1afb08bf96d..5626296a529 100644 --- a/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py +++ b/catalog/tests/dags/providers/provider_api_scripts/test_provider_data_ingester.py @@ -349,7 +349,7 @@ def test_ingest_records_commits_on_exception(): def test_ingest_records_uses_initial_query_params_from_dagrun_conf(): # Initialize the ingester with a conf ingester = MockProviderDataIngester( - {"initial_query_params": {"has_image": 1, "page": 5}} + conf={"initial_query_params": {"has_image": 1, "page": 5}} ) # Mock get_batch to halt ingestion after a single batch @@ -365,7 +365,7 @@ def test_ingest_records_uses_initial_query_params_from_dagrun_conf(): def test_ingest_records_uses_query_params_list_from_dagrun_conf(): # Initialize the ingester with a conf ingester = MockProviderDataIngester( - { + conf={ "query_params_list": [ {"has_image": 1, "page": 5}, {"has_image": 1, "page": 12}, diff --git a/catalog/tests/dags/providers/test_factory_utils.py b/catalog/tests/dags/providers/test_factory_utils.py index b7deb40f9a2..86513175bda 100644 --- a/catalog/tests/dags/providers/test_factory_utils.py +++ b/catalog/tests/dags/providers/test_factory_utils.py @@ -34,7 +34,7 @@ def internal_func_mock(): fdi = FakeDataIngester() -def _set_up_ingester(mock_conf, mock_dag_id, mock_func, value): +def _set_up_ingester(mock_ti, mock_conf, mock_dag_id, mock_func, value): """ Set up ingest records as a proxy for calling the mock function, then return the instance. This is necessary because the args are only handed in during