diff --git a/api/src/pcapi/scripts/clean_old_integration_data/main.py b/api/src/pcapi/scripts/clean_old_integration_data/main.py new file mode 100644 index 00000000000..b0f488b0b1c --- /dev/null +++ b/api/src/pcapi/scripts/clean_old_integration_data/main.py @@ -0,0 +1,152 @@ +import argparse +import datetime +import logging +import statistics +import time + +import psycopg2.errors +import pytz +import sqlalchemy as sa + +from pcapi.models import db + + +logger = logging.getLogger(__name__) + + +REPORT_EVERY = 100_000 +AUTOVACUUM_THRESHOLD = 0.22 + +_LEGACY_API_PROVIDERS_IDS = [ + 15, # TiteLive Stocks (Epagine / Place des libraires.com) + 59, # Praxiel/Inférence + 58, # FNAC + 23, # www.leslibraires.fr + 66, # Decitre + 63, # Librisoft + 68, # TMIC-Ellipses + 65, # Mollat + 67, # CDI-Bookshop +] + + +def _get_eta(end: int, current: int, elapsed_per_batch: list[int], batch_size: int) -> str: + left_to_do = end - current + seconds_eta = left_to_do / batch_size * statistics.mean(elapsed_per_batch) + eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_eta) + eta = eta.astimezone(pytz.timezone("Europe/Paris")) + str_eta = eta.strftime("%d/%m/%Y %H:%M:%S") + return str_eta + + +def _get_max_id() -> int: + result = db.session.execute(sa.text("SELECT MAX(id) FROM offer")).fetchone() + return result[0] if result and result[0] is not None else 0 + + +def _get_dead_tuple_ratio(max_id: int) -> float: + # Important to not use sqlalchemy cache + db.session.rollback() + result = db.session.execute( + sa.text( + """ + SELECT n_dead_tup::float / NULLIF(:max_id + n_dead_tup, 0) AS dead_tuple_ratio + FROM pg_stat_user_tables + WHERE relname = 'offer'; + """ + ), + {"max_id": max_id}, + ).fetchone() + + return result[0] if result else 0.0 + + +def _wait_for_autovacuum(max_id: int) -> None: + while True: + ratio = _get_dead_tuple_ratio(max_id) + + if ratio < AUTOVACUUM_THRESHOLD: + logger.info("Dead tuple ratio is back to %.2f%%, resuming...", ratio * 100) + break + logger.info("Dead tuple ratio is %.2f%%, sleeping for 60 seconds...", ratio * 100) + time.sleep(600) + + +def execute_query(i: int, batch_size: int) -> None: + nb_retry = 3 + while nb_retry > 0: + try: + db.session.execute( + sa.text( + """ + UPDATE offer + SET idAtProvider = NULL + WHERE id BETWEEN :start AND :end + AND "lastProviderId" = ANY (:provider_id_list) + """ + ), + params={"start": i, "end": i + batch_size, "provider_id_list": _LEGACY_API_PROVIDERS_IDS}, + ) + return + except psycopg2.errors.OperationalError as e: + logger.info("Erreur de type %s sur les lignes entre %s et %s", type(e).__name__, i, i + batch_size) + db.session.rollback() + nb_retry -= 1 + if nb_retry == 0: + raise + + +def _clean_old_idAtProvider( + starting_id: int, ending_id: int, batch_size: int, not_dry: bool = False, wait_autovacuum: bool = False +) -> None: + db.session.execute(sa.text("SET SESSION statement_timeout = '300s'")) + max_id = _get_max_id() + logger.info("Max ID for the 'offer' table: %d", max_id) + + elapsed_per_batch = [] + to_report = 0 + for i in range(starting_id, ending_id, batch_size + 1): + start_time = time.perf_counter() + + execute_query(i, batch_size) + + if not_dry: + db.session.commit() + else: + db.session.rollback() + + if wait_autovacuum: + ratio = _get_dead_tuple_ratio(max_id) + if ratio >= AUTOVACUUM_THRESHOLD: + logger.info("Dead tuple ratio %.2f%% exceeds threshold (20%%). Pausing execution...", ratio * 100) + _wait_for_autovacuum(max_id) + + elapsed_per_batch.append(int(time.perf_counter() - start_time)) + eta = _get_eta(ending_id, starting_id, elapsed_per_batch, batch_size) + to_report += batch_size + if to_report >= REPORT_EVERY: + to_report = 0 + logger.info("BATCH : id from %s | eta = %s", i, eta) + + +if __name__ == "__main__": + from pcapi.flask_app import app + + app.app_context().push() + + parser = argparse.ArgumentParser( + description="Nullify idAtProvider of offers linked to old provider synchronization" + ) + parser.add_argument("--starting-id", type=int, default=0, help="starting offer id") + parser.add_argument("--ending-id", type=int, default=0, help="ending offer id") + parser.add_argument("--batch-size", type=int, default=500) + parser.add_argument("--not-dry", action="store_true", help="set to really process (dry-run by default)") + parser.add_argument( + "--wait-autovacuum", action="store_true", help="Let database start autovacuum when threshold has been hit" + ) + args = parser.parse_args() + + if args.starting_id > args.ending_id: + raise ValueError('"start" must be less than "end"') + + _clean_old_idAtProvider(args.starting_id, args.ending_id, args.batch_size, args.not_dry, args.wait_autovacuum) diff --git a/api/src/pcapi/scripts/provider_clean_old_integration_data/main.py b/api/src/pcapi/scripts/provider_clean_old_integration_data/main.py index 822c5bd8a1d..792e724652c 100644 --- a/api/src/pcapi/scripts/provider_clean_old_integration_data/main.py +++ b/api/src/pcapi/scripts/provider_clean_old_integration_data/main.py @@ -1,8 +1,5 @@ import logging -import sqlalchemy as sa - -from pcapi.core.offers import models as offers_models from pcapi.core.providers import models as providers_models from pcapi.flask_app import app from pcapi.repository import transaction @@ -22,29 +19,6 @@ 67, # CDI-Bookshop ] -_BATCH_SIZE = 1000 - - -def _clean_id_a_provider_for_provider(provider_id: int, batch_size: int = _BATCH_SIZE) -> None: - while True: - with transaction(): - offers = ( - offers_models.Offer.query.filter( - offers_models.Offer.lastProviderId == provider_id, - sa.not_(offers_models.Offer.idAtProvider.is_(None)), - ) - .limit(batch_size) - .all() - ) - - if not offers: - break - - offers_models.Offer.query.filter(offers_models.Offer.id.in_([offer.id for offer in offers])).update( - {"idAtProvider": None}, - synchronize_session=False, - ) - def clean_old_provider_data(provider_ids: list[int]) -> None: # Update providers @@ -58,8 +32,6 @@ def clean_old_provider_data(provider_ids: list[int]) -> None: provider.name = f"[DÉPRÉCIÉ] {provider.name}" provider.enabledForPro = False provider.isActive = False - logger.info("Cleaning offers data for provider %s (id: %s)", provider.name, provider.id) - _clean_id_a_provider_for_provider(provider_id=provider_id) if __name__ == "__main__": diff --git a/api/tests/scripts/provider_clean_old_integraiton_data/main_test.py b/api/tests/scripts/provider_clean_old_integraiton_data/main_test.py index 439a326080b..8df8b0efbcd 100644 --- a/api/tests/scripts/provider_clean_old_integraiton_data/main_test.py +++ b/api/tests/scripts/provider_clean_old_integraiton_data/main_test.py @@ -1,9 +1,6 @@ import pytest -from pcapi.core.offers import factories as offers_factories from pcapi.core.providers import factories as providers_factories -from pcapi.models import db -from pcapi.scripts.provider_clean_old_integration_data.main import _clean_id_a_provider_for_provider from pcapi.scripts.provider_clean_old_integration_data.main import clean_old_provider_data @@ -12,47 +9,18 @@ def test_clean_old_provider_data(): provider_1 = providers_factories.ProviderFactory(name="Old Provider that should be deprecated") provider_already_deprecated = providers_factories.ProviderFactory(name="[DÉPRÉCIÉ] Old Provider") provider_3 = providers_factories.ProviderFactory() - offer_provider_1 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12345") - offer_provider_2 = offers_factories.EventOfferFactory(lastProvider=provider_already_deprecated, idAtProvider=None) - offer_provider_3 = offers_factories.ThingOfferFactory(lastProvider=provider_3, idAtProvider="offerId3") clean_old_provider_data([provider_1.id, provider_already_deprecated.id]) - db.session.refresh(offer_provider_1) - db.session.refresh(offer_provider_2) - db.session.refresh(offer_provider_3) - # should be deprecated assert provider_1.name == "[DÉPRÉCIÉ] Old Provider that should be deprecated" assert not provider_1.enabledForPro assert not provider_1.isActive - assert not offer_provider_1.idAtProvider + assert provider_already_deprecated.name == "[DÉPRÉCIÉ] Old Provider" assert not provider_already_deprecated.enabledForPro assert not provider_already_deprecated.isActive - assert not offer_provider_2.idAtProvider # should stay the same - assert offer_provider_3.idAtProvider assert provider_3.enabledForPro assert provider_3.isActive - - -@pytest.mark.usefixtures("db_session") -def test_clean_id_at_providers(): - provider_1 = providers_factories.ProviderFactory(name="Old Provider that should be deprecated") - provider_3 = providers_factories.ProviderFactory() - offer_provider_1 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12345") - offer_provider_2 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12346") - offer_provider_3 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12347") - offer_provider_4 = offers_factories.ThingOfferFactory(lastProvider=provider_3, idAtProvider="offerId3") - - _clean_id_a_provider_for_provider(provider_1.id, batch_size=2) - - # should be deprecated - assert not offer_provider_1.idAtProvider - assert not offer_provider_2.idAtProvider - assert not offer_provider_3.idAtProvider - - # should stay the same - assert offer_provider_4.idAtProvider