Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for announcing and querying LBRY streams over BEP15 (BitTorrent Trackers) #3576

Merged
merged 50 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
4a0bf8a
add torrent udp tracker client, server and tests
shyba Mar 5, 2022
006391d
move udp server to test file, add link to BEP15
shyba Mar 5, 2022
4ea858f
add new conf: tracker_servers
shyba Mar 5, 2022
2df8a1d
make a helper function to announce
shyba Mar 5, 2022
dc6f8c4
add arg to announce stopped, removing the announcement
shyba Mar 5, 2022
3989eef
return whole announcement so the caller knows the interval
shyba Mar 5, 2022
30e8728
use tracker on download
shyba Mar 5, 2022
7b425eb
add tracker announcer component
shyba Mar 5, 2022
758f9de
fix unit tests
shyba Mar 5, 2022
2344aca
fix component property
shyba Mar 5, 2022
24eb189
skip component on test cli
shyba Mar 5, 2022
7acaeca
managed_stream: remove unused imports
shyba Mar 8, 2022
9e9a64d
evented system for tracker announcements
shyba Mar 8, 2022
888e991
improve timeout handling
shyba Mar 8, 2022
43e50f7
fix subscribe_hash
shyba Mar 8, 2022
2d9c574
cache results, save interval on tracker
shyba Mar 8, 2022
30acde0
at most 10 announces concurrently
shyba Mar 8, 2022
3855db6
pause announcer for 1 minute each round
shyba Mar 8, 2022
28fdd62
move concurreny control to lower layer
shyba Mar 8, 2022
61c99ab
avoid readding the same hash when tracker is busy with too many files
shyba Mar 8, 2022
47e432b
make it less verbose, only log after all events are fired
shyba Mar 8, 2022
42fd1c9
stop tracker tasks on shutdown
shyba Mar 8, 2022
05124d4
only log when really announcing, stop counting cached ones
shyba Mar 8, 2022
2f1617e
less verbose on timeouts, dont count timeouts, fix stop
shyba Mar 9, 2022
a3da041
fix exceptions on shutdown, stop using cancel_tasks
shyba Mar 9, 2022
eccf0e6
fix reusing result interval from failed expired attempt
shyba Mar 9, 2022
0e4f1ea
reduce timeout to 10, fix lints
shyba Mar 9, 2022
cc4a578
tests: add support for multiple trackers
shyba Mar 9, 2022
e299a9c
tests: multiple trackers, simple case
shyba Mar 9, 2022
407c570
tests: lower timeout, add test with bad and good mixed
shyba Mar 9, 2022
2918d8c
tracker component is running only if the task is alive
shyba Mar 9, 2022
d4aca89
handle multiple results from multiple trackers
shyba Mar 9, 2022
99fc717
better way to batch announce + handle different intervals for differe…
shyba Mar 9, 2022
16a2023
stop tasks before removing transport
shyba Mar 9, 2022
4e09b35
remove unused import and dead code
shyba Mar 9, 2022
6e5c7a1
use cache_concurrent to avoid requesting the same connection_id multi…
shyba Mar 9, 2022
3c46cc4
expire connection id quicker as some trackers have it set low
shyba Mar 9, 2022
7e6ea97
make peer id according to BEP20
shyba Mar 11, 2022
a7cea40
tracker:log DNS errors as warning instead of trace
shyba Mar 11, 2022
1169a02
make client server updatable from conf
shyba Mar 12, 2022
2e85e29
peer id PREFIX is a constant
shyba Mar 12, 2022
c276053
move server implementation to tracker module
shyba Mar 12, 2022
235cc5d
results are indexed by ip, setdefault after resolve
shyba Mar 12, 2022
b3f894e
add integration test for tracker discovery
shyba Mar 12, 2022
7d560df
use same arg name as overriden datagram_received (linting)
shyba Mar 12, 2022
3dc145f
make peer list query trackers
shyba Apr 4, 2022
7cba51c
update tests, query with port 0, filter bad ports earlier, make unit …
shyba Apr 4, 2022
e54cc88
return KademliaPeers directly into the queue instead of exposing Anno…
shyba Apr 5, 2022
6298123
changes from review
shyba May 6, 2022
03b0d5e
tracker client: extract default timeout and concurreny. Bump concurre…
shyba May 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lbry/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,6 +681,10 @@ class Config(CLIConfig):
('cdn.reflector.lbry.com', 5567)
])

tracker_servers = Servers("BitTorrent-compatible (BEP15) UDP trackers for helping P2P discovery", [
('tracker.lbry.com', 1337)
])

lbryum_servers = Servers("SPV wallet servers", [
('spv11.lbry.com', 50001),
('spv12.lbry.com', 50001),
Expand Down
49 changes: 49 additions & 0 deletions lbry/extras/daemon/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
from lbry.torrent.torrent_manager import TorrentManager
from lbry.wallet import WalletManager
from lbry.wallet.usage_payment import WalletServerPayer
from lbry.torrent.tracker import TrackerClient

try:
from lbry.torrent.session import TorrentSession
except ImportError:
Expand All @@ -48,6 +50,7 @@
PEER_PROTOCOL_SERVER_COMPONENT = "peer_protocol_server"
UPNP_COMPONENT = "upnp"
EXCHANGE_RATE_MANAGER_COMPONENT = "exchange_rate_manager"
TRACKER_ANNOUNCER_COMPONENT = "tracker_announcer_component"
LIBTORRENT_COMPONENT = "libtorrent_component"


Expand Down Expand Up @@ -708,3 +711,49 @@ async def start(self):

async def stop(self):
self.exchange_rate_manager.stop()


class TrackerAnnouncerComponent(Component):
component_name = TRACKER_ANNOUNCER_COMPONENT
depends_on = [FILE_MANAGER_COMPONENT]

def __init__(self, component_manager):
super().__init__(component_manager)
self.file_manager = None
self.announce_task = None
self.tracker_client: typing.Optional[TrackerClient] = None

@property
def component(self):
return self.tracker_client

@property
def running(self):
return self._running and self.announce_task and not self.announce_task.done()

async def announce_forever(self):
while True:
sleep_seconds = 60.0
announce_sd_hashes = []
for file in self.file_manager.get_filtered():
if not file.downloader:
continue
announce_sd_hashes.append(bytes.fromhex(file.sd_hash))
await self.tracker_client.announce_many(*announce_sd_hashes)
await asyncio.sleep(sleep_seconds)

async def start(self):
node = self.component_manager.get_component(DHT_COMPONENT) \
if self.component_manager.has_component(DHT_COMPONENT) else None
node_id = node.protocol.node_id if node else None
self.tracker_client = TrackerClient(node_id, self.conf.tcp_port, lambda: self.conf.tracker_servers)
await self.tracker_client.start()
self.file_manager = self.component_manager.get_component(FILE_MANAGER_COMPONENT)
self.announce_task = asyncio.create_task(self.announce_forever())

async def stop(self):
self.file_manager = None
if self.announce_task and not self.announce_task.done():
self.announce_task.cancel()
self.announce_task = None
self.tracker_client.stop()
25 changes: 16 additions & 9 deletions lbry/extras/daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from lbry.extras import system_info
from lbry.extras.daemon import analytics
from lbry.extras.daemon.components import WALLET_COMPONENT, DATABASE_COMPONENT, DHT_COMPONENT, BLOB_COMPONENT
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT
from lbry.extras.daemon.components import FILE_MANAGER_COMPONENT, DISK_SPACE_COMPONENT, TRACKER_ANNOUNCER_COMPONENT
from lbry.extras.daemon.components import EXCHANGE_RATE_MANAGER_COMPONENT, UPNP_COMPONENT
from lbry.extras.daemon.componentmanager import RequiredCondition
from lbry.extras.daemon.componentmanager import ComponentManager
Expand Down Expand Up @@ -4949,7 +4949,6 @@ async def jsonrpc_blob_delete(self, blob_hash):
DHT / Blob Exchange peer commands.
"""

@requires(DHT_COMPONENT)
async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
"""
Get peers for blob hash
Expand All @@ -4971,21 +4970,29 @@ async def jsonrpc_peer_list(self, blob_hash, page=None, page_size=None):
if not is_valid_blobhash(blob_hash):
# TODO: use error from lbry.error
raise Exception("invalid blob hash")
peers = []
peer_q = asyncio.Queue(loop=self.component_manager.loop)
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
if self.component_manager.has_component(TRACKER_ANNOUNCER_COMPONENT):
tracker = self.component_manager.get_component(TRACKER_ANNOUNCER_COMPONENT)
tracker_peers = await tracker.get_kademlia_peer_list(bytes.fromhex(blob_hash))
log.info("Found %d peers for %s from trackers.", len(tracker_peers), blob_hash[:8])
peer_q.put_nowait(tracker_peers)
elif not self.component_manager.has_component(DHT_COMPONENT):
raise Exception("Peer list needs, at least, either a DHT component or a Tracker component for discovery.")
peers = []
if self.component_manager.has_component(DHT_COMPONENT):
await self.dht_node._peers_for_value_producer(blob_hash, peer_q)
while not peer_q.empty():
peers.extend(peer_q.get_nowait())
results = [
{
"node_id": hexlify(peer.node_id).decode(),
results = {
(peer.address, peer.tcp_port): {
"node_id": hexlify(peer.node_id).decode() if peer.node_id else None,
"address": peer.address,
"udp_port": peer.udp_port,
"tcp_port": peer.tcp_port,
}
for peer in peers
]
return paginate_list(results, page, page_size)
}
return paginate_list(list(results.values()), page, page_size)

@requires(DATABASE_COMPONENT)
async def jsonrpc_blob_announce(self, blob_hash=None, stream_hash=None, sd_hash=None):
Expand Down
1 change: 1 addition & 0 deletions lbry/file/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop, config: 'Config', storage: '
self.purchase_receipt = None
self._added_on = added_on
self.analytics_manager = analytics_manager
self.downloader = None

self.saving = asyncio.Event(loop=self.loop)
self.finished_writing = asyncio.Event(loop=self.loop)
Expand Down
3 changes: 3 additions & 0 deletions lbry/stream/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
from lbry.torrent.tracker import enqueue_tracker_search

if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.dht.node import Node
Expand Down Expand Up @@ -91,6 +93,7 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int =
self.accumulate_task.cancel()
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
await self.add_fixed_peers()
enqueue_tracker_search(bytes.fromhex(self.sd_hash), self.peer_queue)
# start searching for peers for the sd hash
self.search_queue.put_nowait(self.sd_hash)
log.info("searching for peers for stream %s", self.sd_hash)
Expand Down
2 changes: 0 additions & 2 deletions lbry/stream/managed_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

if typing.TYPE_CHECKING:
from lbry.conf import Config
from lbry.schema.claim import Claim
from lbry.blob.blob_manager import BlobManager
from lbry.blob.blob_info import BlobInfo
from lbry.dht.node import Node
from lbry.extras.daemon.analytics import AnalyticsManager
from lbry.wallet.transaction import Transaction

Expand Down
Loading