Skip to content

Commit

Permalink
Extract make_protocol_decorator to use with different IPv8-based prot…
Browse files Browse the repository at this point in the history
…ocols
  • Loading branch information
kozlovsky committed Apr 26, 2022
1 parent d97c30f commit 002fbd2
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 38 deletions.
45 changes: 7 additions & 38 deletions src/tribler/core/components/ipv8/eva_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,16 @@
from collections import defaultdict, deque
from collections.abc import Coroutine
from dataclasses import dataclass
from functools import wraps
from itertools import chain
from random import SystemRandom
from typing import Awaitable, Callable, Dict, Iterable, List, Optional, Type, Union

from ipv8.community import Community
from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.types import Peer

from tribler.core.components.ipv8.protocol_decorator import make_protocol_decorator

__version__ = '2.1.0'

logger = logging.getLogger('EVA')
Expand Down Expand Up @@ -114,38 +114,7 @@ def __str__(self):
TransferErrorCallback = Callable[[Peer, TransferException], Coroutine]


def ipv8_message_handler(packet_type):
def decorator(func):
def inner(community, peer, payload):
# IPv8 always calls a registered packet handler with a Community instance as a first argument.
# In order to call the EVAProtocol method we need to replace the community with the protocol instance.
# When injecting EVAProtocol to a community it should always be named `eva` so the decorator can find it.
eva = getattr(community, 'eva', None)
if not eva:
raise TypeError(f'The {community.__class__.__name__} community does not have `eva` attribute!')
return func(eva, peer, payload)

lazy_wrapped = lazy_wrapper(packet_type)(inner)

@wraps(func)
def outer(protocol, peer, payload):
if isinstance(payload, bytes):
# The function was called by IPv8 for processing an incoming message.
# Let's use the lazy_wrapper machinery to deserialize the payload and call the decorated function.
if not hasattr(protocol, 'community'):
raise TypeError('The protocol instance should have a `community` attribute')

return lazy_wrapped(protocol.community, peer, payload)

if isinstance(payload, VariablePayload):
# The function is called manually (for example, in tests) and the payload is already deserialized.
# Let's call the function directly, no further preprocessing is necessary.
return func(protocol, peer, payload)

raise TypeError(f'Incorrect payload type: {payload.__class__.__name__}')

return outer
return decorator
message_handler = make_protocol_decorator('eva')


class EVAProtocol: # pylint: disable=too-many-instance-attributes
Expand Down Expand Up @@ -309,7 +278,7 @@ def start_outgoing_transfer(self, transfer: OutgoingTransfer):
self.community.register_anonymous_task('eva_terminate_by_timeout', self._terminate_by_timeout_task, transfer)
self.community.register_anonymous_task('eva_send_write_request', self._send_write_request_task, transfer)

@ipv8_message_handler(WriteRequest)
@message_handler(WriteRequest)
async def on_write_request_packet(self, peer: Peer, payload: WriteRequest):
logger.debug(f'On write request. Peer: {peer}. Info: {payload.info}. Size: {payload.data_size}')

Expand Down Expand Up @@ -338,7 +307,7 @@ async def on_write_request_packet(self, peer: Peer, payload: WriteRequest):
self.community.register_anonymous_task('eva_resend_acknowledge', self._resend_acknowledge_task, transfer)
self.send_message(peer, transfer.make_acknowledgement())

@ipv8_message_handler(Acknowledgement)
@message_handler(Acknowledgement)
async def on_acknowledgement_packet(self, peer: Peer, payload: Acknowledgement):
logger.debug(f'On acknowledgement({payload.number}). Window size: {payload.window_size}. Peer: {peer}.')

Expand All @@ -361,7 +330,7 @@ async def on_acknowledgement_packet(self, peer: Peer, payload: Acknowledgement):
logger.debug(f'Transmit({data.number}). Peer: {peer}.')
self.send_message(peer, data)

@ipv8_message_handler(Data)
@message_handler(Data)
async def on_data_packet(self, peer, payload):
await self._on_data_packet(peer, payload)

Expand All @@ -384,7 +353,7 @@ async def _on_data_packet(self, peer, payload):
if transfer.finished:
self.send_scheduled()

@ipv8_message_handler(Error)
@message_handler(Error)
async def on_error_packet(self, peer: Peer, error: Error):
message = error.message.decode('utf-8')
logger.debug(f'On error. Peer: {peer}. Message: "{message}"')
Expand Down
70 changes: 70 additions & 0 deletions src/tribler/core/components/ipv8/protocol_decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from functools import wraps

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload


def make_protocol_decorator(protocol_attr_name):
"""
A decorator factory that generates a lazy_wrapper-analog decorator for a specific IPv8 protocol.
IPv8 has `lazy_wrapper` decorator that can be applied to a community methods to handle deserialization
of incoming IPv8 messages. It cannot be used in classes that are not instances of Community.
make_prococol_decorator generates a similar decorator to a protocol class that is not a community,
but used inside a community. A protocol should be an attribute of a community, and you need to specify
the name of this attribute when calling make_protocol_decorator.
Example of usage:
>>> from ipv8.community import Community
>>> message_handler = make_protocol_decorator('my_protocol')
>>> class MyProtocol:
... @message_handler(VariablePayload1)
... def on_receive_message1(self, peer, payload):
... ...
... @message_handler(VariablePayload2)
... def on_receive_message2(self, peer, payload):
... ...
>>> class MyCommunity(Community):
... def __init__(self, *args, **kwargs):
... super().__init__()
... self.my_protocol = MyProtocol(...) # the name should be the same as in make_protocol_decorator
...
"""

def protocol_decorator(packet_type):
def actual_decorator(func):
def inner(community, peer, payload):
# IPv8 always calls a registered packet handler with a Community instance as a first argument.
# In order to call the protocol method we need to replace the community with the protocol instance.
# We try to find the protocol instance in the community. It should be stored in the attribute
# that name is specified in the `protocol_atr_name` parameter of the decorator's factory.
protocol = getattr(community, protocol_attr_name, None)
if not protocol:
raise TypeError(f'The {community.__class__.__name__} community does not have `{protocol_attr_name}` attribute!')

return func(protocol, peer, payload)

lazy_wrapped = lazy_wrapper(packet_type)(inner)

@wraps(func)
def outer(protocol, peer, payload):
if isinstance(payload, bytes):
# The function was called by IPv8 for processing an incoming message.
# Let's use the lazy_wrapper machinery to deserialize the payload and call the decorated function.
if not hasattr(protocol, 'community'):
raise TypeError('The protocol instance should have a `community` attribute')

return lazy_wrapped(protocol.community, peer, payload)

if isinstance(payload, VariablePayload):
# The function is called manually (for example, in tests) and the payload is already deserialized.
# Let's call the function directly, no further preprocessing is necessary.
return func(protocol, peer, payload)

raise TypeError(f'Incorrect payload type: {payload.__class__.__name__}')

return outer
return actual_decorator
return protocol_decorator

0 comments on commit 002fbd2

Please sign in to comment.