Skip to content

Commit

Permalink
Do ingest updates for existing records that have been set to download…
Browse files Browse the repository at this point in the history
…s disabled
  • Loading branch information
stacimc committed Jan 3, 2024
1 parent 7b8e9eb commit bc41375
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 19 deletions.
2 changes: 1 addition & 1 deletion catalog/dags/providers/factory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def pull_media_wrapper(
# Initialize the ProviderDataIngester class, which will initialize the
# media stores and DelayedRequester.
logger.info(f"Initializing ProviderIngester {ingester_class.__name__}")
ingester = ingester_class(dag_run.conf, dag_run.dag_id, *args)
ingester = ingester_class(ti, dag_run.conf, dag_run.dag_id, *args)
stores: dict[MediaType, MediaStore] = ingester.media_stores

# Check that the ProviderDataIngester class has a store configuration for each
Expand Down
65 changes: 58 additions & 7 deletions catalog/dags/providers/provider_api_scripts/jamendo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
channels: 1/2
"""
import logging
from datetime import datetime, timezone
from urllib.parse import parse_qs, urlencode, urlsplit

from airflow.models import Variable

import common
from common import constants
from common import constants, slack
from common.licenses import get_license_info
from common.loader import provider_details as prov
from common.urls import rewrite_redirected_url
Expand All @@ -38,19 +39,55 @@ class JamendoDataIngester(ProviderDataIngester):
batch_limit = 200
headers = {"Accept": "application/json"}

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Keep track of the number of records which are not ingested due
def __init__(self, ti=None, *args, **kwargs):
super().__init__(ti, *args, **kwargs)

# We will not ingest new Jamendo records that have downloads disabled.
# However, if a record that was previously ingested has been updated
# to have downloads disabled, we do want to ingest it in order to
# process the updates. We use the date of the last successful DagRun
# as the cutoff date for determining whether a record is a new
# addition to the catalog, or if it is an update to a record that was
# ingested previously.
if ti:
self.cutoff_date = ti.get_template_context().get(
"prev_data_interval_end_success"
)
logger.info(ti.get_template_context().get("prev_start_date_success"))
else:
self.cutoff_date = datetime.utcnow()
logger.info(f"Using cutoff date: {self.cutoff_date}.")

# Keep track of the number of new records which were not ingested due
# to downloads being disabled
self.download_disabled_count = 0
self.download_disabled_discarded_count = 0

# Keep track of the number of existing records which _were_ ingested
# that now have downloads disabled. If this count is greater than 0,
# we will report to Slack after ingestion.
self.download_disabled_updated_count = 0

def ingest_records(self, **kwargs):
super().ingest_records(**kwargs)
# Report the number of new records which were not ingested due to having
# downloads disabled.
logger.info(
f"Discarded {self.download_disabled_count} records with"
" `audiodownload_allowed` = False."
)

# If some pre-existing records had their downloads disabled, we will need
# to manually run the `delete_records` DAG to remove them.
if self.download_disabled_updated_count > 0:
message = (
f"{self.download_disabled_updated_count} Jamendo records have"
" been updated to disallow audio downloads. Please run the"
"`delete_records` DAG to remove them. See"
" https://github.com/WordPress/openverse/pull/3618 for"
" additional context."
)
slack.send_alert(message, dag_id=self.dag_id)

def get_media_type(self, record):
return constants.AUDIO

Expand Down Expand Up @@ -168,6 +205,7 @@ def _get_metadata(data):
"downloads": stats.get("rate_download_total", 0),
"listens": stats.get("rate_listened_total", 0),
"playlists": stats.get("rate_playlisted_total", 0),
"audiodownload_allowed": data.get("audiodownload_allowed", True),
}
return {k: v for k, v in metadata.items() if v is not None}

Expand Down Expand Up @@ -204,8 +242,21 @@ def get_record_data(self, data):
return None

if data.get("audiodownload_allowed") is False:
self.download_disabled_count += 1
return None
release_date = datetime.strptime(
data.get("releasedate"), "%Y-%m-%d"
).replace(tzinfo=timezone.utc)

if release_date > self.cutoff_date:
# Do not ingest any new records with downloads disabled.
self.download_disabled_discarded_count += 1
logger.info("Discarding record")
return None
else:
# Previously ingested records whose downloads have been
# disabled need to be updated, so they can be removed
# from the catalog.
logger.info("Updating existing record with download disabled.")
self.download_disabled_updated_count += 1

if duration := data.get("duration"):
duration = int(duration) * 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import TypedDict

from airflow.exceptions import AirflowException
from airflow.models import Variable
from airflow.models import TaskInstance, Variable

from common.requester import DelayedRequester
from common.storage.media import MediaStore
Expand Down Expand Up @@ -115,10 +115,12 @@ def endpoint(self):

def __init__(
self,
ti: TaskInstance = None,
conf: dict = None,
dag_id: str = None,
date: str = None,
day_shift: int = None,
**kwargs,
):
"""
Initialize the provider configuration.
Expand Down
31 changes: 24 additions & 7 deletions catalog/tests/dags/providers/provider_api_scripts/test_jamendo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from unittest.mock import patch
from datetime import datetime, timezone

import pytest

Expand Down Expand Up @@ -89,6 +90,7 @@ def test_get_record_data():
"listens": 5616,
"playlists": 0,
"release_date": "2005-04-12",
"audiodownload_allowed": True,
},
"raw_tags": ["instrumental", "speed_medium"],
"audio_set_foreign_identifier": "119",
Expand Down Expand Up @@ -210,21 +212,36 @@ def test_remove_track_id_handles_data(thumbnail_url, expected):


@pytest.mark.parametrize(
"audiodownload_allowed, should_ingest",
"audiodownload_allowed, release_date, should_ingest",
[
# Happy path, download is allowed
(True, True),
# Always ingest if audiodownload_allowed is True
(True, "2024-01-01", True),
(True, "2020-09-04", True),
# Only prevent ingestion if audiodownload_allowed is explicitly False.
(None, True),
# Download disabled; prevent ingestion
(False, False),
# If None, continue to ingest.
(None, "2024-01-01", True),
(None, "2020-09-04", True),
# Do not prevent ingestion if the record's release_date
# is earlier than the end interval of the last successful DagRun
# (meaning this record has been previously ingested), even if
# downloads are disabled
(False, "2020-09-04", True),
# Prevent ingestion when downloads are disabled and the release date
# is later than the end interval of the last successful run
(False, "2024-01-01", False),
],
)
def test_get_record_data_discards_records_with_downloads_disabled(
audiodownload_allowed, should_ingest
audiodownload_allowed, release_date, should_ingest
):
# Mock date between the test parameters
jamendo.cutoff_date = datetime.strptime("2023-12-31", "%Y-%m-%d").replace(
tzinfo=timezone.utc
)

item_data = _get_resource_json("audio_data_example.json")
item_data["audiodownload_allowed"] = audiodownload_allowed
item_data["releasedate"] = release_date

record_data = jamendo.get_record_data(item_data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ def test_ingest_records_commits_on_exception():
def test_ingest_records_uses_initial_query_params_from_dagrun_conf():
# Initialize the ingester with a conf
ingester = MockProviderDataIngester(
{"initial_query_params": {"has_image": 1, "page": 5}}
conf={"initial_query_params": {"has_image": 1, "page": 5}}
)

# Mock get_batch to halt ingestion after a single batch
Expand All @@ -365,7 +365,7 @@ def test_ingest_records_uses_initial_query_params_from_dagrun_conf():
def test_ingest_records_uses_query_params_list_from_dagrun_conf():
# Initialize the ingester with a conf
ingester = MockProviderDataIngester(
{
conf={
"query_params_list": [
{"has_image": 1, "page": 5},
{"has_image": 1, "page": 12},
Expand Down
2 changes: 1 addition & 1 deletion catalog/tests/dags/providers/test_factory_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def internal_func_mock():
fdi = FakeDataIngester()


def _set_up_ingester(mock_conf, mock_dag_id, mock_func, value):
def _set_up_ingester(mock_ti, mock_conf, mock_dag_id, mock_func, value):
"""
Set up ingest records as a proxy for calling the mock function, then return
the instance. This is necessary because the args are only handed in during
Expand Down

0 comments on commit bc41375

Please sign in to comment.