Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the filelock library to determine the primary GUI/Core process #7660

Merged
merged 1 commit into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions requirements-core.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ libtorrent==1.2.15
file-read-backwards==2.0.0
Brotli==1.0.9 # to prevent AttributeError on macOs: module 'brotli' has no attribute 'error' (in urllib3.response)
human-readable==1.3.2
filelock==3.13.0
18 changes: 12 additions & 6 deletions src/run_tribler_headless.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
from socket import inet_aton
from typing import Optional

from filelock import FileLock

from tribler.core.components.session import Session
from tribler.core.config.tribler_config import TriblerConfig
from tribler.core.start_core import components_gen
from tribler.core.utilities.osutils import get_appstate_dir, get_root_state_directory
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.process_manager import ProcessKind, ProcessManager, TriblerProcess, \
set_global_process_manager
from tribler.core.utilities.process_locking import CORE_LOCK_FILENAME, try_acquire_file_lock
from tribler.core.utilities.process_manager import ProcessKind, ProcessManager
from tribler.core.utilities.process_manager.manager import setup_process_manager


class IPPortAction(argparse.Action):
Expand Down Expand Up @@ -47,6 +50,7 @@ def __init__(self):
"""
self.session = None
self._stopping = False
self.process_lock: Optional[FileLock] = None
self.process_manager: Optional[ProcessManager] = None

def log_incoming_remote_search(self, sock_addr, keywords):
Expand All @@ -68,6 +72,7 @@ async def signal_handler(sig):
print("Tribler shut down")
get_event_loop().stop()
self.process_manager.current_process.finish()
self.process_lock.release()

signal.signal(signal.SIGINT, lambda sig, _: ensure_future(signal_handler(sig)))
signal.signal(signal.SIGTERM, lambda sig, _: ensure_future(signal_handler(sig)))
Expand All @@ -78,11 +83,12 @@ async def signal_handler(sig):
# Check if we are already running a Tribler instance
root_state_dir = get_root_state_directory(create=True)

current_process = TriblerProcess.current_process(ProcessKind.Core)
self.process_manager = ProcessManager(root_state_dir, current_process)
set_global_process_manager(self.process_manager)
self.process_lock = try_acquire_file_lock(root_state_dir / CORE_LOCK_FILENAME)
current_process_owns_lock = bool(self.process_lock)

self.process_manager = setup_process_manager(root_state_dir, ProcessKind.Core, current_process_owns_lock)

if not self.process_manager.current_process.become_primary():
if not current_process_owns_lock:
msg = 'Another Core process is already running'
print(msg)
self.process_manager.sys_exit(1, msg)
Expand Down
21 changes: 12 additions & 9 deletions src/tribler/core/start_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@
from tribler.core.sentry_reporter.sentry_reporter import SentryReporter, SentryStrategy
from tribler.core.upgrade.version_manager import VersionHistory
from tribler.core.utilities import slow_coro_detection
from tribler.core.utilities.process_manager import ProcessKind, ProcessManager, TriblerProcess, \
set_global_process_manager
from tribler.core.utilities.process_locking import CORE_LOCK_FILENAME, try_acquire_file_lock
from tribler.core.utilities.process_manager import ProcessKind
from tribler.core.utilities.process_manager.manager import setup_process_manager

logger = logging.getLogger(__name__)
CONFIG_FILE_NAME = 'triblerd.conf'
Expand Down Expand Up @@ -186,15 +187,17 @@ def run_tribler_core_session(api_port: Optional[int], api_key: str,
def run_core(api_port: Optional[int], api_key: Optional[str], root_state_dir, parsed_args):
logger.info(f"Running Core in {'gui_test_mode' if parsed_args.gui_test_mode else 'normal mode'}")

gui_pid = GuiProcessWatcher.get_gui_pid()
current_process = TriblerProcess.current_process(ProcessKind.Core, creator_pid=gui_pid)
process_manager = ProcessManager(root_state_dir, current_process)
set_global_process_manager(process_manager)
current_process_is_primary = process_manager.current_process.become_primary()
process_lock = try_acquire_file_lock(root_state_dir / CORE_LOCK_FILENAME)
current_process_owns_lock = bool(process_lock)

load_logger_config('tribler-core', root_state_dir, current_process_is_primary)
gui_process_pid = GuiProcessWatcher.get_gui_pid()

if not current_process_is_primary:
process_manager = setup_process_manager(root_state_dir, ProcessKind.Core, current_process_owns_lock,
creator_pid=gui_process_pid)

load_logger_config('tribler-core', root_state_dir, current_process_owns_lock)

if not current_process_owns_lock:
msg = 'Another Core process is already running'
logger.warning(msg)
process_manager.sys_exit(1, msg)
Expand Down
18 changes: 18 additions & 0 deletions src/tribler/core/utilities/process_locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from typing import Optional

from filelock import FileLock, Timeout


GUI_LOCK_FILENAME = 'tribler-gui.lock'
CORE_LOCK_FILENAME = 'tribler-core.lock'


def try_acquire_file_lock(lock_file_name) -> Optional[FileLock]:
lock = FileLock(lock_file_name)
try:
lock.acquire(blocking=False)
except Timeout:
return None
return lock
63 changes: 50 additions & 13 deletions src/tribler/core/utilities/process_manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,40 @@ def get_global_process_manager() -> Optional[ProcessManager]:
return global_process_manager


def setup_process_manager(root_state_dir: Path, process_kind: ProcessKind, current_process_owns_lock: bool,
creator_pid: Optional[int] = None) -> ProcessManager:
process_manager = ProcessManager(root_state_dir)
process_manager.setup_current_process(kind=process_kind, owns_lock=current_process_owns_lock,
creator_pid=creator_pid)
set_global_process_manager(process_manager)
return process_manager


class ProcessManager:
def __init__(self, root_dir: Path, current_process: TriblerProcess, db_filename: str = DB_FILENAME):
def __init__(self, root_dir: Path, db_filename: str = DB_FILENAME):
self.logger = logger # Used by the `with_retry` decorator
self.root_dir = root_dir
self.db_filepath = root_dir / db_filename
self.connection: Optional[sqlite3.Connection] = None
self.current_process = current_process
current_process.manager = self
self._current_process: Optional[TriblerProcess] = None

@with_retry
def setup_current_process(self, kind: ProcessKind, owns_lock: bool, creator_pid: Optional[int] = None):
current_process = TriblerProcess.current_process(manager=self, kind=kind, owns_lock=owns_lock,
creator_pid=creator_pid)
self._current_process = current_process
with self.connect():
if current_process.primary:
if primary_process := self.get_primary_process(current_process.kind):
raise RuntimeError(f'Previous primary process still active: {primary_process}. '
f'Current process: {current_process}')
current_process.save()

@property
def current_process(self):
if self._current_process is None:
raise RuntimeError('Current process is not set')
return self._current_process

@contextmanager
def connect(self) -> ContextManager[sqlite3.Connection]:
Expand Down Expand Up @@ -105,27 +131,38 @@ def _unable_to_open_db_file_get_reason(self):

return 'unknown reason'

def primary_process_rowid(self, kind: ProcessKind) -> Optional[int]:
def get_primary_process(self, kind: ProcessKind) -> Optional[TriblerProcess]:
"""
A helper method to load the current primary process of the specified kind from the database.

Returns rowid of the existing process or None.
Returns existing primary process or None.
"""
with self.connect() as connection:
cursor = connection.execute(f"""
SELECT {sql_scripts.SELECT_COLUMNS}
FROM processes WHERE kind = ? and "primary" = 1 ORDER BY rowid DESC LIMIT 1
FROM processes WHERE kind = ? and "primary" = 1 ORDER BY rowid
""", [kind.value])
row = cursor.fetchone()
if row is not None:

primary_processes = [] # In normal situation there should be at most one primary process
rows = cursor.fetchall()
for row in rows:
process = TriblerProcess.from_row(self, row)
if process.is_running():
return process.rowid
primary_processes.append(process)
else:
# Process is not running anymore; mark it as not primary
process.primary = False
process.save()

if not primary_processes:
return None

if len(primary_processes) > 1:
for process in primary_processes:
process.error_msg = "Multiple primary processes found in the database"
process.save()

# Process is not running anymore; mark it as not primary
process.primary = False
process.save()
return None
return primary_processes[0]

def sys_exit(self, exit_code: Optional[int] = None, error: Optional[str | Exception] = None, replace: bool = False):
"""
Expand Down
39 changes: 6 additions & 33 deletions src/tribler/core/utilities/process_manager/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ class ProcessKind(Enum):


class TriblerProcess:
def __init__(self, pid: int, kind: ProcessKind, app_version: str, started_at: int,
def __init__(self, manager: ProcessManager, pid: int, kind: ProcessKind, app_version: str, started_at: int,
row_version: int = 0, rowid: Optional[int] = None, creator_pid: Optional[int] = None,
primary: bool = False, canceled: bool = False, api_port: Optional[int] = None,
finished_at: Optional[int] = None, exit_code: Optional[int] = None, error_msg: Optional[str] = None,
manager: Optional[ProcessManager] = None):
self._manager = manager
finished_at: Optional[int] = None, exit_code: Optional[int] = None, error_msg: Optional[str] = None):
self.manager = manager
self.rowid = rowid
self.row_version = row_version
self.pid = pid
Expand All @@ -44,16 +43,6 @@ def __init__(self, pid: int, kind: ProcessKind, app_version: str, started_at: in
self.exit_code = exit_code
self.error_msg = error_msg

@property
def manager(self) -> ProcessManager:
if self._manager is None:
raise RuntimeError('Tribler process manager is not set in process object')
return self._manager

@manager.setter
def manager(self, manager: ProcessManager):
self._manager = manager

@property
def logger(self) -> logging.Logger:
"""Used by the `with_retry` decorator"""
Expand Down Expand Up @@ -128,35 +117,19 @@ def __str__(self) -> str:
return ''.join(result)

@classmethod
def current_process(cls, kind: ProcessKind,
creator_pid: Optional[int] = None,
manager: Optional[ProcessManager] = None) -> TriblerProcess:
def current_process(cls, manager: ProcessManager, kind: ProcessKind, owns_lock: bool,
creator_pid: Optional[int] = None) -> TriblerProcess:
"""Constructs an object for a current process, specifying the PID value of the current process"""
pid = os.getpid()
psutil_process = psutil.Process(pid)
started_at = int(psutil_process.create_time())
return cls(manager=manager, row_version=0, pid=pid, kind=kind,
return cls(manager=manager, row_version=0, pid=pid, kind=kind, primary=owns_lock,
app_version=version_id, started_at=started_at, creator_pid=creator_pid)

def is_current_process(self) -> bool:
"""Returns True if the object represents the current process"""
return self.pid == os.getpid() and self.is_running()

@with_retry
def become_primary(self) -> bool:
"""
If there is no primary process already, makes the current process primary and returns the primary status
"""
with self.manager.connect():
# for a new process object self.rowid is None
primary_rowid = self.manager.primary_process_rowid(self.kind)
if primary_rowid is None or primary_rowid == self.rowid:
self.primary = True
else:
self.canceled = True
self.save()
return bool(self.primary)

def is_running(self):
"""Returns True if the object represents a running process"""
if not psutil.pid_exists(self.pid):
Expand Down
5 changes: 2 additions & 3 deletions src/tribler/core/utilities/process_manager/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
@pytest.fixture(name='process_manager')
def process_manager_fixture(tmp_path: Path) -> ProcessManager:
# Creates a process manager with a new database and adds a primary current process to it
current_process = TriblerProcess.current_process(ProcessKind.Core)
process_manager = ProcessManager(tmp_path, current_process)
current_process.become_primary()
process_manager = ProcessManager(tmp_path)
process_manager.setup_current_process(kind=ProcessKind.Core, owns_lock=True)
return process_manager
Loading
Loading