Skip to content

Commit

Permalink
Handle database corruption exception at the SQLite connection level
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovsky committed Nov 23, 2023
1 parent 8924ca4 commit e6d2c58
Show file tree
Hide file tree
Showing 20 changed files with 400 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

from tribler.core.components.bandwidth_accounting.db import history, misc, transaction as db_transaction
from tribler.core.components.bandwidth_accounting.db.transaction import BandwidthTransactionData
from tribler.core.utilities.pony_utils import TriblerDatabase, handle_db_if_corrupted
from tribler.core.utilities.db_corruption_handling.base import handle_db_if_corrupted
from tribler.core.utilities.pony_utils import TriblerDatabase
from tribler.core.utilities.utilities import MEMORY_DB


Expand Down Expand Up @@ -50,6 +51,7 @@ def sqlite_sync_pragmas(_, connection):
create_db = True
db_path_string = ":memory:"
else:
# We need to handle the database corruption case before determining the state of the create_db flag.
handle_db_if_corrupted(db_path)
create_db = not db_path.is_file()
db_path_string = str(db_path)
Expand Down
14 changes: 13 additions & 1 deletion src/tribler/core/components/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
from tribler.core.components.exceptions import ComponentStartupException, MissedDependency, NoneComponent
from tribler.core.components.reporter.exception_handler import default_core_exception_handler
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.exit_codes import EXITCODE_DATABASE_IS_CORRUPTED
from tribler.core.utilities.pony_utils import DatabaseIsCorrupted
from tribler.core.utilities.process_manager import get_global_process_manager

if TYPE_CHECKING:
Expand Down Expand Up @@ -52,8 +52,20 @@ async def start(self):
self.started_event.set()

if isinstance(e, DatabaseIsCorrupted):
# When the database corruption is detected, we should stop the process immediately.
# Tribler GUI will restart the process and the database will be recreated.

# Usually we wrap an exception into ComponentStartupException, and allow
# CoreExceptionHandler.unhandled_error_observer to handle it after all components are started,
# but in this case we don't do it. The reason is that handling ComponentStartupException
# starts the shutting down of Tribler, and due to some obscure reasons it is not possible to
# raise any exception, even SystemExit, from CoreExceptionHandler.unhandled_error_observer when
# Tribler is shutting down. It looks like in this case unhandled_error_observer is called from
# Task.__del__ method and all exceptions that are raised from __del__ are ignored.
# See https://bugs.python.org/issue25489 for similar case.
process_manager = get_global_process_manager()
process_manager.sys_exit(EXITCODE_DATABASE_IS_CORRUPTED, e)
return # Added for clarity; actually, the code raised SystemExit on the previous line

if self.session.failfast:
raise e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from tribler.core.components.metadata_store.db.orm_bindings.channel_node import COMMITTED
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.pony_utils import DatabaseIsCorrupted, run_threaded
from tribler.core.utilities.pony_utils import run_threaded
from tribler.core.utilities.simpledefs import DownloadStatus
from tribler.core.utilities.unicode import hexlify

Expand Down
6 changes: 3 additions & 3 deletions src/tribler/core/components/metadata_store/db/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,10 @@
from tribler.core.components.metadata_store.remote_query_community.payload_checker import process_payload
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.exceptions import InvalidSignatureException
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted
from tribler.core.utilities.notifier import Notifier
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.pony_utils import DatabaseIsCorrupted, TriblerDatabase, get_max, get_or_create, \
handle_db_if_corrupted, \
run_threaded
from tribler.core.utilities.pony_utils import TriblerDatabase, get_max, get_or_create, run_threaded
from tribler.core.utilities.search_utils import torrent_rank
from tribler.core.utilities.unicode import hexlify
from tribler.core.utilities.utilities import MEMORY_DB
Expand Down Expand Up @@ -220,6 +219,7 @@ def on_connect(_, connection):
create_db = True
db_path_string = ":memory:"
else:
# We need to handle the database corruption case before determining the state of the create_db flag.
handle_db_if_corrupted(db_filename)
create_db = not db_filename.is_file()
db_path_string = str(db_filename)
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/components/reporter/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
from tribler.core.components.exceptions import ComponentStartupException
from tribler.core.components.reporter.reported_error import ReportedError
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.exit_codes import EXITCODE_DATABASE_IS_CORRUPTED
from tribler.core.utilities.pony_utils import DatabaseIsCorrupted
from tribler.core.utilities.process_manager import get_global_process_manager

# There are some errors that we are ignoring.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tribler.core.components.reporter.exception_handler import CoreExceptionHandler
from tribler.core.sentry_reporter import sentry_reporter
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted


# pylint: disable=protected-access, redefined-outer-name
Expand Down Expand Up @@ -85,6 +86,17 @@ def test_unhandled_error_observer_exception(exception_handler):
assert reported_error.should_stop


@patch('tribler.core.components.reporter.exception_handler.get_global_process_manager')
def test_unhandled_error_observer_database_corrupted(get_global_process_manager, exception_handler):
# test that database corruption exception reported to the GUI
exception = DatabaseIsCorrupted('db_path_string')
exception_handler.report_callback = MagicMock()
exception_handler.unhandled_error_observer(None, {'exception': exception})

get_global_process_manager().sys_exit.assert_called_once_with(99, exception)
exception_handler.report_callback.assert_not_called()


def test_unhandled_error_observer_only_message(exception_handler):
# test that unhandled exception, represented by message, reported to the GUI
context = {'message': 'Any'}
Expand Down
17 changes: 17 additions & 0 deletions src/tribler/core/components/tests/test_base_component.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from unittest.mock import patch

import pytest

from tribler.core.components.component import Component
from tribler.core.components.exceptions import MissedDependency, MultipleComponentsFound, NoneComponent
from tribler.core.components.session import Session
from tribler.core.config.tribler_config import TriblerConfig
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted


class ComponentTestException(Exception):
Expand Down Expand Up @@ -46,6 +49,20 @@ class TestComponentB(TestComponent):
assert component.stopped


@patch('tribler.core.components.component.get_global_process_manager')
async def test_session_start_database_corruption_detected(get_global_process_manager):
exception = DatabaseIsCorrupted('db_path_string')

class TestComponent(Component):
async def run(self):
raise exception

component = TestComponent()

await component.start()
get_global_process_manager().sys_exit.assert_called_once_with(99, exception)


class ComponentA(Component):
pass

Expand Down
15 changes: 7 additions & 8 deletions src/tribler/core/upgrade/db8_to_db10.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import contextlib
import datetime
import logging
import sqlite3
from collections import deque
from time import time as now

from pony.orm import db_session

from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.pony_utils import marking_corrupted_db
from tribler.core.utilities.db_corruption_handling import sqlite_replacement

TABLE_NAMES = (
"ChannelNode", "TorrentState", "TorrentState_TrackerState", "ChannelPeer", "ChannelVote", "TrackerState", "Vsids")
Expand Down Expand Up @@ -127,12 +126,12 @@ def convert_command(offset, batch_size):
def do_migration(self):
result = None # estimated duration in seconds of ChannelNode table copying time
try:
with marking_corrupted_db(self.old_db_path):
old_table_columns = {}
for table_name in TABLE_NAMES:
old_table_columns[table_name] = get_table_columns(self.old_db_path, table_name)
old_table_columns = {}
for table_name in TABLE_NAMES:
old_table_columns[table_name] = get_table_columns(self.old_db_path, table_name)

with contextlib.closing(sqlite3.connect(self.new_db_path)) as connection, connection:
with contextlib.closing(sqlite_replacement.connect(self.new_db_path)) as connection:
with connection:
cursor = connection.cursor()
cursor.execute("PRAGMA journal_mode = OFF;")
cursor.execute("PRAGMA synchronous = OFF;")
Expand Down Expand Up @@ -235,7 +234,7 @@ def calc_progress(duration_now, duration_half=60.0):


def get_table_columns(db_path, table_name):
with contextlib.closing(sqlite3.connect(db_path)) as connection, connection:
with contextlib.closing(sqlite_replacement.connect(db_path)) as connection, connection:
cursor = connection.cursor()
cursor.execute(f'SELECT * FROM {table_name} LIMIT 1')
names = [description[0] for description in cursor.description]
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/upgrade/tests/test_upgrader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from tribler.core.upgrade.upgrade import TriblerUpgrader, catch_db_is_corrupted_exception, \
cleanup_noncompliant_channel_torrents
from tribler.core.utilities.configparser import CallbackConfigParser
from tribler.core.utilities.pony_utils import DatabaseIsCorrupted
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.utilities import random_infohash


Expand Down
25 changes: 13 additions & 12 deletions src/tribler/core/upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from tribler.core.upgrade.tags_to_knowledge.migration import MigrationTagsToKnowledge
from tribler.core.upgrade.tags_to_knowledge.tags_db import TagDatabase
from tribler.core.utilities.configparser import CallbackConfigParser
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.pony_utils import DatabaseIsCorrupted, get_db_version
from tribler.core.utilities.pony_utils import get_db_version
from tribler.core.utilities.simpledefs import STATEDIR_CHANNELS_DIR, STATEDIR_DB_DIR


Expand Down Expand Up @@ -73,17 +74,17 @@ def cleanup_noncompliant_channel_torrents(state_dir):
def catch_db_is_corrupted_exception(upgrader_method):
# This decorator applied for TriblerUpgrader methods. It suppresses and remembers the DatabaseIsCorrupted exception.
# As a result, if one upgrade method raises an exception, the following upgrade methods are still executed.

# The reason for this is the following: it is possible that one upgrade methods upgrades a database A, while
# the next upgrade method upgrades a database B. If a corruption detected in the database A, the database B still
# need to be upgraded. So we want to temporarily suppress DatabaseIsCorrupted exception until all upgrades are
# executed.

# If an upgrade found the database to be corrupted, the database is marked as corrupted. Then, the next upgrade
# will rename the corrupted database file (this is handled by the get_db_version call) and immediately return
# because there is no database to upgrade. So, if one upgrade function detects the database corruption, all the
# following upgrade functions for this specific database will skip the actual upgrade. As a result, a new
# database with the current DB version will be created on the Tribler Core start.
#
# The reason for this is the following: it is possible that one upgrade method upgrades database A
# while the following upgrade method upgrades database B. If a corruption is detected in the database A,
# the database B still needs to be upgraded. So, we want to temporarily suppress the DatabaseIsCorrupted exception
# until all upgrades are executed.
#
# If an upgrade finds the database to be corrupted, the database is marked as corrupted. Then, the next upgrade
# will rename the corrupted database file (the get_db_version call handles this) and immediately return because
# there is no database to upgrade. So, if one upgrade function detects database corruption, all the following
# upgrade functions for this specific database will skip the actual upgrade. As a result, a new database with
# the current DB version will be created on the Tribler Core start.

@wraps(upgrader_method)
def new_method(*args, **kwargs):
Expand Down
Empty file.
59 changes: 59 additions & 0 deletions src/tribler/core/utilities/db_corruption_handling/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

import logging
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Union

logger = logging.getLogger('db_corruption_handling')


class DatabaseIsCorrupted(Exception):
pass


@contextmanager
def handling_malformed_db_error(db_filepath: Path):
# Used in all methods of Connection and Cursor classes where the database corruption error can occur
try:
yield
except Exception as e:
if _is_malformed_db_exception(e):
_mark_db_as_corrupted(db_filepath)
raise DatabaseIsCorrupted(str(db_filepath)) from e
raise


def handle_db_if_corrupted(db_filename: Union[str, Path]):
# Checks if the database is marked as corrupted and handles it by removing the database file and the marker file
db_path = Path(db_filename)
marker_path = get_corrupted_db_marker_path(db_path)
if marker_path.exists():
_handle_corrupted_db(db_path)


def get_corrupted_db_marker_path(db_filepath: Path) -> Path:
return Path(str(db_filepath) + '.is_corrupted')


def _is_malformed_db_exception(exception):
return isinstance(exception, sqlite3.DatabaseError) and 'malformed' in str(exception)


def _mark_db_as_corrupted(db_filepath: Path):
# Creates a new `*.is_corrupted` marker file alongside the database file
marker_path = get_corrupted_db_marker_path(db_filepath)
marker_path.touch()


def _handle_corrupted_db(db_path: Path):
# Removes the database file and the marker file
if db_path.exists():
logger.warning(f'Database file was marked as corrupted, removing it: {db_path}')
db_path.unlink()

marker_path = get_corrupted_db_marker_path(db_path)
if marker_path.exists():
logger.warning(f'Removing the corrupted database marker: {marker_path}')
marker_path.unlink()
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from __future__ import annotations

import sqlite3
import sys
from pathlib import Path
from sqlite3 import DataError, DatabaseError, Error, IntegrityError, InterfaceError, InternalError, NotSupportedError, \
OperationalError, ProgrammingError, Warning, sqlite_version_info # pylint: disable=unused-import, redefined-builtin

from tribler.core.utilities.db_corruption_handling.base import handling_malformed_db_error


# This module serves as a replacement to the sqlite3 module and handles the case when the database is corrupted.
# It provides the `connect` function that should be used instead of `sqlite3.connect` and the `Cursor` and `Connection`
# classes that replaces `sqlite3.Cursor` and `sqlite3.Connection` classes respectively. If the `connect` function or
# any Connectoin or Cursor method is called and the database is corrupted, the database is marked as corrupted and
# the DatabaseIsCorrupted exception is raised. It should be handled by terminating the Tribler Core with the exit code
# EXITCODE_DATABASE_IS_CORRUPTED (99). After the Core restarts, the `handle_db_if_corrupted` function checks the
# presense of the database corruption marker and handles it by removing the database file and the corruption marker.
# After that, the database is recreated upon the next attempt to connect to it.


def connect(db_filename: str, **kwargs) -> sqlite3.Connection:
# Replaces the sqlite3.connect function
kwargs['factory'] = Connection
with handling_malformed_db_error(Path(db_filename)):
return sqlite3.connect(db_filename, **kwargs)


def _add_method_wrapper_that_handles_malformed_db_exception(cls, method_name: str):
# Creates a wrapper for the given method that handles the case when the database is corrupted

def wrapper(self, *args, **kwargs):
with handling_malformed_db_error(self._db_filepath): # pylint: disable=protected-access
return getattr(super(cls, self), method_name)(*args, **kwargs)

wrapper.__name__ = method_name
wrapper.is_wrapped = True # for testing purposes
setattr(cls, method_name, wrapper)


class Cursor(sqlite3.Cursor):
# Handles the case when the database is corrupted in all relevant methods.
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._db_filepath = self.connection._db_filepath


for method_name_ in ['execute', 'executemany', 'executescript', 'fetchall', 'fetchmany', 'fetchone', '__next__']:
_add_method_wrapper_that_handles_malformed_db_exception(Cursor, method_name_)



class ConnectionBase(sqlite3.Connection):
# This class simplifies testing of the Connection class by allowing mocking of base class methods.
# Direct mocking of sqlite3.Connection methods is not possible because they are C functions.

if sys.version_info < (3, 11):
def blobopen(self, *args, **kwargs) -> Blob:
raise NotImplementedError


class Connection(ConnectionBase):
# Handles the case when the database is corrupted in all relevant methods.
def __init__(self, db_filepath: str, *args, **kwargs):
super().__init__(db_filepath, *args, **kwargs)
self._db_filepath = Path(db_filepath)

def cursor(self, factory=None) -> Cursor:
return super().cursor(factory or Cursor)

def iterdump(self):
# Not implemented because it is not used in Tribler.
# Can be added later with an iterator class that handles the malformed db error during the iteration
raise NotImplementedError

def blobopen(self, *args, **kwargs) -> Blob: # Works for Python >= 3.11
with handling_malformed_db_error(self._db_filepath):
blob = super().blobopen(*args, **kwargs)
return Blob(blob, self._db_filepath)


for method_name_ in ['commit', 'execute', 'executemany', 'executescript', 'backup', '__enter__', '__exit__',
'serialize', 'deserialize']:
_add_method_wrapper_that_handles_malformed_db_exception(Connection, method_name_)


class Blob: # For Python >= 3.11. Added now, so we do not forgot to add it later when upgrading to 3.11.
def __init__(self, blob, db_filepath: Path):
self._blob = blob
self._db_filepath = db_filepath


for method_name_ in ['close', 'read', 'write', 'seek', '__len__', '__enter__', '__exit__', '__getitem__',
'__setitem__']:
_add_method_wrapper_that_handles_malformed_db_exception(Blob, method_name_)
Empty file.
Loading

0 comments on commit e6d2c58

Please sign in to comment.