Skip to content

Commit

Permalink
Exploratory refactor-2 of libp2p + HTTP
Browse files Browse the repository at this point in the history
This is another version of the libp2p + HTTP refactor in #102. The key difference is that a new `NamespacedClient`, having the publisher's address, is created for each sync within the per-sync Syncer.
  • Loading branch information
gammazero committed Aug 9, 2023
1 parent 52d79f6 commit d2700c9
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 26 deletions.
21 changes: 21 additions & 0 deletions dagsync/httpsync/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ func NewPublisherWithoutServer(address string, handlerPath string, lsys ipld.Lin
}, nil
}

// NewPublisherHandler returns a Publisher for use as an http.Handler. Doesn't
// listen or know about a url prefix.
func NewPublisherHandler(lsys ipld.LinkSystem, privKey ic.PrivKey) (*Publisher, error) {
if privKey == nil {
return nil, errors.New("private key required to sign head requests")
}
peerID, err := peer.IDFromPrivateKey(privKey)
if err != nil {
return nil, fmt.Errorf("could not get peer id from private key: %w", err)
}

return &Publisher{
addr: nil,
closer: io.NopCloser(nil),
lsys: lsys,
handlerPath: "",
peerID: peerID,
privKey: privKey,
}, nil
}

// Addrs returns the addresses, as []multiaddress, that the Publisher is
// listening on.
func (p *Publisher) Addrs() []multiaddr.Multiaddr {
Expand Down
150 changes: 150 additions & 0 deletions dagsync/httpsync/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@ import (
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagjson"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/fluent"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/storage/memstore"
"github.com/ipld/go-ipld-prime/traversal"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/message"
"github.com/ipni/go-libipni/dagsync/httpsync"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -96,6 +103,149 @@ func TestNewPublisherForListener(t *testing.T) {
}
}

func TestPublisherWithLibp2pHTTP(t *testing.T) {
ctx := context.Background()
req := require.New(t)

publisherStore := &correctedMemStore{&memstore.Store{
Bag: make(map[string][]byte),
}}
publisherLsys := cidlink.DefaultLinkSystem()
publisherLsys.TrustedStorage = true
publisherLsys.SetReadStorage(publisherStore)
publisherLsys.SetWriteStorage(publisherStore)

privKey, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 256, rand.Reader)
req.NoError(err)

publisher, err := httpsync.NewPublisherHandler(publisherLsys, privKey)
req.NoError(err)

// Use same identity as publisher. This is necessary so that same ID that
// the publisher uses to sign head/ query responses is the same as the ID
// used to identify the publisherStreamHost. Otherwise, it would be
// necessary for the sync client to know both IDs: one for the stream host
// to connect to, and one for the publisher to validate the dignatuse with.
publisherStreamHost, err := libp2p.New(libp2p.Identity(privKey), libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"))
req.NoError(err)

// This is the "HTTP Host". It's like the libp2p "stream host" (aka core
// host.Host), but it uses HTTP semantics instead of stream semantics.
//
// You can pass in options on creation like a stream host to do HTTP over
// libp2p streams, and multiaddrs to create listeners on.
publisherHost, err := libp2phttp.New(
libp2phttp.StreamHost(publisherStreamHost),
libp2phttp.ListenAddrs([]multiaddr.Multiaddr{multiaddr.StringCast("/ip4/127.0.0.1/tcp/0/http")}),
)
req.NoError(err)

go publisherHost.Serve()
defer publisherHost.Close()

protoID := protocol.ID("/ipni-sync/1")

serverStreamMa := publisherHost.Addrs()[0]
serverHTTPMa := publisherHost.Addrs()[1]
req.Contains(serverHTTPMa.String(), "/http")

t.Log("libp2p stream server address:", serverStreamMa.String())
t.Log("libp2p http server address:", serverHTTPMa.String())

// Here is where we attach our request handler. Note that we are mounting
// the "/ipni-sync/1" protocol at /ipni/. libp2phttp manages this mapping
// and clients can learn about the mapping at .well-known/libp2p.
//
// In this case we also want out HTTP handler to not even know about the
// prefix, so we use the stdlib http.StripPrefix.
publisherHost.SetHttpHandlerAtPath(protoID, "/ipni/", http.StripPrefix("/ipni/", publisher))

link, err := publisherLsys.Store(
ipld.LinkContext{Ctx: ctx},
cidlink.LinkPrototype{
Prefix: cid.Prefix{
Version: 1,
Codec: uint64(multicodec.DagJson),
MhType: uint64(multicodec.Sha2_256),
MhLength: -1,
},
},
fluent.MustBuildMap(basicnode.Prototype.Map, 4, func(na fluent.MapAssembler) {
na.AssembleEntry("fish").AssignString("lobster")
na.AssembleEntry("fish1").AssignString("lobster1")
na.AssembleEntry("fish2").AssignString("lobster2")
na.AssembleEntry("fish0").AssignString("lobster0")
}))
req.NoError(err)
publisher.SetRoot(link.(cidlink.Link).Cid)

testCases := []struct {
name string
publisher peer.AddrInfo
newClientHost func(t *testing.T) *libp2phttp.HTTPHost
}{
{
"HTTP transport",
peer.AddrInfo{Addrs: []multiaddr.Multiaddr{serverHTTPMa}},
func(t *testing.T) *libp2phttp.HTTPHost {
clientHost, err := libp2phttp.New()
req.NoError(err)
return clientHost
},
},
{
"libp2p stream transport",
peer.AddrInfo{ID: publisherStreamHost.ID(), Addrs: []multiaddr.Multiaddr{serverStreamMa}},
func(t *testing.T) *libp2phttp.HTTPHost {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
req.NoError(err)
clientHost, err := libp2phttp.New(libp2phttp.StreamHost(clientStreamHost))
req.NoError(err)
return clientHost
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Plumbing to set up the test.
clientStore := &correctedMemStore{&memstore.Store{
Bag: make(map[string][]byte),
}}
clientLsys := cidlink.DefaultLinkSystem()
clientLsys.TrustedStorage = true
clientLsys.SetReadStorage(clientStore)
clientLsys.SetWriteStorage(clientStore)
clientSync := httpsync.NewLibp2pSync(clientLsys, tc.newClientHost(t), protoID, nil)

clientSyncer, err := clientSync.NewSyncer(tc.publisher)
req.NoError(err)
wk := clientSyncer.PeerProtoMap()
if wk != nil {
req.Contains(wk, protoID)
}

headCid, err := clientSyncer.GetHead(ctx)
req.NoError(err)

req.Equal(link.(cidlink.Link).Cid, headCid)

clientSyncer.Sync(ctx, headCid, selectorparse.CommonSelector_MatchPoint)
require.NoError(t, err)

// Assert that data is loadable from the link system.
wantLink := cidlink.Link{Cid: headCid}
node, err := clientLsys.Load(ipld.LinkContext{Ctx: ctx}, wantLink, basicnode.Prototype.Any)
require.NoError(t, err)

// Assert synced node link matches the computed link, i.e. is spec-compliant.
gotLink, err := clientLsys.ComputeLink(wantLink.Prototype(), node)
require.NoError(t, err)
require.Equal(t, gotLink, wantLink, "computed %s but got %s", gotLink.String(), wantLink.String())
})
}
}

func mapKeys(t *testing.T, n ipld.Node) []string {
var keys []string
require.Equal(t, n.Kind(), datamodel.Kind_Map)
Expand Down
89 changes: 72 additions & 17 deletions dagsync/httpsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
"github.com/ipni/go-libipni/maurl"
ic "github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multihash"
)

Expand All @@ -34,6 +35,10 @@ type Sync struct {
blockHook func(peer.ID, cid.Cid)
client *http.Client
lsys ipld.LinkSystem

// libp2phttp
clientHost *libp2phttp.HTTPHost
protoID protocol.ID
}

// NewSync creates a new Sync.
Expand All @@ -50,19 +55,73 @@ func NewSync(lsys ipld.LinkSystem, client *http.Client, blockHook func(peer.ID,
}
}

var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer")

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
client *http.Client
peerID peer.ID
protos libp2phttp.WellKnownProtoMap
rootURL url.URL
urls []*url.URL
sync *Sync
}

func NewLibp2pSync(lsys ipld.LinkSystem, clientHost *libp2phttp.HTTPHost, protoID protocol.ID, blockHook func(peer.ID, cid.Cid)) *Sync {
return &Sync{
blockHook: blockHook,
lsys: lsys,

clientHost: clientHost,
protoID: protoID,
}
}

// NewSyncer creates a new Syncer to use for a single sync operation against a peer.
func (s *Sync) NewSyncer(peerID peer.ID, peerAddrs []multiaddr.Multiaddr) (*Syncer, error) {
urls := make([]*url.URL, len(peerAddrs))
for i := range peerAddrs {
func (s *Sync) NewSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
if s.clientHost != nil {
return s.newLibp2pSyncer(peerInfo)
}
return s.newSyncer(peerInfo)
}

func (s *Sync) newLibp2pSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
httpClient, err := s.clientHost.NamespacedClient(s.protoID, peerInfo)
if err != nil {
return nil, err
}

sr := &Syncer{
client: &httpClient,
peerID: peerInfo.ID,
rootURL: url.URL{Path: "/"},
urls: nil,
sync: s,
}

if peerInfo.ID != "" {
sr.protos, err = s.clientHost.GetAndStorePeerProtoMap(httpClient.Transport, peerInfo.ID)
if err != nil {
return nil, err
}
}

return sr, nil
}

func (s *Sync) newSyncer(peerInfo peer.AddrInfo) (*Syncer, error) {
urls := make([]*url.URL, len(peerInfo.Addrs))
for i, addr := range peerInfo.Addrs {
var err error
urls[i], err = maurl.ToURL(peerAddrs[i])
urls[i], err = maurl.ToURL(addr)
if err != nil {
return nil, err
}
}

return &Syncer{
peerID: peerID,
client: s.client,
peerID: peerInfo.ID,
rootURL: *urls[0],
urls: urls[1:],
sync: s,
Expand All @@ -73,14 +132,8 @@ func (s *Sync) Close() {
s.client.CloseIdleConnections()
}

var errHeadFromUnexpectedPeer = errors.New("found head signed from an unexpected peer")

// Syncer provides sync functionality for a single sync with a peer.
type Syncer struct {
peerID peer.ID
rootURL url.URL
urls []*url.URL
sync *Sync
func (s *Syncer) PeerProtoMap() libp2phttp.WellKnownProtoMap {
return s.protos
}

// GetHead fetches the head of the peer's advertisement chain.
Expand All @@ -102,7 +155,9 @@ func (s *Syncer) GetHead(ctx context.Context) (cid.Cid, error) {
return cid.Undef, err
}

if peerIDFromSig != s.peerID {
if s.peerID == "" {
log.Warn("cannot verify publisher signature without peer ID")
} else if peerIDFromSig != s.peerID {
return cid.Undef, errHeadFromUnexpectedPeer
}

Expand Down Expand Up @@ -136,7 +191,7 @@ func (s *Syncer) Sync(ctx context.Context, nextCid cid.Cid, sel ipld.Node) error
}
}

s.sync.client.CloseIdleConnections()
s.client.CloseIdleConnections()
return nil
}

Expand Down Expand Up @@ -205,7 +260,7 @@ nextURL:
return err
}

resp, err := s.sync.client.Do(req)
resp, err := s.client.Do(req)
if err != nil {
if len(s.urls) != 0 {
log.Errorw("Fetch request failed, will retry with next address", "err", err)
Expand Down
6 changes: 3 additions & 3 deletions dagsync/httpsync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestHttpSync_NFTStorage_DigestCheck(t *testing.T) {
require.NoError(t, err)

sync := httpsync.NewSync(ls, http.DefaultClient, nil)
syncer, err := sync.NewSyncer(pubid, []multiaddr.Multiaddr{pubmaddr})
syncer, err := sync.NewSyncer(peer.AddrInfo{pubid, []multiaddr.Multiaddr{pubmaddr}})
require.NoError(t, err)

head, err := syncer.GetHead(ctx)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestHttpsync_AcceptsSpecCompliantDagJson(t *testing.T) {
ls.SetReadStorage(store)

sync := httpsync.NewSync(ls, http.DefaultClient, nil)
syncer, err := sync.NewSyncer(pubID, pub.Addrs())
syncer, err := sync.NewSyncer(peer.AddrInfo{pubID, pub.Addrs()})
require.NoError(t, err)

head, err := syncer.GetHead(ctx)
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestHttpsync_NotFoundReturnsContentNotFoundErr(t *testing.T) {
ls.SetReadStorage(store)

sync := httpsync.NewSync(ls, http.DefaultClient, nil)
syncer, err := sync.NewSyncer(pubID, pub.Addrs())
syncer, err := sync.NewSyncer(peer.AddrInfo{pubID, pub.Addrs()})
require.NoError(t, err)

mh, err := multihash.Sum([]byte("fish"), multihash.SHA2_256, -1)
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/ipfs/go-ipld-format v0.3.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.20.0
github.com/libp2p/go-libp2p v0.29.1
github.com/libp2p/go-libp2p v0.29.1-0.20230804182920-49d7db486c5e
github.com/libp2p/go-libp2p-gostream v0.6.0
github.com/libp2p/go-libp2p-pubsub v0.9.3
github.com/libp2p/go-msgio v0.3.0
Expand Down Expand Up @@ -113,7 +113,7 @@ require (
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/qtls-go1-19 v0.3.3 // indirect
github.com/quic-go/qtls-go1-20 v0.2.3 // indirect
github.com/quic-go/quic-go v0.36.3 // indirect
github.com/quic-go/quic-go v0.36.4 // indirect
github.com/quic-go/webtransport-go v0.5.3 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/russross/blackfriday/v2 v2.0.1 // indirect
Expand Down
Loading

0 comments on commit d2700c9

Please sign in to comment.