Skip to content

Commit

Permalink
Readable stream analysis refactor (#3231)
Browse files Browse the repository at this point in the history
* Experiment with importing webstreams

* Suppress implicit-import eslint error

* Replace readable-stream with web stream api

* Remove unnecessary null check

* Update package-lock and dependencies

* Fix test

* Make nonbreaking

* Use async stream in statemanager

* Update package-lock file and dependencies

* Include comment to detail deviation from import policy

* Revert "Update package-lock file and dependencies"

This reverts commit baf67f4.

* Revert "Update package-lock and dependencies"

This reverts commit 02b8845.

* Update package-lock
  • Loading branch information
scorbajio authored Jan 17, 2024
1 parent 51fc6da commit a8f326a
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 79 deletions.
82 changes: 33 additions & 49 deletions packages/statemanager/src/stateManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -962,21 +962,14 @@ export class DefaultStateManager implements EVMStateManagerInterface {
throw new Error(`dumpStorage f() can only be called for an existing account`)
}
const trie = this._getStorageTrie(address, account)
const storage: StorageDump = {}
const stream = trie.createAsyncReadStream()

return new Promise((resolve, reject) => {
const storage: StorageDump = {}
const stream = trie.createReadStream()
for await (const chunk of stream) {
storage[bytesToHex(chunk.key)] = bytesToHex(chunk.value)
}

stream.on('data', (val: any) => {
storage[bytesToHex(val.key)] = bytesToHex(val.value)
})
stream.on('end', () => {
resolve(storage)
})
stream.on('error', (e) => {
reject(e)
})
})
return storage
}

/**
Expand All @@ -999,44 +992,35 @@ export class DefaultStateManager implements EVMStateManagerInterface {
throw new Error(`Account does not exist.`)
}
const trie = this._getStorageTrie(address, account)

return new Promise((resolve, reject) => {
let inRange = false
let i = 0

/** Object conforming to {@link StorageRange.storage}. */
const storageMap: StorageRange['storage'] = {}
const stream = trie.createReadStream()

stream.on('data', (val: any) => {
if (!inRange) {
// Check if the key is already in the correct range.
if (bytesToBigInt(val.key) >= startKey) {
inRange = true
} else {
return
}
}

if (i < limit) {
storageMap[bytesToHex(val.key)] = { key: null, value: bytesToHex(val.value) }
i++
} else if (i === limit) {
resolve({
storage: storageMap,
nextKey: bytesToHex(val.key),
})
let inRange = false
let i = 0

/** Object conforming to {@link StorageRange.storage}. */
const storageMap: StorageRange['storage'] = {}
const stream = trie.createAsyncReadStream()
for await (const chunk of stream) {
if (!inRange) {
// Check if the key is already in the correct range.
if (bytesToBigInt(chunk.key) >= startKey) {
inRange = true
} else {
continue
}
})

stream.on('end', () => {
resolve({
}
if (i < limit) {
storageMap[bytesToHex(chunk.key)] = { key: null, value: bytesToHex(chunk.value) }
i++
} else if (i === limit) {
return {
storage: storageMap,
nextKey: null,
})
})
stream.on('error', (e) => reject(e))
})
nextKey: bytesToHex(chunk.key),
}
}
}
return {
storage: storageMap,
nextKey: null,
}
}

/**
Expand Down
20 changes: 19 additions & 1 deletion packages/trie/src/trie.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ import { verifyRangeProof } from './proof/range.js'
import { ROOT_DB_KEY } from './types.js'
import { _walkTrie } from './util/asyncWalk.js'
import { bytesToNibbles, matchingNibbleLength } from './util/nibbles.js'
import { TrieReadStream as ReadStream } from './util/readStream.js'
import {
TrieReadStream as ReadStream,
asyncTrieReadStream as asyncReadStream,
} from './util/readStream.js'
import { WalkController } from './util/walkController.js'

import type {
Expand All @@ -43,6 +46,12 @@ import type {
import type { OnFound } from './util/asyncWalk.js'
import type { BatchDBOp, DB, PutBatch } from '@ethereumjs/util'
import type { Debugger } from 'debug'
// Since ReadableStream is from a Web API, the following type import
// is not needed in and should be ignored by the browser, so an exeption
// is made here to deviate from our policy to not add Node.js specific
// package imports. -- 16/01/24
// eslint-disable-next-line implicit-dependencies/no-implicit
import type { ReadableStream } from 'node:stream/web'

interface Path {
node: TrieNode | null
Expand Down Expand Up @@ -1078,12 +1087,21 @@ export class Trie {

/**
* The `data` event is given an `Object` that has two properties; the `key` and the `value`. Both should be Uint8Arrays.
* @deprecated Use `createAsyncReadStream`
* @return Returns a [stream](https://nodejs.org/dist/latest-v12.x/docs/api/stream.html#stream_class_stream_readable) of the contents of the `trie`
*/
createReadStream(): ReadStream {
return new ReadStream(this)
}

/**
* Use asynchronous iteration over the chunks in a web stream using the for await...of syntax.
* @return Returns a [web stream](https://nodejs.org/api/webstreams.html#example-readablestream) of the contents of the `trie`
*/
createAsyncReadStream(): ReadableStream {
return asyncReadStream(this)
}

/**
* Returns a copy of the underlying trie.
*
Expand Down
69 changes: 45 additions & 24 deletions packages/trie/src/util/readStream.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// eslint-disable-next-line implicit-dependencies/no-implicit
import { ReadableStream } from 'node:stream/web'
import { Readable } from 'readable-stream'

import { BranchNode, LeafNode } from '../node/index.js'
Expand All @@ -7,6 +9,26 @@ import { nibblestoBytes } from './nibbles.js'
import type { Trie } from '../trie.js'
import type { FoundNodeFunction } from '../types.js'

const _findValueNodes = async (trie: Trie, onFound: FoundNodeFunction): Promise<void> => {
const outerOnFound: FoundNodeFunction = async (nodeRef, node, key, walkController) => {
let fullKey = key
if (node instanceof LeafNode) {
fullKey = key.concat(node.key())
// found leaf node!
onFound(nodeRef, node, fullKey, walkController)
} else if (node instanceof BranchNode && node.value()) {
// found branch with value
onFound(nodeRef, node, fullKey, walkController)
} else {
// keep looking for value nodes
if (node !== null) {
walkController.allChildren(node, key)
}
}
}
await trie.walkTrie(trie.root(), outerOnFound)
}

export class TrieReadStream extends Readable {
private trie: Trie
private _started: boolean
Expand All @@ -24,7 +46,7 @@ export class TrieReadStream extends Readable {
}
this._started = true
try {
await this._findValueNodes(async (_, node, key, walkController) => {
await _findValueNodes(this.trie, async (_, node, key, walkController) => {
if (node !== null) {
this.push({
key: nibblestoBytes(key),
Expand All @@ -42,30 +64,29 @@ export class TrieReadStream extends Readable {
}
this.push(null)
}
}

/**
* Finds all nodes that store k,v values
* called by {@link TrieReadStream}
* @private
*/
async _findValueNodes(onFound: FoundNodeFunction): Promise<void> {
const outerOnFound: FoundNodeFunction = async (nodeRef, node, key, walkController) => {
let fullKey = key

if (node instanceof LeafNode) {
fullKey = key.concat(node.key())
// found leaf node!
onFound(nodeRef, node, fullKey, walkController)
} else if (node instanceof BranchNode && node.value()) {
// found branch with value
onFound(nodeRef, node, fullKey, walkController)
} else {
// keep looking for value nodes
if (node !== null) {
walkController.allChildren(node, key)
export function asyncTrieReadStream(trie: Trie) {
return new ReadableStream({
async start(controller) {
try {
await _findValueNodes(trie, async (_, node, key, walkController) => {
if (node !== null) {
controller.enqueue({
key: nibblestoBytes(key),
value: node.value(),
})
walkController.allChildren(node, key)
}
})
} catch (error: any) {
if (error.message === 'Missing node in DB') {
// pass
} else {
throw error
}
}
}
await this.trie.walkTrie(this.trie.root(), outerOnFound)
}
controller.close()
},
})
}
25 changes: 20 additions & 5 deletions packages/trie/test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,12 @@ describe('kv stream test', () => {
},
] as BatchDBOp[]

const valObj = {} as any
const valObj1 = {} as any
const valObj2 = {} as any
for (const op of ops) {
if (op.type === 'put') {
valObj[op.key.toString()] = op.value.toString()
valObj1[op.key.toString()] = op.value.toString()
valObj2[op.key.toString()] = op.value.toString()
}
}

Expand All @@ -110,14 +112,27 @@ describe('kv stream test', () => {
stream.on('data', (d: any) => {
const key = d.key.toString()
const value = d.value.toString()
assert.equal(valObj[key], value)
delete valObj[key]
assert.equal(valObj1[key], value)
delete valObj1[key]
})
stream.on('end', () => {
const keys = Object.keys(valObj)
const keys = Object.keys(valObj1)
assert.equal(keys.length, 0)
})
})

it('should fetch all of the nodes from async stream', async () => {
const stream = trie.createAsyncReadStream()
for await (const chunk of stream) {
const key = chunk.key.toString()
const value = chunk.value.toString()
assert.equal(valObj2[key], value)
delete valObj2[key]
}

const keys = Object.keys(valObj2)
assert.equal(keys.length, 0)
})
})

describe('db stream test', () => {
Expand Down

0 comments on commit a8f326a

Please sign in to comment.