Skip to content

Commit

Permalink
bus: close syncer and wait for wind down
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Aug 15, 2024
1 parent 46dc438 commit 5e6e08f
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 104 deletions.
12 changes: 12 additions & 0 deletions bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/gateway"
rhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/jape"
"go.sia.tech/renterd/alerts"
Expand Down Expand Up @@ -82,6 +85,15 @@ type (
TriggerUpdate()
}

Syncer interface {
io.Closer
Addr() string
BroadcastHeader(h gateway.BlockHeader)
BroadcastTransactionSet([]types.Transaction)
Connect(ctx context.Context, addr string) (*syncer.Peer, error)
Peers() []*syncer.Peer
}

Wallet interface {
Address() types.Address
Balance() (wallet.Balance, error)
Expand Down
76 changes: 74 additions & 2 deletions bus/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"errors"
"fmt"
"net"
"net/http"
"os"
"path/filepath"
"time"

"go.sia.tech/core/consensus"
"go.sia.tech/core/gateway"
"go.sia.tech/core/types"
"go.sia.tech/coreutils"
"go.sia.tech/coreutils/chain"
"go.sia.tech/coreutils/syncer"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/alerts"
"go.sia.tech/renterd/config"
Expand Down Expand Up @@ -186,11 +189,79 @@ func NewNode(cfg NodeConfig, dir string, seed types.PrivateKey, logger *zap.Logg
return nil, nil, nil, err
}

// create syncer
s, err := NewSyncer(cfg, cm, sqlStore, logger)
// bootstrap the peer store
if cfg.Bootstrap {
if cfg.Network == nil {
return nil, nil, nil, errors.New("cannot bootstrap without a network")
}
var peers []string
switch cfg.Network.Name {
case "mainnet":
peers = syncer.MainnetBootstrapPeers
case "zen":
peers = syncer.ZenBootstrapPeers
case "anagami":
peers = syncer.AnagamiBootstrapPeers
default:
return nil, nil, nil, fmt.Errorf("no available bootstrap peers for unknown network '%s'", cfg.Network.Name)
}
for _, addr := range peers {
if err := sqlStore.AddPeer(addr); err != nil {
return nil, nil, nil, fmt.Errorf("%w: failed to add bootstrap peer '%s'", err, addr)
}
}
}

// create syncer, peers will reject us if our hostname is empty or
// unspecified, so use loopback
l, err := net.Listen("tcp", cfg.GatewayAddr)
if err != nil {
return nil, nil, nil, err
}
syncerAddr := l.Addr().String()
host, port, _ := net.SplitHostPort(syncerAddr)
if ip := net.ParseIP(host); ip == nil || ip.IsUnspecified() {
syncerAddr = net.JoinHostPort("127.0.0.1", port)
}

// create header
header := gateway.Header{
GenesisID: cfg.Genesis.ID(),
UniqueID: gateway.GenerateUniqueID(),
NetAddress: syncerAddr,
}

// create syncer options
opts := []syncer.Option{
syncer.WithLogger(logger.Named("syncer")),
syncer.WithSendBlocksTimeout(time.Minute),
}
if cfg.SyncerPeerDiscoveryInterval > 0 {
opts = append(opts, syncer.WithPeerDiscoveryInterval(cfg.SyncerPeerDiscoveryInterval))
}
if cfg.SyncerSyncInterval > 0 {
opts = append(opts, syncer.WithSyncInterval(cfg.SyncerSyncInterval))
}

// create the syncer
s := syncer.New(l, cm, sqlStore, header, opts...)

// start syncer
errChan := make(chan error, 1)
go func() {
errChan <- s.Run(context.Background())
close(errChan)
}()

// create a helper function to wait for syncer to wind down on shutdown
syncerShutdown := func(ctx context.Context) error {
select {
case err := <-errChan:
return err
case <-ctx.Done():
return context.Cause(ctx)
}
}

// create bus
announcementMaxAgeHours := time.Duration(cfg.AnnouncementMaxAgeHours) * time.Hour
Expand All @@ -206,6 +277,7 @@ func NewNode(cfg NodeConfig, dir string, seed types.PrivateKey, logger *zap.Logg
b.Shutdown(ctx),
sqlStore.Close(),
bdb.Close(),
syncerShutdown(ctx),
)
}
return b.Handler(), shutdownFn, cm, nil
Expand Down
102 changes: 0 additions & 102 deletions bus/syncer.go

This file was deleted.

0 comments on commit 5e6e08f

Please sign in to comment.