Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added additional test for hierarchical lock #768

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from contextlib import asynccontextmanager
from typing import Set

from loguru import logger


class HierarchicalLock:
"""A hierarchical lock for asyncio.
Expand Down Expand Up @@ -43,13 +45,18 @@ async def acquire(self, path: str):

# Wait until there is no conflict with existing locked paths
while any(self._is_conflicting(path, lp) for lp in self._locked_paths):
logger.debug(
f"Found conflicting path with {path!r}, waiting for release to check again..."
)
# Condition.wait() releases the lock and waits for notify_all()
await self._cond.wait()

# Acquire the path
self._locked_paths.add(path)
if task not in self._task_locks:
self._task_locks[task] = set()
self._task_locks[task].add(path)
logger.debug("Acquired lock for path: {}", path)

async def release(self, path: str):
"""Release the lock for the given path and notify waiting tasks."""
Expand All @@ -74,6 +81,7 @@ async def release(self, path: str):

# Notify all tasks that something was released
self._cond.notify_all()
logger.debug("Released lock for path: {}", path)

@asynccontextmanager
async def lock(self, path: str) -> "HierarchicalLock":
Expand Down
26 changes: 26 additions & 0 deletions packages/opal-common/opal_common/tests/hierarchical_lock_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ async def lock_sibling(path):
assert duration < 0.2, "Both siblings should acquire lock concurrently"


@pytest.mark.asyncio
async def test_conflict_do_not_block_unrelated():
lock = HierarchicalLock()

# Acquire two sibling paths concurrently
# They should not block each other
async def lock_sibling(path: str, delay: float = 0.1):
async with lock.lock(path):
await asyncio.sleep(delay)
return path

parent = asyncio.create_task(lock_sibling("parent", 0.2))
child = lock_sibling("parent.child", 0.1)
unrelated = lock_sibling("unrelated", 0.1)

# Wait for all tasks to complete, in the order they complete
order = []
for coro in asyncio.as_completed([child, parent, unrelated], timeout=10):
order.append(await coro)
assert order == [
"unrelated",
"parent",
"parent.child",
], "Unrelated paths should not block"


@pytest.mark.asyncio
async def test_parent_blocks_child():
lock = HierarchicalLock()
Expand Down
Loading