Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Lock will not lock up in case of worker failures #8770

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 49 additions & 122 deletions distributed/lock.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,33 @@
from __future__ import annotations

import asyncio
import logging
import uuid
from collections import defaultdict, deque

from dask.utils import parse_timedelta

from distributed.utils import TimeoutError, log_errors, wait_for
from distributed.worker import get_client
from distributed.semaphore import Semaphore

logger = logging.getLogger(__name__)

_no_value = object()

class LockExtension:
"""An extension for the scheduler to manage Locks

This adds the following routes to the scheduler
class Lock(Semaphore):
"""Distributed Centralized Lock

* lock_acquire
* lock_release
"""
.. warning::

def __init__(self, scheduler):
self.scheduler = scheduler
self.events = defaultdict(deque)
self.ids = dict()
This is using the ``distributed.Semaphore`` as a backend, which is
susceptible to lease overbooking. For the Lock this means that if a
lease is timing out, two or more instances could acquire the lock at the
same time. To disable lease timeouts, set
``distributed.scheduler.locks.lease-timeout`` to `inf`, e.g.

self.scheduler.handlers.update(
{"lock_acquire": self.acquire, "lock_release": self.release}
)
.. code-block:: python

@log_errors
async def acquire(self, name=None, id=None, timeout=None):
if isinstance(name, list):
name = tuple(name)
if name not in self.ids:
result = True
else:
while name in self.ids:
event = asyncio.Event()
self.events[name].append(event)
future = event.wait()
if timeout is not None:
future = wait_for(future, timeout)
try:
await future
except TimeoutError:
result = False
break
else:
result = True
finally:
event2 = self.events[name].popleft()
assert event is event2
if result:
assert name not in self.ids
self.ids[name] = id
return result

@log_errors
def release(self, name=None, id=None):
if isinstance(name, list):
name = tuple(name)
if self.ids.get(name) != id:
raise ValueError("This lock has not yet been acquired")
del self.ids[name]
if self.events[name]:
self.scheduler.loop.add_callback(self.events[name][0].set)
else:
del self.events[name]


class Lock:
"""Distributed Centralized Lock
with dask.config.set({"distributed.scheduler.locks.lease-timeout": "inf"}):
lock = Lock("x")
...

Note, that without lease timeouts, the Lock may deadlock in case of
cluster downscaling or worker failures.

Parameters
----------
Expand All @@ -93,29 +47,31 @@ class Lock:
>>> lock.release() # doctest: +SKIP
"""

def __init__(self, name=None, client=None):
self._client = client
fjetter marked this conversation as resolved.
Show resolved Hide resolved
self.name = name or "lock-" + uuid.uuid4().hex
self.id = uuid.uuid4().hex
self._locked = False

@property
def client(self):
if not self._client:
try:
self._client = get_client()
except ValueError:
pass
return self._client

def _verify_running(self):
if not self.client:
raise RuntimeError(
f"{type(self)} object not properly initialized. This can happen"
" if the object is being deserialized outside of the context of"
" a Client or Worker."
def __init__(
self,
name=None,
client=_no_value,
register=True,
scheduler_rpc=None,
loop=None,
):
if client is not _no_value:
import warnings

warnings.warn(
"The `client` parameter is deprecated. It is no longer necessary to pass a client to Lock.",
DeprecationWarning,
stacklevel=2,
)

super().__init__(
max_leases=1,
name=name,
register=register,
scheduler_rpc=scheduler_rpc,
loop=loop,
)

def acquire(self, blocking=True, timeout=None):
"""Acquire the lock

Expand All @@ -139,50 +95,21 @@ def acquire(self, blocking=True, timeout=None):
-------
True or False whether or not it successfully acquired the lock
"""
self._verify_running()
timeout = parse_timedelta(timeout)

if not blocking:
if timeout is not None:
raise ValueError("can't specify a timeout for a non-blocking call")
timeout = 0
return super().acquire(timeout=timeout)

result = self.client.sync(
self.client.scheduler.lock_acquire,
name=self.name,
id=self.id,
timeout=timeout,
)
self._locked = True
return result

def release(self):
"""Release the lock if already acquired"""
self._verify_running()
if not self.locked():
raise ValueError("Lock is not yet acquired")
result = self.client.sync(
self.client.scheduler.lock_release, name=self.name, id=self.id
)
self._locked = False
return result
async def _locked(self):
val = await self.scheduler.semaphore_value(name=self.name)
return val == 1

def locked(self):
return self._locked

def __enter__(self):
self.acquire()
return self

def __exit__(self, exc_type, exc_value, traceback):
self.release()

async def __aenter__(self):
await self.acquire()
return self
return self.sync(self._locked)

async def __aexit__(self, exc_type, exc_value, traceback):
await self.release()
def __getstate__(self):
return self.name

def __reduce__(self):
return (Lock, (self.name,))
def __setstate__(self, state):
self.__init__(name=state, register=False)
2 changes: 0 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@
from distributed.diagnostics.plugin import SchedulerPlugin, _get_plugin_name
from distributed.event import EventExtension
from distributed.http import get_handlers
from distributed.lock import LockExtension
from distributed.metrics import time
from distributed.multi_lock import MultiLockExtension
from distributed.node import ServerNode
Expand Down Expand Up @@ -179,7 +178,6 @@
STIMULUS_ID_UNSET = "<stimulus_id unset>"

DEFAULT_EXTENSIONS = {
"locks": LockExtension,
"multi_locks": MultiLockExtension,
"publish": PublishExtension,
"replay-tasks": ReplayTaskScheduler,
Expand Down
Loading
Loading