Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[R4R] Pipecommit enable trie prefetcher #992

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions core/state/snapshot/difflayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,8 @@ type diffLayer struct {
storageList map[common.Hash][]common.Hash // List of storage slots for iterated retrievals, one per account. Any existing lists are sorted if non-nil
storageData map[common.Hash]map[common.Hash][]byte // Keyed storage slots for direct retrieval. one per account (nil means deleted)

verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.
accountCorrected bool // mark the accountData has been corrected ort not
verifiedCh chan struct{} // the difflayer is verified when verifiedCh is nil or closed
valid bool // mark the difflayer is valid or not.

diffed *bloomfilter.Filter // Bloom filter tracking all the diffed items up to the disk layer

Expand Down Expand Up @@ -294,14 +293,6 @@ func (dl *diffLayer) CorrectAccounts(accounts map[common.Hash][]byte) {
defer dl.lock.Unlock()

dl.accountData = accounts
dl.accountCorrected = true
}

func (dl *diffLayer) AccountsCorrected() bool {
dl.lock.RLock()
defer dl.lock.RUnlock()

return dl.accountCorrected
}

// Parent returns the subsequent layer of a diff layer.
Expand Down
4 changes: 0 additions & 4 deletions core/state/snapshot/disklayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ func (dl *diskLayer) Verified() bool {
func (dl *diskLayer) CorrectAccounts(map[common.Hash][]byte) {
}

func (dl *diskLayer) AccountsCorrected() bool {
return true
}

// Parent always returns nil as there's no layer below the disk.
func (dl *diskLayer) Parent() snapshot {
return nil
Expand Down
14 changes: 4 additions & 10 deletions core/state/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ type Snapshot interface {
// CorrectAccounts updates account data for storing the correct data during pipecommit
CorrectAccounts(map[common.Hash][]byte)

// AccountsCorrected checks whether the account data has been corrected during pipecommit
AccountsCorrected() bool

// Account directly retrieves the account associated with a particular hash in
// the snapshot slim data format.
Account(hash common.Hash) (*Account, error)
Expand All @@ -131,20 +128,17 @@ type Snapshot interface {
// Storage directly retrieves the storage data associated with a particular hash,
// within a particular account.
Storage(accountHash, storageHash common.Hash) ([]byte, error)

// Parent returns the subsequent layer of a snapshot, or nil if the base was
// reached.
Parent() snapshot
}

// snapshot is the internal version of the snapshot data layer that supports some
// additional methods compared to the public API.
type snapshot interface {
Snapshot

// Parent returns the subsequent layer of a snapshot, or nil if the base was
// reached.
//
// Note, the method is an internal helper to avoid type switching between the
// disk and diff layers. There is no locking involved.
Parent() snapshot

// Update creates a new layer on top of the existing snapshot diff tree with
// the specified data items.
//
Expand Down
12 changes: 1 addition & 11 deletions core/state/state_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,17 +365,7 @@ func (s *StateObject) finalise(prefetch bool) {
}
}

// The account root need to be updated before prefetch, otherwise the account root is empty
if s.db.pipeCommit && s.data.Root == dummyRoot && !s.rootCorrected && s.db.snap.AccountsCorrected() {
if acc, err := s.db.snap.Account(crypto.HashData(s.db.hasher, s.address.Bytes())); err == nil {
if acc != nil && len(acc.Root) != 0 {
s.data.Root = common.BytesToHash(acc.Root)
s.rootCorrected = true
}
}
}

if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot && s.data.Root != dummyRoot {
if s.db.prefetcher != nil && prefetch && len(slotsToPrefetch) > 0 && s.data.Root != emptyRoot {
s.db.prefetcher.prefetch(s.data.Root, slotsToPrefetch, s.addrHash)
}
if len(s.dirtyStorage) > 0 {
Expand Down
33 changes: 19 additions & 14 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ var (
// emptyRoot is the known root hash of an empty trie.
emptyRoot = common.HexToHash("56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421")

// dummyRoot is the dummy account root before corrected in pipecommit sync mode,
// the value is 542e5fc2709de84248e9bce43a9c0c8943a608029001360f8ab55bf113b23d28
dummyRoot = crypto.Keccak256Hash([]byte("dummy_account_root"))

emptyAddr = crypto.Keccak256Hash(common.Address{}.Bytes())
)

Expand Down Expand Up @@ -218,7 +214,12 @@ func (s *StateDB) StartPrefetcher(namespace string) {
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
parent := s.snap.Parent()
if parent != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, parent.Root(), namespace)
} else {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, common.Hash{}, namespace)
}
}
}

Expand Down Expand Up @@ -1001,7 +1002,11 @@ func (s *StateDB) Finalise(deleteEmptyObjects bool) {
}
}
if s.prefetcher != nil && len(addressesToPrefetch) > 0 {
s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
if s.snap.Verified() {
s.prefetcher.prefetch(s.originalRoot, addressesToPrefetch, emptyAddr)
} else if s.prefetcher.rootParent != (common.Hash{}) {
s.prefetcher.prefetch(s.prefetcher.rootParent, addressesToPrefetch, emptyAddr)
}
}
// Invalidate journal because reverting across transactions is not allowed.
s.clearJournalAndRefund()
Expand Down Expand Up @@ -1036,9 +1041,13 @@ func (s *StateDB) CorrectAccountsRoot(blockRoot common.Hash) {
}
if accounts, err := snapshot.Accounts(); err == nil && accounts != nil {
for _, obj := range s.stateObjects {
if !obj.deleted && !obj.rootCorrected && obj.data.Root == dummyRoot {
if !obj.deleted {
if account, exist := accounts[crypto.Keccak256Hash(obj.address[:])]; exist {
obj.data.Root = common.BytesToHash(account.Root)
if len(account.Root) == 0 {
obj.data.Root = emptyRoot
} else {
obj.data.Root = common.BytesToHash(account.Root)
}
obj.rootCorrected = true
}
}
Expand All @@ -1051,12 +1060,8 @@ func (s *StateDB) PopulateSnapAccountAndStorage() {
for addr := range s.stateObjectsPending {
if obj := s.stateObjects[addr]; !obj.deleted {
if s.snap != nil {
root := obj.data.Root
storageChanged := s.populateSnapStorage(obj)
if storageChanged {
root = dummyRoot
}
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, root, obj.data.CodeHash)
s.populateSnapStorage(obj)
s.snapAccounts[obj.address] = snapshot.SlimAccountRLP(obj.data.Nonce, obj.data.Balance, obj.data.Root, obj.data.CodeHash)
}
}
}
Expand Down
48 changes: 37 additions & 11 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,11 @@ type prefetchMsg struct {
//
// Note, the prefetcher's API is not thread safe.
type triePrefetcher struct {
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of theaccount trie for metrics
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie
db Database // Database to fetch trie nodes through
root common.Hash // Root hash of theaccount trie for metrics
rootParent common.Hash //Root has of the account trie from block before the prvious one, designed for pipecommit mode
fetches map[common.Hash]Trie // Partially or fully fetcher tries
fetchers map[common.Hash]*subfetcher // Subfetchers for each trie

abortChan chan *subfetcher // to abort a single subfetcher and its children
closed int32
Expand All @@ -70,16 +71,22 @@ type triePrefetcher struct {
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter

accountStaleLoadMeter metrics.Meter
accountStaleDupMeter metrics.Meter
accountStaleSkipMeter metrics.Meter
accountStaleWasteMeter metrics.Meter
}

// newTriePrefetcher
func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root, rootParent common.Hash, namespace string) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
root: root,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),
db: db,
root: root,
rootParent: rootParent,
fetchers: make(map[common.Hash]*subfetcher), // Active prefetchers use the fetchers map
abortChan: make(chan *subfetcher, abortChanSize),

closeMainChan: make(chan struct{}),
closeMainDoneChan: make(chan struct{}),
Expand All @@ -94,6 +101,11 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),

accountStaleLoadMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/load", nil),
accountStaleDupMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/dup", nil),
accountStaleSkipMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/skip", nil),
accountStaleWasteMeter: metrics.GetOrRegisterMeter(prefix+"/accountst/waste", nil),
}
go p.mainLoop()
return p
Expand Down Expand Up @@ -144,7 +156,8 @@ func (p *triePrefetcher) mainLoop() {
}

if metrics.EnabledExpensive {
if fetcher.root == p.root {
switch fetcher.root {
case p.root:
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountDupMeter.Mark(int64(fetcher.dups))
p.accountSkipMeter.Mark(int64(len(fetcher.tasks)))
Expand All @@ -154,7 +167,19 @@ func (p *triePrefetcher) mainLoop() {
}
fetcher.lock.Unlock()
p.accountWasteMeter.Mark(int64(len(fetcher.seen)))
} else {

case p.rootParent:
p.accountStaleLoadMeter.Mark(int64(len(fetcher.seen)))
p.accountStaleDupMeter.Mark(int64(fetcher.dups))
p.accountStaleSkipMeter.Mark(int64(len(fetcher.tasks)))
fetcher.lock.Lock()
for _, key := range fetcher.used {
delete(fetcher.seen, string(key))
}
fetcher.lock.Unlock()
p.accountStaleWasteMeter.Mark(int64(len(fetcher.seen)))

default:
p.storageLoadMeter.Mark(int64(len(fetcher.seen)))
p.storageDupMeter.Mark(int64(fetcher.dups))
p.storageSkipMeter.Mark(int64(len(fetcher.tasks)))
Expand All @@ -165,6 +190,7 @@ func (p *triePrefetcher) mainLoop() {
}
fetcher.lock.Unlock()
p.storageWasteMeter.Mark(int64(len(fetcher.seen)))

}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/state/trie_prefetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func prefetchGuaranteed(prefetcher *triePrefetcher, root common.Hash, keys [][]b

func TestCopyAndClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "")
skey := common.HexToHash("aaa")
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
Expand All @@ -80,7 +80,7 @@ func TestCopyAndClose(t *testing.T) {

func TestUseAfterClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "")
skey := common.HexToHash("aaa")
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
a := prefetcher.trie(db.originalRoot)
Expand All @@ -96,7 +96,7 @@ func TestUseAfterClose(t *testing.T) {

func TestCopyClose(t *testing.T) {
db := filledStateDB()
prefetcher := newTriePrefetcher(db.db, db.originalRoot, "")
prefetcher := newTriePrefetcher(db.db, db.originalRoot, common.Hash{}, "")
skey := common.HexToHash("aaa")
prefetchGuaranteed(prefetcher, db.originalRoot, [][]byte{skey.Bytes()}, common.Hash{})
cpy := prefetcher.copy()
Expand Down