Skip to content

Commit

Permalink
Patch Release: v1.4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexV525 authored May 6, 2024
2 parents dd15e23 + 341abdc commit 47fd473
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ jobs:
- name: Run tests
run: |
pip install pytest
pytest --cov=electrumx --ignore=tests/test_blocks.py
pytest --cov=electrumx --ignore=tests/test_blocks.py --ignore=tests/server/test_compaction.py
76 changes: 41 additions & 35 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
LOCATION_ID_LEN = 36
TX_OUTPUT_IDX_LEN = 4


class Prefetcher:
'''Prefetches blocks (in the forward direction only).'''

Expand Down Expand Up @@ -248,6 +249,7 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat
# Meta
self.next_cache_check = 0
self.touched = set()
self.semaphore = asyncio.Semaphore()
self.reorg_count = 0
self.height = -1
self.tip = None # type: Optional[bytes]
Expand Down Expand Up @@ -339,6 +341,7 @@ async def check_and_advance_blocks(self, raw_blocks):
await self.notifications.on_block(self.touched, self.height)
self.touched = set()
elif hprevs[0] != chain[0]:
self.logger.info(f'check_and_advance_blocks reorg: {first}')
await self.reorg_chain()
else:
# It is probably possible but extremely rare that what
Expand All @@ -351,42 +354,44 @@ async def check_and_advance_blocks(self, raw_blocks):
await self.prefetcher.reset_height(self.height)

async def reorg_chain(self, count=None):
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
if count is None:
self.logger.info('chain reorg detected')
else:
self.logger.info(f'faking a reorg of {count:,d} blocks')
await self.flush(True)
# Use Semaphore to ensure only one reorg signal was held.
async with self.semaphore:
'''Handle a chain reorganisation.
Count is the number of blocks to simulate a reorg, or None for
a real reorg.'''
if count is None:
self.logger.info('chain reorg detected')
else:
self.logger.info(f'faking a reorg of {count:,d} blocks')
await self.flush(True)

async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]:
heights = range(last_height, last_height - len(hex_hashes), -1)
try:
blocks = [self.db.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
except FileNotFoundError:
return await self.daemon.raw_blocks(hex_hashes)

def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)

_start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.prefetcher.reset_height(self.height)
self.backed_up_event.set()
self.backed_up_event.clear()
async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]:
heights = range(last_height, last_height - len(hex_hashes), -1)
try:
blocks = [self.db.read_raw_block(height) for height in heights]
self.logger.info(f'read {len(blocks)} blocks from disk')
return blocks
except FileNotFoundError:
return await self.daemon.raw_blocks(hex_hashes)

def flush_backup():
# self.touched can include other addresses which is
# harmless, but remove None.
self.touched.discard(None)
self.db.flush_backup(self.flush_data(), self.touched)

_start, last, hashes = await self.reorg_hashes(count)
# Reverse and convert to hex strings.
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
for hex_hashes in chunks(hashes, 50):
raw_blocks = await get_raw_blocks(last, hex_hashes)
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
await self.run_in_thread_with_lock(flush_backup)
last -= len(raw_blocks)
await self.prefetcher.reset_height(self.height)
self.backed_up_event.set()
self.backed_up_event.clear()

async def reorg_hashes(self, count):
'''Return a pair (start, last, hashes) of blocks to back up during a
Expand Down Expand Up @@ -3556,6 +3561,7 @@ async def _process_prefetched_blocks(self):
await self.blocks_event.wait()
self.blocks_event.clear()
if self.reorg_count:
self.logger.info(f'_process_prefetched_blocks reorg: {self.reorg_count}')
await self.reorg_chain(self.reorg_count)
self.reorg_count = 0
else:
Expand Down
4 changes: 2 additions & 2 deletions electrumx/server/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import electrumx.lib.util as util
from electrumx.lib.hash import HASHX_LEN, hash_to_hex_str
from electrumx.lib.util import (pack_be_uint16, pack_le_uint64,
from electrumx.lib.util import (pack_be_uint16, pack_be_uint32, pack_le_uint64,
unpack_be_uint16_from, unpack_le_uint64)

if TYPE_CHECKING:
Expand Down Expand Up @@ -157,7 +157,7 @@ def assert_flushed(self):
def flush(self):
start_time = time.monotonic()
self.flush_count += 1
flush_id = pack_be_uint16(self.flush_count)
flush_id = pack_be_uint32(self.flush_count)
unflushed = self.unflushed

with self.db.write_batch() as batch:
Expand Down
10 changes: 8 additions & 2 deletions electrumx/server/http_middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,14 @@ def success_resp(data) -> web.Response:

def request_middleware(self) -> web_middlewares:
async def factory(app: web.Application, handler):
async def middleware_handler(request):
self.logger.info('Request {} comming'.format(request))
async def middleware_handler(request: web.Request):
# Log request details as a future.
async def log_request():
method = request.path
params = await request.json() if request.content_length else None
self.logger.debug(f'HTTP request handling: [method] {method}, [params]: {params}')
asyncio.ensure_future(log_request())

if not self.env.enable_rate_limit:
response = await handler(request)
if isinstance(response, web.Response):
Expand Down
20 changes: 13 additions & 7 deletions electrumx/server/http_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,15 @@ def __init__(self, session_mgr, db, mempool, peer_mgr, kind):
self.MAX_CHUNK_SIZE = 2016
self.hashX_subs = {}

async def format_params(self, request):
async def format_params(self, request: web.Request):
params: list
if request.method == "GET":
params = json.loads(request.query.get("params", "[]"))
else:
elif request.content_length:
json_data = await request.json()
params = json_data.get("params", [])
else:
params = []
return dict(zip(range(len(params)), params))

async def get_rpc_server(self):
Expand Down Expand Up @@ -860,11 +863,8 @@ async def transaction_broadcast(self, request):
# verified
async def scripthash_get_history(self, request):
'''Return the confirmed and unconfirmed history of a scripthash.'''
try:
params = await self.format_params(request)
scripthash = params.get(0, "")
except Exception as e:
scripthash = request
params = await self.format_params(request)
scripthash = params.get(0)

hashX = scripthash_to_hashX(scripthash)
return await self.confirmed_and_unconfirmed_history(hashX)
Expand Down Expand Up @@ -1166,6 +1166,12 @@ async def atomicals_num_to_id(self, request):
atomicals_num_to_id_map_reformatted[num] = location_id_bytes_to_compact(id)
return {'global': await self.get_summary_info(), 'result': atomicals_num_to_id_map_reformatted }

async def atomicals_block_hash(self, request):
params = await self.format_params(request)
height = params.get(0, self.session_mgr.bp.height)
block_hash = self.db.get_atomicals_block_hash(height)
return {'result': block_hash}

async def atomicals_block_txs(self, request):
params = await self.format_params(request)
height = params.get(0, "")
Expand Down
19 changes: 16 additions & 3 deletions electrumx/server/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ async def _start_servers(self, services):
app.router.add_get('/proxy/blockchain.atomicals.listscripthash', handler.atomicals_listscripthash)
app.router.add_get('/proxy/blockchain.atomicals.list', handler.atomicals_list)
app.router.add_get('/proxy/blockchain.atomicals.get_numbers', handler.atomicals_num_to_id)
app.router.add_get('/proxy/blockchain.atomicals.get_block_hash', handler.atomicals_block_hash)
app.router.add_get('/proxy/blockchain.atomicals.get_block_txs', handler.atomicals_block_txs)
app.router.add_get('/proxy/blockchain.atomicals.dump', handler.atomicals_dump)
app.router.add_get('/proxy/blockchain.atomicals.at_location', handler.atomicals_at_location)
Expand Down Expand Up @@ -334,6 +335,7 @@ async def _start_servers(self, services):
app.router.add_post('/proxy/blockchain.atomicals.listscripthash', handler.atomicals_listscripthash)
app.router.add_post('/proxy/blockchain.atomicals.list', handler.atomicals_list)
app.router.add_post('/proxy/blockchain.atomicals.get_numbers', handler.atomicals_num_to_id)
app.router.add_post('/proxy/blockchain.atomicals.get_block_hash', handler.atomicals_block_hash)
app.router.add_post('/proxy/blockchain.atomicals.get_block_txs', handler.atomicals_block_txs)
app.router.add_post('/proxy/blockchain.atomicals.dump', handler.atomicals_dump)
app.router.add_post('/proxy/blockchain.atomicals.at_location', handler.atomicals_at_location)
Expand Down Expand Up @@ -1191,14 +1193,18 @@ def sub_count(self):
return 0

async def handle_request(self, request):
'''Handle an incoming request. ElectrumX doesn't receive
"""Handle an incoming request. ElectrumX doesn't receive
notifications from client sessions.
'''
"""
if isinstance(request, Request):
handler = self.request_handlers.get(request.method)
method = request.method
args = request.args
else:
handler = None
method = 'invalid method' if handler is None else request.method
method = 'invalid method'
args = None
self.logger.debug(f'Session request handling: [method] {method}, [args] {args}')

# If DROP_CLIENT_UNKNOWN is enabled, check if the client identified
# by calling server.version previously. If not, disconnect the session
Expand Down Expand Up @@ -1544,6 +1550,12 @@ async def atomicals_num_to_id(self, limit, offset, asc):
atomicals_num_to_id_map_reformatted[num] = location_id_bytes_to_compact(id)
return {'global': await self.get_summary_info(), 'result': atomicals_num_to_id_map_reformatted }

async def atomicals_block_hash(self, height):
if not height:
height = self.session_mgr.bp.height
block_hash = self.db.get_atomicals_block_hash(height)
return {'result': block_hash}

async def atomicals_block_txs(self, height):
tx_list = self.session_mgr.bp.get_atomicals_block_txs(height)
return {'global': await self.get_summary_info(), 'result': tx_list }
Expand Down Expand Up @@ -3102,6 +3114,7 @@ def set_request_handlers(self, ptuple):
'blockchain.atomicals.listscripthash': self.atomicals_listscripthash,
'blockchain.atomicals.list': self.atomicals_list,
'blockchain.atomicals.get_numbers': self.atomicals_num_to_id,
'blockchain.atomicals.get_block_hash': self.atomicals_block_hash,
'blockchain.atomicals.get_block_txs': self.atomicals_block_txs,
'blockchain.atomicals.dump': self.atomicals_dump,
'blockchain.atomicals.at_location': self.atomicals_at_location,
Expand Down
2 changes: 1 addition & 1 deletion electrumx_server
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ load_dotenv()
def main():
'''Set up logging and run the server.'''
log_fmt = Env.default('LOG_FORMAT', '%(levelname)s:%(name)s:%(message)s')
log_level = Env.default('LOG_LEVEL', 'INFO')
log_level = Env.default('LOG_LEVEL', 'INFO').upper()
log_path = Env.default('LOG_PATH', None)
if log_path:
exist =os.path.exists(log_path)
Expand Down
6 changes: 3 additions & 3 deletions tests/server/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from electrumx.lib.hash import HASHX_LEN
from electrumx.lib.util import pack_be_uint16, pack_le_uint64
from electrumx.lib.util import pack_be_uint16, pack_be_uint32, pack_le_uint64
from electrumx.server.env import Env
from electrumx.server.db import DB

Expand Down Expand Up @@ -49,7 +49,7 @@ def check_hashX_compaction(history):
hist_list = []
hist_map = {}
for flush_count, count in pairs:
key = hashX + pack_be_uint16(flush_count)
key = hashX + pack_be_uint32(flush_count)
hist = full_hist[cum * 5: (cum+count) * 5]
hist_map[key] = hist
hist_list.append(hist)
Expand All @@ -68,7 +68,7 @@ def check_hashX_compaction(history):
assert item == (hashX + pack_be_uint16(n),
full_hist[n * row_size: (n + 1) * row_size])
for flush_count, count in pairs:
assert hashX + pack_be_uint16(flush_count) in keys_to_delete
assert hashX + pack_be_uint32(flush_count) in keys_to_delete

# Check re-compaction is null
hist_map = {key: value for key, value in write_items}
Expand Down

0 comments on commit 47fd473

Please sign in to comment.