Skip to content

Commit

Permalink
fix: remove pubsub discovery hack
Browse files Browse the repository at this point in the history
Pubsub handles this internally now.
  • Loading branch information
Stebalien committed May 29, 2020
1 parent ac8a88d commit c58e3e4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 59 deletions.
6 changes: 2 additions & 4 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ This command outputs data in the following encodings:
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
},
Options: []cmds.Option{
cmds.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
cmds.BoolOption(pubsubDiscoverOptionName, "Deprecated option to instruct pubsub to discovery peers for the topic. Discovery is now built into pubsub."),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
Expand All @@ -83,9 +83,7 @@ This command outputs data in the following encodings:
}

topic := req.Arguments[0]
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)

sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
sub, err := api.PubSub().Subscribe(req.Context, topic)
if err != nil {
return err
}
Expand Down
3 changes: 0 additions & 3 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/ipfs/go-ipfs-provider"
offlineroute "github.com/ipfs/go-ipfs-routing/offline"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
dag "github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
Expand All @@ -44,8 +43,6 @@ import (
"github.com/ipfs/go-ipfs/repo"
)

var log = logging.Logger("core/coreapi")

type CoreAPI struct {
nctx context.Context

Expand Down
59 changes: 7 additions & 52 deletions core/coreapi/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,9 @@ package coreapi
import (
"context"
"errors"
"strings"
"sync"
"time"

cid "github.com/ipfs/go-cid"
coreiface "github.com/ipfs/interface-go-ipfs-core"
caopts "github.com/ipfs/interface-go-ipfs-core/options"
p2phost "github.com/libp2p/go-libp2p-core/host"
peer "github.com/libp2p/go-libp2p-core/peer"
routing "github.com/libp2p/go-libp2p-core/routing"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -19,7 +14,6 @@ import (
type PubSubAPI CoreAPI

type pubSubSubscription struct {
cancel context.CancelFunc
subscription *pubsub.Subscription
}

Expand Down Expand Up @@ -61,12 +55,16 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
}

func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)
// Parse the options to avoid introducing silent failures for invalid
// options. However, we don't currently have any use for them. The only
// subscription option, discovery, is now a no-op as it's handled by
// pubsub itself.
_, err := caopts.PubSubSubscribeOptions(opts...)
if err != nil {
return nil, err
}

r, err := api.checkNode()
_, err = api.checkNode()
if err != nil {
return nil, err
}
Expand All @@ -77,45 +75,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err
}

pubctx, cancel := context.WithCancel(api.nctx)

if options.Discover {
go func() {
blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic))
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid())
}()
}

return &pubSubSubscription{cancel, sub}, nil
}

func connectToPubSubPeers(ctx context.Context, r routing.Routing, ph p2phost.Host, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := r.FindProvidersAsync(ctx, cid, 10)
var wg sync.WaitGroup
for p := range provs {
wg.Add(1)
go func(pi peer.AddrInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := ph.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
return &pubSubSubscription{sub}, nil
}

func (api *PubSubAPI) checkNode() (routing.Routing, error) {
Expand All @@ -132,7 +92,6 @@ func (api *PubSubAPI) checkNode() (routing.Routing, error) {
}

func (sub *pubSubSubscription) Close() error {
sub.cancel()
sub.subscription.Cancel()
return nil
}
Expand Down Expand Up @@ -161,7 +120,3 @@ func (msg *pubSubMessage) Seq() []byte {
func (msg *pubSubMessage) Topics() []string {
return msg.msg.TopicIDs
}

func (api *PubSubAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

0 comments on commit c58e3e4

Please sign in to comment.