-
-
Notifications
You must be signed in to change notification settings - Fork 567
/
Copy pathtest_tasks_manager.py
539 lines (465 loc) · 22.8 KB
/
test_tasks_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
from typing import TYPE_CHECKING, cast
from unittest.mock import MagicMock, patch
import gevent
import pytest
from rotkehlchen.assets.asset import EvmToken
from rotkehlchen.chain.bitcoin.hdkey import HDKey
from rotkehlchen.chain.bitcoin.xpub import XpubData
from rotkehlchen.constants.assets import A_YFI
from rotkehlchen.constants.resolver import evm_address_to_identifier
from rotkehlchen.constants.timing import DATA_UPDATES_REFRESH
from rotkehlchen.db.cache import DBCacheDynamic, DBCacheStatic
from rotkehlchen.db.evmtx import DBEvmTx
from rotkehlchen.db.settings import ModifiableDBSettings
from rotkehlchen.errors.misc import RemoteError
from rotkehlchen.globaldb.handler import GlobalDBHandler
from rotkehlchen.premium.premium import Premium, PremiumCredentials, SubscriptionStatus
from rotkehlchen.serialization.deserialize import deserialize_timestamp
from rotkehlchen.tasks.manager import PREMIUM_STATUS_CHECK, TaskManager
from rotkehlchen.tasks.utils import should_run_periodic_task
from rotkehlchen.tests.utils.ethereum import (
TEST_ADDR1,
TEST_ADDR2,
get_decoded_events_of_transaction,
setup_ethereum_transactions_test,
)
from rotkehlchen.tests.utils.factories import make_evm_address
from rotkehlchen.tests.utils.mock import mock_evm_chains_with_transactions
from rotkehlchen.tests.utils.premium import VALID_PREMIUM_KEY, VALID_PREMIUM_SECRET
from rotkehlchen.types import (
SPAM_PROTOCOL,
ChainID,
EvmTokenKind,
Location,
SupportedBlockchain,
deserialize_evm_tx_hash,
)
from rotkehlchen.utils.hexbytes import hexstring_to_bytes
from rotkehlchen.utils.misc import ts_now
if TYPE_CHECKING:
from rotkehlchen.db.dbhandler import DBHandler
from rotkehlchen.exchanges.exchange import ExchangeInterface
from rotkehlchen.exchanges.manager import ExchangeManager
@pytest.mark.parametrize('number_of_eth_accounts', [2])
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_query_ethereum_transactions(task_manager, ethereum_accounts):
task_manager.potential_tasks = [task_manager._maybe_query_evm_transactions]
now = ts_now()
def tx_query_mock(address, start_ts, end_ts):
assert address in ethereum_accounts
assert start_ts == 0
assert end_ts >= now
ethereum = task_manager.chains_aggregator.get_chain_manager(SupportedBlockchain.ETHEREUM)
tx_query_patch = patch.object(
ethereum.transactions,
'single_address_query_transactions',
wraps=tx_query_mock,
)
timeout = 8
try:
with gevent.Timeout(timeout), tx_query_patch as tx_mock:
# First two calls to schedule should handle the addresses
for i in range(2):
task_manager.schedule()
while tx_mock.call_count != i + 1:
gevent.sleep(.2)
task_manager.schedule()
gevent.sleep(.5)
assert tx_mock.call_count == 2, '3rd schedule should do nothing'
except gevent.Timeout as e:
raise AssertionError(f'The transaction query was not scheduled within {timeout} seconds') from e # noqa: E501
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_schedule_xpub_derivation(task_manager, database):
xpub = 'xpub68V4ZQQ62mea7ZUKn2urQu47Bdn2Wr7SxrBxBDDwE3kjytj361YBGSKDT4WoBrE5htrSB8eAMe59NPnKrcAbiv2veN5GQUmfdjRddD1Hxrk' # noqa: E501
xpub_data1 = XpubData(
xpub=HDKey.from_xpub(xpub=xpub, path='m'),
blockchain=SupportedBlockchain.BITCOIN,
derivation_path='m/0/0',
)
xpub_data2 = XpubData(
xpub=HDKey.from_xpub(xpub=xpub, path='m'),
blockchain=SupportedBlockchain.BITCOIN_CASH,
derivation_path='m/0/0',
)
with database.user_write() as cursor:
database.add_bitcoin_xpub(cursor, xpub_data1)
database.add_bitcoin_xpub(cursor, xpub_data2)
task_manager.potential_tasks = [task_manager._maybe_schedule_xpub_derivation]
xpub_derive_patch = patch(
'rotkehlchen.chain.bitcoin.xpub.XpubManager.check_for_new_xpub_addresses',
return_value=None,
)
timeout = 4
try:
with gevent.Timeout(timeout), xpub_derive_patch as xpub_mock:
task_manager.schedule()
while xpub_mock.call_count != 2:
gevent.sleep(.2)
except gevent.Timeout as e:
raise AssertionError(f'xpub derivation query was not scheduled within {timeout} seconds') from e # noqa: E501
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_schedule_exchange_query(task_manager, exchange_manager, poloniex):
now = ts_now()
task_manager.potential_tasks = [task_manager._maybe_schedule_exchange_history_query]
def mock_query_history(start_ts, end_ts, only_cache):
assert start_ts == 0
assert end_ts >= now
assert only_cache is False
exchange_manager.connected_exchanges[Location.POLONIEX] = [poloniex]
poloniex_patch = patch.object(poloniex, 'query_trade_history', wraps=mock_query_history)
timeout = 5
try:
with gevent.Timeout(timeout), poloniex_patch as poloniex_mock:
task_manager.schedule()
while poloniex_mock.call_count != 1:
gevent.sleep(.2)
task_manager.schedule()
gevent.sleep(.5)
assert poloniex_mock.call_count == 1, '2nd schedule should do nothing'
except gevent.Timeout as e:
raise AssertionError(f'exchange query was not scheduled within {timeout} seconds') from e
def test_maybe_schedule_exchange_query_ignore_exchanges(
task_manager: 'TaskManager',
exchange_manager: 'ExchangeManager',
poloniex: 'ExchangeInterface',
) -> None:
"""Verify that task manager respects the ignored exchanges when querying trades"""
exchange_manager.connected_exchanges[Location.POLONIEX] = [poloniex]
task_manager.exchange_manager = exchange_manager
with task_manager.database.user_write() as cursor:
task_manager.database.set_settings(cursor, ModifiableDBSettings(
non_syncing_exchanges=[poloniex.location_id()],
))
assert task_manager._maybe_schedule_exchange_history_query() is None
@pytest.mark.vcr(filter_query_parameters=['apikey'])
@pytest.mark.parametrize('one_receipt_in_db', [True, False])
@pytest.mark.parametrize('ethereum_accounts', [[TEST_ADDR1, TEST_ADDR2]])
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_schedule_ethereum_txreceipts(
task_manager,
ethereum_manager,
eth_transactions,
database,
one_receipt_in_db,
):
task_manager.potential_tasks = [task_manager._maybe_schedule_evm_txreceipts] # pylint: disable=protected-member
_, receipts = setup_ethereum_transactions_test(
database=database,
transaction_already_queried=True,
one_receipt_in_db=one_receipt_in_db,
)
dbevmtx = DBEvmTx(database)
timeout = 10
tx_hash_1 = hexstring_to_bytes('0x692f9a6083e905bdeca4f0293f3473d7a287260547f8cbccc38c5cb01591fcda') # noqa: E501
tx_hash_2 = hexstring_to_bytes('0x6beab9409a8f3bd11f82081e99e856466a7daf5f04cca173192f79e78ed53a77') # noqa: E501
receipt_get_patch = patch.object(ethereum_manager.node_inquirer, 'get_transaction_receipt', wraps=ethereum_manager.node_inquirer.get_transaction_receipt) # pylint: disable=protected-member # noqa: E501
queried_receipts = set()
try:
with gevent.Timeout(timeout), receipt_get_patch as receipt_task_mock, mock_evm_chains_with_transactions(): # noqa: E501
task_manager.schedule()
with database.conn.read_ctx() as cursor:
while len(queried_receipts) != 2:
for txhash in (tx_hash_1, tx_hash_2):
if dbevmtx.get_receipt(cursor, txhash, ChainID.ETHEREUM) is not None:
queried_receipts.add(txhash)
gevent.sleep(.3)
task_manager.schedule()
gevent.sleep(.5)
assert receipt_task_mock.call_count == 1 if one_receipt_in_db else 2, '2nd schedule should do nothing' # noqa: E501
except gevent.Timeout as e:
raise AssertionError(f'receipts query was not completed within {timeout} seconds') from e
receipt1 = eth_transactions.get_or_query_transaction_receipt(tx_hash_1)
assert receipt1 == receipts[0]
receipt2 = eth_transactions.get_or_query_transaction_receipt(tx_hash_2)
assert receipt2 == receipts[1]
@pytest.mark.parametrize('max_tasks_num', [7])
@pytest.mark.parametrize('start_with_valid_premium', [True])
def test_check_premium_status(rotkehlchen_api_server, username):
"""
Test that the premium check tasks works correctly. The tests creates a valid subscription
and verifies that after the task was scheduled the users premium is deactivated.
"""
rotki = rotkehlchen_api_server.rest_api.rotkehlchen
gevent.killall(rotki.api_task_greenlets)
task_manager = rotki.task_manager
task_manager.potential_tasks = [task_manager._maybe_check_premium_status]
task_manager.last_premium_status_check = ts_now() - 3601
premium_credentials = PremiumCredentials(VALID_PREMIUM_KEY, VALID_PREMIUM_SECRET)
premium = Premium(credentials=premium_credentials, username=username)
premium.status = SubscriptionStatus.ACTIVE
def mock_check_premium_status():
task_manager.last_premium_status_check = ts_now() - PREMIUM_STATUS_CHECK
task_manager._maybe_check_premium_status()
with patch(
'rotkehlchen.db.dbhandler.DBHandler.get_rotkehlchen_premium',
MagicMock(return_value=premium_credentials),
):
assert premium.is_active() is True
assert rotki.premium is not None
with patch('rotkehlchen.premium.premium.Premium.is_active', MagicMock(return_value=False)):
mock_check_premium_status()
assert rotki.premium is None, (
'Premium object is not None and should be'
'deactivated after invalid premium credentials'
)
with patch('rotkehlchen.premium.premium.Premium.is_active', MagicMock(return_value=True)):
mock_check_premium_status()
assert rotki.premium is not None, (
'Permium object is None and Periodic check'
"didn't reactivate the premium status"
)
with patch(
'rotkehlchen.premium.premium.Premium.is_active',
MagicMock(side_effect=RemoteError()),
):
for check_trial in range(3):
mock_check_premium_status()
assert rotki.premium is not None, f'Premium object is None and should be active after the {check_trial} periodic check' # noqa: E501
mock_check_premium_status()
assert rotki.premium is None, 'Premium object is not None and should be deactivated after the 4th periodic check' # noqa: E501
with patch('rotkehlchen.premium.premium.Premium.is_active', MagicMock(return_value=True)):
mock_check_premium_status()
assert rotki.premium is not None, "Permium object is None and Periodic check didn't reactivate the premium status" # noqa: E501
@pytest.mark.parametrize('max_tasks_num', [5])
def test_update_snapshot_balances(task_manager):
task_manager.potential_tasks = [task_manager._maybe_update_snapshot_balances]
query_balances_patch = patch.object(
task_manager,
'query_balances',
)
timeout = 5
try:
with gevent.Timeout(timeout), query_balances_patch as query_mock:
task_manager.schedule()
while query_mock.call_count != 1:
gevent.sleep(.2)
query_mock.assert_called_once_with(
requested_save_data=True,
save_despite_errors=False,
timestamp=None,
ignore_cache=True,
)
except gevent.Timeout as e:
raise AssertionError(f'Update snapshot balances was not completed within {timeout} seconds') from e # noqa: E501
@pytest.mark.parametrize('max_tasks_num', [5])
def test_try_start_same_task(rotkehlchen_api_server):
"""
1. Checks that it is not possible to start 2 same tasks
2. Checks that is possible to start second same task when the first one finishes
"""
rotki = rotkehlchen_api_server.rest_api.rotkehlchen
# Using rotki.greenlet_manager instead of pure GreenletManager() since patch.object
# needs it for proper mocking
spawn_patch = patch.object(
rotki.greenlet_manager,
'spawn_and_track',
wraps=rotki.greenlet_manager.spawn_and_track,
)
def simple_task():
return [rotki.greenlet_manager.spawn_and_track(
method=lambda: gevent.sleep(0.1),
after_seconds=None,
task_name='Lol kek',
exception_is_error=True,
)]
with spawn_patch as patched:
rotki.task_manager.potential_tasks = [rotki.task_manager._maybe_update_snapshot_balances]
rotki.task_manager.schedule()
rotki.task_manager.schedule()
assert patched.call_count == 1
# Check that mapping in the task manager is correct
assert rotki.task_manager.running_greenlets.keys() == {
rotki.task_manager._maybe_update_snapshot_balances,
}
rotki.task_manager.potential_tasks = [simple_task]
rotki.task_manager.schedule() # start a small greenlet
assert patched.call_count == 2
assert rotki.task_manager.running_greenlets.keys() == {
rotki.task_manager._maybe_update_snapshot_balances,
simple_task, # check that mapping was updated
}
# Wait until our small greenlet finishes
gevent.wait(rotki.task_manager.running_greenlets[simple_task])
rotki.task_manager.potential_tasks = []
rotki.task_manager.schedule() # clear the mapping
assert rotki.task_manager.running_greenlets.keys() == { # and check that it was removed
rotki.task_manager._maybe_update_snapshot_balances,
}
# And make sure that now we are able to start it again
rotki.task_manager.potential_tasks = [simple_task]
rotki.task_manager.schedule()
assert patched.call_count == 3
assert rotki.task_manager.running_greenlets.keys() == {
rotki.task_manager._maybe_update_snapshot_balances,
simple_task,
}
def test_should_run_periodic_task(database: 'DBHandler') -> None:
"""
Check that should_run_periodic_task correctly reads the key_value_cache when they have been
set and where the database doesn't have them yet.
"""
assert should_run_periodic_task(
database=database,
key_name=DBCacheStatic.LAST_DATA_UPDATES_TS,
refresh_period=DATA_UPDATES_REFRESH,
) is True
with database.user_write() as write_cursor:
write_cursor.execute(
'INSERT INTO key_value_cache(name, value) VALUES (?, ?)',
(DBCacheStatic.LAST_DATA_UPDATES_TS.value, str(ts_now())),
)
assert should_run_periodic_task(
database=database,
key_name=DBCacheStatic.LAST_DATA_UPDATES_TS,
refresh_period=DATA_UPDATES_REFRESH,
) is False
# Trigger that the function returns true by having an old timestamp
with database.user_write() as write_cursor:
write_cursor.execute(
'UPDATE key_value_cache SET value=? WHERE NAME=?',
(str(ts_now() - DATA_UPDATES_REFRESH), DBCacheStatic.LAST_DATA_UPDATES_TS.value),
)
assert should_run_periodic_task(
database=database,
key_name=DBCacheStatic.LAST_DATA_UPDATES_TS,
refresh_period=DATA_UPDATES_REFRESH,
) is True
@pytest.mark.parametrize('ethereum_accounts', [[make_evm_address()]])
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_kill_running_tx_query_tasks(rotkehlchen_api_server, ethereum_accounts):
"""Test that using maybe_kill_running_tx_query_tasks deletes greenlet from the running tasks
Also test that if called two times without a schedule() in between, no KeyErrors happen.
These used to happen before a fix was introduced since the killed greenlet
was not removed from the tx_query_task_greenlets and/or api_task_greenlets.
"""
rotki = rotkehlchen_api_server.rest_api.rotkehlchen
address = ethereum_accounts[0]
rotki.task_manager.potential_tasks = [rotki.task_manager._maybe_query_evm_transactions]
eth_manager = rotki.chains_aggregator.get_chain_manager(SupportedBlockchain.ETHEREUM)
def patched_address_query_transactions(self, address, start_ts, end_ts): # pylint: disable=unused-argument
while True: # busy wait :D just for the test
gevent.sleep(1)
query_patch = patch.object(
eth_manager.transactions,
'single_address_query_transactions',
wraps=patched_address_query_transactions,
)
with query_patch:
rotki.task_manager.schedule() # Schedule the query
greenlet = rotki.task_manager.running_greenlets[rotki.task_manager._maybe_query_evm_transactions][0] # noqa: E501
assert greenlet.dead is False
assert 'Query ethereum transaction' in greenlet.task_name
# Running it twice to see it's handled properly and dead greenlet does not raise KeyErrors
rotki.maybe_kill_running_tx_query_tasks(SupportedBlockchain.ETHEREUM, [address])
assert greenlet.dead is True
rotki.maybe_kill_running_tx_query_tasks(SupportedBlockchain.ETHEREUM, [address])
assert greenlet.dead is True
# Do a reschedule to see that this clears running greenlets
rotki.task_manager.potential_tasks = []
rotki.task_manager.schedule()
assert len(rotki.task_manager.running_greenlets) == 0
@pytest.mark.parametrize('ethereum_accounts', [['0x2B888954421b424C5D3D9Ce9bB67c9bD47537d12', '0x9531C059098e3d194fF87FebB587aB07B30B1306']]) # noqa: E501
@pytest.mark.parametrize('ethereum_modules', [['eth2']])
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_query_ethereum_withdrawals(task_manager, ethereum_accounts):
task_manager.potential_tasks = [task_manager._maybe_query_withdrawals]
query_patch = patch.object(
task_manager.chains_aggregator.get_module('eth2'),
'query_services_for_validator_withdrawals',
side_effect=lambda *args, **kwargs: None,
)
with query_patch as query_mock:
task_manager.schedule()
gevent.sleep(0) # context switch for execution of task
assert query_mock.call_count == 1
# test the used query ranges
for hours_ago, expected_call_count, msg in (
(5, 2, 'should have ran again'),
(1, 2, 'should not have ran again'),
):
with task_manager.database.user_write() as write_cursor:
for address in ethereum_accounts:
task_manager.database.set_dynamic_cache(
write_cursor=write_cursor,
name=DBCacheDynamic.WITHDRAWALS_TS,
value=ts_now() - 3600 * hours_ago,
address=address,
)
task_manager.schedule()
gevent.sleep(0) # context switch for execution of task
assert query_mock.call_count == expected_call_count, msg
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_detect_new_spam_tokens(
task_manager: TaskManager,
database: 'DBHandler',
globaldb: GlobalDBHandler,
) -> None:
"""Test that the task updating the list of known spam assets works correctly"""
token = EvmToken.initialize( # add a token that will be detected as spam
address=make_evm_address(),
chain_id=ChainID.ETHEREUM,
token_kind=EvmTokenKind.ERC20,
name='$ vanityeth.org ($ vanityeth.org)',
symbol='VANITYTOKEN',
)
globaldb.add_asset(asset=token)
task_manager.potential_tasks = [task_manager._maybe_detect_new_spam_tokens]
task_manager.schedule()
gevent.joinall(task_manager.running_greenlets[task_manager._maybe_detect_new_spam_tokens]) # wait for the task to finish since it might context switch while running # noqa: E501
updated_token = EvmToken(token.identifier)
assert updated_token.protocol == SPAM_PROTOCOL
with database.conn.read_ctx() as cursor:
assert token.identifier in database.get_ignored_asset_ids(cursor=cursor)
cursor.execute(
'SELECT value FROM key_value_cache WHERE name=?',
(DBCacheStatic.LAST_SPAM_ASSETS_DETECT_KEY.value,),
)
assert deserialize_timestamp(cursor.fetchone()[0]) - ts_now() < 2 # saved timestamp should be recent # noqa: E501
@pytest.mark.vcr(filter_query_parameters=['apikey'])
@pytest.mark.parametrize('gnosis_accounts', [['0xcC3Da35614E6CAEEA7947d7Cd58000C350E7FF84']])
@pytest.mark.parametrize('ethereum_accounts', [['0xb524c787669185E11d01C645D1910631e04Fa5Eb']])
@pytest.mark.parametrize('max_tasks_num', [5])
def test_maybe_augmented_detect_new_spam_tokens(
task_manager: TaskManager,
database: 'DBHandler',
globaldb: GlobalDBHandler,
gnosis_inquirer,
ethereum_inquirer,
) -> None:
"""
Test the augmented spam detection schedule and behaviour. We use a token that is not detected
in the fast checks that we do and that is airdropped in a multisend transaction.
"""
tx_hex = deserialize_evm_tx_hash('0x6c10aaafec60e012316f54e2ac691b0a64d8744c21382fd3eb5013b4d1935bab') # noqa: E501
get_decoded_events_of_transaction(
evm_inquirer=gnosis_inquirer,
database=database,
tx_hash=tx_hex,
)
token = EvmToken(evm_address_to_identifier(
address='0x456FEb37ca5F087f7B59F5F684437cf1dd6e968f',
chain_id=ChainID.GNOSIS,
token_type=EvmTokenKind.ERC20,
))
assert token.protocol is None
# add a transaction for an asset that will get deleted from the
# globaldb but we will keep the events. To see nothing breaks.
tx_hex = deserialize_evm_tx_hash('0x5d7e7646e3749fcd575ea76e35763fa8eeb6dfb83c4c242a4448ee1495f695ba') # noqa: E501
get_decoded_events_of_transaction(
evm_inquirer=ethereum_inquirer,
database=database,
tx_hash=tx_hex,
)
globaldb.delete_asset_by_identifier(A_YFI.identifier)
task_manager.potential_tasks = [task_manager._maybe_augmented_detect_new_spam_tokens]
task_manager.schedule()
gevent.joinall(task_manager.running_greenlets[task_manager._maybe_augmented_detect_new_spam_tokens]) # wait for the task to finish since it might context switch while running # noqa: E501
updated_token = cast(EvmToken, globaldb.resolve_asset(identifier=token.identifier))
assert updated_token.protocol == SPAM_PROTOCOL
with database.conn.read_ctx() as cursor:
assert token.identifier in database.get_ignored_asset_ids(cursor=cursor)
cursor.execute(
'SELECT value FROM key_value_cache WHERE name=?',
(DBCacheStatic.LAST_AUGMENTED_SPAM_ASSETS_DETECT_KEY.value,),
)
assert deserialize_timestamp(cursor.fetchone()[0]) - ts_now() < 2 # saved timestamp should be recent # noqa: E501