Skip to content

Commit

Permalink
asyncio: stop using get_event_loop(). introduce ~singleton loop.
Browse files Browse the repository at this point in the history
asyncio.get_event_loop() became deprecated in python3.10. (see python/cpython#83710)
```
.../electrum/electrum/daemon.py:470: DeprecationWarning: There is no current event loop
  self.asyncio_loop = asyncio.get_event_loop()
.../electrum/electrum/network.py:276: DeprecationWarning: There is no current event loop
  self.asyncio_loop = asyncio.get_event_loop()
```
Also, according to that thread, "set_event_loop() [... is] not deprecated by oversight".
So, we stop using get_event_loop() and set_event_loop() in our own code.
Note that libraries we use (such as the stdlib for python <3.10), might call get_event_loop,
which then relies on us having called set_event_loop e.g. for the GUI thread. To work around
this, a custom event loop policy providing a get_event_loop implementation is used.

Previously, we have been using a single asyncio event loop, created with
util.create_and_start_event_loop, and code in many places got a reference to this loop
using asyncio.get_event_loop().
Now, we still use a single asyncio event loop, but it is now stored as a global in
util._asyncio_event_loop (access with util.get_asyncio_loop()).

I believe these changes also fix #5376
  • Loading branch information
SomberNight committed Apr 29, 2022
1 parent 334da24 commit b6a14a2
Show file tree
Hide file tree
Showing 13 changed files with 111 additions and 58 deletions.
2 changes: 1 addition & 1 deletion electrum/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def _run(self, method, args, password_getter=None, **kwargs):
kwargs.pop('wallet')

coro = f(*args, **kwargs)
fut = asyncio.run_coroutine_threadsafe(coro, asyncio.get_event_loop())
fut = asyncio.run_coroutine_threadsafe(coro, util.get_asyncio_loop())
result = fut.result()

if self._callback:
Expand Down
4 changes: 2 additions & 2 deletions electrum/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def request(config: SimpleConfig, endpoint, args=(), timeout=60):
rpc_user, rpc_password = get_rpc_credentials(config)
server_url = 'http://%s:%d' % (host, port)
auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
async def request_coroutine():
if socktype == 'unix':
connector = aiohttp.UnixConnector(path=path)
Expand Down Expand Up @@ -467,7 +467,7 @@ def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
if 'wallet_path' in config.cmdline_options:
self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
"Use the load_wallet command instead.")
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.network = None
if not config.get('offline'):
self.network = Network(config, daemon=self)
Expand Down
4 changes: 2 additions & 2 deletions electrum/exchange_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def get_historical_rates(self, ccy: str, cache_dir: str) -> None:
if h is None:
h = self.read_historical_rates(ccy, cache_dir)
if h is None or h['timestamp'] < time.time() - 24*3600:
asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
util.get_asyncio_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))

def history_ccys(self) -> Sequence[str]:
return []
Expand Down Expand Up @@ -471,7 +471,7 @@ async def query_all_exchanges_for_their_ccys_over_network():
for name, klass in exchanges.items():
exchange = klass(None, None)
await group.spawn(get_currencies_safe(name, exchange))
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
try:
loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network())
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion electrum/gui/kivy/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def __init__(self, **kwargs):
self.is_exit = False
self.wallet = None # type: Optional[Abstract_Wallet]
self.pause_time = 0
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.password = None
self._use_single_password = False
self.resume_dialog = None
Expand Down
2 changes: 1 addition & 1 deletion electrum/lnworker.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ async def add_peer(self, connect_str: str) -> Peer:
# Try DNS-resolving the host (if needed). This is simply so that
# the caller gets a nice exception if it cannot be resolved.
try:
await asyncio.get_event_loop().getaddrinfo(host, port)
await asyncio.get_running_loop().getaddrinfo(host, port)
except socket.gaierror:
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
# add peer
Expand Down
12 changes: 7 additions & 5 deletions electrum/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
init_retry_delay_urgent=1,
)

self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
assert self.asyncio_loop.is_running(), "event loop not running"

assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
Expand Down Expand Up @@ -381,9 +381,11 @@ async def stop_gossip(self, *, full_shutdown: bool = False):
self.channel_db = None
self.path_finder = None

def run_from_another_thread(self, coro, *, timeout=None):
assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread'
fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
@classmethod
def run_from_another_thread(cls, coro, *, timeout=None):
loop = util.get_asyncio_loop()
assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
fut = asyncio.run_coroutine_threadsafe(coro, loop)
return fut.result(timeout)

@staticmethod
Expand Down Expand Up @@ -1321,7 +1323,7 @@ def send_http_on_proxy(cls, method, url, **kwargs):
assert util.get_running_loop() != network.asyncio_loop
loop = network.asyncio_loop
else:
loop = asyncio.get_event_loop()
loop = util.get_asyncio_loop()
coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
# note: _send_http_on_proxy has its own timeout, so no timeout here:
return coro.result()
Expand Down
1 change: 0 additions & 1 deletion electrum/sql_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class SqlDB(Logger):
def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None):
Logger.__init__(self)
self.asyncio_loop = asyncio_loop
asyncio.set_event_loop(asyncio_loop)
self.stopping = False
self.stopped_event = asyncio.Event()
self.path = path
Expand Down
5 changes: 3 additions & 2 deletions electrum/tests/test_lnpeer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import electrum
import electrum.trampoline
from electrum import bitcoin
from electrum import util
from electrum import constants
from electrum.network import Network
from electrum.ecc import ECPrivkey
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, tx_queue):
user_config = {}
user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-")
self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir)
self.asyncio_loop = asyncio.get_event_loop()
self.asyncio_loop = util.get_asyncio_loop()
self.channel_db = ChannelDB(self)
self.channel_db.data_loaded.set()
self.path_finder = LNPathFinder(self.channel_db)
Expand Down Expand Up @@ -1361,4 +1362,4 @@ async def f():


def run(coro):
return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result()
return asyncio.run_coroutine_threadsafe(coro, loop=util.get_asyncio_loop()).result()
3 changes: 2 additions & 1 deletion electrum/tests/test_lnrouter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import asyncio

from electrum import util
from electrum.util import bh2u, bfh, create_and_start_event_loop
from electrum.lnutil import ShortChannelID
from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet,
Expand Down Expand Up @@ -64,7 +65,7 @@ def prepare_graph(self):
"""
class fake_network:
config = self.config
asyncio_loop = asyncio.get_event_loop()
asyncio_loop = util.get_asyncio_loop()
trigger_callback = lambda *args: None
register_callback = lambda *args: None
interface = None
Expand Down
16 changes: 13 additions & 3 deletions electrum/tests/test_lntransport.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio

from electrum import util
from electrum.ecc import ECPrivkey
from electrum.lnutil import LNPeerAddr
from electrum.lntransport import LNResponderTransport, LNTransport
Expand All @@ -11,6 +12,15 @@

class TestLNTransport(ElectrumTestCase):

def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()

def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()

@needs_test_with_all_chacha20_implementations
def test_responder(self):
# local static
Expand Down Expand Up @@ -38,11 +48,11 @@ async def read(self, num_bytes):
assert num_bytes == 66
return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba')
transport = LNResponderTransport(ls_priv, Reader(), Writer())
asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv))
asyncio.run_coroutine_threadsafe(
transport.handshake(epriv=e_priv), self.asyncio_loop).result()

@needs_test_with_all_chacha20_implementations
def test_loop(self):
loop = asyncio.get_event_loop()
responder_shaked = asyncio.Event()
server_shaked = asyncio.Event()
responder_key = ECPrivkey.generate_random_key()
Expand Down Expand Up @@ -96,4 +106,4 @@ async def f():
server.close()
await server.wait_closed()

loop.run_until_complete(f())
asyncio.run_coroutine_threadsafe(f(), self.asyncio_loop).result()
23 changes: 18 additions & 5 deletions electrum/tests/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from electrum.interface import Interface, ServerAddr
from electrum.crypto import sha256
from electrum.util import bh2u
from electrum import util

from . import ElectrumTestCase

Expand All @@ -17,7 +18,9 @@ async def spawn(self, x): return

class MockNetwork:
taskgroup = MockTaskGroup()
asyncio_loop = asyncio.get_event_loop()

def __init__(self):
self.asyncio_loop = util.get_asyncio_loop()

class MockInterface(Interface):
def __init__(self, config):
Expand Down Expand Up @@ -52,9 +55,15 @@ def tearDownClass(cls):

def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})
self.interface = MockInterface(self.config)

def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()

def test_fork_noconflict(self):
blockchain.blockchains = {}
self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}})
Expand All @@ -66,7 +75,8 @@ def mock_connect(height):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
self.assertEqual(('fork', 8), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)

def test_fork_conflict(self):
Expand All @@ -80,7 +90,8 @@ def mock_connect(height):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
self.assertEqual(('fork', 8), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)

def test_can_connect_during_backward(self):
Expand All @@ -93,7 +104,8 @@ def mock_connect(height):
self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=4), util.get_asyncio_loop())
self.assertEqual(('catchup', 5), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)

def mock_fork(self, bad_header):
Expand All @@ -113,7 +125,8 @@ def test_chain_false_during_binary(self):
self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
ifa = self.interface
self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6)))
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=6), util.get_asyncio_loop())
self.assertEqual(('catchup', 7), fut.result())
self.assertEqual(self.interface.q.qsize(), 0)


Expand Down
39 changes: 13 additions & 26 deletions electrum/tests/test_wallet_vertical.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from electrum import storage, bitcoin, keystore, bip32, slip39, wallet
from electrum import Transaction
from electrum import SimpleConfig
from electrum import util
from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet,
restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy)
Expand All @@ -18,6 +19,7 @@
from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput,
PartialTxInput, tx_from_any, TxOutpoint)
from electrum.mnemonic import seed_type
from electrum.network import Network

from electrum.plugins.trustedcoin import trustedcoin

Expand Down Expand Up @@ -699,8 +701,14 @@ class TestWalletSending(TestCaseForTestnet):

def setUp(self):
super().setUp()
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
self.config = SimpleConfig({'electrum_path': self.electrum_path})

def tearDown(self):
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
self._loop_thread.join(timeout=1)
super().tearDown()

def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2):
if config is None:
config = self.config
Expand Down Expand Up @@ -1369,14 +1377,7 @@ async def get_transaction(self, txid, timeout=None):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
Expand Down Expand Up @@ -1429,14 +1430,7 @@ async def get_transaction(self, txid, timeout=None):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
Expand Down Expand Up @@ -1844,8 +1838,8 @@ async def get_transaction(self, txid):
network = NetworkMock()
dest_addr = 'tb1q3ws2p0qjk5vrravv065xqlnkckvzcpclk79eu2'
sweep_coro = sweep(privkeys, network=network, config=self.config, to_address=dest_addr, fee=5000, locktime=1325785, tx_version=1)
loop = asyncio.get_event_loop()
tx = loop.run_until_complete(sweep_coro)
loop = util.get_asyncio_loop()
tx = asyncio.run_coroutine_threadsafe(sweep_coro, loop).result()

tx_copy = tx_from_any(tx.serialize())
self.assertEqual('010000000129349e5641d79915e9d0282fdbaee8c3df0b6731bab9d70bf626e8588bde24ac010000004847304402206bf0d0a93abae0d5873a62ebf277a5dd2f33837821e8b93e74d04e19d71b578002201a6d729bc159941ef5c4c9e5fe13ece9fc544351ba531b00f68ba549c8b38a9a01fdffffff01b82e0f00000000001600148ba0a0bc12b51831f58c7ea8607e76c5982c071fd93a1400',
Expand Down Expand Up @@ -2199,14 +2193,7 @@ async def get_transaction(self, txid, timeout=None):
raise Exception("unexpected txid")
def has_internet_connection(self):
return True
def run_from_another_thread(self, coro, *, timeout=None):
loop, stop_loop, loop_thread = create_and_start_event_loop()
fut = asyncio.run_coroutine_threadsafe(coro, loop)
try:
return fut.result(timeout)
finally:
loop.call_soon_threadsafe(stop_loop.set_result, 1)
loop_thread.join(timeout=1)
run_from_another_thread = Network.run_from_another_thread
def get_local_height(self):
return 0
def blockchain(self):
Expand Down
Loading

0 comments on commit b6a14a2

Please sign in to comment.