Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic support for multiaddr addresses and improvement around peer id #75

Merged
merged 8 commits into from
Nov 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 13 additions & 18 deletions examples/chat/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import click
from libp2p.libp2p import *
from network.multiaddr import MultiAddr
from peer.peerinfo import info_from_p2p_addr


# TODO: change once muxed_connection supports extracting protocol id from messages
PROTOCOL_ID = '/echo/1.0.0'
Expand Down Expand Up @@ -37,7 +39,7 @@ async def write_data(stream):
async def run(port, destination):

if not destination:
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s/p2p/hostA" % port])
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s" % port])

async def stream_handler(stream):
asyncio.ensure_future(read_data(stream))
Expand All @@ -48,38 +50,31 @@ async def stream_handler(stream):
port = None
for listener in host.network.listeners.values():
for addr in listener.get_addrs():
addr_dict = addr.to_options()
if addr_dict['transport'] == 'tcp':
port = addr_dict['port']
break
port = int(addr.value_for_protocol('tcp'))

if not port:
raise RuntimeError("was not able to find the actual local port")

print("Run './examples/chat/chat.py --port %s -d /ip4/127.0.0.1/tcp/%s/p2p/%s' on another console.\n" % (int(port)+1, port, host.get_id().pretty()))
print("Run './examples/chat/chat.py --port %s -d /ip4/127.0.0.1/tcp/%s/p2p/%s' on another console.\n" %
(int(port)+1, port, host.get_id().pretty()))
print("You can replace 127.0.0.1 with public IP as well.")
print("\nWaiting for incoming connection\n\n")


else:
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s/p2p/hostB" % port])

# TODO: improve multiaddr module to have proper function to do this
multiaddr = MultiAddr(destination)
ss = multiaddr.get_multiaddr_string().split('/')
peer_id = ss[-1]
addr = '/'.join(ss[:-2])
host = await new_node(transport_opt=["/ip4/127.0.0.1/tcp/%s" % port])

# Associate the peer with local ip address (see default parameters of Libp2p())
host.get_peerstore().add_addr(peer_id, addr, 10)
m = multiaddr.Multiaddr(destination)
info = info_from_p2p_addr(m)
# Associate the peer with local ip address
await host.connect(info)

# Start a stream with the destination.
# Multiaddress of the destination peer is fetched from the peerstore using 'peerId'.
stream = await host.new_stream(peer_id, [PROTOCOL_ID])
stream = await host.new_stream(info.peer_id, [PROTOCOL_ID])

asyncio.ensure_future(read_data(stream))
asyncio.ensure_future(write_data(stream))
print("Already connected to peer %s" % addr)
print("Already connected to peer %s" % info.addrs[0])


@click.command()
Expand Down
36 changes: 34 additions & 2 deletions host/basic_host.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from .host_interface import IHost
import multiaddr

from .host_interface import IHost

# Upon host creation, host takes in options,
# including the list of addresses on which to listen.
# Host then parses these options and delegates to its Network instance,
# telling it to listen on the given listen addresses.


class BasicHost(IHost):

# default options constructor
Expand Down Expand Up @@ -36,6 +38,18 @@ def get_mux(self):
:return: mux instance of host
"""

def get_addrs(self):
"""
:return: all the multiaddr addresses this host is listening too
"""
p2p_part = multiaddr.Multiaddr('/ipfs/{}'.format(self.get_id().pretty()))

addrs = []
for transport in self.network.listeners.values():
for addr in transport.get_addrs():
addrs.append(addr.encapsulate(p2p_part))
return addrs

def set_stream_handler(self, protocol_id, stream_handler):
"""
set stream handler for host
Expand All @@ -45,7 +59,6 @@ def set_stream_handler(self, protocol_id, stream_handler):
"""
return self.network.set_stream_handler(protocol_id, stream_handler)


# protocol_id can be a list of protocol_ids
# stream will decide which protocol_id to run on
async def new_stream(self, peer_id, protocol_ids):
Expand All @@ -56,3 +69,22 @@ async def new_stream(self, peer_id, protocol_ids):
"""
stream = await self.network.new_stream(peer_id, protocol_ids)
return stream

async def connect(self, peer_info):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding reference if anyone is wondering where this comes from.

"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.

:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""
self.peerstore.add_addrs(peer_info.peer_id, peer_info.addrs, 10)

# there is already a connection to this peer
if peer_info.peer_id in self.network.connections:
return

await self.network.dial_peer(peer_info.peer_id)
19 changes: 19 additions & 0 deletions host/host_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ def get_mux(self):
:return: mux instance of host
"""

@abstractmethod
def get_addrs(self):
"""
:return: all the multiaddr addresses this host is listening too
"""

@abstractmethod
def set_stream_handler(self, protocol_id, stream_handler):
"""
Expand All @@ -39,3 +45,16 @@ def new_stream(self, peer_id, protocol_ids):
:param protocol_ids: protocol ids that stream can run on
:return: true if successful
"""

@abstractmethod
def connect(self, peer_info):
"""
connect ensures there is a connection between this host and the peer with
given peer_info.peer_id. connect will absorb the addresses in peer_info into its internal
peerstore. If there is not an active connection, connect will issue a
dial, and block until a connection is open, or an error is
returned.

:param peer_info: peer_info of the host we want to connect to
:type peer_info: peer.peerinfo.PeerInfo
"""
5 changes: 4 additions & 1 deletion libp2p/libp2p.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from Crypto.PublicKey import RSA
import multiaddr
from peer.peerstore import PeerStore
from peer.id import id_from_public_key
from network.swarm import Swarm
from host.basic_host import BasicHost
from transport.upgrader import TransportUpgrader
Expand All @@ -11,10 +13,11 @@ async def new_node(id_opt=None, transport_opt=None, \

if id_opt is None:
new_key = RSA.generate(2048, e=65537)
id_opt = new_key.publickey().exportKey("PEM")
id_opt = id_from_public_key(new_key.publickey())
# private_key = new_key.exportKey("PEM")

transport_opt = transport_opt or ["/ip4/127.0.0.1/tcp/8001"]
transport_opt = [multiaddr.Multiaddr(t) for t in transport_opt]
muxer_opt = muxer_opt or ["mplex/6.7.0"]
sec_opt = sec_opt or ["secio"]
peerstore = peerstore or PeerStore()
Expand Down
10 changes: 10 additions & 0 deletions network/network_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@ def get_peer_id(self):
:return: the peer id
"""

@abstractmethod
def dial_peer(self, peer_id):
"""
dial_peer try to create a connection to peer_id

:param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id
:return: muxed connection
"""

@abstractmethod
def set_stream_handler(self, protocol_id, stream_handler):
"""
Expand Down
52 changes: 34 additions & 18 deletions network/swarm.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
from peer.id import ID
from protocol_muxer.multiselect_client import MultiselectClient
from protocol_muxer.multiselect import Multiselect
from .network_interface import INetwork
from .stream.net_stream import NetStream
from .multiaddr import MultiAddr
from .connection.raw_connection import RawConnection


class Swarm(INetwork):
# pylint: disable=too-many-instance-attributes, cell-var-from-loop

def __init__(self, my_peer_id, peerstore, upgrader):
self._my_peer_id = my_peer_id
self.self_id = ID(my_peer_id)
def __init__(self, peer_id, peerstore, upgrader):
self.self_id = peer_id
self.peerstore = peerstore
self.upgrader = upgrader
self.connections = dict()
Expand All @@ -36,18 +33,21 @@ def set_stream_handler(self, protocol_id, stream_handler):
self.multiselect.add_handler(protocol_id, stream_handler)
return True

async def new_stream(self, peer_id, protocol_ids):
async def dial_peer(self, peer_id):
"""
:param peer_id: peer_id of destination
:param protocol_id: protocol id
:return: net stream instance
dial_peer try to create a connection to peer_id
:param peer_id: peer if we want to dial
:raises SwarmException: raised when no address if found for peer_id
:return: muxed connection
"""

# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)

if not addrs:
raise SwarmException("No known addresses to peer")

# TODO: define logic to choose which address to use, or try them all ?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we should try them all and use whichever address responds the fastest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what the go-libp2p repo is doing at the moment, and there are plans to upgrade it to a smarter dial. I forgot if it's called a traffic shaper or a dial manager. There is a bit of design freedom here. We should participate in the discussion with the rest of the community.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zixuanzh do you have a link to the issue where they discuss this dial manager ?

Copy link
Contributor

@zixuanzh zixuanzh Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zaibon this is the thread libp2p/go-libp2p-swarm#88, dial manager might be a make-up name.

multiaddr = addrs[0]

if peer_id in self.connections:
Expand All @@ -56,14 +56,32 @@ async def new_stream(self, peer_id, protocol_ids):
muxed_conn = self.connections[peer_id]
else:
# Transport dials peer (gets back a raw conn)
raw_conn = await self.transport.dial(MultiAddr(multiaddr))
raw_conn = await self.transport.dial(multiaddr)

# Use upgrader to upgrade raw conn to muxed conn
muxed_conn = self.upgrader.upgrade_connection(raw_conn, True)

# Store muxed connection in connections
self.connections[peer_id] = muxed_conn

return muxed_conn

async def new_stream(self, peer_id, protocol_ids):
"""
:param peer_id: peer_id of destination
:param protocol_id: protocol id
:return: net stream instance
"""
# Get peer info from peer store
addrs = self.peerstore.addrs(peer_id)

if not addrs:
raise SwarmException("No known addresses to peer")

multiaddr = addrs[0]

muxed_conn = await self.dial_peer(peer_id)

# Use muxed conn to open stream, which returns
# a muxed stream
# TODO: Remove protocol id from being passed into muxed_conn
Expand Down Expand Up @@ -92,18 +110,15 @@ async def listen(self, *args):
Call listener listen with the multiaddr
Map multiaddr to listener
"""
for multiaddr_str in args:
if multiaddr_str in self.listeners:
for multiaddr in args:
if str(multiaddr) in self.listeners:
return True

multiaddr = MultiAddr(multiaddr_str)
multiaddr_dict = multiaddr.to_options()

async def conn_handler(reader, writer):
# Upgrade reader/write to a net_stream and pass \
# to appropriate stream handler (using multiaddr)
raw_conn = RawConnection(multiaddr_dict['host'], \
multiaddr_dict['port'], reader, writer)
raw_conn = RawConnection(multiaddr.value_for_protocol('ip4'),
multiaddr.value_for_protocol('tcp'), reader, writer)
muxed_conn = self.upgrader.upgrade_connection(raw_conn, False)

# TODO: Remove protocol id from muxed_conn accept stream or
Expand All @@ -124,7 +139,7 @@ async def conn_handler(reader, writer):
try:
# Success
listener = self.transport.create_listener(conn_handler)
self.listeners[multiaddr_str] = listener
self.listeners[str(multiaddr)] = listener
await listener.listen(multiaddr)
return True
except IOError:
Expand All @@ -138,5 +153,6 @@ def add_transport(self, transport):
# TODO: Support more than one transport
self.transport = transport


class SwarmException(Exception):
pass
49 changes: 49 additions & 0 deletions peer/id.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
import base58
import multihash

# MaxInlineKeyLength is the maximum length a key can be for it to be inlined in
# the peer ID.
# * When `len(pubKey.Bytes()) <= MaxInlineKeyLength`, the peer ID is the
# identity multihash hash of the public key.
# * When `len(pubKey.Bytes()) > MaxInlineKeyLength`, the peer ID is the
# sha2-256 multihash of the public key.
MAX_INLINE_KEY_LENGTH = 42


class ID:

Expand All @@ -15,3 +25,42 @@ def __str__(self):
return "<peer.ID %s*%s>" % (pid[:2], pid[len(pid)-6:])

__repr__ = __str__

def __eq__(self, other):
#pylint: disable=protected-access
return self._id_str == other._id_str

def __hash__(self):
return hash(self._id_str)


def id_b58_encode(peer_id):
"""
return a b58-encoded string
"""
#pylint: disable=protected-access
return base58.b58encode(peer_id._id_str).decode()


def id_b58_decode(peer_id_str):
"""
return a base58-decoded peer ID
"""
return ID(base58.b58decode(peer_id_str))


def id_from_public_key(key):
# export into binary format
key_bin = key.exportKey("DER")

algo = multihash.Func.sha2_256
# TODO: seems identity is not yet supported in pymultihash
# if len(b) <= MAX_INLINE_KEY_LENGTH:
# algo multihash.func.identity

mh_digest = multihash.digest(key_bin, algo)
return ID(mh_digest.encode())


def id_from_private_key(key):
return id_from_public_key(key.publickey())
Loading