Skip to content

Commit

Permalink
client: Fix cache corruption on loadBefore and prefetch (#169)
Browse files Browse the repository at this point in the history
Currently loadBefore and prefetch spawn async protocol.load_before task,
and, after waking up on its completion, populate the cache with received
data. But in between the time when protocol.load_before handler is run
and the time when protocol.load_before caller wakes up, there is a
window in which event loop might be running some other code, including
the code that handles invalidateTransaction messages from the server.

This means that cache updates and cache invalidations can be processed on
the client not in the order that server sent it. And such difference in
the order can lead to data corruption if e.g server sent

	<- loadBefore oid serial=tid1 next_serial=None
	<- invalidateTransaction tid2 oid

and client processed it as

	invalidateTransaction tid2 oid
	cache.store(oid, tid1, next_serial=None)

because here the end effect is that invalidation for oid@tid2 is not
applied to the cache.

The fix is simple: perform cache updates right after loadBefore reply is
received.

Fixes: #155

The fix is based on analysis and initial patch by @jmuchemb:

#155 (comment)

A tests corresponding to the fix is coming coming through

#170  and
zopefoundation/ZODB#345

For the reference, my original ZEO-specific data corruption reproducer
is here:

#155 (comment)
https://lab.nexedi.com/kirr/wendelin.core/blob/ecd0e7f0/zloadrace5.py

/cc @jamadden, @dataflake, @jimfulton
/reviewed-by @jmuchemb, @d-maurer
/reviewed-on #169
  • Loading branch information
navytux authored Apr 20, 2021
1 parent 415b1ff commit e277ba0
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 12 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Changelog
5.2.3 (unreleased)
------------------

- Fix data corruption due to race between load and external invalidations.
See `issue 155 <https://github.com/zopefoundation/ZEO/issues/155>`_.

5.2.2 (2020-08-11)
------------------
Expand Down
32 changes: 20 additions & 12 deletions src/ZEO/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,19 @@ def load_before(self, oid, tid):
message_id = (oid, tid)
future = self.futures.get(message_id)
if future is None:
future = asyncio.Future(loop=self.loop)
future = Fut()
self.futures[message_id] = future
self._write(
self.encode(message_id, False, 'loadBefore', (oid, tid)))
@future.add_done_callback
def _(future):
try:
data = future.result()
except Exception:
return
if data:
data, start, end = data
self.client.cache.store(oid, start, end, data)
return future

# Methods called by the server.
Expand Down Expand Up @@ -608,9 +617,6 @@ def load_before_threadsafe(self, future, wait_ready, oid, tid):
future.set_exception(exc)
else:
future.set_result(data)
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
elif wait_ready:
self._when_ready(
self.load_before_threadsafe, future, wait_ready, oid, tid)
Expand All @@ -620,10 +626,7 @@ def load_before_threadsafe(self, future, wait_ready, oid, tid):
@future_generator
def _prefetch(self, oid, tid):
try:
data = yield self.protocol.load_before(oid, tid)
if data:
data, start, end = data
self.cache.store(oid, start, end, data)
yield self.protocol.load_before(oid, tid)
except Exception:
logger.exception("prefetch %r %r" % (oid, tid))

Expand Down Expand Up @@ -888,20 +891,25 @@ def close(self):
raise self.exception

class Fut(object):
"""Lightweight future that calls it's callback immediately rather than soon
"""Lightweight future that calls it's callbacks immediately rather than soon
"""

def __init__(self):
self.cbv = []

def add_done_callback(self, cb):
self.cb = cb
self.cbv.append(cb)

exc = None
def set_exception(self, exc):
self.exc = exc
self.cb(self)
for cb in self.cbv:
cb(self)

def set_result(self, result):
self._result = result
self.cb(self)
for cb in self.cbv:
cb(self)

def result(self):
if self.exc:
Expand Down

0 comments on commit e277ba0

Please sign in to comment.