diff --git a/core/commands/pubsub.go b/core/commands/pubsub.go index c9a723a8554..ac3234c7879 100644 --- a/core/commands/pubsub.go +++ b/core/commands/pubsub.go @@ -3,24 +3,17 @@ package commands import ( "context" "encoding/binary" - "errors" "fmt" "io" "net/http" "sort" - "sync" - "time" - core "github.com/ipfs/go-ipfs/core" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" e "github.com/ipfs/go-ipfs/core/commands/e" + options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" - floodsub "gx/ipfs/QmPbYVFhKxamZAN9MyrQMDeoGYa6zkQkhAPguwFfzxPM1J/go-libp2p-floodsub" - blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format" cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit" cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds" - pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" ) var PubsubCmd = &cmds.Command{ @@ -44,6 +37,13 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. }, } +type pubsubMessage struct { + From []byte `json:"from,omitempty"` + Data []byte `json:"data,omitempty"` + Seqno []byte `json:"seqno,omitempty"` + TopicIDs []string `json:"topicIDs,omitempty"` +} + var PubsubSubCmd = &cmds.Command{ Helptext: cmdkit.HelpText{ Tagline: "Subscribe to messages on a given topic.", @@ -75,40 +75,16 @@ This command outputs data in the following encodings: cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - // Must be online! - if !n.OnlineMode() { - return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error()) - } - - if n.Floodsub == nil { - return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use") - } - topic := req.Arguments[0] - sub, err := n.Floodsub.Subscribe(topic) - if err != nil { - return err - } - defer sub.Cancel() - discover, _ := req.Options["discover"].(bool) - if discover { - go func() { - blk := blocks.NewBlock([]byte("floodsub:" + topic)) - err := n.Blocks.AddBlock(blk) - if err != nil { - log.Error("pubsub discovery: ", err) - return - } - - connectToPubSubPeers(req.Context, n, blk.Cid()) - }() - } + + sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover)) + defer sub.Close() if f, ok := res.(http.Flusher); ok { f.Flush() @@ -122,15 +98,17 @@ This command outputs data in the following encodings: return err } - err = res.Emit(msg) - if err != nil { - return err - } + res.Emit(&pubsubMessage{ + Data: msg.Data(), + From: []byte(msg.From()), + Seqno: msg.Seq(), + TopicIDs: msg.Topics(), + }) } }, Encoders: cmds.EncoderMap{ cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { - m, ok := v.(*floodsub.Message) + m, ok := v.(*pubsubMessage) if !ok { return fmt.Errorf("unexpected type: %T", v) } @@ -139,7 +117,7 @@ This command outputs data in the following encodings: return err }), "ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { - m, ok := v.(*floodsub.Message) + m, ok := v.(*pubsubMessage) if !ok { return fmt.Errorf("unexpected type: %T", v) } @@ -149,7 +127,7 @@ This command outputs data in the following encodings: return err }), "lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error { - m, ok := v.(*floodsub.Message) + m, ok := v.(*pubsubMessage) if !ok { return fmt.Errorf("unexpected type: %T", v) } @@ -162,31 +140,7 @@ This command outputs data in the following encodings: return err }), }, - Type: floodsub.Message{}, -} - -func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - provs := n.Routing.FindProvidersAsync(ctx, cid, 10) - wg := &sync.WaitGroup{} - for p := range provs { - wg.Add(1) - go func(pi pstore.PeerInfo) { - defer wg.Done() - ctx, cancel := context.WithTimeout(ctx, time.Second*10) - defer cancel() - err := n.PeerHost.Connect(ctx, pi) - if err != nil { - log.Info("pubsub discover: ", err) - return - } - log.Info("connected to pubsub peer:", pi.ID) - }(p) - } - - wg.Wait() + Type: pubsubMessage{}, } var PubsubPubCmd = &cmds.Command{ @@ -206,20 +160,11 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - // Must be online! - if !n.OnlineMode() { - return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error()) - } - - if n.Floodsub == nil { - return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") - } - topic := req.Arguments[0] err = req.ParseBodyArgs() @@ -228,7 +173,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. } for _, data := range req.Arguments[1:] { - if err := n.Floodsub.Publish(topic, []byte(data)); err != nil { + if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil { return err } } @@ -250,21 +195,17 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. `, }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - // Must be online! - if !n.OnlineMode() { - return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error()) - } - - if n.Floodsub == nil { - return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") + l, err := api.PubSub().Ls(req.Context) + if err != nil { + return err } - return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()}) + return cmds.EmitOnce(res, stringList{l}) }, Type: stringList{}, Encoders: cmds.EncoderMap{ @@ -304,26 +245,21 @@ To use, the daemon must be run with '--enable-pubsub-experiment'. cmdkit.StringArg("topic", false, false, "topic to list connected peers of"), }, Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error { - n, err := cmdenv.GetNode(env) + api, err := cmdenv.GetApi(env) if err != nil { return err } - // Must be online! - if !n.OnlineMode() { - return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error()) - } - - if n.Floodsub == nil { - return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use") - } - var topic string if len(req.Arguments) == 1 { topic = req.Arguments[0] } - peers := n.Floodsub.ListPeers(topic) + peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic)) + if err != nil { + return err + } + list := &stringList{make([]string, 0, len(peers))} for _, peer := range peers { diff --git a/core/coreapi/coreapi.go b/core/coreapi/coreapi.go index 46731a0c620..84d34015510 100644 --- a/core/coreapi/coreapi.go +++ b/core/coreapi/coreapi.go @@ -16,8 +16,12 @@ package coreapi import ( core "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" + + logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" ) +var log = logging.Logger("core/coreapi") + type CoreAPI struct { node *core.IpfsNode } diff --git a/core/coreapi/interface/pubsub.go b/core/coreapi/interface/pubsub.go index 4b52ed6d644..4c9a1d73e5b 100644 --- a/core/coreapi/interface/pubsub.go +++ b/core/coreapi/interface/pubsub.go @@ -6,7 +6,7 @@ import ( options "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" ) // PubSubSubscription is an active PubSub subscription @@ -24,6 +24,12 @@ type PubSubMessage interface { // Data returns the message body Data() []byte + + // Seq returns message identifier + Seq() []byte + + // Topics returns list of topics this message was set to + Topics() []string } // PubSubAPI specifies the interface to PubSub diff --git a/core/coreapi/pubsub.go b/core/coreapi/pubsub.go index 66edd6907dd..f24aba69b32 100644 --- a/core/coreapi/pubsub.go +++ b/core/coreapi/pubsub.go @@ -3,12 +3,18 @@ package coreapi import ( "context" "errors" + "strings" + "sync" + "time" + core "github.com/ipfs/go-ipfs/core" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" - peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer" - floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub" + cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" + floodsub "gx/ipfs/QmPbYVFhKxamZAN9MyrQMDeoGYa6zkQkhAPguwFfzxPM1J/go-libp2p-floodsub" + peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer" + pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore" ) type PubSubAPI CoreAPI @@ -58,6 +64,8 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er } func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { + options, err := caopts.PubSubSubscribeOptions(opts...) + if err := api.checkNode(); err != nil { return nil, err } @@ -67,9 +75,45 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt return nil, err } + if options.Discover { + go func() { + blk, err := api.core().Block().Put(ctx, strings.NewReader("floodsub:"+topic)) + if err != nil { + log.Error("pubsub discovery: ", err) + return + } + + connectToPubSubPeers(ctx, api.node, blk.Path().Cid()) + }() + } + return &pubSubSubscription{sub}, nil } +func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + provs := n.Routing.FindProvidersAsync(ctx, cid, 10) + wg := &sync.WaitGroup{} + for p := range provs { + wg.Add(1) + go func(pi pstore.PeerInfo) { + defer wg.Done() + ctx, cancel := context.WithTimeout(ctx, time.Second*10) + defer cancel() + err := n.PeerHost.Connect(ctx, pi) + if err != nil { + log.Info("pubsub discover: ", err) + return + } + log.Info("connected to pubsub peer:", pi.ID) + }(p) + } + + wg.Wait() +} + func (api *PubSubAPI) checkNode() error { if !api.node.OnlineMode() { return coreiface.ErrOffline @@ -103,3 +147,15 @@ func (msg *pubSubMessage) From() peer.ID { func (msg *pubSubMessage) Data() []byte { return msg.msg.Data } + +func (msg *pubSubMessage) Seq() []byte { + return msg.msg.Seqno +} + +func (msg *pubSubMessage) Topics() []string { + return msg.msg.TopicIDs +} + +func (api *PubSubAPI) core() coreiface.CoreAPI { + return (*CoreAPI)(api) +}