Skip to content

Commit

Permalink
Test search interacts with changes as expected. #323
Browse files Browse the repository at this point in the history
  • Loading branch information
lemon24 committed Mar 4, 2024
1 parent 11db38a commit 6334473
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 220 deletions.
6 changes: 3 additions & 3 deletions src/reader/_storage/_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def _delete_from_search_one_chunk(self, changes: list[Change]) -> None:
with self.get_db() as db:
for change in changes:
# ignore non-entry changes
if change.tag_key or len(change.resource_id) != 2: # pragma: no cover
if change.tag_key or len(change.resource_id) != 2:
continue
assert change.action == Action.DELETE, change.action

Expand Down Expand Up @@ -305,7 +305,7 @@ def _insert_into_search_one_chunk(self, changes: list[Change]) -> None:
entries = {}
for change in changes:
# ignore non-entry changes
if change.tag_key or len(change.resource_id) != 2: # pragma: no cover
if change.tag_key or len(change.resource_id) != 2:
continue
assert change.action == Action.INSERT, change.action
entry = next(
Expand All @@ -314,7 +314,7 @@ def _insert_into_search_one_chunk(self, changes: list[Change]) -> None:
),
None,
)
if not entry: # pragma: no cover FIXME: needs test
if not entry:
continue
if entry._sequence != change.sequence:
continue
Expand Down
320 changes: 103 additions & 217 deletions tests/test_reader_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from reader import StorageError
from reader._storage import Storage
from reader._storage._search import Search
from reader._types import Action
from reader._types import Change
from reader.exceptions import ChangeTrackingNotEnabledError


Expand Down Expand Up @@ -386,6 +388,8 @@ def test_update_triggers_no_change(db_path, make_reader, monkeypatch, set_user_t
if anything else except the indexed fields changes.
"""
# TODO: maybe move this to a test_changes.py test

from reader._storage._search import Search

strip_html_called = 0
Expand Down Expand Up @@ -458,6 +462,105 @@ def strip_html(*args, **kwargs):
assert strip_html_called == 0


@pytest.mark.parametrize('changes_limit', [None, 2])
def test_update_unknown_changes_are_marked_as_done(reader, changes_limit):
reader._parser = parser = Parser()
reader.add_feed(parser.feed(1))
one = parser.entry(1, '1')
two = parser.entry(1, '2')
reader.update_feeds()

reader.enable_search()

changes = [
Change(Action.INSERT, b'wrong', two.resource_id),
Change(Action.INSERT, b'inexistent', one.resource_id),
Change(Action.DELETE, b'inexistent', one.resource_id),
Change(Action.INSERT, b'seq', ('1',)),
Change(Action.DELETE, b'seq', (), 'tag'),
Change(Action.INSERT, b'seq', ('1',), 'tag'),
Change(Action.DELETE, b'seq', ('1', '1'), 'tag'),
]
changes += reader._storage.changes.get()
reader._storage.delete_entries([one.resource_id])
changes += reader._storage.changes.get()

seen_changes = set()

def get(action=None):
rv = list(changes)
if action is not None:
rv = [c for c in rv if c.action == action and c not in seen_changes]
if changes_limit:
rv = rv[:changes_limit]
seen_changes.update(rv)
return rv

done_changes = set()

def done(changes):
done_changes.update(changes)

reader._storage.changes.get = get
reader._storage.changes.done = done

reader.update_search()

assert set(changes) == done_changes


def test_update_actually_deletes_from_database(reader):
# TODO: this is implementation-dependent, move to test_search.py?
reader._parser = parser = Parser()
reader.add_feed(parser.feed(1))
parser.entry(1, '1')
parser.entry(1, '2', summary='summary', content=[Content('content')])
reader.update_feeds()

db = reader._search.get_db()

def get_entries_search():
return sorted(
db.execute('select _feed, _id, _content_path from entries_search')
)

def get_entries_search_sync_state():
return sorted(
db.execute(
"""
select feed, id, sequence, json_array_length(es_rowids)
from entries_search_sync_state
"""
)
)

reader.update_search()
one, two = sorted(reader.get_entries(), key=lambda e: e.resource_id)
assert get_entries_search() == [
('1', '1', None),
('1', '2', '.content[0].value'),
('1', '2', '.summary'),
]
assert get_entries_search_sync_state() == [
('1', '1', one._sequence, 1),
('1', '2', two._sequence, 2),
]

reader._storage.delete_entries([two.resource_id])
reader.update_search()
assert get_entries_search() == [
('1', '1', None),
]
assert get_entries_search_sync_state() == [
('1', '1', one._sequence, 1),
]

reader.delete_feed('1')
reader.update_search()
assert get_entries_search() == []
assert get_entries_search_sync_state() == []


@with_sort
def test_search_entries_basic(reader, sort):
parser = Parser()
Expand Down Expand Up @@ -789,223 +892,6 @@ def test_search_entries_sort_error(reader):
set(reader.search_entries('one', sort='bad sort'))


# BEGIN concurrency tests


# last line fails on Python 3.11 on Windows with:
# AssertionError: assert 'three' == 'two'
# not worth finding out why, these tests will likely be removed in
# https://github.com/lemon24/reader/issues/323


@pytest.mark.flaky(max_runs=5, rerun_filter=lambda *_: os.name == 'nt')
def test_update_search_entry_changed_during_strip_html(
db_path, make_reader, monkeypatch
):
"""Test the entry can't remain out of sync if it changes
during reader.update_search() in a strip_html() call.
https://github.com/lemon24/reader/issues/175#issuecomment-652489019
"""
# This is a very intrusive test, maybe we should move it somewhere else.

reader = make_reader(db_path)
parser = reader._parser = Parser()

feed = parser.feed(1, datetime(2010, 1, 1), title='one')
parser.entry(1, 1, datetime(2010, 1, 1), title='one')
reader.add_feed(feed.url)
reader.update_feeds()

reader.enable_search()
reader.update_search()

feed = parser.feed(1, datetime(2010, 1, 2), title='two')
parser.entry(1, 1, datetime(2010, 1, 2), title='two')
reader.update_feed(feed.url)

in_strip_html = threading.Event()
can_return_from_strip_html = threading.Event()

def target():
# can't use fixture because it would run close() in a different thread
from reader import make_reader
from reader._storage._search import Search

# strip_html() may or may not be used a SQLite user-defined function,
# hence the whole subclassing thing
class MySearch(Search):
@staticmethod
def strip_html(*args, **kwargs):
in_strip_html.set()
can_return_from_strip_html.wait()
return Search.strip_html(*args, **kwargs)

# TODO: remove monkeypatching when make_reader() gets a search_cls argument
monkeypatch.setattr('reader.core.Storage.make_search', lambda s: MySearch(s))

reader = make_reader(db_path)
try:
reader.update_search()
finally:
reader.close()

thread = threading.Thread(target=target)
thread.start()

in_strip_html.wait()

try:
feed = parser.feed(1, datetime(2010, 1, 3), title='three')
parser.entry(1, 1, datetime(2010, 1, 3), title='three')
reader._storage.get_db().execute("PRAGMA busy_timeout = 0;")
reader.update_feed(feed.url)
expected_title = 'three'
except StorageError:
expected_title = 'two'
finally:
can_return_from_strip_html.set()
thread.join()

reader.update_search()

(entry,) = reader.get_entries()
(result,) = reader.search_entries('one OR two OR three')
assert entry.title == result.metadata['.title'].value == expected_title


def test_update_search_entry_changed_between_insert_loops(
db_path, make_reader, monkeypatch
):
"""Test the entry can't be added twice to the search index if it changes
during reader.update_search() between two insert loops.
This was especially relevant for the pre-change-tracking (#323) version:
https://github.com/lemon24/reader/issues/175#issuecomment-654213853
"""
# This is a very intrusive test, maybe we should move it somewhere else.

reader = make_reader(db_path)
reader.enable_search()

parser = reader._parser = Parser()

feed = parser.feed(1, datetime(2010, 1, 1))
parser.entry(1, 1, datetime(2010, 1, 1), summary='one')
reader.add_feed(feed.url)
reader.update_feeds()

in_insert_chunk = threading.Event()
can_return_from_insert_chunk = threading.Event()

def target():
# can't use fixture because it would run close() in a different thread
from reader import make_reader

reader = make_reader(db_path)
original_insert_chunk = reader._search._insert_into_search_one_chunk

def insert_chunk(*args, **kwargs):
in_insert_chunk.set()
can_return_from_insert_chunk.wait()
return original_insert_chunk(*args, **kwargs)

reader._search._insert_into_search_one_chunk = insert_chunk

try:
reader.update_search()
finally:
reader.close()

thread = threading.Thread(target=target)
thread.start()

in_insert_chunk.wait()

try:
feed = parser.feed(1, datetime(2010, 1, 2))
parser.entry(1, 1, datetime(2010, 1, 2), summary='two')
reader.update_feed(feed.url)
finally:
can_return_from_insert_chunk.set()
thread.join()

(result,) = reader.search_entries('entry')
assert len(result.content) == 1

((rowcount,),) = reader._search.get_db().execute(
"select count(*) from entries_search;"
)
assert rowcount == 1


def test_update_search_concurrent_calls(db_path, make_reader, monkeypatch):
"""Test concurrent calls to reader.update_search() don't interfere
with one another.
https://github.com/lemon24/reader/issues/175#issuecomment-652489019
"""
# This is a very intrusive test, maybe we should move it somewhere else.

reader = make_reader(db_path)
parser = reader._parser = Parser()

feed = parser.feed(1, datetime(2010, 1, 1), title='feed')
parser.entry(
1,
1,
datetime(2010, 1, 1),
title='entry',
summary='summary',
content=[Content('content')],
)
reader.add_feed(feed.url)
reader.update_feeds()
reader.enable_search()

barrier = threading.Barrier(2)

def target():
# can't use fixture because it would run close() in a different thread
from reader import make_reader
from reader._storage._search import Search

class MySearch(Search):
@staticmethod
def strip_html(*args, **kwargs):
barrier.wait()
return Search.strip_html(*args, **kwargs)

# TODO: remove monkeypatching when make_reader() gets a search_cls argument
monkeypatch.setattr('reader.core.Storage.make_search', lambda s: MySearch(s))

reader = make_reader(db_path)
try:
reader.update_search()
finally:
reader.close()

threads = [threading.Thread(target=target) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()

(result,) = reader.search_entries('entry')
assert len(result.content) == 2

((rowcount,),) = reader._search.get_db().execute(
"select count(*) from entries_search;"
)
assert rowcount == 2


# END concurrency tests


def test_add_entry_repeatedly(reader):
# this could have been found with Hypothesis

Expand Down

0 comments on commit 6334473

Please sign in to comment.