Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not ingest Jamendo records with downloads disabled #3618

Merged
merged 11 commits into from
Jan 18, 2024
2 changes: 2 additions & 0 deletions catalog/dags/database/delete_records/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
SELECT {source_cols}
FROM {source_table}
{select_query}
ON CONFLICT {unique_cols}
DO NOTHING
"""
DELETE_RECORDS_QUERY = """
DELETE FROM {table}
Expand Down
11 changes: 10 additions & 1 deletion catalog/dags/database/delete_records/delete_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from common import slack
from common.constants import POSTGRES_CONN_ID
from common.sql import RETURN_ROW_COUNT, PostgresHook
from common.storage.columns import DELETED_ON, Column
from common.storage.columns import DELETED_ON, Column, PROVIDER, FOREIGN_ID
from common.storage.db_columns import (
setup_db_columns_for_media_type,
setup_deleted_db_columns_for_media_type,
Expand Down Expand Up @@ -60,12 +60,20 @@ def create_deleted_records(

# To build the source columns, we first list all columns in the main media table
source_cols = ", ".join([col.db_name for col in db_columns])

# Then add the deleted-media specific columns.
# `deleted_on` is set to its insert value to get the current timestamp:
source_cols += f", {DELETED_ON.get_insert_value()}"
# `deleted_reason` is set to the given string
source_cols += f", '{deleted_reason}'"

# The provider, foreign_id pair uniquely identifies a record. When trying to
# add a record to the deleted_media table, if the record's (provider, foreign_id)
# pair is already present in the table, no additional record will be added and the
# existing record in the deleted_media table will not be updated. This preserves the
# record exactly as it was when it was first deleted.
unique_cols = f"({PROVIDER.db_name}, md5({FOREIGN_ID.db_name}))"

return run_sql(
sql_template=constants.CREATE_RECORDS_QUERY,
postgres_conn_id=postgres_conn_id,
Expand All @@ -75,6 +83,7 @@ def create_deleted_records(
source_table=media_type,
source_cols=source_cols,
select_query=select_query,
unique_cols=unique_cols,
)


Expand Down
13 changes: 13 additions & 0 deletions catalog/dags/database/delete_records/delete_records_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,19 @@
}
```

## Multiple deletions

When a record is deleted, it is added to the corresponding Deleted Media table. If the
record is reingested back into the media table, the delete_records DAG may be run
additional times to delete the same record. When this occurs, only one row will be kept
in the Deleted Media table for the record (as uniquely identified by the provider and
foreign identifier pair). This row is not updated, so the `deleted_on` time will reflect
the _first_ time the record was deleted.

When restoring records from the Deleted Media table, it is important to note that these
records have not been updated through reingestion, so fields such as popularity data may
be out of date.

## Warnings

Presently, there is no logic to prevent records that have an entry in a Deleted Media
Expand Down
46 changes: 46 additions & 0 deletions catalog/dags/providers/provider_api_scripts/jamendo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@
channels: 1/2
"""
import logging
from datetime import timedelta
from urllib.parse import parse_qs, urlencode, urlsplit

from airflow.decorators import task_group
from airflow.models import Variable

import common
from common import constants
from common.licenses import get_license_info
from common.loader import provider_details as prov
from common.urls import rewrite_redirected_url
from database.delete_records.delete_records import (
create_deleted_records,
delete_records_from_media_table,
)
from providers.provider_api_scripts.provider_data_ingester import ProviderDataIngester


Expand Down Expand Up @@ -238,6 +244,46 @@ def get_record_data(self, data):
"set_thumbnail": set_thumbnail,
}

@staticmethod
def create_postingestion_tasks():
"""
Create postingestion tasks to delete records that have downloads
disabled from the audio table and preserve them in the
deleted_audio table.

If we instead simply discarded these records during ingestion,
existing records which have had their downloads disabled since their
last ingestion would remain in the catalog. This approach ensures
all records with downloads disabled are removed.
"""
Comment on lines +248 to +258
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to make use of this function again. Clever setup! Also because the reason makes perfect sense 💯


select_query = (
f"WHERE provider='{prov.JAMENDO_DEFAULT_PROVIDER}' "
"AND meta_data->>'audiodownload_allowed' = 'False'"
)

@task_group(group_id="delete_records_with_downloads_disabled")
def delete_download_disabled_records():
# Select all records with downloads disabled and copy them into
# the deleted_audio table
insert_into_deleted_media_table = create_deleted_records.override(
task_id="update_deleted_media_table",
execution_timeout=timedelta(hours=1),
)(
select_query=select_query,
deleted_reason="download_disabled",
media_type=constants.AUDIO,
)

# If successful, delete the records from the audio table
delete_records = delete_records_from_media_table.override(
execution_timeout=timedelta(hours=1)
)(table=constants.AUDIO, select_query=select_query)

insert_into_deleted_media_table >> delete_records

return delete_download_disabled_records()


def main():
logger.info("Begin: Jamendo data ingestion.")
Expand Down
2 changes: 1 addition & 1 deletion catalog/dags/providers/provider_dag_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def append_day_shift(id_str):

if conf.create_postingestion_tasks:
postingestion_tasks = conf.create_postingestion_tasks()
pull_data >> postingestion_tasks
load_tasks >> postingestion_tasks

ingestion_metrics = {
"duration": XCOM_PULL_TEMPLATE.format(pull_data.task_id, "duration"),
Expand Down
1 change: 1 addition & 0 deletions catalog/dags/providers/provider_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def __post_init__(self):
),
ProviderWorkflow(
ingester_class=JamendoDataIngester,
create_postingestion_tasks=JamendoDataIngester.create_postingestion_tasks,
),
ProviderWorkflow(
ingester_class=JusttakeitfreeDataIngester,
Expand Down
120 changes: 100 additions & 20 deletions catalog/tests/dags/database/test_delete_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def postgres_with_image_and_deleted_image_table(image_table, deleted_image_table
cur.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp" WITH SCHEMA public;')
cur.execute(sql.CREATE_IMAGE_TABLE_QUERY.format(image_table))
cur.execute(sql.CREATE_DELETED_IMAGE_TABLE_QUERY.format(deleted_image_table))
cur.execute(
sql.DELETED_IMAGE_TABLE_UNIQUE_CONDITION_QUERY.format(table=deleted_image_table)
)

conn.commit()

Expand All @@ -65,35 +68,36 @@ def postgres_with_image_and_deleted_image_table(image_table, deleted_image_table
conn.close()


def _load_sample_data_into_image_table(image_table, postgres):
def _load_sample_data_into_image_table(image_table, postgres, sample_records=None):
DEFAULT_COLS = {
col.LICENSE.db_name: LICENSE,
col.UPDATED_ON.db_name: "NOW()",
col.CREATED_ON.db_name: "NOW()",
col.TITLE.db_name: TITLE,
}

# Load sample data into the image table
sample_records = [
{
col.FOREIGN_ID.db_name: FID_A,
col.DIRECT_URL.db_name: f"https://images.com/{FID_A}/img.jpg",
col.PROVIDER.db_name: MATCHING_PROVIDER,
},
{
col.FOREIGN_ID.db_name: FID_B,
col.DIRECT_URL.db_name: f"https://images.com/{FID_B}/img.jpg",
col.PROVIDER.db_name: MATCHING_PROVIDER,
},
{
col.FOREIGN_ID.db_name: FID_C,
col.DIRECT_URL.db_name: f"https://images.com/{FID_C}/img.jpg",
col.PROVIDER.db_name: NOT_MATCHING_PROVIDER,
},
]
if not sample_records:
sample_records = [
{
col.FOREIGN_ID.db_name: FID_A,
col.DIRECT_URL.db_name: f"https://images.com/{FID_A}/img.jpg",
col.PROVIDER.db_name: MATCHING_PROVIDER,
},
{
col.FOREIGN_ID.db_name: FID_B,
col.DIRECT_URL.db_name: f"https://images.com/{FID_B}/img.jpg",
col.PROVIDER.db_name: MATCHING_PROVIDER,
},
{
col.FOREIGN_ID.db_name: FID_C,
col.DIRECT_URL.db_name: f"https://images.com/{FID_C}/img.jpg",
col.PROVIDER.db_name: NOT_MATCHING_PROVIDER,
},
]

# Load sample data into the image table
sql.load_sample_data_into_image_table(
image_table, postgres, [{**record, **DEFAULT_COLS} for record in sample_records]
image_table, postgres, [{**DEFAULT_COLS, **record} for record in sample_records]
)


Expand Down Expand Up @@ -144,6 +148,82 @@ def test_create_deleted_records(
assert actual_rows[1][sql.deleted_reason_idx] == deleted_reason


def test_create_deleted_records_does_not_add_duplicates(
postgres_with_image_and_deleted_image_table,
image_table,
deleted_image_table,
identifier,
):
sample_record = {
col.FOREIGN_ID.db_name: FID_A,
col.DIRECT_URL.db_name: f"https://images.com/{FID_A}/img.jpg",
col.PROVIDER.db_name: MATCHING_PROVIDER,
col.TITLE.db_name: "Original title",
}
# Load sample data into the image table
_load_sample_data_into_image_table(
image_table,
postgres_with_image_and_deleted_image_table,
sample_records=[
sample_record,
],
)

# Delete the record
select_query = f"WHERE provider='{MATCHING_PROVIDER}'"
original_deleted_reason = "FOO"
deleted_count = create_deleted_records.function(
media_type=image_table,
select_query=select_query,
deleted_reason=original_deleted_reason,
db_columns=IMAGE_TABLE_COLUMNS,
deleted_db_columns=DELETED_IMAGE_TABLE_COLUMNS,
postgres_conn_id=sql.POSTGRES_CONN_ID,
)

assert deleted_count == 1

# Load a record with the same (provider, foreign_id), but a new title into
# the image table
_load_sample_data_into_image_table(
image_table,
postgres_with_image_and_deleted_image_table,
sample_records=[
# Sample record, modified to have a new title
{**sample_record, col.TITLE.db_name: "New title"},
],
)

# Delete the record again
select_query = f"WHERE provider='{MATCHING_PROVIDER}'"
new_deleted_reason = "BAR"
deleted_count = create_deleted_records.function(
media_type=image_table,
select_query=select_query,
deleted_reason=new_deleted_reason,
db_columns=IMAGE_TABLE_COLUMNS,
deleted_db_columns=DELETED_IMAGE_TABLE_COLUMNS,
postgres_conn_id=sql.POSTGRES_CONN_ID,
)

# 0 records were added to the deleted_image table
assert deleted_count == 0

# There should only be one record in the deleted image table; a duplicate
# (provider, foreign_id) should not have been added
postgres_with_image_and_deleted_image_table.cursor.execute(
f"SELECT * FROM {deleted_image_table};"
)
actual_rows = postgres_with_image_and_deleted_image_table.cursor.fetchall()
assert len(actual_rows) == 1

# The record should reflect the first deletion, so it should have the original
# deleted_reason and updates should not have been made to the record.
assert actual_rows[0][sql.fid_idx] == FID_A
assert actual_rows[0][sql.title_idx] == "Original title"
assert actual_rows[0][sql.deleted_reason_idx] == original_deleted_reason


def test_create_deleted_records_with_query_matching_no_rows(
postgres_with_image_and_deleted_image_table,
image_table,
Expand Down
5 changes: 5 additions & 0 deletions catalog/tests/test_utils/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
{DELETED_IMAGE_TABLE_COLUMN_DEFINITIONS}
);"""

DELETED_IMAGE_TABLE_UNIQUE_CONDITION_QUERY = (
"CREATE UNIQUE INDEX {table}_provider_fid_idx"
" ON public.{table}"
" USING btree (provider, md5(foreign_identifier));"
)

PostgresRef = namedtuple("PostgresRef", ["cursor", "connection"])
ti = mock.Mock(spec=TaskInstance)
Expand Down
6 changes: 6 additions & 0 deletions docker/upstream_db/0003_openledger_image_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,9 @@ CREATE TABLE public.deleted_image (
deleted_reason character varying(80)
);
ALTER TABLE public.deleted_image OWNER TO deploy;
CREATE UNIQUE INDEX deleted_image_provider_fid_idx
stacimc marked this conversation as resolved.
Show resolved Hide resolved
ON public.deleted_image
USING btree (provider, md5(foreign_identifier));
CREATE UNIQUE INDEX deleted_image_identifier_key
ON public.deleted_image
USING btree (identifier);
6 changes: 6 additions & 0 deletions docker/upstream_db/0006_openledger_audio_schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,9 @@ CREATE TABLE public.deleted_audio (
deleted_reason character varying(80)
);
ALTER TABLE public.deleted_audio OWNER TO deploy;
CREATE UNIQUE INDEX deleted_audio_provider_fid_idx
ON public.deleted_audio
USING btree (provider, md5(foreign_identifier));
CREATE UNIQUE INDEX deleted_audio_identifier_key
ON public.deleted_audio
USING btree (identifier);
14 changes: 14 additions & 0 deletions documentation/catalog/reference/DAGs.md
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,20 @@ provider due to deadlinks would look like this:
}
```

##### Multiple deletions

When a record is deleted, it is added to the corresponding Deleted Media table.
If the record is reingested back into the media table, the delete*records DAG
may be run additional times to delete the same record. When this occurs, only
one row will be kept in the Deleted Media table for the record (as uniquely
identified by the provider and foreign identifier pair). This row is not
updated, so the `deleted_on` time will reflect the \_first* time the record was
deleted.

When restoring records from the Deleted Media table, it is important to note
that these records have not been updated through reingestion, so fields such as
popularity data may be out of date.

##### Warnings

Presently, there is no logic to prevent records that have an entry in a Deleted
Expand Down