Skip to content

Commit

Permalink
fix: resolve potential deadlock in AsyncInMemoryCache (langflow-ai#4464)
Browse files Browse the repository at this point in the history
* Fix potential lock misuse and deadlock in AsyncInMemoryCache.

* Recover async lock handling logic.

* Remove unused lock parameter in upsert.

* Fix potential lock misuse and deadlock in AsyncInMemoryCache.

* Recover async lock handling logic.

* Remove unused lock parameter in upsert.

* Add lock parameter to prevent errors.

* Fix ARG002 rule error.

* Lock passed to get and set method.

---------

Co-authored-by: Gabriel Luiz Freitas Almeida <gabriel@langflow.org>
  • Loading branch information
2 people authored and diogocabral committed Nov 26, 2024
1 parent 0ee9e35 commit 55502de
Showing 1 changed file with 8 additions and 27 deletions.
35 changes: 8 additions & 27 deletions src/backend/base/langflow/services/cache/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,10 +298,7 @@ def __init__(self, max_size=None, expiration_time=3600) -> None:
self.expiration_time = expiration_time

async def get(self, key, lock: asyncio.Lock | None = None):
if not lock:
async with self.lock:
return await self._get(key)
else:
async with lock or self.lock:
return await self._get(key)

async def _get(self, key):
Expand All @@ -315,13 +312,7 @@ async def _get(self, key):
return CACHE_MISS

async def set(self, key, value, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._set(
key,
value,
)
else:
async with lock or self.lock:
await self._set(
key,
value,
Expand All @@ -334,39 +325,29 @@ async def _set(self, key, value) -> None:
self.cache.move_to_end(key)

async def delete(self, key, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._delete(key)
else:
async with lock or self.lock:
await self._delete(key)

async def _delete(self, key) -> None:
if key in self.cache:
del self.cache[key]

async def clear(self, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._clear()
else:
async with lock or self.lock:
await self._clear()

async def _clear(self) -> None:
self.cache.clear()

async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
if not lock:
async with self.lock:
await self._upsert(key, value)
else:
await self._upsert(key, value)
await self._upsert(key, value, lock)

async def _upsert(self, key, value) -> None:
existing_value = await self.get(key)
async def _upsert(self, key, value, lock: asyncio.Lock | None = None) -> None:
existing_value = await self.get(key, lock)
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict):
existing_value.update(value)
value = existing_value
await self.set(key, value)
await self.set(key, value, lock)

async def contains(self, key) -> bool:
return key in self.cache

0 comments on commit 55502de

Please sign in to comment.