Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tcoudray-pass committed Feb 5, 2025
1 parent 96a272e commit a18d882
Showing 1 changed file with 26 additions and 112 deletions.
138 changes: 26 additions & 112 deletions api/src/pcapi/scripts/clean_old_integration_data/main.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
import argparse
import datetime
import logging
import statistics
import time

import psycopg2.errors
import pytz
import sqlalchemy as sa

from pcapi.models import db

from pcapi.core.offers.models import Offer
from pcapi.repository import transaction

logger = logging.getLogger(__name__)


REPORT_EVERY = 100_000
REPORT_EVERY = 10_000
AUTOVACUUM_THRESHOLD = 0.22

_BATCH_SIZE = 500
_MIN_ID = 27_692
_MAX_ID = 266_349_987
_LEGACY_API_PROVIDERS_IDS = [
15, # TiteLive Stocks (Epagine / Place des libraires.com)
59, # Praxiel/Inférence
Expand All @@ -30,123 +27,40 @@
]


def _get_eta(end: int, current: int, elapsed_per_batch: list[int], batch_size: int) -> str:
left_to_do = end - current
seconds_eta = left_to_do / batch_size * statistics.mean(elapsed_per_batch)
eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=seconds_eta)
eta = eta.astimezone(pytz.timezone("Europe/Paris"))
str_eta = eta.strftime("%d/%m/%Y %H:%M:%S")
return str_eta


def _get_max_id() -> int:
result = db.session.execute(sa.text("SELECT MAX(id) FROM offer")).fetchone()
return result[0] if result and result[0] is not None else 0


def _get_dead_tuple_ratio(max_id: int) -> float:
# Important to not use sqlalchemy cache
db.session.rollback()
result = db.session.execute(
sa.text(
"""
SELECT n_dead_tup::float / NULLIF(:max_id + n_dead_tup, 0) AS dead_tuple_ratio
FROM pg_stat_user_tables
WHERE relname = 'offer';
"""
),
{"max_id": max_id},
).fetchone()
def clean_id_at_provider():
last_id = _MIN_ID - 1

return result[0] if result else 0.0
while last_id < _MAX_ID:
with transaction():
offers_ids = [
offer_id
for offer_id, in Offer.query.filter(
Offer.id > last_id, Offer.lastProviderId.in_(_LEGACY_API_PROVIDERS_IDS)
)
.with_entities(Offer.id)
.order_by(Offer.id)
.limit(_BATCH_SIZE)
]

if not offers_ids:
break

def _wait_for_autovacuum(max_id: int) -> None:
while True:
ratio = _get_dead_tuple_ratio(max_id)
last_id = offers_ids[-1]

if ratio < AUTOVACUUM_THRESHOLD:
logger.info("Dead tuple ratio is back to %.2f%%, resuming...", ratio * 100)
break
logger.info("Dead tuple ratio is %.2f%%, sleeping for 60 seconds...", ratio * 100)
time.sleep(600)


def execute_query(i: int, batch_size: int) -> None:
nb_retry = 3
while nb_retry > 0:
try:
db.session.execute(
sa.text(
"""
UPDATE offer
SET idAtProvider = NULL
WHERE id BETWEEN :start AND :end
AND "lastProviderId" = ANY (:provider_id_list)
SET "idAtProvider" = NULL
WHERE id = ANY (:id_list)
"""
),
params={"start": i, "end": i + batch_size, "provider_id_list": _LEGACY_API_PROVIDERS_IDS},
params={"id_list": offers_ids},
)
return
except psycopg2.errors.OperationalError as e:
logger.info("Erreur de type %s sur les lignes entre %s et %s", type(e).__name__, i, i + batch_size)
db.session.rollback()
nb_retry -= 1
if nb_retry == 0:
raise


def _clean_old_idAtProvider(
starting_id: int, ending_id: int, batch_size: int, not_dry: bool = False, wait_autovacuum: bool = False
) -> None:
db.session.execute(sa.text("SET SESSION statement_timeout = '300s'"))
max_id = _get_max_id()
logger.info("Max ID for the 'offer' table: %d", max_id)

elapsed_per_batch = []
to_report = 0
for i in range(starting_id, ending_id, batch_size + 1):
start_time = time.perf_counter()

execute_query(i, batch_size)

if not_dry:
db.session.commit()
else:
db.session.rollback()

if wait_autovacuum:
ratio = _get_dead_tuple_ratio(max_id)
if ratio >= AUTOVACUUM_THRESHOLD:
logger.info("Dead tuple ratio %.2f%% exceeds threshold (20%%). Pausing execution...", ratio * 100)
_wait_for_autovacuum(max_id)

elapsed_per_batch.append(int(time.perf_counter() - start_time))
eta = _get_eta(ending_id, starting_id, elapsed_per_batch, batch_size)
to_report += batch_size
if to_report >= REPORT_EVERY:
to_report = 0
logger.info("BATCH : id from %s | eta = %s", i, eta)
logger.info(f"Now at id {last_id}...")


if __name__ == "__main__":
from pcapi.flask_app import app

app.app_context().push()

parser = argparse.ArgumentParser(
description="Nullify idAtProvider of offers linked to old provider synchronization"
)
parser.add_argument("--starting-id", type=int, default=0, help="starting offer id")
parser.add_argument("--ending-id", type=int, default=0, help="ending offer id")
parser.add_argument("--batch-size", type=int, default=500)
parser.add_argument("--not-dry", action="store_true", help="set to really process (dry-run by default)")
parser.add_argument(
"--wait-autovacuum", action="store_true", help="Let database start autovacuum when threshold has been hit"
)
args = parser.parse_args()

if args.starting_id > args.ending_id:
raise ValueError('"start" must be less than "end"')

_clean_old_idAtProvider(args.starting_id, args.ending_id, args.batch_size, args.not_dry, args.wait_autovacuum)

0 comments on commit a18d882

Please sign in to comment.