Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(splitstore): merry christmas lotus! Remove ~120 G from lotus datastore #12803

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# UNRELEASED

- Sync snapshots directly to hotstore. Save 120 G of lotus disk space. ([filecoin-project/lotus#12800](https://github.com/filecoin-project/lotus/pull/12803))
- Add json output of tipsets to `louts chain list`. ([filecoin-project/lotus#12691](https://github.com/filecoin-project/lotus/pull/12691))
- Remove IPNI advertisement relay over pubsub via Lotus node as it now has been deprecated. ([filecoin-project/lotus#12768](https://github.com/filecoin-project/lotus/pull/12768)
- During a network upgrade, log migration progress every 2 seconds so they are more helpful and informative. The `LOTUS_MIGRATE_PROGRESS_LOG_SECONDS` environment variable can be used to change this if needed. ([filecoin-project/lotus#12732](https://github.com/filecoin-project/lotus/pull/12732))
Expand Down
5 changes: 5 additions & 0 deletions blockstore/splitstore/markset_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func NewMapMarkSetEnv(path string) (*MapMarkSetEnv, error) {

func (e *MapMarkSetEnv) New(name string, sizeHint int64) (MarkSet, error) {
path := filepath.Join(e.path, name)
// Limit map size to 1k if sizeHint exceeds this value
// This prevents accidental hanging when sizeHint is too large
if sizeHint > 1000 {
sizeHint = 1000
}
return &MapMarkSet{
set: make(map[string]struct{}, sizeHint),
path: path,
Expand Down
4 changes: 4 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ type Config struct {
// from the hotstore should be written to the cold store
UniversalColdBlocks bool

// FullWarmup indicates to do a chain traversal upon splitstore init to copy
// from cold store to hot store
FullWarmup bool

// HotstoreMessageRetention indicates the hotstore retention policy for messages.
// It has the following semantics:
// - a value of 0 will only retain messages within the compaction boundary (4 finalities)
Expand Down
42 changes: 38 additions & 4 deletions blockstore/splitstore/splitstore_warmup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
var (
// WarmupBoundary is the number of epochs to load state during warmup.
WarmupBoundary = policy.ChainFinality
// Empirically taken from December 2024
MarkSetEstimate int64 = 10_000_000_000
)

// warmup acquires the compaction lock and spawns a goroutine to warm up the hotstore;
Expand All @@ -35,7 +37,12 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
log.Info("warming up hotstore")
start := time.Now()

err := s.doWarmup(curTs)
var err error
if s.cfg.FullWarmup {
err = s.doWarmup(curTs)
} else {
err = s.doWarmup2(curTs)
}
if err != nil {
log.Errorf("error warming up hotstore: %s", err)
return
Expand All @@ -47,9 +54,36 @@ func (s *SplitStore) warmup(curTs *types.TipSet) error {
return nil
}

// the actual warmup procedure; it walks the chain loading all state roots at the boundary
// and headers all the way up to genesis.
// objects are written in batches so as to minimize overhead.
// Warmup2
func (s *SplitStore) doWarmup2(curTs *types.TipSet) error {
log.Infow("warmup starting")

epoch := curTs.Height()
s.markSetSize = MarkSetEstimate
err := s.ds.Put(s.ctx, markSetSizeKey, int64ToBytes(s.markSetSize))
if err != nil {
log.Warnf("error saving mark set size: %s", err)
}

// save the warmup epoch
err = s.ds.Put(s.ctx, warmupEpochKey, epochToBytes(epoch))
if err != nil {
return xerrors.Errorf("error saving warm up epoch: %w", err)
}
s.warmupEpoch.Store(int64(epoch))

// also save the compactionIndex, as this is used as an indicator of warmup for upgraded nodes
err = s.ds.Put(s.ctx, compactionIndexKey, int64ToBytes(s.compactionIndex))
if err != nil {
return xerrors.Errorf("error saving compaction index: %w", err)
}
return nil
}

// the full warmup procedure
// this was standard warmup before we wrote snapshots directly to the hotstore
// now this is used only if explicitly configured. A good use case for this is
// when starting splitstore off of an unpruned full node.
func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
var boundaryEpoch abi.ChainEpoch
epoch := curTs.Height()
Expand Down
2 changes: 1 addition & 1 deletion chain/gen/gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func NewGeneratorWithSectorsAndUpgradeSchedule(numSectors int, us stmgr.UpgradeS
return nil, xerrors.Errorf("failed to get metadata datastore: %w", err)
}

bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion chain/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func BenchmarkGetRandomness(b *testing.B) {
b.Fatal(err)
}

bs, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(context.TODO(), repo.UniversalBlockstore)
if err != nil {
b.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ var chainBalanceStateCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down Expand Up @@ -721,7 +721,7 @@ var chainPledgeCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return xerrors.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/deal-label.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var dealLabelCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var diffMinerStates = &cli.Command{
_ = lkrepo.Close()
}(lkrepo)

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/export-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var exportCarCmd = &cli.Command{

lr, err := r.Lock(repo.FullNode)
if err == nil {
bs, err = lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err = lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ var exportChainCmd = &cli.Command{

defer fi.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/gas-estimation.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var gasTraceCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down Expand Up @@ -177,7 +177,7 @@ var replayOfflineCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/lotus-shed/import-car.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var importCarCmd = &cli.Command{
return xerrors.Errorf("opening the car file: %w", err)
}

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return err
}
Expand Down Expand Up @@ -118,7 +118,7 @@ var importObjectCmd = &cli.Command{
}
defer lr.Close() //nolint:errcheck

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/invariants.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var invariantsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open universal blockstore %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/lotus-shed/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,15 @@ var migrationsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, closer, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open universal blockstore %w", err)
}
defer func() {
if err := closer(); err != nil {
log.Warnf("failed to close universal blockstore: %s", err)
}
}()

path, err := lkrepo.SplitstorePath()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/miner-peerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var minerPeeridCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/miner-types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ var minerTypesCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/msig.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ var multisigGetAllCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ var stateTreePruneCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
69 changes: 69 additions & 0 deletions cmd/lotus-shed/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"context"
"encoding/binary"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -31,6 +32,7 @@ var splitstoreCmd = &cli.Command{
splitstoreClearCmd,
splitstoreCheckCmd,
splitstoreInfoCmd,
splitstoreRepoInfoCmd,
},
}

Expand Down Expand Up @@ -353,6 +355,40 @@ func deleteSplitstoreKeys(lr repo.LockedRepo) error {
return nil
}

func printSplitstoreKeys(lr repo.LockedRepo) error {
ds, err := lr.Datastore(context.TODO(), "/metadata")
if err != nil {
return xerrors.Errorf("error opening datastore: %w", err)
}
if closer, ok := ds.(io.Closer); ok {
defer closer.Close() //nolint
}

res, err := ds.Query(context.Background(), query.Query{Prefix: "/splitstore"})
if err != nil {
return xerrors.Errorf("error querying datastore for splitstore keys: %w", err)
}

fmt.Println("Splitstore keys and values:")
for r := range res.Next() {
if r.Error != nil {
return xerrors.Errorf("datastore query error: %w", r.Error)
}

// Get the value for this key
value, err := ds.Get(context.Background(), datastore.NewKey(r.Key))
if err != nil {
return xerrors.Errorf("error getting value for key %s: %w", r.Key, err)
}

// Decode the value as a uvarint
decoded, _ := binary.Uvarint(value)
fmt.Printf(" %s: %d\n", r.Key, decoded)
}

return nil
}

// badger logging through go-log
type badgerLogger struct {
*zap.SugaredLogger
Expand All @@ -378,6 +414,39 @@ var splitstoreCheckCmd = &cli.Command{
},
}

var splitstoreRepoInfoCmd = &cli.Command{
Name: "splitstore-info",
Usage: "Display splitstore metadata information",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "repo",
Value: "~/.lotus",
Usage: "lotus repo path",
},
},
Action: func(cctx *cli.Context) error {
r, err := repo.NewFS(cctx.String("repo"))
if err != nil {
return xerrors.Errorf("error opening fs repo: %w", err)
}

exists, err := r.Exists()
if err != nil {
return err
}
if !exists {
return xerrors.Errorf("lotus repo doesn't exist")
}

lr, err := r.Lock(repo.FullNode)
if err != nil {
return xerrors.Errorf("error locking repo: %w", err)
}

return printSplitstoreKeys(lr)
},
}

var splitstoreInfoCmd = &cli.Command{
Name: "info",
Description: "prints some basic splitstore information",
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/state-stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func loadChainStore(ctx context.Context, repoPath string) (*StoreHandle, error)
return nil, err
}

cold, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
cold, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, xerrors.Errorf("failed to open universal blockstore %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-shed/terminations.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var terminationsCmd = &cli.Command{

defer lkrepo.Close() //nolint:errcheck

bs, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lkrepo.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return fmt.Errorf("failed to open blockstore: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/lotus-sim/simulation/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewNode(ctx context.Context, r repo.Repo) (nd *Node, _err error) {
}
}()

bs, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
bs, _, err := lr.Blockstore(ctx, repo.UniversalBlockstore)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading