Skip to content

Commit

Permalink
pubsub cmd: switch to coreapi
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
  • Loading branch information
magik6k committed Sep 25, 2018
1 parent 3bedefb commit a232340
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 102 deletions.
134 changes: 35 additions & 99 deletions core/commands/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,17 @@ package commands
import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"sort"
"sync"
"time"

core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
floodsub "gx/ipfs/QmPbYVFhKxamZAN9MyrQMDeoGYa6zkQkhAPguwFfzxPM1J/go-libp2p-floodsub"
blocks "gx/ipfs/QmRcHuYzAyswytBuMF78rj3LTChYszomRFXNg4685ZN1WM/go-block-format"
cmdkit "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit"
cmds "gx/ipfs/QmXTmUCBtDUrzDYVzASogLiNph7EBuYqEgPL7QoHNMzUnz/go-ipfs-cmds"
pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore"
)

var PubsubCmd = &cmds.Command{
Expand All @@ -44,6 +37,13 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
},
}

type pubsubMessage struct {
From []byte `json:"from,omitempty"`
Data []byte `json:"data,omitempty"`
Seqno []byte `json:"seqno,omitempty"`
TopicIDs []string `json:"topicIDs,omitempty"`
}

var PubsubSubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Subscribe to messages on a given topic.",
Expand Down Expand Up @@ -75,40 +75,16 @@ This command outputs data in the following encodings:
cmdkit.BoolOption("discover", "try to discover other peers subscribed to the same topic"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return fmt.Errorf("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}

topic := req.Arguments[0]
sub, err := n.Floodsub.Subscribe(topic)
if err != nil {
return err
}
defer sub.Cancel()

discover, _ := req.Options["discover"].(bool)
if discover {
go func() {
blk := blocks.NewBlock([]byte("floodsub:" + topic))
err := n.Blocks.AddBlock(blk)
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(req.Context, n, blk.Cid())
}()
}

sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
defer sub.Close()

if f, ok := res.(http.Flusher); ok {
f.Flush()
Expand All @@ -122,15 +98,17 @@ This command outputs data in the following encodings:
return err
}

err = res.Emit(msg)
if err != nil {
return err
}
res.Emit(&pubsubMessage{
Data: msg.Data(),
From: []byte(msg.From()),
Seqno: msg.Seq(),
TopicIDs: msg.Topics(),
})
}
},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -139,7 +117,7 @@ This command outputs data in the following encodings:
return err
}),
"ndpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -149,7 +127,7 @@ This command outputs data in the following encodings:
return err
}),
"lenpayload": cmds.MakeEncoder(func(req *cmds.Request, w io.Writer, v interface{}) error {
m, ok := v.(*floodsub.Message)
m, ok := v.(*pubsubMessage)
if !ok {
return fmt.Errorf("unexpected type: %T", v)
}
Expand All @@ -162,31 +140,7 @@ This command outputs data in the following encodings:
return err
}),
},
Type: floodsub.Message{},
}

func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
Type: pubsubMessage{},
}

var PubsubPubCmd = &cmds.Command{
Expand All @@ -206,20 +160,11 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("data", true, true, "Payload of message to publish.").EnableStdin(),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
}

topic := req.Arguments[0]

err = req.ParseBodyArgs()
Expand All @@ -228,7 +173,7 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
}

for _, data := range req.Arguments[1:] {
if err := n.Floodsub.Publish(topic, []byte(data)); err != nil {
if err := api.PubSub().Publish(req.Context, topic, []byte(data)); err != nil {
return err
}
}
Expand All @@ -250,21 +195,17 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
`,
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
l, err := api.PubSub().Ls(req.Context)
if err != nil {
return err
}

return cmds.EmitOnce(res, stringList{n.Floodsub.GetTopics()})
return cmds.EmitOnce(res, stringList{l})
},
Type: stringList{},
Encoders: cmds.EncoderMap{
Expand Down Expand Up @@ -304,26 +245,21 @@ To use, the daemon must be run with '--enable-pubsub-experiment'.
cmdkit.StringArg("topic", false, false, "topic to list connected peers of"),
},
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
n, err := cmdenv.GetNode(env)
api, err := cmdenv.GetApi(env)
if err != nil {
return err
}

// Must be online!
if !n.OnlineMode() {
return cmdkit.Errorf(cmdkit.ErrClient, ErrNotOnline.Error())
}

if n.Floodsub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use")
}

var topic string
if len(req.Arguments) == 1 {
topic = req.Arguments[0]
}

peers := n.Floodsub.ListPeers(topic)
peers, err := api.PubSub().Peers(req.Context, options.PubSub.Topic(topic))
if err != nil {
return err
}

list := &stringList{make([]string, 0, len(peers))}

for _, peer := range peers {
Expand Down
4 changes: 4 additions & 0 deletions core/coreapi/coreapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ package coreapi
import (
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"

logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
)

var log = logging.Logger("core/coreapi")

type CoreAPI struct {
node *core.IpfsNode
}
Expand Down
8 changes: 7 additions & 1 deletion core/coreapi/interface/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
)

// PubSubSubscription is an active PubSub subscription
Expand All @@ -24,6 +24,12 @@ type PubSubMessage interface {

// Data returns the message body
Data() []byte

// Seq returns message identifier
Seq() []byte

// Topics returns list of topics this message was set to
Topics() []string
}

// PubSubAPI specifies the interface to PubSub
Expand Down
60 changes: 58 additions & 2 deletions core/coreapi/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package coreapi
import (
"context"
"errors"
"strings"
"sync"
"time"

core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"

peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
floodsub "gx/ipfs/QmY1L5krVk8dv8d74uESmJTXGpoigVYqBVxXXz1aS8aFSb/go-libp2p-floodsub"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
floodsub "gx/ipfs/QmPbYVFhKxamZAN9MyrQMDeoGYa6zkQkhAPguwFfzxPM1J/go-libp2p-floodsub"
peer "gx/ipfs/QmbNepETomvmXfz1X5pHNFD2QuPqnqi47dTd94QJWSorQ3/go-libp2p-peer"
pstore "gx/ipfs/QmfAQMFpgDU2U4BXG64qVr8HSiictfWvkSBz7Y2oDj65st/go-libp2p-peerstore"
)

type PubSubAPI CoreAPI
Expand Down Expand Up @@ -58,6 +64,8 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
}

func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
options, err := caopts.PubSubSubscribeOptions(opts...)

if err := api.checkNode(); err != nil {
return nil, err
}
Expand All @@ -67,9 +75,45 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err
}

if options.Discover {
go func() {
blk, err := api.core().Block().Put(ctx, strings.NewReader("floodsub:"+topic))
if err != nil {
log.Error("pubsub discovery: ", err)
return
}

connectToPubSubPeers(ctx, api.node, blk.Path().Cid())
}()
}

return &pubSubSubscription{sub}, nil
}

func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

provs := n.Routing.FindProvidersAsync(ctx, cid, 10)
wg := &sync.WaitGroup{}
for p := range provs {
wg.Add(1)
go func(pi pstore.PeerInfo) {
defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
err := n.PeerHost.Connect(ctx, pi)
if err != nil {
log.Info("pubsub discover: ", err)
return
}
log.Info("connected to pubsub peer:", pi.ID)
}(p)
}

wg.Wait()
}

func (api *PubSubAPI) checkNode() error {
if !api.node.OnlineMode() {
return coreiface.ErrOffline
Expand Down Expand Up @@ -103,3 +147,15 @@ func (msg *pubSubMessage) From() peer.ID {
func (msg *pubSubMessage) Data() []byte {
return msg.msg.Data
}

func (msg *pubSubMessage) Seq() []byte {
return msg.msg.Seqno
}

func (msg *pubSubMessage) Topics() []string {
return msg.msg.TopicIDs
}

func (api *PubSubAPI) core() coreiface.CoreAPI {
return (*CoreAPI)(api)
}

0 comments on commit a232340

Please sign in to comment.