diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 0afb15c11c4..410cc50df0c 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -115,6 +115,23 @@ type Config struct { // A positive value is the number of compactions before a full GC is performed; // a value of 1 will perform full GC in every compaction. HotStoreFullGCFrequency uint64 + + // HotstoreMaxSpaceTarget suggests the max allowed space the hotstore can take. + // This is not a hard limit, it is possible for the hotstore to exceed the target + // for example if state grows massively between compactions. The splitstore + // will make a best effort to avoid overflowing the target and in practice should + // never overflow. This field is used when doing GC at the end of a compaction to + // adaptively choose moving GC + HotstoreMaxSpaceTarget uint64 + + // Moving GC will be triggered when total moving size exceeds + // HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold + HotstoreMaxSpaceThreshold uint64 + + // Safety buffer to prevent moving GC from overflowing disk. + // Moving GC will not occur when total moving size exceeds + // HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer + HotstoreMaxSpaceSafetyBuffer uint64 } // ChainAccessor allows the Splitstore to access the chain. It will most likely @@ -165,6 +182,7 @@ type SplitStore struct { compactionIndex int64 pruneIndex int64 + onlineGCCnt int64 ctx context.Context cancel func() @@ -195,6 +213,17 @@ type SplitStore struct { // registered protectors protectors []func(func(cid.Cid) error) error + + // dag sizes measured during latest compaction + // logged and used for GC strategy + + // protected by compaction lock + szWalk int64 + szProtectedTxns int64 + szKeys int64 // approximate, not counting keys protected when entering critical section + + // protected by txnLk + szMarkedLiveRefs int64 } var _ bstore.Blockstore = (*SplitStore)(nil) diff --git a/blockstore/splitstore/splitstore_check.go b/blockstore/splitstore/splitstore_check.go index 33651598020..2645c78c5b5 100644 --- a/blockstore/splitstore/splitstore_check.go +++ b/blockstore/splitstore/splitstore_check.go @@ -95,7 +95,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { } defer visitor.Close() //nolint - err = s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor, + size := s.walkChain(curTs, boundaryEpoch, boundaryEpoch, visitor, func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk @@ -133,7 +133,7 @@ func (s *SplitStore) doCheck(curTs *types.TipSet) error { return err } - log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt) + log.Infow("check done", "cold", *coldCnt, "missing", *missingCnt, "walk size", size) write("--") write("cold: %d missing: %d", *coldCnt, *missingCnt) write("DONE") diff --git a/blockstore/splitstore/splitstore_compact.go b/blockstore/splitstore/splitstore_compact.go index 59bdd515d5e..12d0ff899b5 100644 --- a/blockstore/splitstore/splitstore_compact.go +++ b/blockstore/splitstore/splitstore_compact.go @@ -66,7 +66,8 @@ var ( ) const ( - batchSize = 16384 + batchSize = 16384 + cidKeySize = 128 ) func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { @@ -199,9 +200,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { log.Debugf("marking %d live refs", len(cids)) startMark := time.Now() + szMarked := new(int64) + count := new(int32) visitor := newConcurrentVisitor() - walkObject := func(c cid.Cid) error { + walkObject := func(c cid.Cid) (int64, error) { return s.walkObjectIncomplete(c, visitor, func(c cid.Cid) error { if isUnitaryObject(c) { @@ -228,10 +231,12 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { // optimize the common case of single put if len(cids) == 1 { - if err := walkObject(cids[0]); err != nil { + sz, err := walkObject(cids[0]) + if err != nil { log.Errorf("error marking tipset refs: %s", err) } log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count) + atomic.AddInt64(szMarked, sz) return } @@ -243,9 +248,11 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { worker := func() error { for c := range workch { - if err := walkObject(c); err != nil { + sz, err := walkObject(c) + if err != nil { return err } + atomic.AddInt64(szMarked, sz) } return nil @@ -268,7 +275,8 @@ func (s *SplitStore) markLiveRefs(cids []cid.Cid) { log.Errorf("error marking tipset refs: %s", err) } - log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count) + log.Debugw("marking live refs done", "took", time.Since(startMark), "marked", *count, "size marked", *szMarked) + s.szMarkedLiveRefs += atomic.LoadInt64(szMarked) } // transactionally protect a view @@ -361,6 +369,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { log.Infow("protecting transactional references", "refs", len(txnRefs)) count := 0 + sz := new(int64) workch := make(chan cid.Cid, len(txnRefs)) startProtect := time.Now() @@ -393,10 +402,11 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { worker := func() error { for c := range workch { - err := s.doTxnProtect(c, markSet) + szTxn, err := s.doTxnProtect(c, markSet) if err != nil { return xerrors.Errorf("error protecting transactional references to %s: %w", c, err) } + atomic.AddInt64(sz, szTxn) } return nil } @@ -409,16 +419,16 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { if err := g.Wait(); err != nil { return err } - - log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count) + s.szProtectedTxns += atomic.LoadInt64(sz) + log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count, "protected size", sz) } } // transactionally protect a reference by walking the object and marking. // concurrent markings are short circuited by checking the markset. -func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { +func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) { if err := s.checkClosing(); err != nil { - return err + return 0, err } // Note: cold objects are deleted heaviest first, so the consituents of an object @@ -509,6 +519,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { // might be potentially inconsistent; abort compaction and notify the user to intervene. return xerrors.Errorf("checkpoint exists; aborting compaction") } + s.clearSizeMeasurements() currentEpoch := curTs.Height() boundaryEpoch := currentEpoch - CompactionBoundary @@ -598,7 +609,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } err = s.walkChain(curTs, boundaryEpoch, inclMsgsEpoch, &noopVisitor{}, fHot, fCold) - if err != nil { return xerrors.Errorf("error marking: %w", err) } @@ -638,7 +648,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { defer purgew.Close() //nolint:errcheck // some stats for logging - var hotCnt, coldCnt, purgeCnt int + var hotCnt, coldCnt, purgeCnt int64 err = s.hot.ForEachKey(func(c cid.Cid) error { // was it marked? mark, err := markSet.Has(c) @@ -690,8 +700,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("cold collection done", "took", time.Since(startCollect)) log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt, "purge", purgeCnt) - stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt))) - stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + s.szKeys = hotCnt * cidKeySize + stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt)) + stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt)) if err := s.checkClosing(); err != nil { return err @@ -773,8 +784,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return xerrors.Errorf("error purging cold objects: %w", err) } log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge)) - s.endCriticalSection() + log.Infow("critical section done", "total protected size", s.szProtectedTxns, "total marked live size", s.szMarkedLiveRefs) if err := checkpoint.Close(); err != nil { log.Warnf("error closing checkpoint: %s", err) @@ -907,6 +918,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp copy(toWalk, ts.Cids()) walkCnt := new(int64) scanCnt := new(int64) + szWalk := new(int64) tsRef := func(blkCids []cid.Cid) (cid.Cid, error) { return types.NewTipSetKey(blkCids...).Cid() @@ -942,48 +954,64 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if err != nil { return xerrors.Errorf("error computing cid reference to parent tipset") } - if err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk); err != nil { + sz, err := s.walkObjectIncomplete(pRef, visitor, fHot, stopWalk) + if err != nil { return xerrors.Errorf("error walking parent tipset cid reference") } + atomic.AddInt64(szWalk, sz) // message are retained if within the inclMsgs boundary if hdr.Height >= inclMsgs && hdr.Height > 0 { if inclMsgs < inclState { // we need to use walkObjectIncomplete here, as messages/receipts may be missing early on if we // synced from snapshot and have a long HotStoreMessageRetentionPolicy. - if err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk); err != nil { + sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fHot, stopWalk) + if err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } + atomic.AddInt64(szWalk, sz) - if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk); err != nil { + sz, err = s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fHot, stopWalk) + if err != nil { return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } + atomic.AddInt64(szWalk, sz) } else { - if err := s.walkObject(hdr.Messages, visitor, fHot); err != nil { + sz, err = s.walkObject(hdr.Messages, visitor, fHot) + if err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } + atomic.AddInt64(szWalk, sz) - if err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot); err != nil { + sz, err := s.walkObject(hdr.ParentMessageReceipts, visitor, fHot) + if err != nil { return xerrors.Errorf("error walking message receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } + atomic.AddInt64(szWalk, sz) } } // messages and receipts outside of inclMsgs are included in the cold store if hdr.Height < inclMsgs && hdr.Height > 0 { - if err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk); err != nil { + sz, err := s.walkObjectIncomplete(hdr.Messages, visitor, fCold, stopWalk) + if err != nil { return xerrors.Errorf("error walking messages (cid: %s): %w", hdr.Messages, err) } - if err := s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk); err != nil { + atomic.AddInt64(szWalk, sz) + sz, err = s.walkObjectIncomplete(hdr.ParentMessageReceipts, visitor, fCold, stopWalk) + if err != nil { return xerrors.Errorf("error walking messages receipts (cid: %s): %w", hdr.ParentMessageReceipts, err) } + atomic.AddInt64(szWalk, sz) } // state is only retained if within the inclState boundary, with the exception of genesis if hdr.Height >= inclState || hdr.Height == 0 { - if err := s.walkObject(hdr.ParentStateRoot, visitor, fHot); err != nil { + sz, err := s.walkObject(hdr.ParentStateRoot, visitor, fHot) + if err != nil { return xerrors.Errorf("error walking state root (cid: %s): %w", hdr.ParentStateRoot, err) } + atomic.AddInt64(szWalk, sz) atomic.AddInt64(scanCnt, 1) } @@ -1001,9 +1029,11 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp if err != nil { return xerrors.Errorf("error computing cid reference to parent tipset") } - if err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk); err != nil { + sz, err := s.walkObjectIncomplete(hRef, visitor, fHot, stopWalk) + if err != nil { return xerrors.Errorf("error walking parent tipset cid reference") } + atomic.AddInt64(szWalk, sz) for len(toWalk) > 0 { // walking can take a while, so check this with every opportunity @@ -1047,123 +1077,129 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp } } - log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt) - + log.Infow("chain walk done", "walked", *walkCnt, "scanned", *scanCnt, "walk size", szWalk) + s.szWalk = atomic.LoadInt64(szWalk) return nil } -func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) error { +func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid) error) (int64, error) { + var sz int64 visit, err := visitor.Visit(c) if err != nil { - return xerrors.Errorf("error visiting object: %w", err) + return 0, xerrors.Errorf("error visiting object: %w", err) } if !visit { - return nil + return sz, nil } if err := f(c); err != nil { if err == errStopWalk { - return nil + return sz, nil } - return err + return 0, err } if c.Prefix().Codec != cid.DagCBOR { - return nil + return sz, nil } // check this before recursing if err := s.checkClosing(); err != nil { - return err + return 0, err } var links []cid.Cid err = s.view(c, func(data []byte) error { + sz += int64(len(data)) return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { links = append(links, c) }) }) if err != nil { - return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) } for _, c := range links { - err := s.walkObject(c, visitor, f) + szLink, err := s.walkObject(c, visitor, f) if err != nil { - return xerrors.Errorf("error walking link (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err) } + sz += szLink } - return nil + return sz, nil } // like walkObject, but the object may be potentially incomplete (references missing) -func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) error { +func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, missing func(cid.Cid) error) (int64, error) { + var sz int64 visit, err := visitor.Visit(c) if err != nil { - return xerrors.Errorf("error visiting object: %w", err) + return 0, xerrors.Errorf("error visiting object: %w", err) } if !visit { - return nil + return sz, nil } // occurs check -- only for DAGs if c.Prefix().Codec == cid.DagCBOR { has, err := s.has(c) if err != nil { - return xerrors.Errorf("error occur checking %s: %w", c, err) + return 0, xerrors.Errorf("error occur checking %s: %w", c, err) } if !has { err = missing(c) if err == errStopWalk { - return nil + return sz, nil } - return err + return 0, err } } if err := f(c); err != nil { if err == errStopWalk { - return nil + return sz, nil } - return err + return 0, err } if c.Prefix().Codec != cid.DagCBOR { - return nil + return sz, nil } // check this before recursing if err := s.checkClosing(); err != nil { - return err + return sz, err } var links []cid.Cid err = s.view(c, func(data []byte) error { + sz += int64(len(data)) return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { links = append(links, c) }) }) if err != nil { - return xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error scanning linked block (cid: %s): %w", c, err) } for _, c := range links { - err := s.walkObjectIncomplete(c, visitor, f, missing) + szLink, err := s.walkObjectIncomplete(c, visitor, f, missing) if err != nil { - return xerrors.Errorf("error walking link (cid: %s): %w", c, err) + return 0, xerrors.Errorf("error walking link (cid: %s): %w", c, err) } + sz += szLink } - return nil + return sz, nil } // internal version used during compaction and related operations @@ -1429,8 +1465,9 @@ func (s *SplitStore) completeCompaction() error { } s.compactType = none - // Note: at this point we can start the splitstore; a compaction should run on - // the first head change, which will trigger gc on the hotstore. + // Note: at this point we can start the splitstore; base epoch is not + // incremented here so a compaction should run on the first head + // change, which will trigger gc on the hotstore. // We don't mind the second (back-to-back) compaction as the head will // have advanced during marking and coldset accumulation. return nil @@ -1488,6 +1525,13 @@ func (s *SplitStore) completePurge(coldr *ColdSetReader, checkpoint *Checkpoint, return nil } +func (s *SplitStore) clearSizeMeasurements() { + s.szKeys = 0 + s.szMarkedLiveRefs = 0 + s.szProtectedTxns = 0 + s.szWalk = 0 +} + // I really don't like having this code, but we seem to have some occasional DAG references with // missing constituents. During testing in mainnet *some* of these references *sometimes* appeared // after a little bit. @@ -1528,7 +1572,7 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) { missing = make(map[cid.Cid]struct{}) for c := range towalk { - err := s.walkObjectIncomplete(c, visitor, + _, err := s.walkObjectIncomplete(c, visitor, func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk diff --git a/blockstore/splitstore/splitstore_gc.go b/blockstore/splitstore/splitstore_gc.go index 3b53b8042cd..2ddb7d404e9 100644 --- a/blockstore/splitstore/splitstore_gc.go +++ b/blockstore/splitstore/splitstore_gc.go @@ -7,15 +7,61 @@ import ( bstore "github.com/filecoin-project/lotus/blockstore" ) +const ( + // Fraction of garbage in badger vlog for online GC traversal to collect garbage + AggressiveOnlineGCThreshold = 0.0001 +) + func (s *SplitStore) gcHotAfterCompaction() { + // Measure hotstore size, determine if we should do full GC, determine if we can do full GC. + // We should do full GC if + // FullGCFrequency is specified and compaction index matches frequency + // OR HotstoreMaxSpaceTarget is specified and total moving space is within 150 GB of target + // We can do full if + // HotstoreMaxSpaceTarget is not specified + // OR total moving space would not exceed 50 GB below target + // + // a) If we should not do full GC => online GC + // b) If we should do full GC and can => moving GC + // c) If we should do full GC and can't => aggressive online GC + getSize := func() int64 { + sizer, ok := s.hot.(bstore.BlockstoreSize) + if ok { + size, err := sizer.Size() + if err != nil { + log.Warnf("error getting hotstore size: %s, estimating empty hot store for targeting", err) + return 0 + } + return size + } + log.Errorf("Could not measure hotstore size, assuming it is 0 bytes, which it is not") + return 0 + } + hotSize := getSize() + + copySizeApprox := s.szKeys + s.szMarkedLiveRefs + s.szProtectedTxns + s.szWalk + shouldTarget := s.cfg.HotstoreMaxSpaceTarget > 0 && hotSize+copySizeApprox > int64(s.cfg.HotstoreMaxSpaceTarget)-int64(s.cfg.HotstoreMaxSpaceThreshold) + shouldFreq := s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 + shouldDoFull := shouldTarget || shouldFreq + canDoFull := s.cfg.HotstoreMaxSpaceTarget == 0 || hotSize+copySizeApprox < int64(s.cfg.HotstoreMaxSpaceTarget)-int64(s.cfg.HotstoreMaxSpaceSafetyBuffer) + log.Debugw("approximating new hot store size", "key size", s.szKeys, "marked live refs", s.szMarkedLiveRefs, "protected txns", s.szProtectedTxns, "walked DAG", s.szWalk) + log.Infof("measured hot store size: %d, approximate new size: %d, should do full %t, can do full %t", hotSize, copySizeApprox, shouldDoFull, canDoFull) + var opts []bstore.BlockstoreGCOption - if s.cfg.HotStoreFullGCFrequency > 0 && s.compactionIndex%int64(s.cfg.HotStoreFullGCFrequency) == 0 { + if shouldDoFull && canDoFull { opts = append(opts, bstore.WithFullGC(true)) + } else if shouldDoFull && !canDoFull { + log.Warnf("Attention! Estimated moving GC size %d is not within safety buffer %d of target max %d, performing aggressive online GC to attempt to bring hotstore size down safely", copySizeApprox, s.cfg.HotstoreMaxSpaceSafetyBuffer, s.cfg.HotstoreMaxSpaceTarget) + log.Warn("If problem continues you can 1) temporarily allocate more disk space to hotstore and 2) reflect in HotstoreMaxSpaceTarget OR trigger manual move with `lotus chain prune hot-moving`") + log.Warn("If problem continues and you do not have any more disk space you can run continue to manually trigger online GC at aggressive thresholds (< 0.01) with `lotus chain prune hot`") + + opts = append(opts, bstore.WithThreshold(AggressiveOnlineGCThreshold)) } if err := s.gcBlockstore(s.hot, opts); err != nil { log.Warnf("error garbage collecting hostore: %s", err) } + log.Infof("measured hot store size after GC: %d", getSize()) } func (s *SplitStore) gcBlockstore(b bstore.Blockstore, opts []bstore.BlockstoreGCOption) error { diff --git a/blockstore/splitstore/splitstore_reify.go b/blockstore/splitstore/splitstore_reify.go index aa14f090af6..07efedead38 100644 --- a/blockstore/splitstore/splitstore_reify.go +++ b/blockstore/splitstore/splitstore_reify.go @@ -101,7 +101,7 @@ func (s *SplitStore) doReify(c cid.Cid) { defer s.txnLk.RUnlock() count := 0 - err := s.walkObjectIncomplete(c, newTmpVisitor(), + _, err := s.walkObjectIncomplete(c, newTmpVisitor(), func(c cid.Cid) error { if isUnitaryObject(c) { return errStopWalk diff --git a/documentation/en/default-lotus-config.toml b/documentation/en/default-lotus-config.toml index 903573ed0fd..c5132171442 100644 --- a/documentation/en/default-lotus-config.toml +++ b/documentation/en/default-lotus-config.toml @@ -230,6 +230,35 @@ # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREFULLGCFREQUENCY #HotStoreFullGCFrequency = 20 + # HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC + # will run moving GC if disk utilization gets within a threshold (150 GB) of the target. + # Splitstore GC will NOT run moving GC if the total size of the move would get + # within 50 GB of the target, and instead will run a more aggressive online GC. + # If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore + # GC will trigger moving GC if either configuration condition is met. + # A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer. + # At this minimum size moving GC happens every time, any smaller and moving GC won't + # be able to run. In spring 2023 this minimum is ~550 GB. + # + # type: uint64 + # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACETARGET + #HotStoreMaxSpaceTarget = 0 + + # When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size + # exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold + # + # type: uint64 + # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACETHRESHOLD + #HotStoreMaxSpaceThreshold = 150000000000 + + # Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget + # is set. Moving GC will not occur when total moving size exceeds + # HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer + # + # type: uint64 + # env var: LOTUS_CHAINSTORE_SPLITSTORE_HOTSTOREMAXSPACESAFETYBUFFER + #HotstoreMaxSpaceSafetyBuffer = 50000000000 + [Cluster] # EXPERIMENTAL. config to enabled node cluster with raft consensus diff --git a/node/config/def.go b/node/config/def.go index fc8451ee3eb..2617ec5ba2f 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -95,7 +95,9 @@ func DefaultFullNode() *FullNode { HotStoreType: "badger", MarkSetType: "badger", - HotStoreFullGCFrequency: 20, + HotStoreFullGCFrequency: 20, + HotStoreMaxSpaceThreshold: 150_000_000_000, + HotstoreMaxSpaceSafetyBuffer: 50_000_000_000, }, }, Cluster: *DefaultUserRaftConfig(), diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index 8b79bed4fec..c620847084e 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -1286,6 +1286,35 @@ the compaction boundary; default is 0.`, A value of 0 disables, while a value 1 will do full GC in every compaction. Default is 20 (about once a week).`, }, + { + Name: "HotStoreMaxSpaceTarget", + Type: "uint64", + + Comment: `HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC +will run moving GC if disk utilization gets within a threshold (150 GB) of the target. +Splitstore GC will NOT run moving GC if the total size of the move would get +within 50 GB of the target, and instead will run a more aggressive online GC. +If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore +GC will trigger moving GC if either configuration condition is met. +A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer. +At this minimum size moving GC happens every time, any smaller and moving GC won't +be able to run. In spring 2023 this minimum is ~550 GB.`, + }, + { + Name: "HotStoreMaxSpaceThreshold", + Type: "uint64", + + Comment: `When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size +exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold`, + }, + { + Name: "HotstoreMaxSpaceSafetyBuffer", + Type: "uint64", + + Comment: `Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget +is set. Moving GC will not occur when total moving size exceeds +HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer`, + }, }, "StorageMiner": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index 690e8caee11..5b952d35e46 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -601,6 +601,25 @@ type Splitstore struct { // A value of 0 disables, while a value 1 will do full GC in every compaction. // Default is 20 (about once a week). HotStoreFullGCFrequency uint64 + // HotStoreMaxSpaceTarget sets a target max disk size for the hotstore. Splitstore GC + // will run moving GC if disk utilization gets within a threshold (150 GB) of the target. + // Splitstore GC will NOT run moving GC if the total size of the move would get + // within 50 GB of the target, and instead will run a more aggressive online GC. + // If both HotStoreFullGCFrequency and HotStoreMaxSpaceTarget are set then splitstore + // GC will trigger moving GC if either configuration condition is met. + // A reasonable minimum is 2x fully GCed hotstore size + 50 G buffer. + // At this minimum size moving GC happens every time, any smaller and moving GC won't + // be able to run. In spring 2023 this minimum is ~550 GB. + HotStoreMaxSpaceTarget uint64 + + // When HotStoreMaxSpaceTarget is set Moving GC will be triggered when total moving size + // exceeds HotstoreMaxSpaceTarget - HotstoreMaxSpaceThreshold + HotStoreMaxSpaceThreshold uint64 + + // Safety buffer to prevent moving GC from overflowing disk when HotStoreMaxSpaceTarget + // is set. Moving GC will not occur when total moving size exceeds + // HotstoreMaxSpaceTarget - HotstoreMaxSpaceSafetyBuffer + HotstoreMaxSpaceSafetyBuffer uint64 } // // Full Node diff --git a/node/modules/blockstore.go b/node/modules/blockstore.go index 90b7b6183b9..f96fd0db499 100644 --- a/node/modules/blockstore.go +++ b/node/modules/blockstore.go @@ -82,11 +82,14 @@ func SplitBlockstore(cfg *config.Chainstore) func(lc fx.Lifecycle, r repo.Locked } cfg := &splitstore.Config{ - MarkSetType: cfg.Splitstore.MarkSetType, - DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", - UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", - HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, - HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, + MarkSetType: cfg.Splitstore.MarkSetType, + DiscardColdBlocks: cfg.Splitstore.ColdStoreType == "discard", + UniversalColdBlocks: cfg.Splitstore.ColdStoreType == "universal", + HotStoreMessageRetention: cfg.Splitstore.HotStoreMessageRetention, + HotStoreFullGCFrequency: cfg.Splitstore.HotStoreFullGCFrequency, + HotstoreMaxSpaceTarget: cfg.Splitstore.HotStoreMaxSpaceTarget, + HotstoreMaxSpaceThreshold: cfg.Splitstore.HotStoreMaxSpaceThreshold, + HotstoreMaxSpaceSafetyBuffer: cfg.Splitstore.HotstoreMaxSpaceSafetyBuffer, } ss, err := splitstore.Open(path, ds, hot, cold, cfg) if err != nil {