Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: chunk initial symlinks on re-ingest #882

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 52 additions & 42 deletions src/program/program.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,51 +397,61 @@ def _init_db_from_symlinks(self):
errors = []
added_items = set()

progress, console = create_progress_bar(len(items))
task = progress.add_task("Enriching items with metadata", total=len(items), log="")
# Convert items to list and get total count
items_list = [item for item in items if isinstance(item, (Movie, Show))]
total_items = len(items_list)

progress, console = create_progress_bar(total_items)
task = progress.add_task("Enriching items with metadata", total=total_items, log="")

# Process in chunks of 100 items
chunk_size = 100
with Live(progress, console=console, refresh_per_second=10):
workers = int(os.getenv("SYMLINK_MAX_WORKERS", 4))
with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in items
if isinstance(item, (Movie, Show))
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

# Check for existing item using your db_functions
if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1, log=log_message)

progress.update(task, log="Finished Indexing Symlinks!")

for i in range(0, total_items, chunk_size):
chunk = items_list[i:i + chunk_size]

with ThreadPoolExecutor(thread_name_prefix="EnhanceSymlinks", max_workers=workers) as executor:
future_to_item = {
executor.submit(self._enhance_item, item): item
for item in chunk
}

for future in as_completed(future_to_item):
item = future_to_item[future]
log_message = ""

try:
if not item or item.imdb_id in added_items:
errors.append(f"Duplicate symlink directory found for {item.log_string}")
continue

if db_functions.get_item_by_id(item.id, session=session):
errors.append(f"Duplicate item found in database for id: {item.id}")
continue

enhanced_item = future.result()
if not enhanced_item:
errors.append(f"Failed to enhance {item.log_string} ({item.imdb_id}) with Trakt Indexer")
continue

enhanced_item.store_state()
session.add(enhanced_item)
added_items.add(item.imdb_id)

log_message = f"Indexed IMDb Id: {enhanced_item.id} as {enhanced_item.type.title()}: {enhanced_item.log_string}"
except NotADirectoryError:
errors.append(f"Skipping {item.log_string} as it is not a valid directory")
except Exception as e:
logger.exception(f"Error processing {item.log_string}: {e}")
finally:
progress.update(task, advance=1, log=log_message)

# Commit after each chunk
session.commit()

progress.update(task, log="Finished Indexing Symlinks!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider adding transaction rollback on errors

While committing after each chunk is good, consider adding explicit transaction rollback on errors to ensure data consistency.

-# Commit after each chunk
-session.commit()
+try:
+    # Commit after each chunk
+    session.commit()
+except Exception as e:
+    session.rollback()
+    logger.error(f"Failed to commit chunk: {e}")
+    errors.append(f"Failed to commit chunk: {e}")

Committable suggestion skipped: line range outside the PR's diff.


if errors:
logger.error("Errors encountered during initialization")
Expand All @@ -452,4 +462,4 @@ def _init_db_from_symlinks(self):
total_seconds = elapsed_time.total_seconds()
hours, remainder = divmod(total_seconds, 3600)
minutes, seconds = divmod(remainder, 60)
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")
logger.success(f"Database initialized, time taken: h{int(hours):02d}:m{int(minutes):02d}:s{int(seconds):02d}")
Loading