diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index 1ac1808a43..5a2b8d53a3 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -3,7 +3,6 @@ package pruner import ( "context" - "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" @@ -15,6 +14,7 @@ import ( fullavail "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery" + "github.com/celestiaorg/celestia-node/store" ) var log = logging.Logger("module/pruner") @@ -33,9 +33,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { // This is necessary to invoke the pruner service as independent thanks to a // quirk in FX. fx.Invoke(func(_ *pruner.Service) {}), - fx.Invoke(func(ctx context.Context, ds datastore.Batching, p pruner.Pruner) error { - return pruner.DetectPreviousRun(ctx, ds, p.Kind()) - }), ) baseComponents := fx.Options( @@ -67,6 +64,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { baseComponents, fx.Supply(fullAvailOpts), fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }), + checkPreviousRun(), ) case node.Bridge: coreOpts := make([]core.Option, 0) @@ -83,6 +81,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option { fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }), fx.Supply(coreOpts), fx.Supply(fullAvailOpts), + checkPreviousRun(), ) default: panic("unknown node type") @@ -98,3 +97,23 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option { return opt }) } + +func checkPreviousRun() fx.Option { + return fx.Invoke(func( + ctx context.Context, + fa *fullavail.ShareAvailability, + p *pruner.Service, + s *store.Store, + ) error { + convert, err := fa.ConvertToPruned(ctx) + if err != nil { + return err + } + + if convert { + return p.ClearCheckpoint(ctx) + } + + return nil + }) +} diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index 900dcd1211..f6aac79521 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -239,8 +239,13 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option { ) case node.Bridge, node.Full: return fx.Options( - fx.Provide(func(s *store.Store, getter shwap.Getter, opts []full.Option) *full.ShareAvailability { - return full.NewShareAvailability(s, getter, opts...) + fx.Provide(func( + s *store.Store, + getter shwap.Getter, + ds datastore.Batching, + opts []full.Option, + ) *full.ShareAvailability { + return full.NewShareAvailability(s, getter, ds, opts...) }), fx.Provide(func(avail *full.ShareAvailability) share.Availability { return avail diff --git a/nodebuilder/tests/prune_test.go b/nodebuilder/tests/prune_test.go index 9deecd6e93..d4091b5757 100644 --- a/nodebuilder/tests/prune_test.go +++ b/nodebuilder/tests/prune_test.go @@ -19,8 +19,8 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/das" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp" - "github.com/celestiaorg/celestia-node/pruner" "github.com/celestiaorg/celestia-node/share" + full_avail "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter" "github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds" @@ -189,6 +189,7 @@ func TestConvertFromPrunedToArchival(t *testing.T) { err = store.PutConfig(archivalCfg) require.NoError(t, err) _, err = sw.NewNodeWithStore(nt, store) - require.ErrorIs(t, err, pruner.ErrDisallowRevertToArchival, nt.String()) + assert.Error(t, err) + assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err) } } diff --git a/pruner/checkpoint.go b/pruner/checkpoint.go index 6811d42fbf..7e8195ca57 100644 --- a/pruner/checkpoint.go +++ b/pruner/checkpoint.go @@ -7,18 +7,11 @@ import ( "fmt" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" "github.com/celestiaorg/celestia-node/header" ) var ( - // ErrDisallowRevertToArchival is returned when a node has been run with pruner enabled before and - // launching it with archival mode. - ErrDisallowRevertToArchival = errors.New( - "node has been run with pruner enabled before, it is not safe to convert to an archival" + - "Run with --experimental-pruning enabled or consider re-initializing the store") - storePrefix = datastore.NewKey("pruner") checkpointKey = datastore.NewKey("checkpoint") errCheckpointNotFound = errors.New("checkpoint not found") @@ -27,46 +20,17 @@ var ( // checkpoint contains information related to the state of the // pruner service that is periodically persisted to disk. type checkpoint struct { - PrunerKind string `json:"pruner_kind"` LastPrunedHeight uint64 `json:"last_pruned_height"` FailedHeaders map[uint64]struct{} `json:"failed"` } -func newCheckpoint(prunerKind string) *checkpoint { +func newCheckpoint() *checkpoint { return &checkpoint{ - PrunerKind: prunerKind, LastPrunedHeight: 1, FailedHeaders: map[uint64]struct{}{}, } } -// DetectPreviousRun ensures that a node that has been run with "full" pruning -// mode previously cannot revert back to an "archival" one. This check should -// only be performed when a node is either a Full or Bridge node. -func DetectPreviousRun(ctx context.Context, ds datastore.Datastore, expectedKind string) error { - wrappedDs := namespace.Wrap(ds, storePrefix) - - cp, err := getCheckpoint(ctx, wrappedDs) - if err != nil { - if errors.Is(err, errCheckpointNotFound) { - return nil - } - return fmt.Errorf("failed to load checkpoint: %w", err) - } - - if cp.PrunerKind != expectedKind { - // do not allow reversion back to archival mode - if cp.PrunerKind == "full" { - return ErrDisallowRevertToArchival - } - // allow conversion from archival to full by overriding previous checkpoint - log.Infow("overriding checkpoint to enable full pruning mode...") - cp = newCheckpoint(expectedKind) - return storeCheckpoint(ctx, wrappedDs, cp) - } - return nil -} - // storeCheckpoint persists the checkpoint to disk. func storeCheckpoint(ctx context.Context, ds datastore.Datastore, c *checkpoint) error { bin, err := json.Marshal(c) @@ -101,7 +65,7 @@ func (s *Service) loadCheckpoint(ctx context.Context) error { cp, err := getCheckpoint(ctx, s.ds) if err != nil { if errors.Is(err, errCheckpointNotFound) { - s.checkpoint = newCheckpoint(s.pruner.Kind()) + s.checkpoint = newCheckpoint() return storeCheckpoint(ctx, s.ds, s.checkpoint) } return err diff --git a/pruner/checkpoint_test.go b/pruner/checkpoint_test.go index ea65fb48e1..7723e2dcfc 100644 --- a/pruner/checkpoint_test.go +++ b/pruner/checkpoint_test.go @@ -5,9 +5,7 @@ import ( "testing" "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" ds_sync "github.com/ipfs/go-datastore/sync" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,7 +13,6 @@ func TestStoreCheckpoint(t *testing.T) { ctx := context.Background() ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) c := &checkpoint{ - PrunerKind: "test", LastPrunedHeight: 1, FailedHeaders: map[uint64]struct{}{1: {}}, } @@ -27,63 +24,3 @@ func TestStoreCheckpoint(t *testing.T) { require.NoError(t, err) require.Equal(t, c, c2) } - -// TestDisallowRevertArchival tests that a node that has been previously run -// with full pruning cannot convert back into an "archival" node. -func TestDisallowRevertArchival(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - namespaceWrapped := namespace.Wrap(ds, storePrefix) - c := &checkpoint{ - PrunerKind: "full", - LastPrunedHeight: 1, - FailedHeaders: map[uint64]struct{}{1: {}}, - } - - err := storeCheckpoint(ctx, namespaceWrapped, c) - require.NoError(t, err) - - // give the unwrapped ds here as this is expected to run - // before pruner service is constructed - err = DetectPreviousRun(ctx, ds, "archival") - assert.Error(t, err) - assert.ErrorIs(t, err, ErrDisallowRevertToArchival) - - // ensure no false positives - err = DetectPreviousRun(ctx, ds, "full") - assert.NoError(t, err) - - // ensure checkpoint is retrievable after - cp, err := getCheckpoint(ctx, namespaceWrapped) - require.NoError(t, err) - require.NotNil(t, cp) - assert.Equal(t, cp.LastPrunedHeight, c.LastPrunedHeight) -} - -func TestCheckpointOverride(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - namespaceWrapped := namespace.Wrap(ds, storePrefix) - c := &checkpoint{ - PrunerKind: "archival", - LastPrunedHeight: 600, - FailedHeaders: map[uint64]struct{}{1: {}}, - } - - err := storeCheckpoint(ctx, namespaceWrapped, c) - require.NoError(t, err) - - // give the unwrapped ds here as this is expected to run - // before pruner service is constructed - err = DetectPreviousRun(ctx, ds, "full") - assert.NoError(t, err) - - cp, err := getCheckpoint(ctx, namespaceWrapped) - require.NoError(t, err) - assert.Equal(t, "full", cp.PrunerKind) - assert.Equal(t, uint64(1), cp.LastPrunedHeight) -} diff --git a/pruner/pruner.go b/pruner/pruner.go index 02f777910f..a591a65392 100644 --- a/pruner/pruner.go +++ b/pruner/pruner.go @@ -10,5 +10,4 @@ import ( // from the node's datastore. type Pruner interface { Prune(context.Context, *header.ExtendedHeader) error - Kind() string } diff --git a/pruner/service.go b/pruner/service.go index 9a617a486c..48f6be428d 100644 --- a/pruner/service.go +++ b/pruner/service.go @@ -94,6 +94,19 @@ func (s *Service) Stop(ctx context.Context) error { } } +func (s *Service) LastPruned(ctx context.Context) (uint64, error) { + err := s.loadCheckpoint(ctx) + if err != nil { + return 0, err + } + return s.checkpoint.LastPrunedHeight, nil +} + +func (s *Service) ClearCheckpoint(ctx context.Context) error { + s.checkpoint = newCheckpoint() + return storeCheckpoint(ctx, s.ds, s.checkpoint) +} + // run prunes blocks older than the availability wiindow periodically until the // pruner service is stopped. func (s *Service) run() { diff --git a/pruner/service_test.go b/pruner/service_test.go index 25f0ef0472..0540ec887b 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -291,6 +291,37 @@ func TestFindPruneableHeaders(t *testing.T) { } } +func TestService_ClearCheckpoint(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + serv, err := NewService( + &mockPruner{}, + time.Second, // doesn't matter + headertest.NewStore(t), + sync.MutexWrap(datastore.NewMapDatastore()), + time.Second, // doesn't matter + ) + require.NoError(t, err) + serv.ctx = ctx + + oldCp := &checkpoint{LastPrunedHeight: 500, FailedHeaders: map[uint64]struct{}{4: {}}} + err = storeCheckpoint(ctx, serv.ds, oldCp) + require.NoError(t, err) + + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + assert.Equal(t, oldCp, serv.checkpoint) + + err = serv.ClearCheckpoint(ctx) + require.NoError(t, err) + + err = serv.loadCheckpoint(ctx) + require.NoError(t, err) + + assert.Equal(t, newCheckpoint(), serv.checkpoint) +} + type mockPruner struct { deletedHeaderHashes []pruned @@ -318,10 +349,6 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error { return nil } -func (mp *mockPruner) Kind() string { - return "mock" -} - // TODO @renaynay @distractedm1nd: Deduplicate via headertest utility. // https://github.com/celestiaorg/celestia-node/issues/3278. type SpacedHeaderGenerator struct { diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index c81bde51db..4b18cb7196 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -6,6 +6,8 @@ import ( "fmt" "time" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/header" @@ -16,7 +18,16 @@ import ( "github.com/celestiaorg/celestia-node/store" ) -var log = logging.Logger("share/full") +// ErrDisallowRevertToArchival is returned when a node has been run with pruner enabled before and +// launching it with archival mode. +var ErrDisallowRevertToArchival = errors.New( + "node has been run with pruner enabled before, it is not safe to convert to an archival" + + "Run with --experimental-pruning enabled or consider re-initializing the store") + +var ( + log = logging.Logger("share/full") + storePrefix = datastore.NewKey("full_avail") +) // ShareAvailability implements share.Availability using the full data square // recovery technique. It is considered "full" because it is required @@ -25,6 +36,8 @@ type ShareAvailability struct { store *store.Store getter shwap.Getter + ds datastore.Datastore + storageWindow time.Duration archival bool } @@ -33,6 +46,7 @@ type ShareAvailability struct { func NewShareAvailability( store *store.Store, getter shwap.Getter, + ds datastore.Datastore, opts ...Option, ) *ShareAvailability { p := defaultParams() @@ -43,6 +57,7 @@ func NewShareAvailability( return &ShareAvailability{ store: store, getter: getter, + ds: namespace.Wrap(ds, storePrefix), storageWindow: availability.StorageWindow, archival: p.archival, } @@ -112,9 +127,35 @@ func (fa *ShareAvailability) Prune(ctx context.Context, eh *header.ExtendedHeade return fa.store.RemoveODSQ4(ctx, eh.Height(), eh.DAH.Hash()) } -func (fa *ShareAvailability) Kind() string { - if fa.archival { - return "archival" +var previousPrunedRunKey = datastore.NewKey("previous_run") + +// ConvertToPruned ensures that a node has not been run with pruning enabled before +// cannot revert to archival mode. It returns true only if the node is converting to +// pruned mode for the first time. +func (fa *ShareAvailability) ConvertToPruned(ctx context.Context) (bool, error) { + prevPruned, err := fa.ds.Has(ctx, previousPrunedRunKey) + if err != nil { + return false, fmt.Errorf("share/availability/full: failed to check previous pruned run in "+ + "datastore: %w", err) } - return "full" + + // node has been run with pruning enabled previously and + // is attempting to revert to archival, do not allow + if prevPruned && fa.archival { + return false, ErrDisallowRevertToArchival + } + + // if no previous pruned run has been recorded, record + // for the first time + if !prevPruned && !fa.archival { + err = fa.ds.Put(ctx, previousPrunedRunKey, []byte{}) + if err != nil { + return false, fmt.Errorf("share/availability/full: failed to updated pruning mode in "+ + "datastore: %w", err) + } + return true, nil + } + + // no changes in pruning mode + return false, nil } diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index 24435ee0d8..ba3a73778a 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -6,6 +6,8 @@ import ( "time" "github.com/golang/mock/gomock" + "github.com/ipfs/go-datastore" + ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -33,7 +35,7 @@ func TestSharesAvailable(t *testing.T) { store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) require.NoError(t, err) - avail := NewShareAvailability(store, getter) + avail := NewShareAvailability(store, getter, datastore.NewMapDatastore()) err = avail.SharesAvailable(ctx, eh) require.NoError(t, err) @@ -60,7 +62,7 @@ func TestSharesAvailable_StoredEds(t *testing.T) { store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) require.NoError(t, err) - avail := NewShareAvailability(store, nil) + avail := NewShareAvailability(store, nil, datastore.NewMapDatastore()) err = store.PutODSQ4(ctx, roots, eh.Height(), eds) require.NoError(t, err) @@ -90,7 +92,7 @@ func TestSharesAvailable_ErrNotAvailable(t *testing.T) { store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) require.NoError(t, err) - avail := NewShareAvailability(store, getter) + avail := NewShareAvailability(store, getter, datastore.NewMapDatastore()) errors := []error{shwap.ErrNotFound, context.DeadlineExceeded} for _, getterErr := range errors { @@ -114,7 +116,7 @@ func TestSharesAvailable_OutsideSamplingWindow_NonArchival(t *testing.T) { suite := headertest.NewTestSuite(t, 3, time.Nanosecond) headers := suite.GenExtendedHeaders(10) - avail := NewShareAvailability(store, getter) + avail := NewShareAvailability(store, getter, datastore.NewMapDatastore()) avail.storageWindow = time.Nanosecond // make all headers outside sampling window for _, h := range headers { @@ -140,7 +142,7 @@ func TestSharesAvailable_OutsideSamplingWindow_Archival(t *testing.T) { getter.EXPECT().GetEDS(gomock.Any(), gomock.Any()).Times(1).Return(eds, nil) - avail := NewShareAvailability(store, getter, WithArchivalMode()) + avail := NewShareAvailability(store, getter, datastore.NewMapDatastore(), WithArchivalMode()) avail.storageWindow = time.Nanosecond // make all headers outside sampling window err = avail.SharesAvailable(ctx, eh) @@ -149,3 +151,67 @@ func TestSharesAvailable_OutsideSamplingWindow_Archival(t *testing.T) { require.NoError(t, err) assert.True(t, has) } + +// TestDisallowRevertArchival tests that a node that has been previously run +// with full pruning cannot convert back into an "archival" node +func TestDisallowRevertArchival(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + + // create a pruned node instance (non-archival) for the first time + fa := NewShareAvailability(nil, nil, ds) + + convert, err := fa.ConvertToPruned(ctx) + assert.NoError(t, err) + assert.True(t, convert) + // ensure availability impl recorded the pruned run + has, err := fa.ds.Has(ctx, previousPrunedRunKey) + require.NoError(t, err) + assert.True(t, has) + + // now change to archival mode + fa = NewShareAvailability(nil, nil, ds, WithArchivalMode()) + + // ensure failure + convert, err = fa.ConvertToPruned(ctx) + assert.Error(t, err) + assert.ErrorIs(t, err, ErrDisallowRevertToArchival) + assert.False(t, convert) + + // ensure the node can still run in pruned mode + fa = NewShareAvailability(nil, nil, ds) + convert, err = fa.ConvertToPruned(ctx) + assert.NoError(t, err) + assert.False(t, convert) +} + +// TestAllowConversionFromArchivalToPruned tests that a node that has been previously run +// in archival mode can convert to a pruned node +func TestAllowConversionFromArchivalToPruned(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + + fa := NewShareAvailability(nil, nil, ds, WithArchivalMode()) + + convert, err := fa.ConvertToPruned(ctx) + assert.NoError(t, err) + assert.False(t, convert) + + has, err := fa.ds.Has(ctx, previousPrunedRunKey) + require.NoError(t, err) + assert.False(t, has) + + fa = NewShareAvailability(nil, nil, ds) + + convert, err = fa.ConvertToPruned(ctx) + assert.NoError(t, err) + assert.True(t, convert) + + has, err = fa.ds.Has(ctx, previousPrunedRunKey) + require.NoError(t, err) + assert.True(t, has) +} diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index 15281bb7c9..f3afbb26fd 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -240,10 +240,6 @@ func (la *ShareAvailability) Prune(ctx context.Context, h *header.ExtendedHeader return nil } -func (la *ShareAvailability) Kind() string { - return "light" -} - func datastoreKeyForRoot(root *share.AxisRoots) datastore.Key { return datastore.NewKey(root.String()) }