Skip to content

Commit

Permalink
try
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Jan 14, 2025
1 parent 9bf0217 commit 6087e64
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 128 deletions.
27 changes: 23 additions & 4 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pruner
import (
"context"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

Expand All @@ -15,6 +14,7 @@ import (
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
"github.com/celestiaorg/celestia-node/store"
)

var log = logging.Logger("module/pruner")
Expand All @@ -33,9 +33,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
fx.Invoke(func(ctx context.Context, ds datastore.Batching, p pruner.Pruner) error {
return pruner.DetectPreviousRun(ctx, ds, p.Kind())
}),
)

baseComponents := fx.Options(
Expand Down Expand Up @@ -67,6 +64,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
checkPreviousRun(),
)
case node.Bridge:
coreOpts := make([]core.Option, 0)
Expand All @@ -83,6 +81,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
checkPreviousRun(),
)
default:
panic("unknown node type")
Expand All @@ -98,3 +97,23 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
return opt
})
}

func checkPreviousRun() fx.Option {
return fx.Invoke(func(
ctx context.Context,
fa *fullavail.ShareAvailability,
p *pruner.Service,
s *store.Store,

Check failure on line 106 in nodebuilder/pruner/module.go

View workflow job for this annotation

GitHub Actions / go-ci / Lint

unused-parameter: parameter 's' seems to be unused, consider removing or renaming it as _ (revive)
) error {
convert, err := fa.ConvertToPruned(ctx)
if err != nil {
return err
}

if convert {
return p.ClearCheckpoint(ctx)
}

return nil
})
}
9 changes: 7 additions & 2 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,13 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(func(s *store.Store, getter shwap.Getter, opts []full.Option) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, opts...)
fx.Provide(func(
s *store.Store,
getter shwap.Getter,
ds datastore.Batching,
opts []full.Option,
) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, ds, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
Expand Down
5 changes: 3 additions & 2 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
full_avail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds"
Expand Down Expand Up @@ -189,6 +189,7 @@ func TestConvertFromPrunedToArchival(t *testing.T) {
err = store.PutConfig(archivalCfg)
require.NoError(t, err)
_, err = sw.NewNodeWithStore(nt, store)
require.ErrorIs(t, err, pruner.ErrDisallowRevertToArchival, nt.String())
assert.Error(t, err)
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}
40 changes: 2 additions & 38 deletions pruner/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@ import (
"fmt"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

"github.com/celestiaorg/celestia-node/header"
)

var (
// ErrDisallowRevertToArchival is returned when a node has been run with pruner enabled before and
// launching it with archival mode.
ErrDisallowRevertToArchival = errors.New(
"node has been run with pruner enabled before, it is not safe to convert to an archival" +
"Run with --experimental-pruning enabled or consider re-initializing the store")

storePrefix = datastore.NewKey("pruner")
checkpointKey = datastore.NewKey("checkpoint")
errCheckpointNotFound = errors.New("checkpoint not found")
Expand All @@ -27,46 +20,17 @@ var (
// checkpoint contains information related to the state of the
// pruner service that is periodically persisted to disk.
type checkpoint struct {
PrunerKind string `json:"pruner_kind"`
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

func newCheckpoint(prunerKind string) *checkpoint {
func newCheckpoint() *checkpoint {
return &checkpoint{
PrunerKind: prunerKind,
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{},
}
}

// DetectPreviousRun ensures that a node that has been run with "full" pruning
// mode previously cannot revert back to an "archival" one. This check should
// only be performed when a node is either a Full or Bridge node.
func DetectPreviousRun(ctx context.Context, ds datastore.Datastore, expectedKind string) error {
wrappedDs := namespace.Wrap(ds, storePrefix)

cp, err := getCheckpoint(ctx, wrappedDs)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
return nil
}
return fmt.Errorf("failed to load checkpoint: %w", err)
}

if cp.PrunerKind != expectedKind {
// do not allow reversion back to archival mode
if cp.PrunerKind == "full" {
return ErrDisallowRevertToArchival
}
// allow conversion from archival to full by overriding previous checkpoint
log.Infow("overriding checkpoint to enable full pruning mode...")
cp = newCheckpoint(expectedKind)
return storeCheckpoint(ctx, wrappedDs, cp)
}
return nil
}

// storeCheckpoint persists the checkpoint to disk.
func storeCheckpoint(ctx context.Context, ds datastore.Datastore, c *checkpoint) error {
bin, err := json.Marshal(c)
Expand Down Expand Up @@ -101,7 +65,7 @@ func (s *Service) loadCheckpoint(ctx context.Context) error {
cp, err := getCheckpoint(ctx, s.ds)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
s.checkpoint = newCheckpoint(s.pruner.Kind())
s.checkpoint = newCheckpoint()
return storeCheckpoint(ctx, s.ds, s.checkpoint)
}
return err
Expand Down
63 changes: 0 additions & 63 deletions pruner/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import (
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStoreCheckpoint(t *testing.T) {
ctx := context.Background()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
c := &checkpoint{
PrunerKind: "test",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}
Expand All @@ -27,63 +24,3 @@ func TestStoreCheckpoint(t *testing.T) {
require.NoError(t, err)
require.Equal(t, c, c2)
}

// TestDisallowRevertArchival tests that a node that has been previously run
// with full pruning cannot convert back into an "archival" node.
func TestDisallowRevertArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "full",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "archival")
assert.Error(t, err)
assert.ErrorIs(t, err, ErrDisallowRevertToArchival)

// ensure no false positives
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

// ensure checkpoint is retrievable after
cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
require.NotNil(t, cp)
assert.Equal(t, cp.LastPrunedHeight, c.LastPrunedHeight)
}

func TestCheckpointOverride(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "archival",
LastPrunedHeight: 600,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
assert.Equal(t, "full", cp.PrunerKind)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
1 change: 0 additions & 1 deletion pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ import (
// from the node's datastore.
type Pruner interface {
Prune(context.Context, *header.ExtendedHeader) error
Kind() string
}
13 changes: 13 additions & 0 deletions pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ func (s *Service) Stop(ctx context.Context) error {
}
}

func (s *Service) LastPruned(ctx context.Context) (uint64, error) {
err := s.loadCheckpoint(ctx)
if err != nil {
return 0, err
}
return s.checkpoint.LastPrunedHeight, nil
}

func (s *Service) ClearCheckpoint(ctx context.Context) error {
s.checkpoint = newCheckpoint()
return storeCheckpoint(ctx, s.ds, s.checkpoint)
}

// run prunes blocks older than the availability wiindow periodically until the
// pruner service is stopped.
func (s *Service) run() {
Expand Down
35 changes: 31 additions & 4 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,37 @@ func TestFindPruneableHeaders(t *testing.T) {
}
}

func TestService_ClearCheckpoint(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

serv, err := NewService(
&mockPruner{},
time.Second, // doesn't matter
headertest.NewStore(t),
sync.MutexWrap(datastore.NewMapDatastore()),
time.Second, // doesn't matter
)
require.NoError(t, err)
serv.ctx = ctx

oldCp := &checkpoint{LastPrunedHeight: 500, FailedHeaders: map[uint64]struct{}{4: {}}}
err = storeCheckpoint(ctx, serv.ds, oldCp)
require.NoError(t, err)

err = serv.loadCheckpoint(ctx)
require.NoError(t, err)
assert.Equal(t, oldCp, serv.checkpoint)

err = serv.ClearCheckpoint(ctx)
require.NoError(t, err)

err = serv.loadCheckpoint(ctx)
require.NoError(t, err)

assert.Equal(t, newCheckpoint(), serv.checkpoint)
}

type mockPruner struct {
deletedHeaderHashes []pruned

Expand Down Expand Up @@ -318,10 +349,6 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error {
return nil
}

func (mp *mockPruner) Kind() string {
return "mock"
}

// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility.
// https://github.com/celestiaorg/celestia-node/issues/3278.
type SpacedHeaderGenerator struct {
Expand Down
Loading

0 comments on commit 6087e64

Please sign in to comment.