Skip to content

Commit

Permalink
(PC-33659)[API] chore: script to clean old idAtProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
tcoudray-pass committed Jan 31, 2025
1 parent 9f4fdce commit 0a253e8
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 61 deletions.
152 changes: 152 additions & 0 deletions api/src/pcapi/scripts/clean_old_integration_data/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import argparse
import datetime
import logging
import statistics
import time

import psycopg2.errors
import pytz
import sqlalchemy as sa

from pcapi.models import db


logger = logging.getLogger(__name__)


REPORT_EVERY = 100_000
AUTOVACUUM_THRESHOLD = 0.22

_LEGACY_API_PROVIDERS_IDS = [
15, # TiteLive Stocks (Epagine / Place des libraires.com)
59, # Praxiel/Inférence
58, # FNAC
23, # www.leslibraires.fr
66, # Decitre
63, # Librisoft
68, # TMIC-Ellipses
65, # Mollat
67, # CDI-Bookshop
]


def _get_eta(end: int, current: int, elapsed_per_batch: list[int], batch_size: int) -> str:
left_to_do = end - current
seconds_eta = left_to_do / batch_size * statistics.mean(elapsed_per_batch)
eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_eta)
eta = eta.astimezone(pytz.timezone("Europe/Paris"))
str_eta = eta.strftime("%d/%m/%Y %H:%M:%S")
return str_eta


def _get_max_id() -> int:
result = db.session.execute(sa.text("SELECT MAX(id) FROM offer")).fetchone()
return result[0] if result and result[0] is not None else 0


def _get_dead_tuple_ratio(max_id: int) -> float:
# Important to not use sqlalchemy cache
db.session.rollback()
result = db.session.execute(
sa.text(
"""
SELECT n_dead_tup::float / NULLIF(:max_id + n_dead_tup, 0) AS dead_tuple_ratio
FROM pg_stat_user_tables
WHERE relname = 'offer';
"""
),
{"max_id": max_id},
).fetchone()

return result[0] if result else 0.0


def _wait_for_autovacuum(max_id: int) -> None:
while True:
ratio = _get_dead_tuple_ratio(max_id)

if ratio < AUTOVACUUM_THRESHOLD:
logger.info("Dead tuple ratio is back to %.2f%%, resuming...", ratio * 100)
break
logger.info("Dead tuple ratio is %.2f%%, sleeping for 60 seconds...", ratio * 100)
time.sleep(600)


def execute_query(i: int, batch_size: int) -> None:
nb_retry = 3
while nb_retry > 0:
try:
db.session.execute(
sa.text(
"""
UPDATE offer
SET idAtProvider = NULL
WHERE id BETWEEN :start AND :end
AND "lastProviderId" = ANY (:provider_id_list)
"""
),
params={"start": i, "end": i + batch_size, "provider_id_list": _LEGACY_API_PROVIDERS_IDS},
)
return
except psycopg2.errors.OperationalError as e:
logger.info("Erreur de type %s sur les lignes entre %s et %s", type(e).__name__, i, i + batch_size)
db.session.rollback()
nb_retry -= 1
if nb_retry == 0:
raise


def _clean_old_idAtProvider(
starting_id: int, ending_id: int, batch_size: int, not_dry: bool = False, wait_autovacuum: bool = False
) -> None:
db.session.execute(sa.text("SET SESSION statement_timeout = '300s'"))
max_id = _get_max_id()
logger.info("Max ID for the 'offer' table: %d", max_id)

elapsed_per_batch = []
to_report = 0
for i in range(starting_id, ending_id, batch_size + 1):
start_time = time.perf_counter()

execute_query(i, batch_size)

if not_dry:
db.session.commit()
else:
db.session.rollback()

if wait_autovacuum:
ratio = _get_dead_tuple_ratio(max_id)
if ratio >= AUTOVACUUM_THRESHOLD:
logger.info("Dead tuple ratio %.2f%% exceeds threshold (20%%). Pausing execution...", ratio * 100)
_wait_for_autovacuum(max_id)

elapsed_per_batch.append(int(time.perf_counter() - start_time))
eta = _get_eta(ending_id, starting_id, elapsed_per_batch, batch_size)
to_report += batch_size
if to_report >= REPORT_EVERY:
to_report = 0
logger.info("BATCH : id from %s | eta = %s", i, eta)


if __name__ == "__main__":
from pcapi.flask_app import app

app.app_context().push()

parser = argparse.ArgumentParser(
description="Nullify idAtProvider of offers linked to old provider synchronization"
)
parser.add_argument("--starting-id", type=int, default=0, help="starting offer id")
parser.add_argument("--ending-id", type=int, default=0, help="ending offer id")
parser.add_argument("--batch-size", type=int, default=500)
parser.add_argument("--not-dry", action="store_true", help="set to really process (dry-run by default)")
parser.add_argument(
"--wait-autovacuum", action="store_true", help="Let database start autovacuum when threshold has been hit"
)
args = parser.parse_args()

if args.starting_id > args.ending_id:
raise ValueError('"start" must be less than "end"')

_clean_old_idAtProvider(args.starting_id, args.ending_id, args.batch_size, args.not_dry, args.wait_autovacuum)
28 changes: 0 additions & 28 deletions api/src/pcapi/scripts/provider_clean_old_integration_data/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging

import sqlalchemy as sa

from pcapi.core.offers import models as offers_models
from pcapi.core.providers import models as providers_models
from pcapi.flask_app import app
from pcapi.repository import transaction
Expand All @@ -22,29 +19,6 @@
67, # CDI-Bookshop
]

_BATCH_SIZE = 1000


def _clean_id_a_provider_for_provider(provider_id: int, batch_size: int = _BATCH_SIZE) -> None:
while True:
with transaction():
offers = (
offers_models.Offer.query.filter(
offers_models.Offer.lastProviderId == provider_id,
sa.not_(offers_models.Offer.idAtProvider.is_(None)),
)
.limit(batch_size)
.all()
)

if not offers:
break

offers_models.Offer.query.filter(offers_models.Offer.id.in_([offer.id for offer in offers])).update(
{"idAtProvider": None},
synchronize_session=False,
)


def clean_old_provider_data(provider_ids: list[int]) -> None:
# Update providers
Expand All @@ -58,8 +32,6 @@ def clean_old_provider_data(provider_ids: list[int]) -> None:
provider.name = f"[DÉPRÉCIÉ] {provider.name}"
provider.enabledForPro = False
provider.isActive = False
logger.info("Cleaning offers data for provider %s (id: %s)", provider.name, provider.id)
_clean_id_a_provider_for_provider(provider_id=provider_id)


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import pytest

from pcapi.core.offers import factories as offers_factories
from pcapi.core.providers import factories as providers_factories
from pcapi.models import db
from pcapi.scripts.provider_clean_old_integration_data.main import _clean_id_a_provider_for_provider
from pcapi.scripts.provider_clean_old_integration_data.main import clean_old_provider_data


Expand All @@ -12,47 +9,18 @@ def test_clean_old_provider_data():
provider_1 = providers_factories.ProviderFactory(name="Old Provider that should be deprecated")
provider_already_deprecated = providers_factories.ProviderFactory(name="[DÉPRÉCIÉ] Old Provider")
provider_3 = providers_factories.ProviderFactory()
offer_provider_1 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12345")
offer_provider_2 = offers_factories.EventOfferFactory(lastProvider=provider_already_deprecated, idAtProvider=None)
offer_provider_3 = offers_factories.ThingOfferFactory(lastProvider=provider_3, idAtProvider="offerId3")

clean_old_provider_data([provider_1.id, provider_already_deprecated.id])

db.session.refresh(offer_provider_1)
db.session.refresh(offer_provider_2)
db.session.refresh(offer_provider_3)

# should be deprecated
assert provider_1.name == "[DÉPRÉCIÉ] Old Provider that should be deprecated"
assert not provider_1.enabledForPro
assert not provider_1.isActive
assert not offer_provider_1.idAtProvider

assert provider_already_deprecated.name == "[DÉPRÉCIÉ] Old Provider"
assert not provider_already_deprecated.enabledForPro
assert not provider_already_deprecated.isActive
assert not offer_provider_2.idAtProvider

# should stay the same
assert offer_provider_3.idAtProvider
assert provider_3.enabledForPro
assert provider_3.isActive


@pytest.mark.usefixtures("db_session")
def test_clean_id_at_providers():
provider_1 = providers_factories.ProviderFactory(name="Old Provider that should be deprecated")
provider_3 = providers_factories.ProviderFactory()
offer_provider_1 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12345")
offer_provider_2 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12346")
offer_provider_3 = offers_factories.ThingOfferFactory(lastProvider=provider_1, idAtProvider="12347")
offer_provider_4 = offers_factories.ThingOfferFactory(lastProvider=provider_3, idAtProvider="offerId3")

_clean_id_a_provider_for_provider(provider_1.id, batch_size=2)

# should be deprecated
assert not offer_provider_1.idAtProvider
assert not offer_provider_2.idAtProvider
assert not offer_provider_3.idAtProvider

# should stay the same
assert offer_provider_4.idAtProvider

0 comments on commit 0a253e8

Please sign in to comment.