Skip to content

Commit

Permalink
feat: using simple queue
Browse files Browse the repository at this point in the history
  • Loading branch information
classicalliu committed Aug 2, 2019
1 parent fd34cb0 commit 3595368
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 18 deletions.
7 changes: 0 additions & 7 deletions packages/neuron-wallet/src/services/sync/block-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ export default class BlockListener {
await this.queue.kill()
}

public drain = async () => {
if (this.queue) {
return this.queue.drain()
}
return undefined
}

public regenerate = async (): Promise<void> => {
if (this.queue && this.queue.length() > 0) {
return
Expand Down
14 changes: 4 additions & 10 deletions packages/neuron-wallet/src/services/sync/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ import { Block, BlockHeader } from '../../types/cell-types'
import RangeForCheck from './range-for-check'
import BlockNumber from './block-number'
import Utils from './utils'
import QueueAdapter from './queue-adapter'
import SimpleQueue from './simple-queue'
import { TransactionPersistor } from '../tx'

export default class Queue {
private q: QueueAdapter
private concurrent: number = 1
private q: SimpleQueue
private lockHashes: string[]
private getBlocksService: GetBlocks
private startBlockNumber: bigint
Expand All @@ -26,7 +25,7 @@ export default class Queue {
currentBlockNumber: BlockNumber = new BlockNumber(),
rangeForCheck: RangeForCheck = new RangeForCheck()
) {
this.q = new QueueAdapter(this.getWorker(), this.concurrent)
this.q = new SimpleQueue(this.getWorker())
this.lockHashes = lockHashes
this.getBlocksService = new GetBlocks()
this.startBlockNumber = BigInt(startBlockNumber)
Expand All @@ -40,12 +39,11 @@ export default class Queue {
}

private getWorker = () => {
const worker = async (task: any, callback: any) => {
const worker = async (task: any) => {
try {
await Utils.retry(this.retryTime, 0, async () => {
await this.pipeline(task.blockNumbers)
})
await callback()
} catch {
this.clear()
}
Expand All @@ -69,10 +67,6 @@ export default class Queue {
this.q.kill()
}

public drain = async () => {
return this.q.drain()
}

public pipeline = async (blockNumbers: string[]) => {
// 1. get blocks
const blocks: Block[] = await this.getBlocksService.getRangeBlocks(blockNumbers)
Expand Down
60 changes: 60 additions & 0 deletions packages/neuron-wallet/src/services/sync/simple-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import Utils from './utils'

export default class SimpleQueue {
private q: any[] = []
private worker: any
private stopped = false

constructor(worker: any, start: boolean = true) {
this.worker = worker
if (start) {
this.start()
}
}

/* eslint no-await-in-loop: "off" */
public start = async () => {
while (!this.stopped) {
const nextValue = this.shift()
if (nextValue) {
await this.worker(nextValue)
await this.yield()
} else {
await this.yield(50)
}
}
}

private shift = () => {
return this.q.shift()
}

public yield = async (millisecond: number = 1) => {
return Utils.sleep(millisecond)
}

public push = (value: any) => {
this.q.push(value)
}

public stop = () => {
this.stopped = true

this.push = (value: any) => value
this.clear()
}

public kill = () => {
this.stop()
}

public clear = () => {
while (this.q.length) {
this.q.pop()
}
}

public length = (): number => {
return this.q.length
}
}
3 changes: 2 additions & 1 deletion packages/neuron-wallet/src/startup/sync-block-task/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import AddressesUsedSubject from '../../models/subjects/addresses-used-subject'
import BlockListener from '../../services/sync/block-listener'
import { NetworkWithID } from '../../services/networks'
import { initDatabase } from './init-database'
import Utils from '../../services/sync/utils'
import { register as registerTxStatusListener } from '../../listeners/tx-status'

import { register as registerAddressListener } from '../../listeners/address'
Expand Down Expand Up @@ -58,7 +59,7 @@ export const switchNetwork = async () => {
const regenerateListener = async () => {
await blockListener.stop()
// wait former queue to be drained
await blockListener.drain()
await Utils.sleep(3000)
const hashes: string[] = await loadAddressesAndConvert()
blockListener = new BlockListener(hashes, nodeService.tipNumberSubject)
await blockListener.start(true)
Expand Down

0 comments on commit 3595368

Please sign in to comment.