Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

trie: remove inconsistent trie nodes during sync in path mode #28595

Merged
merged 7 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ethdb/dbtest/testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,13 @@ func TestDatabaseSuite(t *testing.T, New func() ethdb.KeyValueStore) {
b.Put([]byte("5"), nil)
b.Delete([]byte("1"))
b.Put([]byte("6"), nil)
b.Delete([]byte("3"))

b.Delete([]byte("3")) // delete then put
b.Put([]byte("3"), nil)

b.Put([]byte("7"), nil) // put then delete
b.Delete([]byte("7"))

if err := b.Write(); err != nil {
t.Fatal(err)
}
Expand Down
223 changes: 143 additions & 80 deletions trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,9 @@ type LeafCallback func(keys [][]byte, path []byte, leaf []byte, parent common.Ha

// nodeRequest represents a scheduled or already in-flight trie node retrieval request.
type nodeRequest struct {
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete
deletes [][]byte // List of internal path segments for trie nodes to delete
hash common.Hash // Hash of the trie node to retrieve
path []byte // Merkle path leading to this node for prioritization
data []byte // Data content of the node, cached until all subtrees complete

parent *nodeRequest // Parent state node referencing this entry
deps int // Number of dependencies before allowed to commit this node
Expand All @@ -146,38 +145,85 @@ type CodeSyncResult struct {
Data []byte // Data content of the retrieved bytecode
}

// nodeOp represents an operation upon the trie node. It can either represent a
// deletion to the specific node or a node write for persisting retrieved node.
type nodeOp struct {
owner common.Hash // identifier of the trie (empty for account trie)
path []byte // path from the root to the specified node.
blob []byte // the content of the node (nil for deletion)
hash common.Hash // hash of the node content (empty for node deletion)
}

// isDelete indicates if the operation is a database deletion.
func (op *nodeOp) isDelete() bool {
return len(op.blob) == 0
}

// syncMemBatch is an in-memory buffer of successfully downloaded but not yet
// persisted data items.
type syncMemBatch struct {
nodes map[string][]byte // In-memory membatch of recently completed nodes
hashes map[string]common.Hash // Hashes of recently completed nodes
deletes map[string]struct{} // List of paths for trie node to delete
codes map[common.Hash][]byte // In-memory membatch of recently completed codes
size uint64 // Estimated batch-size of in-memory data.
scheme string // State scheme identifier
codes map[common.Hash][]byte // In-memory batch of recently completed codes
nodes []nodeOp // In-memory batch of recently completed/deleted nodes
size uint64 // Estimated batch-size of in-memory data.
}

// newSyncMemBatch allocates a new memory-buffer for not-yet persisted trie nodes.
func newSyncMemBatch() *syncMemBatch {
func newSyncMemBatch(scheme string) *syncMemBatch {
return &syncMemBatch{
nodes: make(map[string][]byte),
hashes: make(map[string]common.Hash),
deletes: make(map[string]struct{}),
codes: make(map[common.Hash][]byte),
scheme: scheme,
codes: make(map[common.Hash][]byte),
}
}

// hasNode reports the trie node with specific path is already cached.
func (batch *syncMemBatch) hasNode(path []byte) bool {
_, ok := batch.nodes[string(path)]
return ok
}

// hasCode reports the contract code with specific hash is already cached.
func (batch *syncMemBatch) hasCode(hash common.Hash) bool {
_, ok := batch.codes[hash]
return ok
}

// addCode caches a contract code database write operation.
func (batch *syncMemBatch) addCode(hash common.Hash, code []byte) {
batch.codes[hash] = code
batch.size += common.HashLength + uint64(len(code))
}

// addNode caches a node database write operation.
func (batch *syncMemBatch) addNode(owner common.Hash, path []byte, blob []byte, hash common.Hash) {
if batch.scheme == rawdb.PathScheme {
if owner == (common.Hash{}) {
batch.size += uint64(len(path) + len(blob))
} else {
batch.size += common.HashLength + uint64(len(path)+len(blob))
}
} else {
batch.size += common.HashLength + uint64(len(blob))
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
blob: blob,
hash: hash,
})
}

// delNode caches a node database delete operation.
func (batch *syncMemBatch) delNode(owner common.Hash, path []byte) {
if batch.scheme != rawdb.PathScheme {
log.Error("Unexpected node deletion", "owner", owner, "path", path, "scheme", batch.scheme)
return // deletion is not supported in hash mode.
}
if owner == (common.Hash{}) {
batch.size += uint64(len(path))
} else {
batch.size += common.HashLength + uint64(len(path))
}
batch.nodes = append(batch.nodes, nodeOp{
owner: owner,
path: path,
})
}

// Sync is the main state trie synchronisation scheduler, which provides yet
// unknown trie hashes to retrieve, accepts node data associated with said hashes
// and reconstructs the trie step by step until all is done.
Expand All @@ -196,7 +242,7 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
ts := &Sync{
scheme: scheme,
database: database,
membatch: newSyncMemBatch(),
membatch: newSyncMemBatch(scheme),
nodeReqs: make(map[string]*nodeRequest),
codeReqs: make(map[common.Hash]*codeRequest),
queue: prque.New[int64, any](nil), // Ugh, can contain both string and hash, whyyy
Expand All @@ -210,16 +256,17 @@ func NewSync(root common.Hash, database ethdb.KeyValueReader, callback LeafCallb
// parent for completion tracking. The given path is a unique node path in
// hex format and contain all the parent path if it's layered trie node.
func (s *Sync) AddSubTrie(root common.Hash, path []byte, parent common.Hash, parentPath []byte, callback LeafCallback) {
// Short circuit if the trie is empty or already known
if root == types.EmptyRootHash {
return
}
if s.membatch.hasNode(path) {
Copy link
Member Author

@rjl493456442 rjl493456442 Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checking is meaningless and no longer required.

Originally in hash scheme, it's possible that a sub trie(storage trie) is shared by different contracts and the check here can serve as the de-duplication mechanism. However it's totally different in path scheme, which should still keep the duplication for different contracts. Therefore, path is used as the identifier here for checking duplication(originally it's node hash).

Due the fact the path can uniquely identify a trie node, so it's not really possible that the node with specific path is already cached in local batch, even for hash mode. It turns out a completely useless operation.

Therefore, we can remove it.

return
}
owner, inner := ResolvePath(path)
if rawdb.HasTrieNode(s.database, owner, inner, root, s.scheme) {
exist, inconsistent := s.hasNode(owner, inner, root)
if exist {
// The entire subtrie is already present in the database.
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
s.membatch.delNode(owner, inner)
}
// Assemble the new sub-trie sync request
req := &nodeRequest{
Expand Down Expand Up @@ -371,39 +418,42 @@ func (s *Sync) ProcessNode(result NodeSyncResult) error {
}

// Commit flushes the data stored in the internal membatch out to persistent
// storage, returning any occurred error.
// storage, returning any occurred error. The whole data set will be flushed
// in an atomic database batch.
func (s *Sync) Commit(dbw ethdb.Batch) error {
// Flush the pending node writes into database batch.
var (
account int
storage int
)
for path, value := range s.membatch.nodes {
owner, inner := ResolvePath([]byte(path))
if owner == (common.Hash{}) {
account += 1
for _, op := range s.membatch.nodes {
if op.isDelete() {
// node deletion is only supported in path mode.
if op.owner == (common.Hash{}) {
rawdb.DeleteAccountTrieNode(dbw, op.path)
} else {
rawdb.DeleteStorageTrieNode(dbw, op.owner, op.path)
}
deletionGauge.Inc(1)
} else {
storage += 1
if op.owner == (common.Hash{}) {
account += 1
} else {
storage += 1
}
rawdb.WriteTrieNode(dbw, op.owner, op.path, op.hash, op.blob, s.scheme)
}
rawdb.WriteTrieNode(dbw, owner, inner, s.membatch.hashes[path], value, s.scheme)
}
accountNodeSyncedGauge.Inc(int64(account))
storageNodeSyncedGauge.Inc(int64(storage))

// Flush the pending node deletes into the database batch.
// Please note that each written and deleted node has a
// unique path, ensuring no duplication occurs.
for path := range s.membatch.deletes {
owner, inner := ResolvePath([]byte(path))
rawdb.DeleteTrieNode(dbw, owner, inner, common.Hash{} /* unused */, s.scheme)
}
// Flush the pending code writes into database batch.
for hash, value := range s.membatch.codes {
rawdb.WriteCode(dbw, hash, value)
}
codeSyncedGauge.Inc(int64(len(s.membatch.codes)))

s.membatch = newSyncMemBatch() // reset the batch
s.membatch = newSyncMemBatch(s.scheme) // reset the batch
return nil
}

Expand Down Expand Up @@ -476,12 +526,15 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// child as invalid. This is essential in the case of path mode
// scheme; otherwise, state healing might overwrite existing child
// nodes silently while leaving a dangling parent node within the
// range of this internal path on disk. This would break the
// guarantee for state healing.
// range of this internal path on disk and the persistent state
// ends up with a very weird situation that nodes on the same path
// are not inconsistent while they all present in disk. This property
// would break the guarantee for state healing.
//
// While it's possible for this shortNode to overwrite a previously
// existing full node, the other branches of the fullNode can be
// retained as they remain untouched and complete.
// retained as they are not accessible with the new shortNode, and
// also the whole sub-trie is still untouched and complete.
//
// This step is only necessary for path mode, as there is no deletion
// in hash mode at all.
Expand All @@ -498,8 +551,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
exists = rawdb.ExistsStorageTrieNode(s.database, owner, append(inner, key[:i]...))
}
if exists {
req.deletes = append(req.deletes, key[:i])
deletionGauge.Inc(1)
s.membatch.delNode(owner, append(inner, key[:i]...))
log.Debug("Detected dangling node", "owner", owner, "path", append(inner, key[:i]...))
}
}
Expand All @@ -521,6 +573,7 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
var (
missing = make(chan *nodeRequest, len(children))
pending sync.WaitGroup
batchMu sync.Mutex
)
for _, child := range children {
// Notify any external watcher of a new key/value node
Expand All @@ -538,34 +591,32 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
}
}
}
// If the child references another node, resolve or schedule
// If the child references another node, resolve or schedule.
// We check all children concurrently.
if node, ok := (child.node).(hashNode); ok {
// Try to resolve the node from the local database
if s.membatch.hasNode(child.path) {
continue
}
// Check the presence of children concurrently
path := child.path
hash := common.BytesToHash(node)
pending.Add(1)
go func(child childNode) {
go func() {
defer pending.Done()

// If database says duplicate, then at least the trie node is present
// and we hold the assumption that it's NOT legacy contract code.
var (
chash = common.BytesToHash(node)
owner, inner = ResolvePath(child.path)
)
if rawdb.HasTrieNode(s.database, owner, inner, chash, s.scheme) {
owner, inner := ResolvePath(path)
exist, inconsistent := s.hasNode(owner, inner, hash)
if exist {
return
} else if inconsistent {
// There is a pre-existing node with the wrong hash in DB, remove it.
batchMu.Lock()
s.membatch.delNode(owner, inner)
batchMu.Unlock()
}
// Locally unknown node, schedule for retrieval
missing <- &nodeRequest{
path: child.path,
hash: chash,
path: path,
hash: hash,
parent: req,
callback: req.callback,
}
}(child)
}()
}
}
pending.Wait()
Expand All @@ -587,21 +638,10 @@ func (s *Sync) children(req *nodeRequest, object node) ([]*nodeRequest, error) {
// committed themselves.
func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// Write the node content to the membatch
s.membatch.nodes[string(req.path)] = req.data
s.membatch.hashes[string(req.path)] = req.hash
owner, path := ResolvePath(req.path)
s.membatch.addNode(owner, path, req.data, req.hash)

// The size tracking refers to the db-batch, not the in-memory data.
if s.scheme == rawdb.PathScheme {
s.membatch.size += uint64(len(req.path) + len(req.data))
} else {
s.membatch.size += common.HashLength + uint64(len(req.data))
}
// Delete the internal nodes which are marked as invalid
for _, segment := range req.deletes {
path := append(req.path, segment...)
s.membatch.deletes[string(path)] = struct{}{}
s.membatch.size += uint64(len(path))
}
// Removed the completed node request
delete(s.nodeReqs, string(req.path))
s.fetches[len(req.path)]--

Expand All @@ -622,8 +662,9 @@ func (s *Sync) commitNodeRequest(req *nodeRequest) error {
// committed themselves.
func (s *Sync) commitCodeRequest(req *codeRequest) error {
// Write the node content to the membatch
s.membatch.codes[req.hash] = req.data
s.membatch.size += common.HashLength + uint64(len(req.data))
s.membatch.addCode(req.hash, req.data)

// Removed the completed code request
delete(s.codeReqs, req.hash)
s.fetches[len(req.path)]--

Expand All @@ -639,6 +680,28 @@ func (s *Sync) commitCodeRequest(req *codeRequest) error {
return nil
}

// hasNode reports whether the specified trie node is present in the database.
// 'exists' is true when the node exists in the database and matches the given root
// hash. The 'inconsistent' return value is true when the node exists but does not
// match the expected hash.
func (s *Sync) hasNode(owner common.Hash, path []byte, hash common.Hash) (exists bool, inconsistent bool) {
// If node is running with hash scheme, check the presence with node hash.
if s.scheme == rawdb.HashScheme {
return rawdb.HasLegacyTrieNode(s.database, hash), false
}
// If node is running with path scheme, check the presence with node path.
var blob []byte
var dbHash common.Hash
if owner == (common.Hash{}) {
blob, dbHash = rawdb.ReadAccountTrieNode(s.database, path)
} else {
blob, dbHash = rawdb.ReadStorageTrieNode(s.database, owner, path)
}
exists = hash == dbHash
inconsistent = !exists && len(blob) != 0
return exists, inconsistent
}

// ResolvePath resolves the provided composite node path by separating the
// path in account trie if it's existent.
func ResolvePath(path []byte) (common.Hash, []byte) {
Expand Down
Loading
Loading