From e4b6e2b61893517c01a35a272806a319c845dd77 Mon Sep 17 00:00:00 2001 From: Spoked Date: Thu, 10 Oct 2024 02:00:38 -0400 Subject: [PATCH] fix: improved removing items from database --- src/controllers/items.py | 48 ++++++----- src/program/db/db_functions.py | 143 ++++++++++++++++++++++++++++----- src/program/media/item.py | 11 ++- src/program/program.py | 86 +++++++++++++++----- src/program/symlink.py | 9 +++ src/utils/event_manager.py | 44 ++++++++-- 6 files changed, 271 insertions(+), 70 deletions(-) diff --git a/src/controllers/items.py b/src/controllers/items.py index ded2335a..3894b072 100644 --- a/src/controllers/items.py +++ b/src/controllers/items.py @@ -3,27 +3,28 @@ from typing import Optional import Levenshtein -from RTN import RTN, Torrent +from RTN import Torrent from fastapi import APIRouter, HTTPException, Request -from sqlalchemy import func, select +from sqlalchemy import delete, func, select from sqlalchemy.exc import NoResultFound from program.content import Overseerr from program.db.db import db from program.db.db_functions import ( clear_streams, + clear_streams_by_id, delete_media_item, + delete_media_item_by_id, get_media_items_by_ids, + get_parent_ids, get_parent_items_by_ids, reset_media_item, ) -from program.media.item import MediaItem +from program.media.item import Episode, MediaItem, Season from program.media.state import States from program.symlink import Symlinker from program.downloaders import Downloader, get_needed_media -from program.downloaders.realdebrid import RealDebridDownloader, add_torrent_magnet, torrent_info -from program.settings.versions import models -from program.settings.manager import settings_manager +from program.downloaders.realdebrid import add_torrent_magnet, torrent_info from program.media.stream import Stream from program.scrapers.shared import rtn from program.types import Event @@ -246,29 +247,38 @@ async def retry_items(request: Request, ids: str): "/remove", summary="Remove Media Items", description="Remove media items based on item IDs", + operation_id="remove_item", ) async def remove_item(request: Request, ids: str): - ids = handle_ids(ids) + ids: list[int] = handle_ids(ids) try: - media_items = get_parent_items_by_ids(ids) + media_items: list[int] = get_parent_ids(ids) if not media_items: - raise ValueError("Invalid item ID(s) provided. Some items may not exist.") + return HTTPException(status_code=404, detail="Item(s) not found") + for media_item in media_items: - logger.debug(f"Removing item {media_item.title} with ID {media_item._id}") - request.app.program.em.cancel_job(media_item) - await asyncio.sleep(0.1) # Ensure cancellation is processed - clear_streams(media_item) + logger.debug(f"Removing item with ID {media_item}") + request.app.program.em.cancel_job_by_id(media_item) + await asyncio.sleep(0.2) # Ensure cancellation is processed + clear_streams_by_id(media_item) + symlink_service = request.app.program.services.get(Symlinker) if symlink_service: - symlink_service.delete_item_symlinks(media_item) - if media_item.requested_by == "overseerr" and media_item.requested_id: - logger.debug(f"Item was originally requested by Overseerr, deleting request within Overseerr...") - Overseerr.delete_request(media_item.requested_id) - delete_media_item(media_item) + symlink_service.delete_item_symlinks_by_id(media_item) + + with db.Session() as session: + requested_id = session.execute(select(MediaItem.requested_id).where(MediaItem._id == media_item)).scalar_one() + if requested_id: + logger.debug(f"Deleting request from Overseerr with ID {requested_id}") + Overseerr.delete_request(requested_id) + + logger.debug(f"Deleting item from database with ID {media_item}") + delete_media_item_by_id(media_item) + logger.info(f"Successfully removed item with ID {media_item}") except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) - return {"success": True, "message": f"Removed items with ids {ids}"} + return {"success": True, "message": f"Successfully removed items", "removed_ids": ids} @router.post("/{id}/set_torrent_rd_magnet", description="Set a torrent for a media item using a magnet link.") def add_torrent(request: Request, id: int, magnet: str): diff --git a/src/program/db/db_functions.py b/src/program/db/db_functions.py index 0ad9f5b9..efa9ea16 100644 --- a/src/program/db/db_functions.py +++ b/src/program/db/db_functions.py @@ -3,8 +3,8 @@ from typing import TYPE_CHECKING, List import alembic -from sqlalchemy import delete, func, insert, select, text, union_all -from sqlalchemy.orm import Session, aliased, selectinload +from sqlalchemy import delete, func, insert, select, text +from sqlalchemy.orm import Session, selectinload from program.libraries.symlink import fix_broken_symlinks from program.media.stream import Stream, StreamBlacklistRelation, StreamRelation @@ -15,7 +15,7 @@ from .db import alembic, db if TYPE_CHECKING: - from program.media.item import MediaItem + from program.media.item import MediaItem, Episode, Season def get_media_items_by_ids(media_item_ids: list[int]): @@ -69,6 +69,20 @@ def get_parent_items_by_ids(media_item_ids: list[int]): items.append(item) return items +def get_parent_ids(media_item_ids: list[int]): + """Retrieve the _ids of MediaItems of type 'movie' or 'show' by a list of MediaItem _ids.""" + from program.media.item import MediaItem + with db.Session() as session: + parent_ids = [] + for media_item_id in media_item_ids: + item_id = session.execute( + select(MediaItem._id) + .where(MediaItem._id == media_item_id, MediaItem.type.in_(["movie", "show"])) + ).scalar_one() + if item_id: + parent_ids.append(item_id) + return parent_ids + def get_item_by_imdb_id(imdb_id: str): """Retrieve a MediaItem of type 'movie' or 'show' by an IMDb ID.""" from program.media.item import MediaItem @@ -83,17 +97,68 @@ def delete_media_item(item: "MediaItem"): session.delete(item) session.commit() -def delete_media_item_by_id(media_item_id: int): - """Delete a MediaItem and all its associated relationships by the MediaItem _id.""" - from program.media.item import MediaItem +def delete_media_item_by_id(media_item_id: int, batch_size: int = 30): + """Delete a Movie or Show by _id. If it's a Show, delete its Seasons and Episodes in batches, committing after each batch.""" + from program.media.item import MediaItem, Show, Season, Episode with db.Session() as session: - item = session.query(MediaItem).filter_by(_id=media_item_id).first() + # First, retrieve the media item's type + media_item = session.execute( + select(MediaItem._id, MediaItem.type) + .where(MediaItem._id == media_item_id) + ).first() + + if not media_item: + logger.error(f"No item found with ID {media_item_id}") + return False + + media_item_id, media_item_type = media_item + + if media_item_type == "movie": + # Directly delete movie + logger.debug(f"Deleting Movie with ID {media_item_id}") + session.execute(delete(MediaItem).where(MediaItem._id == media_item_id)) + session.commit() # Commit movie deletion immediately + + elif media_item_type == "show": + logger.debug(f"Deleting Show with ID {media_item_id}") + + # Delete Seasons and Episodes in batches, committing after each batch + delete_seasons_and_episodes(session, media_item_id, batch_size) + + # Ensure no references exist in the Show table + session.execute(delete(Show).where(Show._id == media_item_id)) + + # Delete the Show itself + session.execute(delete(MediaItem).where(MediaItem._id == media_item_id)) + session.commit() # Commit show deletion at the end + + return True + +def delete_seasons_and_episodes(session, show_id: int, batch_size: int): + """Delete seasons and episodes of a show in batches, committing after each batch.""" + from program.media.item import Episode, Season - if item: - session.delete(item) - session.commit() - else: - raise ValueError(f"MediaItem with id {media_item_id} does not exist.") + # Delete seasons one by one + season_ids = session.execute( + select(Season._id).where(Season.parent_id == show_id) + ).scalars().all() + + for season_id in season_ids: + # Delete episodes in batches for each season + while True: + episode_ids = session.execute( + select(Episode._id) + .where(Episode.parent_id == season_id) + .limit(batch_size) + ).scalars().all() + + if not episode_ids: + break + + session.execute(delete(Episode).where(Episode._id.in_(episode_ids))) + session.commit() # Commit after each batch of episodes + session.execute(delete(Season).where(Season._id == season_id)) + session.commit() # Commit after deleting the season def delete_media_item_by_item_id(item_id: str): """Delete a MediaItem and all its associated relationships by the MediaItem _id.""" @@ -156,6 +221,17 @@ def clear_streams(item: "MediaItem"): ) session.commit() +def clear_streams_by_id(media_item_id: int): + """Clear all streams for a media item by the MediaItem _id.""" + with db.Session() as session: + session.execute( + delete(StreamRelation).where(StreamRelation.parent_id == media_item_id) + ) + session.execute( + delete(StreamBlacklistRelation).where(StreamBlacklistRelation.media_item_id == media_item_id) + ) + session.commit() + def blacklist_stream(item: "MediaItem", stream: Stream, session: Session = None) -> bool: """Blacklist a stream for a media item.""" close_session = False @@ -233,19 +309,37 @@ def load_streams_in_pages(session: Session, media_item_id: int, page_number: int def _get_item_ids(session, item): from program.media.item import Episode, Season + if item.type == "show": show_id = item._id - season_alias = aliased(Season, flat=True) - season_query = select(Season._id.label('id')).where(Season.parent_id == show_id) - episode_query = ( - select(Episode._id.label('id')) - .join(season_alias, Episode.parent_id == season_alias._id) - .where(season_alias.parent_id == show_id) - ) + # season_alias = aliased(Season, flat=True) + # season_query = select(Season._id.label('id')).where(Season.parent_id == show_id) + # episode_query = ( + # select(Episode._id.label('id')) + # .join(season_alias, Episode.parent_id == season_alias._id) + # .where(season_alias.parent_id == show_id) + # ) + + # combined_query = union_all(season_query, episode_query) + # related_ids = session.execute(combined_query).scalars().all() + # return show_id, related_ids + + # Fetch season IDs + season_ids = session.execute( + select(Season._id).where(Season.parent_id == show_id) + ).scalars().all() + + # Fetch episode IDs for each season + episode_ids = [] + for season_id in season_ids: + episode_ids.extend( + session.execute( + select(Episode._id).where(Episode.parent_id == season_id) + ).scalars().all() + ) - combined_query = union_all(season_query, episode_query) - related_ids = session.execute(combined_query).scalars().all() + related_ids = season_ids + episode_ids return show_id, related_ids elif item.type == "season": @@ -263,6 +357,13 @@ def _get_item_ids(session, item): return item._id, [] +def _get_item_ids_from_item_id(session, media_item_id: int): + from program.media.item import MediaItem + item = session.execute(select(MediaItem).where(MediaItem._id == media_item_id)).unique().scalar_one_or_none() + if not item: + return None + return _get_item_ids(session, item) + def _ensure_item_exists_in_db(item: "MediaItem") -> bool: from program.media.item import MediaItem, Movie, Show if isinstance(item, (Movie, Show)): diff --git a/src/program/media/item.py b/src/program/media/item.py index 7bb386b7..6aebf297 100644 --- a/src/program/media/item.py +++ b/src/program/media/item.py @@ -196,10 +196,13 @@ def copy_other_media_attr(self, other): def is_scraped(self): session = object_session(self) - if session: - session.refresh(self, attribute_names=['blacklisted_streams']) # Prom: Ensure these reflect the state of whats in the db. - return (len(self.streams) > 0 - and any(not stream in self.blacklisted_streams for stream in self.streams)) + if session and session.is_active: + try: + session.refresh(self, attribute_names=['blacklisted_streams']) + return (len(self.streams) > 0 and any(not stream in self.blacklisted_streams for stream in self.streams)) + except (sqlalchemy.exc.InvalidRequestError, sqlalchemy.orm.exc.DetachedInstanceError): + return False + return False def to_dict(self): """Convert item to dictionary (API response)""" diff --git a/src/program/program.py b/src/program/program.py index f50e20e5..a199ce44 100644 --- a/src/program/program.py +++ b/src/program/program.py @@ -172,7 +172,40 @@ def start(self): ws_manager.send_health_update("running") self.initialized = True + # def _retry_library(self) -> None: + # count = 0 + # with db.Session() as session: + # count = session.execute( + # select(func.count(MediaItem._id)) + # .where(MediaItem.last_state.not_in([States.Completed, States.Unreleased])) + # .where(MediaItem.type.in_(["movie", "show"])) + # ).scalar_one() + + # if count == 0: + # return + + # logger.log("PROGRAM", f"Found {count} items to retry") + + # number_of_rows_per_page = 10 + # for page_number in range(0, (count // number_of_rows_per_page) + 1): + # with db.Session() as session: + # items_to_submit = [] + # items_to_submit += session.execute( + # select(MediaItem) + # .where(MediaItem.last_state.not_in([States.Completed, States.Unreleased])) + # .where(MediaItem.type.in_(["movie", "show"])) + # .order_by(MediaItem.requested_at.desc()) + # .limit(number_of_rows_per_page) + # .offset(page_number * number_of_rows_per_page) + # ).unique().scalars().all() + + # session.expunge_all() + # session.close() + # for item in items_to_submit: + # self.em.add_event(Event(emitted_by="RetryLibrary", item=item)) + def _retry_library(self) -> None: + """Retry items that failed to download.""" count = 0 with db.Session() as session: count = session.execute( @@ -184,25 +217,38 @@ def _retry_library(self) -> None: if count == 0: return - logger.log("PROGRAM", f"Found {count} items to retry") - - number_of_rows_per_page = 10 - for page_number in range(0, (count // number_of_rows_per_page) + 1): - with db.Session() as session: - items_to_submit = [] - items_to_submit += session.execute( - select(MediaItem) - .where(MediaItem.last_state.not_in([States.Completed, States.Unreleased])) - .where(MediaItem.type.in_(["movie", "show"])) - .order_by(MediaItem.requested_at.desc()) - .limit(number_of_rows_per_page) - .offset(page_number * number_of_rows_per_page) - ).unique().scalars().all() - - session.expunge_all() - session.close() - for item in items_to_submit: + number_of_rows_per_page = 100 + max_events_to_add = 1000 # Limit the number of events to add at once + events_added = 0 + + logger.log("PROGRAM", f"Starting retry process for {count} items. Processing in batches.") + + def fetch_items_in_batches(): + for page_number in range(0, (count // number_of_rows_per_page) + 1): + with db.Session() as session: + items_to_submit = session.execute( + select(MediaItem._id) # Fetch only the IDs initially + .where(MediaItem.last_state.not_in([States.Completed, States.Unreleased])) + .where(MediaItem.type.in_(["movie", "show"])) + .order_by(MediaItem.requested_at.desc()) + .limit(number_of_rows_per_page) + .offset(page_number * number_of_rows_per_page) + ).scalars().all() + yield items_to_submit + + for batch in fetch_items_in_batches(): + if events_added >= max_events_to_add: + break + + for item_id in batch: + with db.Session() as session: + item = session.get(MediaItem, item_id) # Fetch the full item only when needed self.em.add_event(Event(emitted_by="RetryLibrary", item=item)) + events_added += 1 + + if events_added >= max_events_to_add: + logger.debug(f"Added batch of {len(batch)} items, waiting for the next batch") + break def _schedule_functions(self) -> None: """Schedule each service based on its update interval.""" @@ -218,9 +264,9 @@ def _schedule_functions(self) -> None: "args": [settings_manager.settings.symlink.library_path, settings_manager.settings.symlink.rclone_path] } - if settings_manager.settings.post_processing.subliminal.enabled: - pass + # if settings_manager.settings.post_processing.subliminal.enabled: # scheduled_functions[self._download_subtitles] = {"interval": 60 * 60 * 24} + for func, config in scheduled_functions.items(): self.scheduler.add_job( func, diff --git a/src/program/symlink.py b/src/program/symlink.py index 870d34f1..9ff41971 100644 --- a/src/program/symlink.py +++ b/src/program/symlink.py @@ -270,6 +270,15 @@ def delete_item_symlinks(self, item: "MediaItem") -> bool: item_path = base_path / f"{item.title.replace('/', '-')} ({item.aired_at.year}) {{imdb-{item.imdb_id}}}" return _delete_symlink(item, item_path) + def delete_item_symlinks_by_id(self, item_id: int) -> bool: + """Delete symlinks and directories based on the item ID.""" + with db.Session() as session: + item = session.execute(select(MediaItem).where(MediaItem._id == item_id)).unique().scalar_one() + if not item: + logger.error(f"No item found with ID {item_id}") + return False + return self.delete_item_symlinks(item) + def _delete_symlink(item: Union[Movie, Show], item_path: Path) -> bool: try: if item_path.exists(): diff --git a/src/utils/event_manager.py b/src/utils/event_manager.py index 80a549e2..01ce2667 100644 --- a/src/utils/event_manager.py +++ b/src/utils/event_manager.py @@ -6,13 +6,14 @@ from threading import Lock from loguru import logger +from sqlalchemy import select from sqlalchemy.orm.exc import StaleDataError from concurrent.futures import CancelledError import utils.websockets.manager as ws_manager from program.db.db import db from program.db.db_functions import _get_item_ids, _run_thread_with_db_item -from program.media.item import Season, Show +from program.media.item import MediaItem, Season, Show from program.types import Event @@ -25,6 +26,7 @@ def __init__(self): self._futures = [] self._queued_events = [] self._running_events = [] + self._remove_id_queue = [] self.mutex = Lock() def _find_or_create_executor(self, service_cls) -> concurrent.futures.ThreadPoolExecutor: @@ -60,7 +62,8 @@ def _process_future(self, future, service): """ try: result = next(future.result(), None) - self._futures.remove(future) + if future in self._futures: + self._futures.remove(future) ws_manager.send_event_update([future.event for future in self._futures if hasattr(future, "event")]) if isinstance(result, tuple): item, timestamp = result @@ -68,12 +71,16 @@ def _process_future(self, future, service): item, timestamp = result, datetime.now() if item: self.remove_item_from_running(item) + if item._id in self._remove_id_queue: + # Item was removed while running + logger.debug(f"Item {item.log_string} is in the removed queue, discarding result.") + self._remove_id_queue.remove(item._id) + self.remove_item_from_queue(item) + return self.add_event(Event(emitted_by=service, item=item, run_at=timestamp)) except (StaleDataError, CancelledError): # Expected behavior when cancelling tasks or when the item was removed return - except ValueError as e: - logger.error(f"Error in future for {future}: {e}") except Exception as e: logger.error(f"Error in future for {future}: {e}") logger.exception(traceback.format_exc()) @@ -201,11 +208,36 @@ def cancel_job(self, item, suppress_logs=False): self._futures.remove(future) # Clear from queued and running events - self._queued_events = [event for event in self._queued_events if event.item._id != item._id and event.item.imdb_id != item.imdb_id] - self._running_events = [event for event in self._running_events if event.item._id != item._id and event.item.imdb_id != item.imdb_id] + with self.mutex: + self._remove_id_queue.append(item._id) + self._queued_events = [event for event in self._queued_events if event.item._id != item._id and event.item.imdb_id != item.imdb_id] + self._running_events = [event for event in self._running_events if event.item._id != item._id and event.item.imdb_id != item.imdb_id] + self._futures = [future for future in self._futures if not hasattr(future, 'event') or future.event.item._id != item._id and future.event.item.imdb_id != item.imdb_id] logger.debug(f"Canceled jobs for item {item.log_string} and its children.") + def cancel_job_by_id(self, item_id, suppress_logs=False): + """ + Cancels a job associated with the given media item ID. + + Args: + item_id (int): The ID of the media item whose job needs to be canceled. + suppress_logs (bool): If True, suppresses debug logging for this operation. + """ + with db.Session() as session: + # Fetch only the necessary fields + item = session.execute( + select(MediaItem).where(MediaItem._id == item_id) + ).unique().scalar_one_or_none() + + if not item: + if not suppress_logs: + logger.error(f"No item found with ID {item_id}") + return + + # Use the existing cancel_job logic with just the ID + self.cancel_job(item, suppress_logs) + def next(self): """ Get the next event in the queue with an optional timeout.