Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve reindex command, optional block, also check if thread is running #747

Merged
merged 2 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export interface AdminCollectFeesCommand extends AdminCommand {

export interface AdminReindexChainCommand extends AdminCommand {
chainId: number
block?: number
}

export interface ICommandHandler {
Expand Down
30 changes: 27 additions & 3 deletions src/components/Indexer/crawlerThread.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ export async function processNetworkData(
await sleep(interval)
// reindex chain command called
if (REINDEX_BLOCK && !lockProccessing) {
const networkHeight = await getNetworkHeight(provider)
// either "true" for success or "false" otherwise
const result = await reindexChain(currentBlock)
const result = await reindexChain(currentBlock, networkHeight)
// get all reindex commands
// TODO (check that we do not receive multiple commands for same reindex before previous finishes)
parentPort.postMessage({
Expand All @@ -238,7 +239,17 @@ export async function processNetworkData(
}
}

async function reindexChain(currentBlock: number): Promise<boolean> {
async function reindexChain(
currentBlock: number,
networkHeight: number
): Promise<boolean> {
if (REINDEX_BLOCK > networkHeight) {
INDEXER_LOGGER.error(
`Invalid reindex block! ${REINDEX_BLOCK} is bigger than network height: ${networkHeight}. Continue indexing normally...`
)
REINDEX_BLOCK = null
return false
}
// for reindex command we don't care about last known/saved block
const block = await updateLastIndexedBlockNumber(REINDEX_BLOCK)
if (block !== -1) {
Expand Down Expand Up @@ -328,11 +339,24 @@ parentPort.on('message', (message) => {
REINDEX_QUEUE.push(message.data.reindexTask)
} else if (message.method === INDEXER_MESSAGES.REINDEX_CHAIN) {
// reindex a specific chain

// get the deploy block number
const deployBlock = getDeployedContractBlock(rpcDetails.chainId)
REINDEX_BLOCK =
// first option
let possibleBlock =
rpcDetails.startBlock && rpcDetails.startBlock >= deployBlock
? rpcDetails.startBlock
: deployBlock

// do we have a specific block number?
const { block } = message.data
if (block && !isNaN(block)) {
// we still need to check network height
if (block > deployBlock) {
possibleBlock = block
}
}
REINDEX_BLOCK = possibleBlock
} else if (message.method === INDEXER_MESSAGES.STOP_CRAWLING) {
// stop indexing the chain
stoppedCrawling = true
Expand Down
174 changes: 102 additions & 72 deletions src/components/Indexer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const JOBS_QUEUE: JobStatus[] = []

const MAX_CRAWL_RETRIES = 10
let numCrawlAttempts = 0

const runningThreads: Map<number, boolean> = new Map<number, boolean>()
export class OceanIndexer {
private db: Database
private networks: RPCS
Expand Down Expand Up @@ -72,7 +74,7 @@ export class OceanIndexer {
public stopAllThreads(): boolean {
let count = 0
for (const chainID of this.supportedChains) {
if (this.stopThread(parseInt(chainID))) {
if (this.stopThread(Number(chainID))) {
count++
}
}
Expand All @@ -84,6 +86,7 @@ export class OceanIndexer {
const worker = this.workers[chainID]
if (worker) {
worker.postMessage({ method: 'stop-crawling' })
runningThreads.set(chainID, false)
return true
}
INDEXER_LOGGER.error('Unable to find running worker thread for chain ' + chainID)
Expand Down Expand Up @@ -187,6 +190,7 @@ export class OceanIndexer {
)}`,
true
)
runningThreads.set(chainID, true)
return worker
}

Expand All @@ -200,79 +204,83 @@ export class OceanIndexer {
// track if we were able to start them all
count++
this.workers[chainId] = worker
worker.on('message', (event: any) => {
if (event.data) {
if (
[
EVENTS.METADATA_CREATED,
EVENTS.METADATA_UPDATED,
EVENTS.METADATA_STATE,
EVENTS.ORDER_STARTED,
EVENTS.ORDER_REUSED
].includes(event.method)
) {
// will emit the metadata created/updated event and advertise it to the other peers (on create only)
INDEXER_LOGGER.logMessage(
`Emiting "${event.method}" for DDO : ${event.data.id} from network: ${network} `
)
INDEXER_DDO_EVENT_EMITTER.emit(event.method, event.data.id)
// remove from indexing list
} else if (event.method === INDEXER_CRAWLING_EVENTS.REINDEX_QUEUE_POP) {
// remove this one from the queue (means we processed the reindex for this tx)
INDEXING_QUEUE = INDEXING_QUEUE.filter(
(task) =>
task.txId !== event.data.txId && task.chainId !== event.data.chainId
)
// reindex tx successfully done
INDEXER_CRAWLING_EVENT_EMITTER.emit(
INDEXER_CRAWLING_EVENTS.REINDEX_TX, // explicitly set constant value for readability
event.data
)
this.updateJobStatus(
PROTOCOL_COMMANDS.REINDEX_TX,
create256Hash([event.data.chainId, event.data.txId].join('')),
CommandStatus.SUCCESS
)
} else if (event.method === INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN) {
// we should listen to this on the dashboard for instance
INDEXER_CRAWLING_EVENT_EMITTER.emit(
INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN,
event.data
)
this.updateJobStatus(
PROTOCOL_COMMANDS.REINDEX_CHAIN,
create256Hash([event.data.chainId].join('')),
event.data.result ? CommandStatus.SUCCESS : CommandStatus.FAILURE
)
} else if (event.method === INDEXER_CRAWLING_EVENTS.CRAWLING_STARTED) {
INDEXER_CRAWLING_EVENT_EMITTER.emit(event.method, event.data)
}
} else {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
'Missing event data (ddo) on postMessage. Something is wrong!',
true
)
}
})

worker.on('error', (err: Error) => {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error in worker for network ${network}: ${err.message}`,
true
)
})
this.setupEventListeners(worker, chainId)
}
}
return count === this.supportedChains.length
}

worker.on('exit', (code: number) => {
private setupEventListeners(worker: Worker, chainId: number) {
worker.on('message', (event: any) => {
if (event.data) {
if (
[
EVENTS.METADATA_CREATED,
EVENTS.METADATA_UPDATED,
EVENTS.METADATA_STATE,
EVENTS.ORDER_STARTED,
EVENTS.ORDER_REUSED
].includes(event.method)
) {
// will emit the metadata created/updated event and advertise it to the other peers (on create only)
INDEXER_LOGGER.logMessage(
`Worker for network ${network} exited with code: ${code}`,
true
`Emiting "${event.method}" for DDO : ${event.data.id} from network: ${chainId} `
)
INDEXER_DDO_EVENT_EMITTER.emit(event.method, event.data.id)
// remove from indexing list
} else if (event.method === INDEXER_CRAWLING_EVENTS.REINDEX_QUEUE_POP) {
// remove this one from the queue (means we processed the reindex for this tx)
INDEXING_QUEUE = INDEXING_QUEUE.filter(
(task) => task.txId !== event.data.txId && task.chainId !== event.data.chainId
)
})
// reindex tx successfully done
INDEXER_CRAWLING_EVENT_EMITTER.emit(
INDEXER_CRAWLING_EVENTS.REINDEX_TX, // explicitly set constant value for readability
event.data
)
this.updateJobStatus(
PROTOCOL_COMMANDS.REINDEX_TX,
create256Hash([event.data.chainId, event.data.txId].join('')),
CommandStatus.SUCCESS
)
} else if (event.method === INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN) {
// we should listen to this on the dashboard for instance
INDEXER_CRAWLING_EVENT_EMITTER.emit(
INDEXER_CRAWLING_EVENTS.REINDEX_CHAIN,
event.data
)
this.updateJobStatus(
PROTOCOL_COMMANDS.REINDEX_CHAIN,
create256Hash([event.data.chainId].join('')),
event.data.result ? CommandStatus.SUCCESS : CommandStatus.FAILURE
)
} else if (event.method === INDEXER_CRAWLING_EVENTS.CRAWLING_STARTED) {
INDEXER_CRAWLING_EVENT_EMITTER.emit(event.method, event.data)
}
} else {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
'Missing event data (ddo) on postMessage. Something is wrong!',
true
)
}
}
return count === this.supportedChains.length
})

worker.on('error', (err: Error) => {
INDEXER_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Error in worker for network ${chainId}: ${err.message}`,
true
)
})

worker.on('exit', (code: number) => {
INDEXER_LOGGER.logMessage(
`Worker for network ${chainId} exited with code: ${code}`,
true
)
runningThreads.set(chainId, false)
})
}

public addReindexTask(reindexTask: ReindexTask): JobStatus | null {
Expand All @@ -293,18 +301,40 @@ export class OceanIndexer {
return null
}

public resetCrawling(chainId: number): JobStatus | null {
public async resetCrawling(
chainId: number,
blockNumber?: number
): Promise<JobStatus | null> {
const isRunning = runningThreads.get(chainId)
// not running, but still on the array
if (!isRunning && this.workers[chainId]) {
INDEXER_LOGGER.warn(
'Thread for chain: ' + chainId + ' is not running, restarting first...'
)
delete this.workers[chainId]
const worker = await this.startThread(chainId)
if (!worker) {
INDEXER_LOGGER.error('Could not restart worker thread, aborting...')
return null
}
this.workers[chainId] = worker
this.setupEventListeners(worker, chainId)
}
const worker = this.workers[chainId]
if (worker) {
const job = buildJobIdentifier(PROTOCOL_COMMANDS.REINDEX_CHAIN, [
chainId.toString()
])
worker.postMessage({
method: INDEXER_MESSAGES.REINDEX_CHAIN,
data: { msgId: job.jobId }
data: { msgId: job.jobId, block: blockNumber }
})
this.addJob(job)
return job
} else {
INDEXER_LOGGER.error(
`Could not find a worker thread for chain ${chainId}, aborting...`
)
}
return null
}
Expand Down
4 changes: 1 addition & 3 deletions src/components/core/admin/reindexChainHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ export class ReindexChainHandler extends AdminHandler {
async handle(task: AdminReindexChainCommand): Promise<P2PCommandResponse> {
const validation = this.validate(task)
if (!validation.valid) {
console.log('bad request:', validation)
return buildInvalidParametersResponse(validation)
}
CORE_LOGGER.logMessage(`Reindexing chain command called`)
const checkChainId = await checkSupportedChainId(task.chainId)
if (!checkChainId.validation) {
console.log('bad request 2:', checkChainId)
return buildErrorResponse(
`Chain ID ${task.chainId} is not supported in the node's config`
)
Expand All @@ -42,7 +40,7 @@ export class ReindexChainHandler extends AdminHandler {
return buildErrorResponse('Node is not running an indexer instance!')
}

const job = indexer.resetCrawling(task.chainId)
const job = await indexer.resetCrawling(Number(task.chainId), task.block)
if (job) {
return {
status: { httpStatus: 200 },
Expand Down
Loading