Skip to content

Commit

Permalink
[actpool] Simplify action store (#4515)
Browse files Browse the repository at this point in the history
  • Loading branch information
envestcc authored Jan 7, 2025
1 parent 360260f commit 4157ba3
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 78 deletions.
32 changes: 0 additions & 32 deletions actpool/actionstore.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package actpool

import (
"encoding/hex"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -56,7 +55,6 @@ type (
}
actionStoreConfig struct {
Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs
Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead)
}

onAction func(selp *action.SealedEnvelope) error
Expand Down Expand Up @@ -188,21 +186,12 @@ func (s *actionStore) Put(act *action.SealedEnvelope) error {
if err != nil {
return errors.Wrap(err, "failed to encode action")
}
toDelete, hasBlobToDelete := s.evict()
id, err := s.store.Put(blob)
if err != nil {
return errors.Wrap(err, "failed to put blob into store")
}
s.stored += uint64(s.store.Size(id))
s.lookup[h] = id
// if the datacap is exceeded, remove old data
if s.stored > s.config.Datacap {
if !hasBlobToDelete {
log.L().Debug("no worst action found")
} else {
s.drop(toDelete)
}
}
actionStoreMtc.WithLabelValues("size").Set(float64(s.stored))
return nil
}
Expand Down Expand Up @@ -242,27 +231,6 @@ func (s *actionStore) Range(fn func(hash.Hash256) bool) {
}
}

func (s *actionStore) drop(h hash.Hash256) {
id, ok := s.lookup[h]
if !ok {
log.L().Warn("worst action not found in lookup", zap.String("hash", hex.EncodeToString(h[:])))
return
}
if err := s.store.Delete(id); err != nil {
log.L().Error("failed to delete worst action", zap.Error(err))
}
s.stored -= uint64(s.store.Size(id))
delete(s.lookup, h)
}

// TODO: implement a proper eviction policy
func (s *actionStore) evict() (hash.Hash256, bool) {
for h := range s.lookup {
return h, true
}
return hash.ZeroHash256, false
}

// newSlotter creates a helper method for the Billy datastore that returns the
// individual shelf sizes used to store transactions in.
//
Expand Down
7 changes: 2 additions & 5 deletions actpool/actionstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestBlobStore(t *testing.T) {
t.Run("open", func(t *testing.T) {
cfg := actionStoreConfig{
Datadir: t.TempDir(),
Datacap: 10 * 1024 * 1024,
}
store, err := newActionStore(cfg, encode, decode)
r.NoError(err)
Expand All @@ -68,7 +67,6 @@ func TestBlobStore(t *testing.T) {
t.Run("put", func(t *testing.T) {
cfg := actionStoreConfig{
Datadir: t.TempDir(),
Datacap: 10240,
}
store, err := newActionStore(cfg, encode, decode)
r.NoError(err)
Expand All @@ -92,8 +90,8 @@ func TestBlobStore(t *testing.T) {
act, err = action.SignedExecution("", identityset.PrivateKey(1), 3, big.NewInt(1), 100, big.NewInt(100), body)
r.NoError(err)
r.NoError(store.Put(act))
r.Equal(uint64(8192), store.stored)
r.Equal(2, len(store.lookup))
r.Equal(uint64(12288), store.stored)
r.Equal(3, len(store.lookup))
assertions.MustNoErrorV(store.Get(assertions.MustNoErrorV(act.Hash())))
})
}
Expand All @@ -118,7 +116,6 @@ func BenchmarkDatabase(b *testing.B) {
b.Run("billy-put", func(b *testing.B) {
cfg := actionStoreConfig{
Datadir: b.TempDir(),
Datacap: 1024,
}
store, err := newActionStore(cfg, nil, nil)
r.NoError(err)
Expand Down
31 changes: 2 additions & 29 deletions actpool/actpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"sort"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/iotexproject/iotex-core/v2/pkg/lifecycle"
"github.com/iotexproject/iotex-core/v2/pkg/log"
"github.com/iotexproject/iotex-core/v2/pkg/prometheustimer"
"github.com/iotexproject/iotex-core/v2/pkg/routine"
"github.com/iotexproject/iotex-core/v2/pkg/tracer"
)

Expand Down Expand Up @@ -120,9 +118,7 @@ type actPool struct {
jobQueue []chan workerJob
worker []*queueWorker
subs []Subscriber

store *actionStore // store is the persistent cache for actpool
storeSync *routine.RecurringTask // storeSync is the recurring task to sync actions from store to memory
store *actionStore // store is the persistent cache for actpool
}

// NewActPool constructs a new actpool
Expand Down Expand Up @@ -201,7 +197,7 @@ func (ap *actPool) Start(ctx context.Context) error {
log.L().Info("Failed to load action from store", zap.Error(err))
}
}
return ap.storeSync.Start(ctx)
return nil
}

func (ap *actPool) Stop(ctx context.Context) error {
Expand All @@ -211,7 +207,6 @@ func (ap *actPool) Stop(ctx context.Context) error {
}
}
if ap.store != nil {
ap.storeSync.Stop(ctx)
return ap.store.Close()
}
return nil
Expand Down Expand Up @@ -548,28 +543,6 @@ func (ap *actPool) allocatedWorker(senderAddr address.Address) int {
return int(lastByte) % _numWorker
}

func (ap *actPool) syncFromStore() {
if ap.store == nil {
return
}
ap.store.Range(func(h hash.Hash256) bool {
if _, exist := ap.allActions.Get(h); !exist {
act, err := ap.store.Get(h)
if err != nil {
log.L().Warn("Failed to get action from store", zap.Error(err))
return true
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := ap.add(ctx, act); err != nil {
log.L().Warn("Failed to add action to pool", zap.Error(err))
return true
}
}
return true
})
}

func (ap *actPool) AddSubscriber(sub Subscriber) {
ap.subs = append(ap.subs, sub)
}
Expand Down
8 changes: 2 additions & 6 deletions actpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ var (
BlackList: []string{},
MaxNumBlobsPerAcct: 16,
Store: &StoreConfig{
Datadir: "/var/data/actpool.cache",
Datacap: 1024 * 1024 * 100, // 100MB
ReadInterval: 10 * time.Minute,
Datadir: "/var/data/actpool.cache",
},
}
)
Expand Down Expand Up @@ -60,7 +58,5 @@ func (ap Config) MinGasPrice() *big.Int {

// StoreConfig is the configuration for the blob store
type StoreConfig struct {
Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs
Datacap uint64 `yaml:"datacap"` // Soft-cap of database storage (hard cap is larger due to overhead)
ReadInterval time.Duration `yaml:"readInterval"` // Interval to read from store to actpool memory
Datadir string `yaml:"datadir"` // Data directory containing the currently executable blobs
}
6 changes: 0 additions & 6 deletions actpool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"time"

"github.com/facebookgo/clock"

"github.com/iotexproject/iotex-core/v2/pkg/routine"
)

// ActQueueOption is the option for actQueue.
Expand Down Expand Up @@ -45,15 +43,11 @@ func WithStore(cfg StoreConfig, encode encodeAction, decode decodeAction) func(*
}
store, err := newActionStore(actionStoreConfig{
Datadir: cfg.Datadir,
Datacap: cfg.Datacap,
}, encode, decode)
if err != nil {
return err
}
a.store = store
a.storeSync = routine.NewRecurringTask(func() {
a.syncFromStore()
}, cfg.ReadInterval)
return nil
}
}

0 comments on commit 4157ba3

Please sign in to comment.