diff --git a/packages/neuron-wallet/src/services/sync/block-listener.ts b/packages/neuron-wallet/src/services/sync/block-listener.ts index 32d322ffbb..9c83896aac 100644 --- a/packages/neuron-wallet/src/services/sync/block-listener.ts +++ b/packages/neuron-wallet/src/services/sync/block-listener.ts @@ -60,13 +60,6 @@ export default class BlockListener { await this.queue.kill() } - public drain = async () => { - if (this.queue) { - return this.queue.drain() - } - return undefined - } - public regenerate = async (): Promise => { if (this.queue && this.queue.length() > 0) { return diff --git a/packages/neuron-wallet/src/services/sync/queue.ts b/packages/neuron-wallet/src/services/sync/queue.ts index 6ac7072560..b1237d0c47 100644 --- a/packages/neuron-wallet/src/services/sync/queue.ts +++ b/packages/neuron-wallet/src/services/sync/queue.ts @@ -3,12 +3,11 @@ import { Block, BlockHeader } from '../../types/cell-types' import RangeForCheck from './range-for-check' import BlockNumber from './block-number' import Utils from './utils' -import QueueAdapter from './queue-adapter' +import SimpleQueue from './simple-queue' import { TransactionPersistor } from '../tx' export default class Queue { - private q: QueueAdapter - private concurrent: number = 1 + private q: SimpleQueue private lockHashes: string[] private getBlocksService: GetBlocks private startBlockNumber: bigint @@ -26,7 +25,7 @@ export default class Queue { currentBlockNumber: BlockNumber = new BlockNumber(), rangeForCheck: RangeForCheck = new RangeForCheck() ) { - this.q = new QueueAdapter(this.getWorker(), this.concurrent) + this.q = new SimpleQueue(this.getWorker()) this.lockHashes = lockHashes this.getBlocksService = new GetBlocks() this.startBlockNumber = BigInt(startBlockNumber) @@ -40,12 +39,11 @@ export default class Queue { } private getWorker = () => { - const worker = async (task: any, callback: any) => { + const worker = async (task: any) => { try { await Utils.retry(this.retryTime, 0, async () => { await this.pipeline(task.blockNumbers) }) - await callback() } catch { this.clear() } @@ -69,10 +67,6 @@ export default class Queue { this.q.kill() } - public drain = async () => { - return this.q.drain() - } - public pipeline = async (blockNumbers: string[]) => { // 1. get blocks const blocks: Block[] = await this.getBlocksService.getRangeBlocks(blockNumbers) diff --git a/packages/neuron-wallet/src/services/sync/simple-queue.ts b/packages/neuron-wallet/src/services/sync/simple-queue.ts new file mode 100644 index 0000000000..249ca4c145 --- /dev/null +++ b/packages/neuron-wallet/src/services/sync/simple-queue.ts @@ -0,0 +1,60 @@ +import Utils from './utils' + +export default class SimpleQueue { + private q: any[] = [] + private worker: any + private stopped = false + + constructor(worker: any, start: boolean = true) { + this.worker = worker + if (start) { + this.start() + } + } + + /* eslint no-await-in-loop: "off" */ + public start = async () => { + while (!this.stopped) { + const nextValue = this.shift() + if (nextValue) { + await this.worker(nextValue) + await this.yield() + } else { + await this.yield(50) + } + } + } + + private shift = () => { + return this.q.shift() + } + + public yield = async (millisecond: number = 1) => { + return Utils.sleep(millisecond) + } + + public push = (value: any) => { + this.q.push(value) + } + + public stop = () => { + this.stopped = true + + this.push = (value: any) => value + this.clear() + } + + public kill = () => { + this.stop() + } + + public clear = () => { + while (this.q.length) { + this.q.pop() + } + } + + public length = (): number => { + return this.q.length + } +} diff --git a/packages/neuron-wallet/src/startup/sync-block-task/task.ts b/packages/neuron-wallet/src/startup/sync-block-task/task.ts index a8353af5b3..5a23dd7323 100644 --- a/packages/neuron-wallet/src/startup/sync-block-task/task.ts +++ b/packages/neuron-wallet/src/startup/sync-block-task/task.ts @@ -7,6 +7,7 @@ import AddressesUsedSubject from '../../models/subjects/addresses-used-subject' import BlockListener from '../../services/sync/block-listener' import { NetworkWithID } from '../../services/networks' import { initDatabase } from './init-database' +import Utils from '../../services/sync/utils' import { register as registerTxStatusListener } from '../../listeners/tx-status' import { register as registerAddressListener } from '../../listeners/address' @@ -58,7 +59,7 @@ export const switchNetwork = async () => { const regenerateListener = async () => { await blockListener.stop() // wait former queue to be drained - await blockListener.drain() + await Utils.sleep(3000) const hashes: string[] = await loadAddressesAndConvert() blockListener = new BlockListener(hashes, nodeService.tipNumberSubject) await blockListener.start(true)