diff --git a/bus/bus.go b/bus/bus.go index 56ccfbc41..c65f49e92 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -9,13 +9,16 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "time" "go.sia.tech/core/consensus" + "go.sia.tech/core/gateway" rhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" "go.sia.tech/coreutils/chain" + "go.sia.tech/coreutils/syncer" "go.sia.tech/coreutils/wallet" "go.sia.tech/jape" "go.sia.tech/renterd/alerts" @@ -82,6 +85,15 @@ type ( TriggerUpdate() } + Syncer interface { + io.Closer + Addr() string + BroadcastHeader(h gateway.BlockHeader) + BroadcastTransactionSet([]types.Transaction) + Connect(ctx context.Context, addr string) (*syncer.Peer, error) + Peers() []*syncer.Peer + } + Wallet interface { Address() types.Address Balance() (wallet.Balance, error) diff --git a/bus/node.go b/bus/node.go index 2fbaf9c96..a46eb111d 100644 --- a/bus/node.go +++ b/bus/node.go @@ -4,15 +4,18 @@ import ( "context" "errors" "fmt" + "net" "net/http" "os" "path/filepath" "time" "go.sia.tech/core/consensus" + "go.sia.tech/core/gateway" "go.sia.tech/core/types" "go.sia.tech/coreutils" "go.sia.tech/coreutils/chain" + "go.sia.tech/coreutils/syncer" "go.sia.tech/coreutils/wallet" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/config" @@ -186,11 +189,79 @@ func NewNode(cfg NodeConfig, dir string, seed types.PrivateKey, logger *zap.Logg return nil, nil, nil, err } - // create syncer - s, err := NewSyncer(cfg, cm, sqlStore, logger) + // bootstrap the peer store + if cfg.Bootstrap { + if cfg.Network == nil { + return nil, nil, nil, errors.New("cannot bootstrap without a network") + } + var peers []string + switch cfg.Network.Name { + case "mainnet": + peers = syncer.MainnetBootstrapPeers + case "zen": + peers = syncer.ZenBootstrapPeers + case "anagami": + peers = syncer.AnagamiBootstrapPeers + default: + return nil, nil, nil, fmt.Errorf("no available bootstrap peers for unknown network '%s'", cfg.Network.Name) + } + for _, addr := range peers { + if err := sqlStore.AddPeer(addr); err != nil { + return nil, nil, nil, fmt.Errorf("%w: failed to add bootstrap peer '%s'", err, addr) + } + } + } + + // create syncer, peers will reject us if our hostname is empty or + // unspecified, so use loopback + l, err := net.Listen("tcp", cfg.GatewayAddr) if err != nil { return nil, nil, nil, err } + syncerAddr := l.Addr().String() + host, port, _ := net.SplitHostPort(syncerAddr) + if ip := net.ParseIP(host); ip == nil || ip.IsUnspecified() { + syncerAddr = net.JoinHostPort("127.0.0.1", port) + } + + // create header + header := gateway.Header{ + GenesisID: cfg.Genesis.ID(), + UniqueID: gateway.GenerateUniqueID(), + NetAddress: syncerAddr, + } + + // create syncer options + opts := []syncer.Option{ + syncer.WithLogger(logger.Named("syncer")), + syncer.WithSendBlocksTimeout(time.Minute), + } + if cfg.SyncerPeerDiscoveryInterval > 0 { + opts = append(opts, syncer.WithPeerDiscoveryInterval(cfg.SyncerPeerDiscoveryInterval)) + } + if cfg.SyncerSyncInterval > 0 { + opts = append(opts, syncer.WithSyncInterval(cfg.SyncerSyncInterval)) + } + + // create the syncer + s := syncer.New(l, cm, sqlStore, header, opts...) + + // start syncer + errChan := make(chan error, 1) + go func() { + errChan <- s.Run(context.Background()) + close(errChan) + }() + + // create a helper function to wait for syncer to wind down on shutdown + syncerShutdown := func(ctx context.Context) error { + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return context.Cause(ctx) + } + } // create bus announcementMaxAgeHours := time.Duration(cfg.AnnouncementMaxAgeHours) * time.Hour @@ -206,6 +277,7 @@ func NewNode(cfg NodeConfig, dir string, seed types.PrivateKey, logger *zap.Logg b.Shutdown(ctx), sqlStore.Close(), bdb.Close(), + syncerShutdown(ctx), ) } return b.Handler(), shutdownFn, cm, nil diff --git a/bus/syncer.go b/bus/syncer.go deleted file mode 100644 index c1f941cce..000000000 --- a/bus/syncer.go +++ /dev/null @@ -1,102 +0,0 @@ -package bus - -import ( - "context" - "errors" - "fmt" - "io" - "net" - "time" - - "go.sia.tech/core/gateway" - "go.sia.tech/core/types" - "go.sia.tech/coreutils/syncer" - "go.uber.org/zap" -) - -type Syncer interface { - io.Closer - Addr() string - BroadcastHeader(h gateway.BlockHeader) - BroadcastTransactionSet([]types.Transaction) - Connect(ctx context.Context, addr string) (*syncer.Peer, error) - Peers() []*syncer.Peer -} - -// NewSyncer creates a syncer using the given configuration. The syncer that is -// returned is already running, closing it will close the underlying listener -// causing the syncer to stop. -func NewSyncer(cfg NodeConfig, cm syncer.ChainManager, ps syncer.PeerStore, logger *zap.Logger) (Syncer, error) { - // validate config - if cfg.Bootstrap && cfg.Network == nil { - return nil, errors.New("cannot bootstrap without a network") - } - - // bootstrap the syncer - if cfg.Bootstrap { - peers, err := peers(cfg.Network.Name) - if err != nil { - return nil, err - } - for _, addr := range peers { - if err := ps.AddPeer(addr); err != nil { - return nil, fmt.Errorf("%w: failed to add bootstrap peer '%s'", err, addr) - } - } - } - - // create syncer - l, err := net.Listen("tcp", cfg.GatewayAddr) - if err != nil { - return nil, err - } - syncerAddr := l.Addr().String() - - // peers will reject us if our hostname is empty or unspecified, so use loopback - host, port, _ := net.SplitHostPort(syncerAddr) - if ip := net.ParseIP(host); ip == nil || ip.IsUnspecified() { - syncerAddr = net.JoinHostPort("127.0.0.1", port) - } - - // create header - header := gateway.Header{ - GenesisID: cfg.Genesis.ID(), - UniqueID: gateway.GenerateUniqueID(), - NetAddress: syncerAddr, - } - - // start the syncer - s := syncer.New(l, cm, ps, header, options(cfg, logger)...) - go s.Run(context.Background()) - - return s, nil -} - -func options(cfg NodeConfig, logger *zap.Logger) (opts []syncer.Option) { - opts = append(opts, - syncer.WithLogger(logger.Named("syncer")), - syncer.WithSendBlocksTimeout(time.Minute), - ) - - if cfg.SyncerPeerDiscoveryInterval > 0 { - opts = append(opts, syncer.WithPeerDiscoveryInterval(cfg.SyncerPeerDiscoveryInterval)) - } - if cfg.SyncerSyncInterval > 0 { - opts = append(opts, syncer.WithSyncInterval(cfg.SyncerSyncInterval)) - } - - return -} - -func peers(network string) ([]string, error) { - switch network { - case "mainnet": - return syncer.MainnetBootstrapPeers, nil - case "zen": - return syncer.ZenBootstrapPeers, nil - case "anagami": - return syncer.AnagamiBootstrapPeers, nil - default: - return nil, fmt.Errorf("no available bootstrap peers for unknown network '%s'", network) - } -}