Skip to content

Commit

Permalink
adding process event
Browse files Browse the repository at this point in the history
  • Loading branch information
StoyanD committed Jul 24, 2024
1 parent 773294e commit e3fab3a
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 42 deletions.
1 change: 0 additions & 1 deletion src/bullmq/processors/dtos/SourceIntentTx.dto.ts

This file was deleted.

9 changes: 5 additions & 4 deletions src/bullmq/processors/solve-intent.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { Injectable, Logger } from '@nestjs/common'
import { Job } from 'bullmq'
import { EcoLogMessage } from '../../common/logging/eco-log-message'
import { SourceIntentService } from '../../source-intent/source-intent.service'
import { EventLogWS } from '../../source-intent/dtos/EventLogWS'
import { EventLogWS, SourceIntentTxHash } from '../../source-intent/dtos/EventLogWS'

@Injectable()
@Processor(QUEUES.CREATE_INTENT.queue)
Expand All @@ -25,9 +25,10 @@ export class SolveIntentProcessor extends WorkerHost {
)

switch (job.name) {
case QUEUES.CREATE_INTENT.jobs.create_intent:
const intentData = job.data as EventLogWS
return await this.sourceIntentService.createIntent(intentData)
case QUEUES.SOURCE_INTENT.jobs.create_intent:
return await this.sourceIntentService.createIntent(job.data as EventLogWS)
case QUEUES.SOURCE_INTENT.jobs.process_intent:
return await this.sourceIntentService.processIntent(job.data as SourceIntentTxHash)
default:
this.logger.error(
EcoLogMessage.fromDefault({
Expand Down
7 changes: 4 additions & 3 deletions src/common/redis/constants.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
export const QUEUES = {
CREATE_INTENT: {
queue: 'create_intent',
prefix: '{solver-create-intent}',
SOURCE_INTENT: {
queue: 'source_intent',
prefix: '{solver-source-intent}',
jobs: {
create_intent: 'create_intent',
process_intent: 'process_intent',
},
},
}
Expand Down
3 changes: 3 additions & 0 deletions src/source-intent/dtos/EventLogWS.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ import { Log as EthersLog } from 'alchemy-sdk'
export interface EventLogWS extends EthersLog {
network: Network
}

// The hash of the log event
export type SourceIntentTxHash = string
45 changes: 21 additions & 24 deletions src/source-intent/source-intent.service.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
import { Injectable, Logger, OnModuleInit } from '@nestjs/common'
import { AlchemyService } from '../alchemy/alchemy.service'
import { RedlockService } from '../nest-redlock/nest-redlock.service'
import { EventLogWS } from './dtos/EventLogWS'
import { EventLogWS, SourceIntentTxHash } from './dtos/EventLogWS'
import { InjectModel } from '@nestjs/mongoose'
import { SourceIntentModel } from './schemas/source-intent.schema'
import { Model } from 'mongoose'
import { EcoLogMessage } from '../common/logging/eco-log-message'
import { decodeCreateIntentLog } from '../ws/ws.helpers'
import { InjectQueue } from '@nestjs/bullmq'
import { QUEUES } from '../common/redis/constants'
import { JobsOptions, Queue } from 'bullmq'
import { EcoConfigService } from '../eco-configs/eco-config.service'
import { SourceIntentTx } from '../bullmq/processors/dtos/SourceIntentTx.dto'

Check failure on line 14 in src/source-intent/source-intent.service.ts

View workflow job for this annotation

GitHub Actions / lint

'SourceIntentTx' is defined but never used

/**
* Service class for solving an intent on chain
*/
@Injectable()
export class SourceIntentService implements OnModuleInit {
private logger = new Logger(SourceIntentService.name)

private intentJobConfig: JobsOptions
constructor(
private readonly alchemyService: AlchemyService,
@InjectQueue(QUEUES.SOURCE_INTENT.queue) private readonly intentQueue: Queue,
@InjectModel(SourceIntentModel.name) private intentModel: Model<SourceIntentModel>,
private redlockService: RedlockService,
private readonly ecoConfigService: EcoConfigService,
) {}

onModuleInit() {}
onModuleInit() {
this.intentJobConfig = this.ecoConfigService.getRedis().jobs.intentJobConfig
}

async createIntent(intentWs: EventLogWS) {
const intent = decodeCreateIntentLog(intentWs.data, intentWs.topics)
this.logger.log(`Creating intent: `)
const intent = decodeCreateIntentLog(intentWs.data, intentWs.topics)
const lock = await this.redlockService.acquireLock([intent.hash as string], 5000)
//this instance didn`t get the lock, so just break out here
if (!lock) {
Expand Down Expand Up @@ -58,6 +67,13 @@ export class SourceIntentService implements OnModuleInit {
receipt: null,
status: 'PENDING',
})

//add to processing queue
await this.intentQueue.add(QUEUES.SOURCE_INTENT.jobs.process_intent, intent.hash, {
jobId: intent.hash as string,
...this.intentJobConfig,
})

this.logger.debug(
EcoLogMessage.fromDefault({
message: `Recorded intent ${record.intent.hash}`,
Expand All @@ -74,24 +90,5 @@ export class SourceIntentService implements OnModuleInit {
}
}

// @OnEvent(EVENTS.SOURCE_INTENT_CREATED)
// async handleSourceIntentCreatedEvent(payload: any) {
// let lock: Lock
// try {
// lock = await this.redlockService.acquire([`test:${1}`], 5000)
// await this.delay(2000)
// this.logger.log(`Received event: ${payload}`)
// lock.release()
// } catch (e) {
// this.logger.error(e)
// }
// }

// async acquireLock(hash: string): Promise<Lock | null> {
// try {
// return await this.redlockService.acquire([hash], 5000)
// } catch {
// return null
// }
// }
async processIntent(intentHash: SourceIntentTxHash) {}

Check failure on line 93 in src/source-intent/source-intent.service.ts

View workflow job for this annotation

GitHub Actions / lint

'intentHash' is defined but never used
}
15 changes: 5 additions & 10 deletions src/source-intent/source-intent.ws.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import { AlchemyEventType, Network } from 'alchemy-sdk'
import { JobsOptions, Queue } from 'bullmq'
import { QUEUES } from '../common/redis/constants'
import { InjectQueue } from '@nestjs/bullmq'
import { SourceIntentTx } from '../bullmq/processors/dtos/SourceIntentTx.dto'
import { EventLogWS } from './dtos/EventLogWS'
import { EcoLogMessage } from '../common/logging/eco-log-message'

Expand All @@ -22,7 +21,7 @@ export class SourceIntentWsService implements OnModuleInit {
private intentJobConfig: JobsOptions

constructor(
@InjectQueue(QUEUES.CREATE_INTENT.queue) private readonly solveIntentQueue: Queue,
@InjectQueue(QUEUES.SOURCE_INTENT.queue) private readonly intentQueue: Queue,
private readonly alchemyService: AlchemyService,
private readonly ecoConfigService: EcoConfigService,
) {}
Expand Down Expand Up @@ -52,14 +51,10 @@ export class SourceIntentWsService implements OnModuleInit {
}),
)
//add to processing queue
await this.solveIntentQueue.add(
QUEUES.CREATE_INTENT.jobs.create_intent,
event as SourceIntentTx,
{
jobId: event.transactionHash,
...this.intentJobConfig,
},
)
await this.intentQueue.add(QUEUES.SOURCE_INTENT.jobs.create_intent, event as EventLogWS, {
jobId: event.transactionHash,
...this.intentJobConfig,
})
}
}
}

0 comments on commit e3fab3a

Please sign in to comment.