Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

namesys/pubsub: pubsub Publisher and Resolver #4047

Merged
merged 1 commit into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
unrestrictedApiAccessKwd = "unrestricted-api"
writableKwd = "writable"
enableFloodSubKwd = "enable-pubsub-experiment"
enableIPNSPubSubKwd = "enable-namesys-pubsub"
enableMultiplexKwd = "enable-mplex-experiment"
// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
Expand Down Expand Up @@ -157,6 +158,7 @@ Headers.
cmdkit.BoolOption(offlineKwd, "Run offline. Do not connect to the rest of the network but provide local API."),
cmdkit.BoolOption(migrateKwd, "If true, assume yes at the migrate prompt. If false, assume no."),
cmdkit.BoolOption(enableFloodSubKwd, "Instantiate the ipfs daemon with the experimental pubsub feature enabled."),
cmdkit.BoolOption(enableIPNSPubSubKwd, "Enable IPNS record distribution through pubsub; enables pubsub."),
cmdkit.BoolOption(enableMultiplexKwd, "Add the experimental 'go-multiplex' stream muxer to libp2p on construction.").WithDefault(true),

// TODO: add way to override addresses. tricky part: updating the config if also --init.
Expand Down Expand Up @@ -283,6 +285,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {

offline, _, _ := req.Option(offlineKwd).Bool()
pubsub, _, _ := req.Option(enableFloodSubKwd).Bool()
ipnsps, _, _ := req.Option(enableIPNSPubSubKwd).Bool()
mplex, _, _ := req.Option(enableMultiplexKwd).Bool()

// Start assembling node config
Expand All @@ -292,6 +295,7 @@ func daemonFunc(req cmds.Request, re cmds.ResponseEmitter) {
Online: !offline,
ExtraOpts: map[string]bool{
"pubsub": pubsub,
"ipnsps": ipnsps,
"mplex": mplex,
},
//TODO(Kubuxu): refactor Online vs Offline by adding Permanent vs Ephemeral
Expand Down
2 changes: 1 addition & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

if cfg.Online {
do := setupDiscoveryOption(rcfg.Discovery)
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("mplex")); err != nil {
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do, cfg.getOpt("pubsub"), cfg.getOpt("ipnsps"), cfg.getOpt("mplex")); err != nil {
return err
}
} else {
Expand Down
163 changes: 163 additions & 0 deletions core/commands/ipnsps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package commands

import (
"errors"
"fmt"
"io"
"strings"

cmds "github.com/ipfs/go-ipfs/commands"
e "github.com/ipfs/go-ipfs/core/commands/e"
ns "github.com/ipfs/go-ipfs/namesys"

cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit"
)

type ipnsPubsubState struct {
Enabled bool
}

type ipnsPubsubCancel struct {
Canceled bool
}

// IpnsPubsubCmd is the subcommand that allows us to manage the IPNS pubsub system
var IpnsPubsubCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "IPNS pubsub management",
ShortDescription: `
Manage and inspect the state of the IPNS pubsub resolver.

Note: this command is experimental and subject to change as the system is refined
`,
},
Subcommands: map[string]*cmds.Command{
"state": ipnspsStateCmd,
"subs": ipnspsSubsCmd,
"cancel": ipnspsCancelCmd,
},
}

var ipnspsStateCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Query the state of IPNS pubsub",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

_, ok := n.Namesys.GetResolver("pubsub")
res.SetOutput(&ipnsPubsubState{ok})
},
Type: ipnsPubsubState{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

output, ok := v.(*ipnsPubsubState)
if !ok {
return nil, e.TypeErr(output, v)
}

var state string
if output.Enabled {
state = "enabled"
} else {
state = "disabled"
}

return strings.NewReader(state + "\n"), nil
},
},
}

var ipnspsSubsCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Show current name subscriptions",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}

psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
}

res.SetOutput(&stringList{psr.GetSubscriptions()})
},
Type: stringList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: stringListMarshaler,
},
}

var ipnspsCancelCmd = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Cancel a name subscription",
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

r, ok := n.Namesys.GetResolver("pubsub")
if !ok {
res.SetError(errors.New("IPNS pubsub subsystem is not enabled"), cmdkit.ErrClient)
return
}

psr, ok := r.(*ns.PubsubResolver)
if !ok {
res.SetError(fmt.Errorf("unexpected resolver type: %v", r), cmdkit.ErrNormal)
return
}

ok = psr.Cancel(req.Arguments()[0])
res.SetOutput(&ipnsPubsubCancel{ok})
},
Arguments: []cmdkit.Argument{
cmdkit.StringArg("name", true, false, "Name to cancel the subscription for."),
},
Type: ipnsPubsubCancel{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}

output, ok := v.(*ipnsPubsubCancel)
if !ok {
return nil, e.TypeErr(output, v)
}

var state string
if output.Canceled {
state = "canceled"
} else {
state = "no subscription"
}

return strings.NewReader(state + "\n"), nil
},
},
}
2 changes: 1 addition & 1 deletion core/commands/mount_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

cmds "github.com/ipfs/go-ipfs/commands"

cmdkit "gx/ipfs/QmSNbH2A1evCCbJSDC6u3RV3GGDhgu6pRGbXHvrN89tMKf/go-ipfs-cmdkit"
cmdkit "gx/ipfs/QmUyfy4QSr3NXym4etEiRyxBLqqAeKHJuRdi8AACxg63fZ/go-ipfs-cmdkit"
)

var MountCmd = &cmds.Command{
Expand Down
1 change: 1 addition & 0 deletions core/commands/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,6 @@ Resolve the value of a dnslink:
Subcommands: map[string]*cmds.Command{
"publish": PublishCmd,
"resolve": IpnsCmd,
"pubsub": IpnsPubsubCmd,
},
}
11 changes: 9 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type Mounts struct {
Ipns mount.Mount
}

func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, mplex bool) error {
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption, pubsub, ipnsps, mplex bool) error {

if n.PeerHost != nil { // already online.
return errors.New("node already online")
Expand Down Expand Up @@ -249,10 +249,17 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
return err
}

if pubsub {
if pubsub || ipnsps {
n.Floodsub = floodsub.NewFloodSub(ctx, peerhost)
}

if ipnsps {
err = namesys.AddPubsubNameSystem(ctx, n.Namesys, n.PeerHost, n.Routing, n.Repo.Datastore(), n.Floodsub)
if err != nil {
return err
}
}

n.P2P = p2p.NewP2P(n.Identity, n.PeerHost, n.Peerstore)

// setup local discovery
Expand Down
4 changes: 4 additions & 0 deletions core/corehttp/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func (m mockNamesys) PublishWithEOL(ctx context.Context, name ci.PrivKey, value
return errors.New("not implemented for mockNamesys")
}

func (m mockNamesys) GetResolver(subs string) (namesys.Resolver, bool) {
return nil, false
}

func newNodeWithMockNamesys(ns mockNamesys) (*core.IpfsNode, error) {
c := config.Config{
Identity: config.Identity{
Expand Down
8 changes: 8 additions & 0 deletions namesys/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var ErrPublishFailed = errors.New("Could not publish name.")
type NameSystem interface {
Resolver
Publisher
ResolverLookup
}

// Resolver is an object capable of resolving names.
Expand Down Expand Up @@ -112,3 +113,10 @@ type Publisher interface {
// call once the records spec is implemented
PublishWithEOL(ctx context.Context, name ci.PrivKey, value path.Path, eol time.Time) error
}

// ResolverLookup is an object capable of finding resolvers for a subsystem
type ResolverLookup interface {

// GetResolver retrieves a resolver associated with a subsystem
GetResolver(subs string) (Resolver, bool)
}
Loading