From 72e0906fa95a23a4d33179abe86698fe7d9c063a Mon Sep 17 00:00:00 2001 From: Stoyan Dimitrov Date: Wed, 8 Jan 2025 11:28:01 -0500 Subject: [PATCH] Ed 4570 retry unfeasable (#76) * move bigint conversion * moving watch services into own module. adding watch fulfill intent service * creating watch fulfillment event * refactoring watch create intent to use same abstract parent * adding tests for watch fulfillment adding inbox processor adding intent utils method for update - > needs more work and tests * adding default intervals, intentconfigs, and liquiditymanager to config defaults.ts removing eventemitter from app modules adding interval modeule and service adding interval processor and queue adding retry_intent to source intent queue * adding skeleton of retry infeasable * disabling storage prover from being accepted by solver * updating routes-ts to ~0.2.10-beta update only on patch * fixing merge * adding retry infeasable intents tests --- config/default.ts | 44 ++++ src/app.module.ts | 10 +- src/bullmq/processors/inbox.processor.ts | 55 +++++ src/bullmq/processors/interval.processor.ts | 54 +++++ src/bullmq/processors/processor.module.ts | 3 +- .../processors/solve-intent.processor.ts | 15 +- src/bullmq/utils/queue.ts | 11 + src/chain-monitor/chain-monitor.module.ts | 3 +- src/chain-monitor/chain-sync.service.ts | 4 +- .../tests/chain-sync.service.spec.ts | 8 +- src/common/errors/eco-error.ts | 8 +- src/common/redis/constants.ts | 15 ++ src/contracts/inbox.ts | 8 + src/contracts/prover.ts | 2 +- src/eco-configs/eco-config.service.ts | 14 ++ src/eco-configs/eco-config.types.ts | 34 ++- src/intent/feasable-intent.service.ts | 2 +- src/intent/fulfill-intent.service.ts | 2 +- src/intent/intent.controller.ts | 4 +- src/intent/intent.module.ts | 4 +- .../tests/feasable-intent.service.spec.ts | 1 - .../tests/fulfill-intent.service.spec.ts | 6 +- src/intent/tests/utils-intent.service.spec.ts | 54 ++++- .../tests/validate-intent.service.spec.ts | 2 +- src/intent/utils-intent.service.ts | 41 +++- src/intent/validate-intent.service.ts | 4 +- src/intent/watch-intent.service.ts | 201 ------------------ src/intervals/interval.module.ts | 18 ++ .../retry-infeasable-intents.service.ts | 89 ++++++++ .../retry-infeasable-intents.service.spec.ts | 167 +++++++++++++++ src/prover/proof.service.ts | 43 ++-- src/prover/tests/proof.service.spec.ts | 19 +- .../watch-create-intent.service.spec.ts} | 43 ++-- .../tests/watch-fulfillment.service.spec.ts | 141 ++++++++++++ .../intent/watch-create-intent.service.ts | 117 ++++++++++ src/watch/intent/watch-event.service.ts | 140 ++++++++++++ src/watch/intent/watch-fulfillment.service.ts | 114 ++++++++++ src/watch/watch.module.ts | 13 ++ yarn.lock | 31 +-- 39 files changed, 1216 insertions(+), 328 deletions(-) create mode 100644 src/bullmq/processors/inbox.processor.ts create mode 100644 src/bullmq/processors/interval.processor.ts create mode 100644 src/contracts/inbox.ts delete mode 100644 src/intent/watch-intent.service.ts create mode 100644 src/intervals/interval.module.ts create mode 100644 src/intervals/retry-infeasable-intents.service.ts create mode 100644 src/intervals/tests/retry-infeasable-intents.service.spec.ts rename src/{intent/tests/watch-intent.service.spec.ts => watch/intent/tests/watch-create-intent.service.spec.ts} (87%) create mode 100644 src/watch/intent/tests/watch-fulfillment.service.spec.ts create mode 100644 src/watch/intent/watch-create-intent.service.ts create mode 100644 src/watch/intent/watch-event.service.ts create mode 100644 src/watch/intent/watch-fulfillment.service.ts create mode 100644 src/watch/watch.module.ts diff --git a/config/default.ts b/config/default.ts index d6c15b2..a0c3965 100644 --- a/config/default.ts +++ b/config/default.ts @@ -45,6 +45,50 @@ export default { }, }, }, + intervals: { + retryInfeasableIntents: { + repeatOpts: { + every: 10000, + }, + jobTemplate: { + name: 'retry-infeasable-intents', + data: {}, + }, + }, + defaults: { + repeatOpts: { + every: 10000, + }, + jobTemplate: { + name: 'default-interval-job', + data: {}, + opts: { + removeOnComplete: true, + removeOnFail: true, + + attempts: 3, + backoff: { + type: 'exponential', + delay: 2_000, + }, + }, + }, + }, + }, + intentConfigs: { + proofs: { + storage_duration_seconds: 604800, + hyperlane_duration_seconds: 3600, + }, + }, + liquidityManager: { + intervalDuration: 300000, + targetSlippage: 0.02, + thresholds: { + surplus: 0.1, + deficit: 0.2, + }, + }, externalAPIs: {}, logger: { usePino: true, diff --git a/src/app.module.ts b/src/app.module.ts index 5562fc1..6b5a18d 100644 --- a/src/app.module.ts +++ b/src/app.module.ts @@ -4,7 +4,6 @@ import { ChainMonitorModule } from './chain-monitor/chain-monitor.module' import { EcoConfigService } from './eco-configs/eco-config.service' import { LoggerModule } from 'nestjs-pino' import { MongooseModule } from '@nestjs/mongoose' -import { EventEmitterModule } from '@nestjs/event-emitter' import { IntentModule } from './intent/intent.module' import { IntentSourceModel } from './intent/schemas/intent-source.schema' import { BalanceModule } from './balance/balance.module' @@ -15,21 +14,21 @@ import { ProcessorModule } from './bullmq/processors/processor.module' import { SolverModule } from './solver/solver.module' import { FlagsModule } from './flags/flags.module' import { LiquidityManagerModule } from '@/liquidity-manager/liquidity-manager.module' +import { WatchModule } from '@/watch/watch.module' +import { IntervalModule } from '@/intervals/interval.module' @Module({ imports: [ BalanceModule, ChainMonitorModule, EcoConfigModule.withAWS(), - EventEmitterModule.forRoot({ - // the delimiter used to segment namespaces - delimiter: '.', - }), + FlagsModule, HealthModule, IntentModule, SignModule, IntentSourceModel, + IntervalModule, ProcessorModule, MongooseModule.forRootAsync({ inject: [EcoConfigService], @@ -43,6 +42,7 @@ import { LiquidityManagerModule } from '@/liquidity-manager/liquidity-manager.mo ProverModule, SolverModule, LiquidityManagerModule, + WatchModule, ...getPino(), ], controllers: [], diff --git a/src/bullmq/processors/inbox.processor.ts b/src/bullmq/processors/inbox.processor.ts new file mode 100644 index 0000000..e0a0784 --- /dev/null +++ b/src/bullmq/processors/inbox.processor.ts @@ -0,0 +1,55 @@ +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq' +import { QUEUES } from '@/common/redis/constants' +import { Injectable, Logger } from '@nestjs/common' +import { Job } from 'bullmq' +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { UtilsIntentService } from '@/intent/utils-intent.service' +import { FulfillmentLog } from '@/contracts/inbox' + +@Injectable() +@Processor(QUEUES.INBOX.queue) +export class InboxProcessor extends WorkerHost { + private logger = new Logger(InboxProcessor.name) + constructor(private readonly utilsIntentService: UtilsIntentService) { + super() + } + + async process( + job: Job, + processToken?: string | undefined, // eslint-disable-line @typescript-eslint/no-unused-vars + ): Promise { + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `InboxProcessor: process`, + properties: { + job: job.name, + }, + }), + ) + + switch (job.name) { + case QUEUES.INBOX.jobs.fulfillment: + return await this.utilsIntentService.updateOnFulfillment(job.data as FulfillmentLog) + default: + this.logger.error( + EcoLogMessage.fromDefault({ + message: `InboxProcessor: Invalid job type ${job.name}`, + }), + ) + return Promise.reject('Invalid job type') + } + } + + @OnWorkerEvent('failed') + onJobFailed(job: Job, error: Error) { + this.logger.error( + EcoLogMessage.fromDefault({ + message: `InboxProcessor: Error processing job`, + properties: { + job, + error, + }, + }), + ) + } +} diff --git a/src/bullmq/processors/interval.processor.ts b/src/bullmq/processors/interval.processor.ts new file mode 100644 index 0000000..d4b81eb --- /dev/null +++ b/src/bullmq/processors/interval.processor.ts @@ -0,0 +1,54 @@ +import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq' +import { QUEUES } from '@/common/redis/constants' +import { Injectable, Logger } from '@nestjs/common' +import { Job } from 'bullmq' +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { RetryInfeasableIntentsService } from '@/intervals/retry-infeasable-intents.service' + +@Injectable() +@Processor(QUEUES.INTERVAL.queue) +export class IntervalProcessor extends WorkerHost { + private logger = new Logger(IntervalProcessor.name) + constructor(private readonly retryInfeasableIntentsService: RetryInfeasableIntentsService) { + super() + } + + async process( + job: Job, + processToken?: string | undefined, // eslint-disable-line @typescript-eslint/no-unused-vars + ): Promise { + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `IntervalProcessor: process`, + properties: { + job: job.name, + }, + }), + ) + + switch (job.name) { + case QUEUES.INTERVAL.jobs.retry_infeasable_intents: + return await this.retryInfeasableIntentsService.retryInfeasableIntents() + default: + this.logger.error( + EcoLogMessage.fromDefault({ + message: `IntervalProcessor: Invalid job type ${job.name}`, + }), + ) + return Promise.reject('Invalid job type') + } + } + + @OnWorkerEvent('failed') + onJobFailed(job: Job, error: Error) { + this.logger.error( + EcoLogMessage.fromDefault({ + message: `IntervalProcessor: Error processing job`, + properties: { + job, + error, + }, + }), + ) + } +} diff --git a/src/bullmq/processors/processor.module.ts b/src/bullmq/processors/processor.module.ts index f620ae8..7dc30fc 100644 --- a/src/bullmq/processors/processor.module.ts +++ b/src/bullmq/processors/processor.module.ts @@ -5,9 +5,10 @@ import { SolveIntentProcessor } from './solve-intent.processor' import { BalanceModule } from '../../balance/balance.module' import { IntentModule } from '../../intent/intent.module' import { SignModule } from '../../sign/sign.module' +import { InboxProcessor } from '@/bullmq/processors/inbox.processor' @Module({ imports: [BalanceModule, IntentModule, SignModule], - providers: [EthWebsocketProcessor, SignerProcessor, SolveIntentProcessor], + providers: [EthWebsocketProcessor, SignerProcessor, SolveIntentProcessor, InboxProcessor], }) export class ProcessorModule {} diff --git a/src/bullmq/processors/solve-intent.processor.ts b/src/bullmq/processors/solve-intent.processor.ts index 2b08fd1..039f05e 100644 --- a/src/bullmq/processors/solve-intent.processor.ts +++ b/src/bullmq/processors/solve-intent.processor.ts @@ -1,14 +1,14 @@ import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq' -import { QUEUES } from '../../common/redis/constants' +import { QUEUES } from '@/common/redis/constants' import { Injectable, Logger } from '@nestjs/common' import { Job } from 'bullmq' -import { EcoLogMessage } from '../../common/logging/eco-log-message' -import { FeasableIntentService } from '../../intent/feasable-intent.service' -import { ValidateIntentService } from '../../intent/validate-intent.service' -import { CreateIntentService } from '../../intent/create-intent.service' -import { FulfillIntentService } from '../../intent/fulfill-intent.service' +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { FeasableIntentService } from '@/intent/feasable-intent.service' +import { ValidateIntentService } from '@/intent/validate-intent.service' +import { CreateIntentService } from '@/intent/create-intent.service' +import { FulfillIntentService } from '@/intent/fulfill-intent.service' import { Hex } from 'viem' -import { IntentCreatedLog } from '../../contracts' +import { IntentCreatedLog } from '@/contracts' @Injectable() @Processor(QUEUES.SOURCE_INTENT.queue) @@ -40,6 +40,7 @@ export class SolveIntentProcessor extends WorkerHost { case QUEUES.SOURCE_INTENT.jobs.create_intent: return await this.createIntentService.createIntent(job.data as IntentCreatedLog) case QUEUES.SOURCE_INTENT.jobs.validate_intent: + case QUEUES.SOURCE_INTENT.jobs.retry_intent: return await this.validateIntentService.validateIntent(job.data as Hex) case QUEUES.SOURCE_INTENT.jobs.feasable_intent: return await this.feasableIntentService.feasableIntent(job.data as Hex) diff --git a/src/bullmq/utils/queue.ts b/src/bullmq/utils/queue.ts index ca91202..8a96819 100644 --- a/src/bullmq/utils/queue.ts +++ b/src/bullmq/utils/queue.ts @@ -9,3 +9,14 @@ export async function removeJobSchedulers(queue: Queue, jobName: string) { } } } + +/** + * Checks to see if there is a scheduled job of a given name in the queue. + * @param queue the queue to check + * @param jobName the name of the job to check + * @returns + */ +export async function isJobScheduled(queue: Queue, jobName: string): Promise { + const repeatableJobs = await queue.getJobSchedulers() + return !!repeatableJobs.find((job) => job.name === jobName) +} diff --git a/src/chain-monitor/chain-monitor.module.ts b/src/chain-monitor/chain-monitor.module.ts index 4359c4c..5114fff 100644 --- a/src/chain-monitor/chain-monitor.module.ts +++ b/src/chain-monitor/chain-monitor.module.ts @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common' import { IntentModule } from '../intent/intent.module' import { ChainSyncService } from './chain-sync.service' import { TransactionModule } from '../transaction/transaction.module' +import { WatchModule } from '@/watch/watch.module' @Module({ - imports: [IntentModule, TransactionModule], + imports: [IntentModule, TransactionModule, WatchModule], providers: [ChainSyncService], exports: [ChainSyncService], }) diff --git a/src/chain-monitor/chain-sync.service.ts b/src/chain-monitor/chain-sync.service.ts index 28622e7..2460fc6 100644 --- a/src/chain-monitor/chain-sync.service.ts +++ b/src/chain-monitor/chain-sync.service.ts @@ -8,7 +8,7 @@ import { IntentSource } from '../eco-configs/eco-config.types' import { IntentCreatedLog } from '../contracts' import { entries } from 'lodash' import { BlockTag } from 'viem' -import { WatchIntentService } from '../intent/watch-intent.service' +import { WatchCreateIntentService } from '../watch/intent/watch-create-intent.service' import { KernelAccountClientService } from '../transaction/smart-wallets/kernel/kernel-account-client.service' import { IntentSourceAbi } from '@eco-foundation/routes-ts' @@ -25,7 +25,7 @@ export class ChainSyncService implements OnApplicationBootstrap { constructor( @InjectModel(IntentSourceModel.name) private intentModel: Model, private readonly kernelAccountClientService: KernelAccountClientService, - private readonly watchIntentService: WatchIntentService, + private readonly watchIntentService: WatchCreateIntentService, private ecoConfigService: EcoConfigService, ) {} diff --git a/src/chain-monitor/tests/chain-sync.service.spec.ts b/src/chain-monitor/tests/chain-sync.service.spec.ts index 31ea10a..d0bd43b 100644 --- a/src/chain-monitor/tests/chain-sync.service.spec.ts +++ b/src/chain-monitor/tests/chain-sync.service.spec.ts @@ -1,6 +1,6 @@ import { createMock, DeepMocked } from '@golevelup/ts-jest' import { ChainSyncService } from '../chain-sync.service' -import { WatchIntentService } from '../../intent/watch-intent.service' +import { WatchCreateIntentService } from '../../watch/intent/watch-create-intent.service' import { EcoConfigService } from '../../eco-configs/eco-config.service' import { Test, TestingModule } from '@nestjs/testing' import { getModelToken } from '@nestjs/mongoose' @@ -14,7 +14,7 @@ import { IntentSourceAbi } from '@eco-foundation/routes-ts' describe('ChainSyncService', () => { let chainSyncService: ChainSyncService let accountService: DeepMocked - let watchIntentService: DeepMocked + let watchIntentService: DeepMocked let ecoConfigService: DeepMocked beforeEach(async () => { @@ -25,7 +25,7 @@ describe('ChainSyncService', () => { provide: KernelAccountClientService, useValue: createMock(), }, - { provide: WatchIntentService, useValue: createMock() }, + { provide: WatchCreateIntentService, useValue: createMock() }, { provide: EcoConfigService, useValue: createMock() }, { provide: getModelToken(IntentSourceModel.name), @@ -36,7 +36,7 @@ describe('ChainSyncService', () => { chainSyncService = chainMod.get(ChainSyncService) accountService = chainMod.get(KernelAccountClientService) - watchIntentService = chainMod.get(WatchIntentService) + watchIntentService = chainMod.get(WatchCreateIntentService) ecoConfigService = chainMod.get(EcoConfigService) as DeepMocked }) diff --git a/src/common/errors/eco-error.ts b/src/common/errors/eco-error.ts index 15cfe13..50be44d 100644 --- a/src/common/errors/eco-error.ts +++ b/src/common/errors/eco-error.ts @@ -57,11 +57,11 @@ export class EcoError extends Error { } // WatchIntent Service - static WatchIntentUnsubscribeError = new Error('Could not unsubscribe from watch intent') - static WatchIntentUnsubscribeFromError(chainID: number) { - return new Error(`Could not unsubscribe from watch intent for chain : ${chainID}`) + static WatchEventUnsubscribeError = new Error('Could not unsubscribe from watch event') + static WatchEventUnsubscribeFromError(chainID: number) { + return new Error(`Could not unsubscribe from watch event for chain : ${chainID}`) } - static WatchIntentNoUnsubscribeError(chainID: number) { + static WatchEventNoUnsubscribeError(chainID: number) { return new Error(`There is no unwatch for chain : ${chainID}`) } diff --git a/src/common/redis/constants.ts b/src/common/redis/constants.ts index 9b621f4..d52eefd 100644 --- a/src/common/redis/constants.ts +++ b/src/common/redis/constants.ts @@ -7,6 +7,21 @@ export const QUEUES: Record = { validate_intent: 'validate_intent', feasable_intent: 'feasable_intent', fulfill_intent: 'fulfill_intent', + retry_intent: 'retry_intent', + }, + }, + INTERVAL: { + queue: 'interval', + prefix: '{interval}', + jobs: { + retry_infeasable_intents: 'retry_infeasable_intents', + }, + }, + INBOX: { + queue: 'inbox', + prefix: '{inbox}', + jobs: { + fulfillment: 'fulfillment', }, }, ETH_SOCKET: { diff --git a/src/contracts/inbox.ts b/src/contracts/inbox.ts new file mode 100644 index 0000000..63f2c5b --- /dev/null +++ b/src/contracts/inbox.ts @@ -0,0 +1,8 @@ +import { InboxAbi } from '@eco-foundation/routes-ts' +import { ExtractAbiEvent } from 'abitype' +import { Prettify, Log } from 'viem' + +// Define the type for the Fulfillment event log +export type FulfillmentLog = Prettify< + Log, true> +> diff --git a/src/contracts/prover.ts b/src/contracts/prover.ts index 6275dde..d100478 100644 --- a/src/contracts/prover.ts +++ b/src/contracts/prover.ts @@ -3,7 +3,7 @@ export const PROOF_HYPERLANE = 1 export type ProofType = typeof PROOF_STORAGE | typeof PROOF_HYPERLANE -export const Proofs = { +export const Proofs: Record = { Storage: PROOF_STORAGE, Hyperlane: PROOF_HYPERLANE, } diff --git a/src/eco-configs/eco-config.service.ts b/src/eco-configs/eco-config.service.ts index 5b31b87..ae1beb6 100644 --- a/src/eco-configs/eco-config.service.ts +++ b/src/eco-configs/eco-config.service.ts @@ -114,6 +114,20 @@ export class EcoConfigService implements OnModuleInit { return this.get('eth') } + // Returns the intervals config, sets defaults for repeatOpts and jobTemplate if not set + getIntervals(): EcoConfigType['intervals'] { + const configs = this.get('intervals') as EcoConfigType['intervals'] + for (const [, value] of Object.entries(configs)) { + _.merge(value, configs.defaults, value) + } + return configs + } + + // Returns the intent configs + getIntentConfigs(): EcoConfigType['intentConfigs'] { + return this.get('intentConfigs') + } + // Returns the external APIs config getExternalAPIs(): EcoConfigType['externalAPIs'] { return this.get('externalAPIs') diff --git a/src/eco-configs/eco-config.types.ts b/src/eco-configs/eco-config.types.ts index ebd7364..2d893bd 100644 --- a/src/eco-configs/eco-config.types.ts +++ b/src/eco-configs/eco-config.types.ts @@ -3,7 +3,7 @@ import { ClusterNode } from 'ioredis' import { Params as PinoParams } from 'nestjs-pino' import * as Redis from 'ioredis' import { Settings } from 'redlock' -import { JobsOptions } from 'bullmq' +import { JobsOptions, RepeatOptions } from 'bullmq' import { Hex } from 'viem' import { LDOptions } from '@launchdarkly/node-server-sdk' @@ -14,6 +14,8 @@ export type EcoConfigType = { } externalAPIs: unknown redis: RedisConfig + intervals: IntervalConfig + intentConfigs: IntentConfig alchemy: AlchemyConfigType launchDarkly: LaunchDarklyConfig eth: { @@ -91,6 +93,36 @@ export type RedisConfig = { } } +/** + * The config type for the intervals section + */ +export type IntervalConfig = { + retryInfeasableIntents: { + repeatOpts: Omit + jobTemplate: { + name?: string + opts: Omit + } + } + defaults: { + repeatOpts: Omit + jobTemplate?: { + name?: string + opts?: Omit + } + } +} + +/** + * The config type for the intent section + */ +export type IntentConfig = { + proofs: { + storage_duration_seconds: number + hyperlane_duration_seconds: number + } +} + /** * The config type for the aws credentials */ diff --git a/src/intent/feasable-intent.service.ts b/src/intent/feasable-intent.service.ts index 89c3290..d489349 100644 --- a/src/intent/feasable-intent.service.ts +++ b/src/intent/feasable-intent.service.ts @@ -80,7 +80,7 @@ export class FeasableIntentService implements OnModuleInit { ...this.intentJobConfig, }) } else { - await this.utilsIntentService.updateInfeasableIntentModel(this.intentModel, model, results) + await this.utilsIntentService.updateInfeasableIntentModel(model, results) } } diff --git a/src/intent/fulfill-intent.service.ts b/src/intent/fulfill-intent.service.ts index ac64898..e305c9e 100644 --- a/src/intent/fulfill-intent.service.ts +++ b/src/intent/fulfill-intent.service.ts @@ -128,7 +128,7 @@ export class FulfillIntentService { throw e } finally { // Update the db model - await this.utilsIntentService.updateIntentModel(this.intentModel, model) + await this.utilsIntentService.updateIntentModel(model) } } diff --git a/src/intent/intent.controller.ts b/src/intent/intent.controller.ts index c129751..9d0b6fe 100644 --- a/src/intent/intent.controller.ts +++ b/src/intent/intent.controller.ts @@ -1,5 +1,5 @@ import { Controller, Get } from '@nestjs/common' -import { WatchIntentService } from './watch-intent.service' +import { WatchCreateIntentService } from '../watch/intent/watch-create-intent.service' import { Network } from 'alchemy-sdk' import { ValidateIntentService } from './validate-intent.service' import { Logger } from '@nestjs/common' @@ -11,7 +11,7 @@ import { IntentCreatedLog } from '../contracts' export class IntentSourceController { private logger = new Logger(IntentSourceController.name) constructor( - private readonly watchIntentService: WatchIntentService, + private readonly watchIntentService: WatchCreateIntentService, private readonly validateService: ValidateIntentService, ) {} diff --git a/src/intent/intent.module.ts b/src/intent/intent.module.ts index 0f204c7..1ff206a 100644 --- a/src/intent/intent.module.ts +++ b/src/intent/intent.module.ts @@ -5,7 +5,6 @@ import { IntentSourceModel, IntentSourceSchema } from './schemas/intent-source.s import { ValidateIntentService } from './validate-intent.service' import { FeasableIntentService } from './feasable-intent.service' import { CreateIntentService } from './create-intent.service' -import { WatchIntentService } from './watch-intent.service' import { UtilsIntentService } from './utils-intent.service' import { BalanceModule } from '../balance/balance.module' import { FulfillIntentService } from './fulfill-intent.service' @@ -29,17 +28,16 @@ import { FlagsModule } from '../flags/flags.module' CreateIntentService, ValidateIntentService, FeasableIntentService, - WatchIntentService, FulfillIntentService, UtilsIntentService, ], // controllers: [IntentSourceController], exports: [ - WatchIntentService, CreateIntentService, ValidateIntentService, FeasableIntentService, FulfillIntentService, + UtilsIntentService, MongooseModule, //add IntentSourceModel to the rest of the modules that import intents ], }) diff --git a/src/intent/tests/feasable-intent.service.spec.ts b/src/intent/tests/feasable-intent.service.spec.ts index a9abba8..8cf378b 100644 --- a/src/intent/tests/feasable-intent.service.spec.ts +++ b/src/intent/tests/feasable-intent.service.spec.ts @@ -109,7 +109,6 @@ describe('FeasableIntentService', () => { await feasableIntentService.feasableIntent(intentHash) expect(utilsIntentService.updateInfeasableIntentModel).toHaveBeenCalledWith( - {}, mockData.model, validateExecution.results, ) diff --git a/src/intent/tests/fulfill-intent.service.spec.ts b/src/intent/tests/fulfill-intent.service.spec.ts index 3c7bf8f..74011d6 100644 --- a/src/intent/tests/fulfill-intent.service.spec.ts +++ b/src/intent/tests/fulfill-intent.service.spec.ts @@ -191,7 +191,7 @@ describe('FulfillIntentService', () => { }) await expect(() => fulfillIntentService.executeFulfillIntent(hash)).rejects.toThrow(error) expect(utilsIntentService.updateIntentModel).toHaveBeenCalledTimes(1) - expect(utilsIntentService.updateIntentModel).toHaveBeenCalledWith(intentModel, { + expect(utilsIntentService.updateIntentModel).toHaveBeenCalledWith({ ...model, status: 'FAILED', receipt: error, @@ -209,7 +209,7 @@ describe('FulfillIntentService', () => { await expect(() => fulfillIntentService.executeFulfillIntent(hash)).rejects.toThrow(error2) expect(utilsIntentService.updateIntentModel).toHaveBeenCalledTimes(2) - expect(utilsIntentService.updateIntentModel).toHaveBeenLastCalledWith(intentModel, { + expect(utilsIntentService.updateIntentModel).toHaveBeenLastCalledWith({ ...model, status: 'FAILED', receipt: { previous: error, current: error2 }, @@ -273,7 +273,7 @@ describe('FulfillIntentService', () => { it('should update the db model with status and receipt', async () => { expect(utilsIntentService.updateIntentModel).toHaveBeenCalledTimes(1) - expect(utilsIntentService.updateIntentModel).toHaveBeenCalledWith(intentModel, { + expect(utilsIntentService.updateIntentModel).toHaveBeenCalledWith({ ...model, status: 'SOLVED', receipt: { transactionHash }, diff --git a/src/intent/tests/utils-intent.service.spec.ts b/src/intent/tests/utils-intent.service.spec.ts index 3f7595f..1cc0861 100644 --- a/src/intent/tests/utils-intent.service.spec.ts +++ b/src/intent/tests/utils-intent.service.spec.ts @@ -12,6 +12,7 @@ import { Queue } from 'bullmq' import { EcoError } from '../../common/errors/eco-error' import { getFunctionBytes } from '../../common/viem/contracts' import { address1, address2 } from './feasable-intent.service.spec' +import { FulfillmentLog } from '@/contracts/inbox' jest.mock('viem', () => { return { @@ -74,7 +75,7 @@ describe('UtilsIntentService', () => { describe('on updateIntentModel', () => { it('should updateOne model off the intent hash', async () => { - await utilsIntentService.updateIntentModel(intentModel, model) + await utilsIntentService.updateIntentModel(model) expect(mockUpdateOne).toHaveBeenCalledTimes(1) expect(mockUpdateOne).toHaveBeenCalledWith({ 'intent.hash': model.intent.hash }, model) }) @@ -89,7 +90,7 @@ describe('UtilsIntentService', () => { expiresEarly: false, sameChainFulfill: false, } - await utilsIntentService.updateInvalidIntentModel(intentModel, model, invalidCause) + await utilsIntentService.updateInvalidIntentModel(model, invalidCause) expect(mockUpdateOne).toHaveBeenCalledTimes(1) expect(mockUpdateOne).toHaveBeenCalledWith( { 'intent.hash': model.intent.hash }, @@ -106,7 +107,7 @@ describe('UtilsIntentService', () => { profitable: false, }, ] - await utilsIntentService.updateInfeasableIntentModel(intentModel, model, infeasable) + await utilsIntentService.updateInfeasableIntentModel(model, infeasable) expect(mockUpdateOne).toHaveBeenCalledTimes(1) expect(mockUpdateOne).toHaveBeenCalledWith( { 'intent.hash': model.intent.hash }, @@ -250,8 +251,8 @@ describe('UtilsIntentService', () => { } as any const solver = { targets: { address1: { contractType: 'erc20', selectors: [] } } } expect(utilsIntentService.targetsSupported(model, solver as any)).toBe(false) - expect(mockLogWarn).toHaveBeenCalledTimes(1) - expect(mockLogWarn).toHaveBeenCalledWith({ + expect(mockLogDebug).toHaveBeenCalledTimes(1) + expect(mockLogDebug).toHaveBeenCalledWith({ msg: `Targets not supported for intent ${model.intent.hash}`, intentHash: model.intent.hash, sourceNetwork: model.event.sourceNetwork, @@ -265,8 +266,8 @@ describe('UtilsIntentService', () => { } as any const solver = { targets: {} } expect(utilsIntentService.targetsSupported(model, solver as any)).toBe(false) - expect(mockLogWarn).toHaveBeenCalledTimes(1) - expect(mockLogWarn).toHaveBeenCalledWith({ + expect(mockLogDebug).toHaveBeenCalledTimes(1) + expect(mockLogDebug).toHaveBeenCalledWith({ msg: `Targets not supported for intent ${model.intent.hash}`, intentHash: model.intent.hash, sourceNetwork: model.event.sourceNetwork, @@ -280,8 +281,8 @@ describe('UtilsIntentService', () => { } as any const solver = { targets: { [target1]: targetConfig } } expect(utilsIntentService.targetsSupported(model, solver as any)).toBe(false) - expect(mockLogWarn).toHaveBeenCalledTimes(1) - expect(mockLogWarn).toHaveBeenCalledWith({ + expect(mockLogDebug).toHaveBeenCalledTimes(1) + expect(mockLogDebug).toHaveBeenCalledWith({ msg: `Targets not supported for intent ${model.intent.hash}`, intentHash: model.intent.hash, sourceNetwork: model.event.sourceNetwork, @@ -346,4 +347,39 @@ describe('UtilsIntentService', () => { expect(await utilsIntentService.getIntentProcessData(intentHash)).toEqual({ model, solver }) }) }) + + describe('on updateOnFulfillment', () => { + const fulfillment = { + args: { + _hash: '0x123', + _solver: '0x456', + _intent: '0x789', + _receipt: '0xabc', + _result: '0xdef', + }, + } as any as FulfillmentLog + it('should log a warning if no intent exists in the db for the fulfillment hash', async () => { + intentModel.findOne = jest.fn().mockReturnValue(undefined) + await utilsIntentService.updateOnFulfillment(fulfillment) + expect(mockLogWarn).toHaveBeenCalledTimes(1) + expect(mockLogWarn).toHaveBeenCalledWith({ + msg: `Intent not found for fulfillment ${fulfillment.args._hash}`, + fulfillment, + }) + }) + + it('should update the intent as solved if it exists', async () => { + const model = { + face: 1, + status: 'PENDING', + } + intentModel.findOne = jest.fn().mockReturnValue(model) + await utilsIntentService.updateOnFulfillment(fulfillment) + expect(intentModel.updateOne).toHaveBeenCalledTimes(1) + expect(intentModel.updateOne).toHaveBeenCalledWith( + { 'intent.hash': fulfillment.args._hash }, + { ...model, status: 'SOLVED' }, + ) + }) + }) }) diff --git a/src/intent/tests/validate-intent.service.spec.ts b/src/intent/tests/validate-intent.service.spec.ts index dbf7a7e..853bf14 100644 --- a/src/intent/tests/validate-intent.service.spec.ts +++ b/src/intent/tests/validate-intent.service.spec.ts @@ -268,7 +268,7 @@ describe('ValidateIntentService', () => { expect(await validateIntentService['assertValidations'](model, solver)).toBe(false) expect(updateInvalidIntentModel).toHaveBeenCalledTimes(1) expect(mockLogLog).toHaveBeenCalledTimes(1) - expect(updateInvalidIntentModel).toHaveBeenCalledWith({}, model, logObj) + expect(updateInvalidIntentModel).toHaveBeenCalledWith(model, logObj) expect(mockLogLog).toHaveBeenCalledWith({ msg: `Intent failed validation ${model.intent.hash}`, model, diff --git a/src/intent/utils-intent.service.ts b/src/intent/utils-intent.service.ts index dd58e3d..ec23cb0 100644 --- a/src/intent/utils-intent.service.ts +++ b/src/intent/utils-intent.service.ts @@ -10,6 +10,7 @@ import { difference, includes } from 'lodash' import { decodeFunctionData, DecodeFunctionDataReturnType, Hex, toFunctionSelector } from 'viem' import { getERCAbi } from '../contracts' import { getFunctionBytes } from '../common/viem/contracts' +import { FulfillmentLog } from '@/contracts/inbox' /** * Data for a transaction target @@ -59,8 +60,8 @@ export class UtilsIntentService { * @param intentModel the model factory to use * @param model the new model data */ - async updateIntentModel(intentModel: Model, model: IntentSourceModel) { - return await intentModel.updateOne({ 'intent.hash': model.intent.hash }, model) + async updateIntentModel(model: IntentSourceModel) { + return await this.intentModel.updateOne({ 'intent.hash': model.intent.hash }, model) } /** @@ -72,7 +73,6 @@ export class UtilsIntentService { * @returns */ async updateInvalidIntentModel( - intentModel: Model, model: IntentSourceModel, invalidCause: { proverUnsupported: boolean @@ -84,7 +84,7 @@ export class UtilsIntentService { ) { model.status = 'INVALID' model.receipt = invalidCause as any - return await this.updateIntentModel(intentModel, model) + return await this.updateIntentModel(model) } /** @@ -95,14 +95,35 @@ export class UtilsIntentService { * @param infeasable the infeasable result * @returns */ - async updateInfeasableIntentModel( - intentModel: Model, - model: IntentSourceModel, - infeasable: InfeasableResult, - ) { + async updateInfeasableIntentModel(model: IntentSourceModel, infeasable: InfeasableResult) { model.status = 'INFEASABLE' model.receipt = infeasable as any - return await this.updateIntentModel(intentModel, model) + return await this.updateIntentModel(model) + } + + /** + * Updates the intent model with the fulfillment status. If the intent was fulfilled by this solver, then + * the status should already be SOLVED: in that case this function does nothing. + * + * @param fulfillment the fulfillment log event + */ + async updateOnFulfillment(fulfillment: FulfillmentLog) { + const model = await this.intentModel.findOne({ + 'intent.hash': fulfillment.args._hash, + }) + if (model) { + model.status = 'SOLVED' + await this.intentModel.updateOne({ 'intent.hash': fulfillment.args._hash }, model) + } else { + this.logger.warn( + EcoLogMessage.fromDefault({ + message: `Intent not found for fulfillment ${fulfillment.args._hash}`, + properties: { + fulfillment, + }, + }), + ) + } } /** diff --git a/src/intent/validate-intent.service.ts b/src/intent/validate-intent.service.ts index 4064d28..096ab87 100644 --- a/src/intent/validate-intent.service.ts +++ b/src/intent/validate-intent.service.ts @@ -108,7 +108,7 @@ export class ValidateIntentService implements OnModuleInit { expiresEarly || sameChainFulfill ) { - await this.utilsIntentService.updateInvalidIntentModel(this.intentModel, model, { + await this.utilsIntentService.updateInvalidIntentModel(model, { proverUnsupported, targetsUnsupported, selectorsUnsupported, @@ -127,7 +127,7 @@ export class ValidateIntentService implements OnModuleInit { sameChainFulfill, ...(expiresEarly && { proofMinDurationSeconds: this.proofService - .getProofMinimumDate(model.intent.prover) + .getProofMinimumDate(this.proofService.getProverType(model.intent.prover)) .toUTCString(), }), }, diff --git a/src/intent/watch-intent.service.ts b/src/intent/watch-intent.service.ts deleted file mode 100644 index 6bc03d8..0000000 --- a/src/intent/watch-intent.service.ts +++ /dev/null @@ -1,201 +0,0 @@ -import { Injectable, Logger, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common' -import { EcoConfigService } from '../eco-configs/eco-config.service' -import { JobsOptions, Queue } from 'bullmq' -import { QUEUES } from '../common/redis/constants' -import { InjectQueue } from '@nestjs/bullmq' -import { getIntentJobId } from '../common/utils/strings' -import { IntentSource } from '../eco-configs/eco-config.types' -import { EcoLogMessage } from '../common/logging/eco-log-message' -import { MultichainPublicClientService } from '../transaction/multichain-public-client.service' -import { IntentCreatedLog } from '../contracts' -import { PublicClient, WatchContractEventReturnType, zeroHash } from 'viem' -import { convertBigIntsToStrings } from '../common/viem/utils' -import { entries } from 'lodash' -import { IntentSourceAbi } from '@eco-foundation/routes-ts' -import { EcoError } from '@/common/errors/eco-error' - -/** - * This service subscribes to IntentSource contracts for IntentCreated events. It subscribes on all - * supported chains and prover addresses. When an event is emitted, it mutates the event log, and then - * adds it intent queue for processing. - */ -@Injectable() -export class WatchIntentService implements OnApplicationBootstrap, OnModuleDestroy { - private logger = new Logger(WatchIntentService.name) - private intentJobConfig: JobsOptions - private unwatch: Record = {} - - constructor( - @InjectQueue(QUEUES.SOURCE_INTENT.queue) private readonly intentQueue: Queue, - private readonly publicClientService: MultichainPublicClientService, - private readonly ecoConfigService: EcoConfigService, - ) {} - - async onModuleInit() { - this.intentJobConfig = this.ecoConfigService.getRedis().jobs.intentJobConfig - } - - async onApplicationBootstrap() { - await this.subscribe() - } - - async onModuleDestroy() { - // close all clients - this.unsubscribe() - } - - /** - * Subscribes to all IntentSource contracts for IntentCreated events. It subscribes on all supported chains - * filtering on the prover addresses and destination chain ids. It loads a mapping of the unsubscribe events to - * call {@link onModuleDestroy} to close the clients. - */ - async subscribe() { - const solverSupportedChains = entries(this.ecoConfigService.getSolvers()).map(([, solver]) => - BigInt(solver.chainID), - ) - const subscribeTasks = this.ecoConfigService.getIntentSources().map(async (source) => { - const client = await this.publicClientService.getClient(source.chainID) - await this.subscribeToSource(client, source, solverSupportedChains) - }) - - await Promise.all(subscribeTasks) - } - - async subscribeToSource( - client: PublicClient, - source: IntentSource, - solverSupportedChains: bigint[], - ) { - this.logger.debug( - EcoLogMessage.fromDefault({ - message: `watch intent: subscribeToSource`, - properties: { - source, - }, - }), - ) - this.unwatch[source.chainID] = client.watchContractEvent({ - onError: async (error) => { - this.logger.error( - EcoLogMessage.fromDefault({ - message: `rpc client error`, - properties: { - error, - }, - }), - ) - //reset the filters as they might have expired or we might have been moved to a new node - //https://support.quicknode.com/hc/en-us/articles/10838914856977-Error-code-32000-message-filter-not-found - await this.unsubscribeFrom(source.chainID) - await this.subscribeToSource(client, source, solverSupportedChains) - }, - address: source.sourceAddress, - abi: IntentSourceAbi, - eventName: 'IntentCreated', - args: { - // restrict by acceptable chains, chain ids must be bigints - _destinationChain: solverSupportedChains, - _prover: source.provers, - }, - onLogs: this.addJob(source), - }) - } - - /** - * Unsubscribes from all IntentSource contracts. It closes all clients in {@link onModuleDestroy} - */ - async unsubscribe() { - this.logger.debug( - EcoLogMessage.fromDefault({ - message: `watch-intent: unsubscribe`, - }), - ) - - Object.values(this.unwatch).forEach((unwatch) => { - try { - unwatch() - } catch (e) { - this.logger.error( - EcoLogMessage.withError({ - message: `watch-intent: unsubscribe`, - error: EcoError.WatchIntentUnsubscribeError, - properties: { - errorPassed: e, - }, - }), - ) - } - }) - } - - /** - * Unsubscribes from a specific chain - * @param chainID the chain id to unsubscribe from - */ - async unsubscribeFrom(chainID: number) { - if (this.unwatch[chainID]) { - this.logger.debug( - EcoLogMessage.fromDefault({ - message: `watch intent: unsubscribeFrom`, - properties: { - chainID, - }, - }), - ) - try { - this.unwatch[chainID]() - } catch (e) { - this.logger.error( - EcoLogMessage.withError({ - message: `watch-intent: unsubscribeFrom`, - error: EcoError.WatchIntentUnsubscribeFromError(chainID), - properties: { - chainID, - errorPassed: e, - }, - }), - ) - } - } else { - this.logger.error( - EcoLogMessage.withError({ - message: `watch intent: unsubscribeFrom`, - error: EcoError.WatchIntentNoUnsubscribeError(chainID), - properties: { - chainID, - }, - }), - ) - } - } - - addJob(source: IntentSource) { - return async (logs: IntentCreatedLog[]) => { - for (const log of logs) { - // bigint as it can't serialize to JSON - const createIntent = convertBigIntsToStrings(log) - createIntent.sourceChainID = source.chainID - createIntent.sourceNetwork = source.network - const jobId = getIntentJobId( - 'watch', - createIntent.args._hash ?? zeroHash, - createIntent.logIndex ?? 0, - ) - this.logger.debug( - EcoLogMessage.fromDefault({ - message: `watch intent`, - properties: { - createIntent, - jobId, - }, - }), - ) - // add to processing queue - await this.intentQueue.add(QUEUES.SOURCE_INTENT.jobs.create_intent, createIntent, { - jobId, - ...this.intentJobConfig, - }) - } - } - } -} diff --git a/src/intervals/interval.module.ts b/src/intervals/interval.module.ts new file mode 100644 index 0000000..3cf134c --- /dev/null +++ b/src/intervals/interval.module.ts @@ -0,0 +1,18 @@ +import { Module } from '@nestjs/common' +import { RetryInfeasableIntentsService } from '@/intervals/retry-infeasable-intents.service' +import { initBullMQ } from '@/bullmq/bullmq.helper' +import { QUEUES } from '@/common/redis/constants' +import { IntentModule } from '@/intent/intent.module' +import { ProverModule } from '@/prover/prover.module' +import { IntervalProcessor } from '@/bullmq/processors/interval.processor' + +@Module({ + imports: [ + initBullMQ(QUEUES.INTERVAL), + initBullMQ(QUEUES.SOURCE_INTENT), + IntentModule, + ProverModule, + ], + providers: [RetryInfeasableIntentsService, IntervalProcessor], +}) +export class IntervalModule {} diff --git a/src/intervals/retry-infeasable-intents.service.ts b/src/intervals/retry-infeasable-intents.service.ts new file mode 100644 index 0000000..f6bb316 --- /dev/null +++ b/src/intervals/retry-infeasable-intents.service.ts @@ -0,0 +1,89 @@ +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { QUEUES } from '@/common/redis/constants' +import { getIntentJobId } from '@/common/utils/strings' +import { Proofs } from '@/contracts' +import { EcoConfigService } from '@/eco-configs/eco-config.service' +import { IntentSourceModel } from '@/intent/schemas/intent-source.schema' +import { ProofService } from '@/prover/proof.service' +import { InjectQueue } from '@nestjs/bullmq' +import { Injectable, Logger, OnApplicationBootstrap } from '@nestjs/common' +import { InjectModel } from '@nestjs/mongoose' +import { JobsOptions, Queue } from 'bullmq' +import { Model } from 'mongoose' + +@Injectable() +export class RetryInfeasableIntentsService implements OnApplicationBootstrap { + private logger = new Logger(RetryInfeasableIntentsService.name) + private intentJobConfig: JobsOptions + + constructor( + @InjectQueue(QUEUES.INTERVAL.queue) private readonly intervalQueue: Queue, + @InjectQueue(QUEUES.SOURCE_INTENT.queue) private readonly intentQueue: Queue, + @InjectModel(IntentSourceModel.name) private intentModel: Model, + private readonly proofService: ProofService, + private readonly ecoConfigService: EcoConfigService, + ) {} + + async onModuleInit() { + this.intentJobConfig = this.ecoConfigService.getIntervals().retryInfeasableIntents.jobTemplate + .opts as JobsOptions + } + + async onApplicationBootstrap() { + const config = this.ecoConfigService.getIntervals().retryInfeasableIntents + config.repeatOpts = { ...config.repeatOpts, immediately: true } + config.jobTemplate = { + ...config.jobTemplate, + name: QUEUES.INTERVAL.jobs.retry_infeasable_intents, + } + this.intervalQueue.upsertJobScheduler( + QUEUES.INTERVAL.jobs.RETRY_INFEASABLE_INTENTS, + config.repeatOpts, + config.jobTemplate, + ) + } + + /** + * Retries intents that are infeasable but still within the proof expiration window. + * Sends them on the {@link QUEUES.SOURCE_INTENT.jobs.retry_intent} queue to validate + */ + async retryInfeasableIntents() { + const models = await this.getInfeasableIntents() + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `retryInfeasableIntents`, + properties: { + models, + }, + }), + ) + + const retryTasks = models.map(async (model) => { + const jobId = getIntentJobId('retry', model.intent.hash, model.intent.logIndex) + + //add to processing queue + await this.intentQueue.add(QUEUES.SOURCE_INTENT.jobs.retry_intent, model.intent.hash, { + jobId, + ...this.intentJobConfig, + }) + }) + + await Promise.all(retryTasks) + } + + private async getInfeasableIntents() { + return await this.intentModel.find({ + status: 'INFEASABLE', + $or: [ + { + 'intent.expiration': { $gt: this.proofService.getProofMinimumDate(Proofs.Hyperlane) }, + 'intent.prover': { $in: this.proofService.getProvers(Proofs.Hyperlane) }, + }, + { + 'intent.expiration': { $gt: this.proofService.getProofMinimumDate(Proofs.Storage) }, + 'intent.prover': { $in: this.proofService.getProvers(Proofs.Storage) }, + }, + ], + }) + } +} diff --git a/src/intervals/tests/retry-infeasable-intents.service.spec.ts b/src/intervals/tests/retry-infeasable-intents.service.spec.ts new file mode 100644 index 0000000..ff9a17e --- /dev/null +++ b/src/intervals/tests/retry-infeasable-intents.service.spec.ts @@ -0,0 +1,167 @@ +import { createMock, DeepMocked } from '@golevelup/ts-jest' +import { EcoConfigService } from '../../eco-configs/eco-config.service' +import { Test, TestingModule } from '@nestjs/testing' +import { ProofService } from '../../prover/proof.service' +import { RetryInfeasableIntentsService } from '@/intervals/retry-infeasable-intents.service' +import { Queue } from 'bullmq' +import { getModelToken } from '@nestjs/mongoose' +import { Model } from 'mongoose' +import { IntentSourceModel } from '@/intent/schemas/intent-source.schema' +import { BullModule, getQueueToken } from '@nestjs/bullmq' +import { QUEUES } from '@/common/redis/constants' +import { Proofs } from '@/contracts' +import { Hex } from 'viem' + +describe('RetryInfeasableIntentsService', () => { + let infeasableService: RetryInfeasableIntentsService + let proofService: DeepMocked + let ecoConfigService: DeepMocked + let intervalQueue: DeepMocked + let intentQueue: DeepMocked + let intentSourceModel: DeepMocked> + + const mockLogDebug = jest.fn() + const mockLogLog = jest.fn() + + beforeEach(async () => { + const chainMod: TestingModule = await Test.createTestingModule({ + providers: [ + RetryInfeasableIntentsService, + { + provide: getModelToken(IntentSourceModel.name), + useValue: createMock>(), + }, + { provide: ProofService, useValue: createMock() }, + { provide: EcoConfigService, useValue: createMock() }, + ], + imports: [ + BullModule.registerQueue({ + name: QUEUES.INTERVAL.queue, + }), + BullModule.registerQueue({ + name: QUEUES.SOURCE_INTENT.queue, + }), + ], + }) + .overrideProvider(getQueueToken(QUEUES.INTERVAL.queue)) + .useValue(createMock()) + .overrideProvider(getQueueToken(QUEUES.SOURCE_INTENT.queue)) + .useValue(createMock()) + .compile() + + //turn off the services from logging durring testing + chainMod.useLogger(false) + infeasableService = chainMod.get(RetryInfeasableIntentsService) + proofService = chainMod.get(ProofService) + ecoConfigService = chainMod.get(EcoConfigService) + intentSourceModel = chainMod.get(getModelToken(IntentSourceModel.name)) + intervalQueue = chainMod.get(getQueueToken(QUEUES.INTERVAL.queue)) + intentQueue = chainMod.get(getQueueToken(QUEUES.SOURCE_INTENT.queue)) + + infeasableService['logger'].debug = mockLogDebug + infeasableService['logger'].log = mockLogLog + }) + + afterEach(async () => { + jest.restoreAllMocks() + mockLogDebug.mockClear() + mockLogLog.mockClear() + }) + + describe('on startup', () => { + const mockInternals = { + retryInfeasableIntents: { + repeatOpts: { + every: 10000, + }, + jobTemplate: { + name: 'retry-infeasable-intents', + data: {}, + }, + }, + } + beforeEach(async () => { + ecoConfigService.getIntervals.mockReturnValue(mockInternals as any) + }) + + it('should set intentJobConfig', async () => { + await infeasableService.onModuleInit() + expect(ecoConfigService.getIntervals).toHaveBeenCalledTimes(1) + }) + + it('should set upsertJobScheduler', async () => { + await infeasableService.onApplicationBootstrap() + expect(ecoConfigService.getIntervals).toHaveBeenCalledTimes(1) + expect(intervalQueue.upsertJobScheduler).toHaveBeenCalledTimes(1) + expect(intervalQueue.upsertJobScheduler).toHaveBeenCalledWith( + QUEUES.INTERVAL.jobs.RETRY_INFEASABLE_INTENTS, + { ...mockInternals.retryInfeasableIntents.repeatOpts, immediately: true }, + { + ...mockInternals.retryInfeasableIntents.jobTemplate, + name: QUEUES.INTERVAL.jobs.retry_infeasable_intents, + }, + ) + }) + }) + + describe('on retryInfeasableIntents', () => { + let mockGetInfeasableIntents = jest.fn() + const mockModels = [ + { intent: { hash: 'hash1', logIndex: 1 } }, + { intent: { hash: 'hash2', logIndex: 2 } }, + ] + beforeEach(async () => { + infeasableService['getInfeasableIntents'] = mockGetInfeasableIntents + mockGetInfeasableIntents.mockResolvedValue(mockModels) + }) + + it('should log models retrieved', async () => { + await infeasableService.retryInfeasableIntents() + expect(mockLogDebug).toHaveBeenCalledTimes(1) + expect(mockLogDebug).toHaveBeenCalledWith({ + msg: 'retryInfeasableIntents', + models: mockModels, + }) + }) + + it('should add every model to the queue', async () => { + const addSpy = jest.spyOn(intentQueue, 'add') + await infeasableService.retryInfeasableIntents() + expect(addSpy).toHaveBeenCalledTimes(2) + }) + }) + + describe('getInfeasableIntents', () => { + it('should fetch intents with status INFEASABLE and valid expiration for Hyperlane proofs', async () => { + const minDateHyper = new Date('2022-01-01') + const minDateStorage = new Date('2022-01-02') + const proverHyper: Hex[] = ['0x1a', '0x2a'] + const proverStorage: Hex[] = ['0x3b', '0x4b'] + const mockGetProofMinimumDate = jest + .spyOn(proofService, 'getProofMinimumDate') + .mockImplementation((proof) => (proof == Proofs.Hyperlane ? minDateHyper : minDateStorage)) + const mockGetProvers = jest + .spyOn(proofService, 'getProvers') + .mockImplementation((proof) => (proof == Proofs.Hyperlane ? proverHyper : proverStorage)) + + await infeasableService['getInfeasableIntents']() + + expect(intentSourceModel.find).toHaveBeenCalledWith({ + status: 'INFEASABLE', + $or: [ + { + 'intent.expiration': { $gt: minDateHyper }, + 'intent.prover': { $in: proverHyper }, + }, + { + 'intent.expiration': { $gt: minDateStorage }, + 'intent.prover': { $in: proverStorage }, + }, + ], + }) + + expect(mockGetProofMinimumDate).toHaveBeenCalledTimes(2) + expect(mockGetProvers).toHaveBeenCalledTimes(2) + }) + }) +}) diff --git a/src/prover/proof.service.ts b/src/prover/proof.service.ts index a779833..7558edd 100644 --- a/src/prover/proof.service.ts +++ b/src/prover/proof.service.ts @@ -26,12 +26,6 @@ export class ProofService implements OnModuleInit { */ private proofContracts: Record = {} - // set the minimum duration for a proof to be valid as a constant to 14 days - // TODO: update prover interace contract to return the duration here - public static readonly PROOF_STORAGE_MINIMUM_DURATION_SECONDS = 60 * 60 * 24 * 14 - - // the minimum duration for a hyperprover to be valid, 1 hour - public static readonly PROOF_HYPERPROVER_MINIMUM_DURATION_SECONDS = 60 * 15 constructor( private readonly publicClient: MultichainPublicClientService, private readonly ecoConfigService: EcoConfigService, @@ -69,6 +63,26 @@ export class ProofService implements OnModuleInit { return this.getProofType(proverAddress) === PROOF_STORAGE } + /** + * Returns all the prover addresses for a given proof type + * @param proofType the proof type + * @returns + */ + getProvers(proofType: ProofType): Hex[] { + return entries(this.proofContracts) + .filter(([, type]) => type === proofType) + .map(([address]) => address as Hex) + } + + /** + * Returns the prover type for a given prover address + * @param prover the prover address + * @returns + */ + getProverType(prover: Hex): ProofType { + return this.proofContracts[prover] + } + /** * Loads the proof types for each prover address into memory. * Assume all provers must have the same proof type if their @@ -139,7 +153,7 @@ export class ProofService implements OnModuleInit { * @returns true if the intent can be proven before the minimum proof time, false otherwise */ isIntentExpirationWithinProofMinimumDate(prover: Hex, expirationDate: Date): boolean { - return compareAsc(expirationDate, this.getProofMinimumDate(prover)) == 1 + return compareAsc(expirationDate, this.getProofMinimumDate(this.proofContracts[prover])) == 1 } /** @@ -147,7 +161,7 @@ export class ProofService implements OnModuleInit { * @param chainID the chain id * @returns */ - getProofMinimumDate(prover: Hex): Date { + getProofMinimumDate(prover: ProofType): Date { return addSeconds(new Date(), this.getProofMinimumDurationSeconds(prover)) } @@ -157,9 +171,14 @@ export class ProofService implements OnModuleInit { * @param prover the address of the prover * @returns */ - private getProofMinimumDurationSeconds(prover: Hex): number { - return this.isHyperlaneProver(prover) - ? ProofService.PROOF_HYPERPROVER_MINIMUM_DURATION_SECONDS - : ProofService.PROOF_STORAGE_MINIMUM_DURATION_SECONDS + private getProofMinimumDurationSeconds(prover: ProofType): number { + const proofs = this.ecoConfigService.getIntentConfigs().proofs + switch (prover) { + case PROOF_HYPERLANE: + return proofs.hyperlane_duration_seconds + case PROOF_STORAGE: + default: + return proofs.storage_duration_seconds + } } } diff --git a/src/prover/tests/proof.service.spec.ts b/src/prover/tests/proof.service.spec.ts index 7a61958..f866dcb 100644 --- a/src/prover/tests/proof.service.spec.ts +++ b/src/prover/tests/proof.service.spec.ts @@ -110,6 +110,15 @@ describe('ProofService', () => { }) describe('on utility methods', () => { + const intentConfigs = { + proofs: { + storage_duration_seconds: 10, + hyperlane_duration_seconds: 20, + }, + } + beforeEach(async () => { + ecoConfigService.getIntentConfigs = jest.fn().mockReturnValue(intentConfigs) + }) it('should correctly check if its a hyperlane prover', async () => { jest.spyOn(proofService, 'getProofType').mockReturnValue(PROOF_HYPERLANE) expect(proofService.isHyperlaneProver('0x123')).toBe(true) @@ -123,14 +132,12 @@ describe('ProofService', () => { }) it('should return the correct minimum proof time', async () => { - jest.spyOn(proofService, 'isHyperlaneProver').mockReturnValue(true) - expect(proofService['getProofMinimumDurationSeconds']('0x123')).toBe( - ProofService.PROOF_HYPERPROVER_MINIMUM_DURATION_SECONDS, + expect(proofService['getProofMinimumDurationSeconds'](PROOF_HYPERLANE)).toBe( + intentConfigs.proofs.hyperlane_duration_seconds, ) - jest.spyOn(proofService, 'isHyperlaneProver').mockReturnValue(false) - expect(proofService['getProofMinimumDurationSeconds']('0x123')).toBe( - ProofService.PROOF_STORAGE_MINIMUM_DURATION_SECONDS, + expect(proofService['getProofMinimumDurationSeconds'](PROOF_STORAGE)).toBe( + intentConfigs.proofs.storage_duration_seconds, ) }) diff --git a/src/intent/tests/watch-intent.service.spec.ts b/src/watch/intent/tests/watch-create-intent.service.spec.ts similarity index 87% rename from src/intent/tests/watch-intent.service.spec.ts rename to src/watch/intent/tests/watch-create-intent.service.spec.ts index d99d7c7..28107ce 100644 --- a/src/intent/tests/watch-intent.service.spec.ts +++ b/src/watch/intent/tests/watch-create-intent.service.spec.ts @@ -1,15 +1,15 @@ +import { QUEUES } from '@/common/redis/constants' +import { EcoConfigService } from '@/eco-configs/eco-config.service' +import { WatchCreateIntentService } from '@/watch/intent/watch-create-intent.service' import { createMock, DeepMocked } from '@golevelup/ts-jest' -import { EcoConfigService } from '../../eco-configs/eco-config.service' -import { Test, TestingModule } from '@nestjs/testing' import { BullModule, getQueueToken } from '@nestjs/bullmq' -import { QUEUES } from '../../common/redis/constants' +import { Test, TestingModule } from '@nestjs/testing' import { Job, Queue } from 'bullmq' -import { WatchIntentService } from '../watch-intent.service' -import { MultichainPublicClientService } from '../../transaction/multichain-public-client.service' import { EcoError } from '@/common/errors/eco-error' +import { MultichainPublicClientService } from '@/transaction/multichain-public-client.service' describe('WatchIntentService', () => { - let watchIntentService: WatchIntentService + let watchIntentService: WatchCreateIntentService let publicClientService: DeepMocked let ecoConfigService: DeepMocked let queue: DeepMocked @@ -26,7 +26,7 @@ describe('WatchIntentService', () => { beforeEach(async () => { const chainMod: TestingModule = await Test.createTestingModule({ providers: [ - WatchIntentService, + WatchCreateIntentService, { provide: MultichainPublicClientService, useValue: createMock(), @@ -43,7 +43,7 @@ describe('WatchIntentService', () => { .useValue(createMock()) .compile() - watchIntentService = chainMod.get(WatchIntentService) + watchIntentService = chainMod.get(WatchCreateIntentService) publicClientService = chainMod.get(MultichainPublicClientService) ecoConfigService = chainMod.get(EcoConfigService) queue = chainMod.get(getQueueToken(QUEUES.SOURCE_INTENT.queue)) @@ -63,7 +63,6 @@ describe('WatchIntentService', () => { describe('on lifecycle', () => { describe('on startup', () => { it('should subscribe to nothing if no source intents', async () => { - ecoConfigService.getSolvers.mockReturnValue(sources) const mock = jest.spyOn(watchIntentService, 'subscribe') await watchIntentService.onApplicationBootstrap() expect(mock).toHaveBeenCalledTimes(1) @@ -95,7 +94,7 @@ describe('WatchIntentService', () => { it('should unsubscribe to nothing if no source intents', async () => { const mock = jest.spyOn(watchIntentService, 'unsubscribe') await watchIntentService.onModuleDestroy() - expect(mock).toHaveBeenCalledTimes + expect(mock).toHaveBeenCalledTimes(1) }) it('should unsubscribe to all source intents', async () => { @@ -117,15 +116,15 @@ describe('WatchIntentService', () => { const log = { args: { _hash: BigInt(1), logIndex: BigInt(2) } } as any let mockQueueAdd: jest.SpyInstance>> - beforeEach(() => { + beforeEach(async () => { mockQueueAdd = jest.spyOn(queue, 'add') - watchIntentService.addJob(s)([log]) + await watchIntentService.addJob(s)([log]) expect(mockLogDebug).toHaveBeenCalledTimes(1) }) it('should convert all bigints to strings', async () => { expect(mockLogDebug.mock.calls[0][0].createIntent).toEqual( expect.objectContaining({ - args: { _hash: '1', logIndex: '2' }, + args: { _hash: log.args._hash.toString(), logIndex: log.args.logIndex.toString() }, }), ) }) @@ -144,7 +143,7 @@ describe('WatchIntentService', () => { expect(mockQueueAdd).toHaveBeenCalledWith( QUEUES.SOURCE_INTENT.jobs.create_intent, expect.any(Object), - { jobId: 'watch-1-0' }, + { jobId: 'watch-create-intent-1-0' }, ) }) }) @@ -175,8 +174,8 @@ describe('WatchIntentService', () => { expect(mockUnwatch2).toHaveBeenCalledTimes(1) expect(mockLogError).toHaveBeenCalledTimes(1) expect(mockLogError).toHaveBeenCalledWith({ - msg: 'watch-intent: unsubscribe', - error: EcoError.WatchIntentUnsubscribeError.toString(), + msg: 'watch-event: unsubscribe', + error: EcoError.WatchEventUnsubscribeError.toString(), errorPassed: e, }) }) @@ -188,7 +187,7 @@ describe('WatchIntentService', () => { expect(mockLogError).toHaveBeenCalledTimes(0) expect(mockLogDebug).toHaveBeenCalledTimes(1) expect(mockLogDebug).toHaveBeenCalledWith({ - msg: 'watch-intent: unsubscribe', + msg: 'watch-event: unsubscribe', }) }) }) @@ -214,7 +213,7 @@ describe('WatchIntentService', () => { expect(mockLogDebug).toHaveBeenCalledTimes(1) expect(mockLogError).toHaveBeenCalledTimes(0) expect(mockLogDebug).toHaveBeenCalledWith({ - msg: 'watch intent: unsubscribeFrom', + msg: 'watch-event: unsubscribeFrom', chainID, }) }) @@ -228,8 +227,8 @@ describe('WatchIntentService', () => { expect(mockUnwatch1).toHaveBeenCalledTimes(1) expect(mockLogError).toHaveBeenCalledTimes(1) expect(mockLogError).toHaveBeenCalledWith({ - msg: 'watch-intent: unsubscribeFrom', - error: EcoError.WatchIntentUnsubscribeFromError(chainID).toString(), + msg: 'watch-event: unsubscribeFrom', + error: EcoError.WatchEventUnsubscribeFromError(chainID).toString(), errorPassed: e, chainID, }) @@ -246,8 +245,8 @@ describe('WatchIntentService', () => { expect(mockUnwatch1).toHaveBeenCalledTimes(0) expect(mockLogError).toHaveBeenCalledTimes(1) expect(mockLogError).toHaveBeenCalledWith({ - msg: 'watch intent: unsubscribeFrom', - error: EcoError.WatchIntentNoUnsubscribeError(chainID).toString(), + msg: 'watch event: unsubscribeFrom', + error: EcoError.WatchEventNoUnsubscribeError(chainID).toString(), chainID, }) }) diff --git a/src/watch/intent/tests/watch-fulfillment.service.spec.ts b/src/watch/intent/tests/watch-fulfillment.service.spec.ts new file mode 100644 index 0000000..29de5cb --- /dev/null +++ b/src/watch/intent/tests/watch-fulfillment.service.spec.ts @@ -0,0 +1,141 @@ +import { QUEUES } from '@/common/redis/constants' +import { EcoConfigService } from '@/eco-configs/eco-config.service' +import { MultichainPublicClientService } from '@/transaction/multichain-public-client.service' +import { WatchFulfillmentService } from '@/watch/intent/watch-fulfillment.service' +import { createMock, DeepMocked } from '@golevelup/ts-jest' +import { BullModule, getQueueToken } from '@nestjs/bullmq' +import { Test, TestingModule } from '@nestjs/testing' +import { Job, Queue } from 'bullmq' + +describe('WatchFulfillmentService', () => { + let watchFulfillmentService: WatchFulfillmentService + let publicClientService: DeepMocked + let ecoConfigService: DeepMocked + let queue: DeepMocked + const mockLogDebug = jest.fn() + const mockLogLog = jest.fn() + + const inboxes = [ + { chainID: 1, solverAddress: '0x1234' }, + { chainID: 2, solverAddress: '0x5678' }, + ] as any + const inboxRecord = inboxes.reduce((acc, solver) => { + acc[solver.chainID] = solver + return acc + }, {}) + const supportedChains = inboxes.map((s) => BigInt(s.chainID)) + + beforeEach(async () => { + const chainMod: TestingModule = await Test.createTestingModule({ + providers: [ + WatchFulfillmentService, + { + provide: MultichainPublicClientService, + useValue: createMock(), + }, + { provide: EcoConfigService, useValue: createMock() }, + ], + imports: [ + BullModule.registerQueue({ + name: QUEUES.INBOX.queue, + }), + ], + }) + .overrideProvider(getQueueToken(QUEUES.INBOX.queue)) + .useValue(createMock()) + .compile() + + watchFulfillmentService = chainMod.get(WatchFulfillmentService) + publicClientService = chainMod.get(MultichainPublicClientService) + ecoConfigService = chainMod.get(EcoConfigService) + queue = chainMod.get(getQueueToken(QUEUES.INBOX.queue)) + + watchFulfillmentService['logger'].debug = mockLogDebug + watchFulfillmentService['logger'].log = mockLogLog + }) + + afterEach(async () => { + // restore the spy created with spyOn + jest.restoreAllMocks() + mockLogDebug.mockClear() + mockLogLog.mockClear() + }) + + describe('on lifecycle', () => { + describe('on startup', () => { + it('should subscribe to nothing if no solvers', async () => { + const mock = jest.spyOn(watchFulfillmentService, 'subscribe') + await watchFulfillmentService.onApplicationBootstrap() + expect(mock).toHaveBeenCalledTimes(1) + }) + + it('should subscribe to all solvers', async () => { + const mockWatch = jest.fn() + publicClientService.getClient.mockResolvedValue({ + watchContractEvent: mockWatch, + } as any) + ecoConfigService.getSolvers.mockReturnValue(inboxRecord) + watchFulfillmentService.getSupportedChains = jest.fn().mockReturnValue(supportedChains) + await watchFulfillmentService.onApplicationBootstrap() + expect(mockWatch).toHaveBeenCalledTimes(2) + + for (const [index, s] of inboxes.entries()) { + const { address, eventName, args } = mockWatch.mock.calls[index][0] + const partial = { address, eventName, args } + expect(partial).toEqual({ + address: s.solverAddress, + eventName: 'Fulfillment', + args: { _sourceChainID: supportedChains }, + }) + } + }) + }) + + describe('on destroy', () => { + it('should unsubscribe to nothing if no solvers', async () => { + const mock = jest.spyOn(watchFulfillmentService, 'unsubscribe') + await watchFulfillmentService.onModuleDestroy() + expect(mock).toHaveBeenCalledTimes(1) + }) + + it('should unsubscribe to all solvers', async () => { + const mockUnwatch = jest.fn() + publicClientService.getClient.mockResolvedValue({ + watchContractEvent: () => mockUnwatch, + } as any) + ecoConfigService.getSolvers.mockReturnValue(inboxRecord) + await watchFulfillmentService.onApplicationBootstrap() + await watchFulfillmentService.onModuleDestroy() + expect(mockUnwatch).toHaveBeenCalledTimes(2) + }) + }) + + describe('on fulfillment', () => { + const log = { args: { _hash: BigInt(1), logIndex: BigInt(2) } } as any + let mockQueueAdd: jest.SpyInstance>> + + beforeEach(async () => { + mockQueueAdd = jest.spyOn(queue, 'add') + await watchFulfillmentService.addJob()([log]) + expect(mockLogDebug).toHaveBeenCalledTimes(1) + }) + + it('should convert all bigints to strings', async () => { + expect(mockLogDebug.mock.calls[0][0].fulfillment).toEqual( + expect.objectContaining({ + args: { _hash: log.args._hash.toString(), logIndex: log.args.logIndex.toString() }, + }), + ) + }) + + it('should should enque a job for every intent', async () => { + expect(mockQueueAdd).toHaveBeenCalledTimes(1) + expect(mockQueueAdd).toHaveBeenCalledWith( + QUEUES.INBOX.jobs.fulfillment, + expect.any(Object), + { jobId: 'watch-fulfillement-1-0' }, + ) + }) + }) + }) +}) diff --git a/src/watch/intent/watch-create-intent.service.ts b/src/watch/intent/watch-create-intent.service.ts new file mode 100644 index 0000000..647eeca --- /dev/null +++ b/src/watch/intent/watch-create-intent.service.ts @@ -0,0 +1,117 @@ +import { Injectable, Logger } from '@nestjs/common' +import { EcoConfigService } from '@/eco-configs/eco-config.service' +import { Queue } from 'bullmq' +import { QUEUES } from '@/common/redis/constants' +import { InjectQueue } from '@nestjs/bullmq' +import { getIntentJobId } from '@/common/utils/strings' +import { IntentSource } from '@/eco-configs/eco-config.types' +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { MultichainPublicClientService } from '@/transaction/multichain-public-client.service' +import { IntentCreatedLog } from '@/contracts' +import { PublicClient, zeroHash } from 'viem' +import { convertBigIntsToStrings } from '@/common/viem/utils' +import { entries } from 'lodash' +import { IntentSourceAbi } from '@eco-foundation/routes-ts' +import { WatchEventService } from '@/watch/intent/watch-event.service' + +/** + * This service subscribes to IntentSource contracts for IntentCreated events. It subscribes on all + * supported chains and prover addresses. When an event is emitted, it mutates the event log, and then + * adds it intent queue for processing. + */ +@Injectable() +export class WatchCreateIntentService extends WatchEventService { + protected logger = new Logger(WatchCreateIntentService.name) + + constructor( + @InjectQueue(QUEUES.SOURCE_INTENT.queue) protected readonly intentQueue: Queue, + protected readonly publicClientService: MultichainPublicClientService, + protected readonly ecoConfigService: EcoConfigService, + ) { + super(intentQueue, publicClientService, ecoConfigService) + } + + /** + * Subscribes to all IntentSource contracts for IntentCreated events. It subscribes on all supported chains + * filtering on the prover addresses and destination chain ids. It loads a mapping of the unsubscribe events to + * call {@link onModuleDestroy} to close the clients. + */ + async subscribe(): Promise { + const subscribeTasks = this.ecoConfigService.getIntentSources().map(async (source) => { + const client = await this.publicClientService.getClient(source.chainID) + await this.subscribeTo(client, source, this.getSupportedChains()) + }) + + await Promise.all(subscribeTasks) + } + + /** + * Unsubscribes from all IntentSource contracts. It closes all clients in {@link onModuleDestroy} + */ + async unsubscribe() { + super.unsubscribe() + } + + /** + * Checks to see what networks we have inbox contracts for + * @returns the supported chains for the event + */ + getSupportedChains(): bigint[] { + return entries(this.ecoConfigService.getSolvers()).map(([, solver]) => BigInt(solver.chainID)) + } + + async subscribeTo(client: PublicClient, source: IntentSource, solverSupportedChains: bigint[]) { + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch create intent: subscribeToSource`, + properties: { + source, + }, + }), + ) + this.unwatch[source.chainID] = client.watchContractEvent({ + onError: async (error) => { + await this.onError(error, client, source) + }, + address: source.sourceAddress, + abi: IntentSourceAbi, + eventName: 'IntentCreated', + args: { + // restrict by acceptable chains, chain ids must be bigints + _destinationChain: solverSupportedChains, + _prover: source.provers, + }, + onLogs: this.addJob(source), + }) + } + + addJob(source: IntentSource) { + return async (logs: IntentCreatedLog[]) => { + for (const log of logs) { + // bigint as it can't serialize to JSON + const createIntent = convertBigIntsToStrings(log) + createIntent.sourceChainID = source.chainID + createIntent.sourceNetwork = source.network + const jobId = getIntentJobId( + 'watch-create-intent', + createIntent.args._hash ?? zeroHash, + createIntent.logIndex ?? 0, + ) + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch intent`, + properties: { + createIntent, + jobId, + }, + }), + ) + // add to processing queue + await this.intentQueue.add(QUEUES.SOURCE_INTENT.jobs.create_intent, createIntent, { + jobId, + ...this.intentJobConfig, + }) + } + } + } +} diff --git a/src/watch/intent/watch-event.service.ts b/src/watch/intent/watch-event.service.ts new file mode 100644 index 0000000..5b854cb --- /dev/null +++ b/src/watch/intent/watch-event.service.ts @@ -0,0 +1,140 @@ +import { Injectable, Logger, OnApplicationBootstrap, OnModuleDestroy } from '@nestjs/common' +import { EcoConfigService } from '../../eco-configs/eco-config.service' +import { JobsOptions, Queue } from 'bullmq' +import { MultichainPublicClientService } from '../../transaction/multichain-public-client.service' +import { PublicClient, WatchContractEventReturnType } from 'viem' +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { EcoError } from '@/common/errors/eco-error' + +/** + * This service subscribes has hooks for subscribing and unsubscribing to a contract event. + */ +@Injectable() +export abstract class WatchEventService + implements OnApplicationBootstrap, OnModuleDestroy +{ + protected logger: Logger + protected intentJobConfig: JobsOptions + protected unwatch: Record = {} + + constructor( + protected readonly queue: Queue, + protected readonly publicClientService: MultichainPublicClientService, + protected readonly ecoConfigService: EcoConfigService, + ) {} + + async onModuleInit() { + this.intentJobConfig = this.ecoConfigService.getRedis().jobs.intentJobConfig + } + + async onApplicationBootstrap() { + await this.subscribe() + } + + async onModuleDestroy() { + // close all clients + this.unsubscribe() + } + + /** + * Subscribes to the events. It loads a mapping of the unsubscribe events to + * call {@link onModuleDestroy} to close the clients. + */ + abstract subscribe(): Promise + + /** + * Subscribes to a contract on a specific chain + * @param client the client to subscribe to + * @param contract the contract to subscribe to + * @param supportedChains the chains to subscribe on + */ + abstract subscribeTo(client: PublicClient, contract: T, supportedChains: bigint[]): Promise + + /** + * Unsubscribes from all events. It closes all clients in {@link onModuleDestroy} + */ + async unsubscribe(): Promise { + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch-event: unsubscribe`, + }), + ) + Object.values(this.unwatch).forEach((unwatch) => { + try { + unwatch() + } catch (e) { + this.logger.error( + EcoLogMessage.withError({ + message: `watch-event: unsubscribe`, + error: EcoError.WatchEventUnsubscribeError, + properties: { + errorPassed: e, + }, + }), + ) + } + }) + } + + /** + * Checks to see what networks + * @returns the supported chains for the event + */ + abstract getSupportedChains(): bigint[] + + async onError(error: any, client: PublicClient, contract: T) { + this.logger.error( + EcoLogMessage.fromDefault({ + message: `rpc client error`, + properties: { + error, + }, + }), + ) + //reset the filters as they might have expired or we might have been moved to a new node + //https://support.quicknode.com/hc/en-us/articles/10838914856977-Error-code-32000-message-filter-not-found + await this.unsubscribeFrom(contract.chainID) + await this.subscribeTo(client, contract, this.getSupportedChains()) + } + + /** + * Unsubscribes from a specific chain + * @param chainID the chain id to unsubscribe from + */ + async unsubscribeFrom(chainID: number) { + if (this.unwatch[chainID]) { + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch-event: unsubscribeFrom`, + properties: { + chainID, + }, + }), + ) + try { + this.unwatch[chainID]() + } catch (e) { + this.logger.error( + EcoLogMessage.withError({ + message: `watch-event: unsubscribeFrom`, + error: EcoError.WatchEventUnsubscribeFromError(chainID), + properties: { + chainID, + errorPassed: e, + }, + }), + ) + } + } else { + this.logger.error( + EcoLogMessage.withError({ + message: `watch event: unsubscribeFrom`, + error: EcoError.WatchEventNoUnsubscribeError(chainID), + properties: { + chainID, + }, + }), + ) + } + } +} diff --git a/src/watch/intent/watch-fulfillment.service.ts b/src/watch/intent/watch-fulfillment.service.ts new file mode 100644 index 0000000..4d545c6 --- /dev/null +++ b/src/watch/intent/watch-fulfillment.service.ts @@ -0,0 +1,114 @@ +import { Injectable, Logger } from '@nestjs/common' +import { EcoConfigService } from '@/eco-configs/eco-config.service' +import { Queue } from 'bullmq' +import { QUEUES } from '@/common/redis/constants' +import { InjectQueue } from '@nestjs/bullmq' +import { getIntentJobId } from '@/common/utils/strings' +import { Solver } from '@/eco-configs/eco-config.types' +import { EcoLogMessage } from '@/common/logging/eco-log-message' +import { MultichainPublicClientService } from '@/transaction/multichain-public-client.service' +import { PublicClient, zeroHash } from 'viem' +import { convertBigIntsToStrings } from '@/common/viem/utils' +import { entries } from 'lodash' +import { InboxAbi } from '@eco-foundation/routes-ts' +import { WatchEventService } from '@/watch/intent/watch-event.service' +import { FulfillmentLog } from '@/contracts/inbox' + +/** + * This service subscribes to Inbox contracts for Fulfillment events. It subscribes on all + * supported chains and prover addresses. When an event is emitted, adds the event + * to the queue to update the intent in the database. + */ +@Injectable() +export class WatchFulfillmentService extends WatchEventService { + protected logger = new Logger(WatchFulfillmentService.name) + + constructor( + @InjectQueue(QUEUES.INBOX.queue) protected readonly inboxQueue: Queue, + protected readonly publicClientService: MultichainPublicClientService, + protected readonly ecoConfigService: EcoConfigService, + ) { + super(inboxQueue, publicClientService, ecoConfigService) + } + + /** + * Subscribes to all Inbox constacts for Fulfillment events. It loads a mapping of the unsubscribe events to + * call {@link onModuleDestroy} to close the clients. + */ + async subscribe(): Promise { + const subscribeTasks = entries(this.ecoConfigService.getSolvers()).map(async ([, solver]) => { + const client = await this.publicClientService.getClient(solver.chainID) + await this.subscribeTo(client, solver, this.getSupportedChains()) + }) + await Promise.all(subscribeTasks) + } + + async unsubscribe() { + super.unsubscribe() + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch fulfillment: unsubscribe`, + }), + ) + } + + /** + * Checks to see what networks we have intent sources for + * @returns the supported chains for the event + */ + getSupportedChains(): bigint[] { + return this.ecoConfigService.getIntentSources().map((source) => BigInt(source.chainID)) + } + + async subscribeTo(client: PublicClient, solver: Solver, souceChains: bigint[]) { + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch fulfillment event: subscribeToFulfillment`, + properties: { + solver, + }, + }), + ) + this.unwatch[solver.chainID] = client.watchContractEvent({ + onError: async (error) => { + await this.onError(error, client, solver) + }, + address: solver.solverAddress, + abi: InboxAbi, + eventName: 'Fulfillment', + args: { + // restrict by acceptable chains, chain ids must be bigints + _sourceChainID: souceChains, + }, + onLogs: this.addJob(), + }) + } + + addJob() { + return async (logs: FulfillmentLog[]) => { + for (const log of logs) { + // bigint as it can't serialize to JSON + const fulfillment = convertBigIntsToStrings(log) + const jobId = getIntentJobId( + 'watch-fulfillement', + fulfillment.args._hash ?? zeroHash, + fulfillment.logIndex ?? 0, + ) + this.logger.debug( + EcoLogMessage.fromDefault({ + message: `watch fulfillment`, + properties: { + fulfillment, + jobId, + }, + }), + ) + // add to processing queue + await this.inboxQueue.add(QUEUES.INBOX.jobs.fulfillment, fulfillment, { + jobId, + ...this.intentJobConfig, + }) + } + } + } +} diff --git a/src/watch/watch.module.ts b/src/watch/watch.module.ts new file mode 100644 index 0000000..1c4903a --- /dev/null +++ b/src/watch/watch.module.ts @@ -0,0 +1,13 @@ +import { Module } from '@nestjs/common' +import { initBullMQ } from '@/bullmq/bullmq.helper' +import { QUEUES } from '@/common/redis/constants' +import { WatchCreateIntentService } from '@/watch/intent/watch-create-intent.service' +import { TransactionModule } from '@/transaction/transaction.module' +import { WatchFulfillmentService } from '@/watch/intent/watch-fulfillment.service' + +@Module({ + imports: [initBullMQ(QUEUES.SOURCE_INTENT), initBullMQ(QUEUES.INBOX), TransactionModule], + providers: [WatchCreateIntentService, WatchFulfillmentService], + exports: [WatchCreateIntentService, WatchFulfillmentService], +}) +export class WatchModule {} diff --git a/yarn.lock b/yarn.lock index 74afc94..9d09b1a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6640,16 +6640,7 @@ string-length@^4.0.1: char-regex "^1.0.2" strip-ansi "^6.0.0" -"string-width-cjs@npm:string-width@^4.2.0": - version "4.2.3" - resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" - integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== - dependencies: - emoji-regex "^8.0.0" - is-fullwidth-code-point "^3.0.0" - strip-ansi "^6.0.1" - -string-width@^4.0.0, string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.2, string-width@^4.2.3: +"string-width-cjs@npm:string-width@^4.2.0", string-width@^4.0.0, string-width@^4.1.0, string-width@^4.2.0, string-width@^4.2.2, string-width@^4.2.3: version "4.2.3" resolved "https://registry.yarnpkg.com/string-width/-/string-width-4.2.3.tgz#269c7117d27b05ad2e536830a8ec895ef9c6d010" integrity sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g== @@ -6681,14 +6672,7 @@ string_decoder@~1.1.1: dependencies: safe-buffer "~5.1.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1": - version "6.0.1" - resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" - integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== - dependencies: - ansi-regex "^5.0.1" - -strip-ansi@^6.0.0, strip-ansi@^6.0.1: +"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1: version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== @@ -7289,7 +7273,7 @@ word-wrap@^1.2.5: resolved "https://registry.yarnpkg.com/word-wrap/-/word-wrap-1.2.5.tgz#d2c45c6dd4fbce621a66f136cbe328afd0410b34" integrity sha512-BN22B5eaMMI9UMtjrGd5g5eCYPpCPDUy0FJXbYsaT5zYxjFOckS53SQDE3pWkVoWpHXVb3BrYcEN4Twa55B5cA== -"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0": +"wrap-ansi-cjs@npm:wrap-ansi@^7.0.0", wrap-ansi@^7.0.0: version "7.0.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== @@ -7307,15 +7291,6 @@ wrap-ansi@^6.0.1, wrap-ansi@^6.2.0: string-width "^4.1.0" strip-ansi "^6.0.0" -wrap-ansi@^7.0.0: - version "7.0.0" - resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-7.0.0.tgz#67e145cff510a6a6984bdf1152911d69d2eb9e43" - integrity sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q== - dependencies: - ansi-styles "^4.0.0" - string-width "^4.1.0" - strip-ansi "^6.0.0" - wrap-ansi@^8.1.0: version "8.1.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-8.1.0.tgz#56dc22368ee570face1b49819975d9b9a5ead214"