Skip to content

Commit

Permalink
Ed 4570 retry unfeasable (#76)
Browse files Browse the repository at this point in the history
* move bigint conversion

* moving watch services into own module. adding watch fulfill intent service

* creating watch fulfillment event

* refactoring watch create intent to use same abstract parent

* adding tests for watch fulfillment
adding inbox processor
adding intent utils method for update - > needs more work and tests

* adding default intervals, intentconfigs, and liquiditymanager to config defaults.ts
removing eventemitter from app modules
adding interval modeule and service
adding interval processor and queue
adding retry_intent to source intent queue

* adding skeleton of retry infeasable

* disabling storage prover from being accepted by solver

* updating routes-ts to ~0.2.10-beta update only on patch

* fixing merge

* adding retry infeasable intents tests
  • Loading branch information
StoyanD authored Jan 8, 2025
1 parent e966ce0 commit 72e0906
Show file tree
Hide file tree
Showing 39 changed files with 1,216 additions and 328 deletions.
44 changes: 44 additions & 0 deletions config/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,50 @@ export default {
},
},
},
intervals: {
retryInfeasableIntents: {
repeatOpts: {
every: 10000,
},
jobTemplate: {
name: 'retry-infeasable-intents',
data: {},
},
},
defaults: {
repeatOpts: {
every: 10000,
},
jobTemplate: {
name: 'default-interval-job',
data: {},
opts: {
removeOnComplete: true,
removeOnFail: true,

attempts: 3,
backoff: {
type: 'exponential',
delay: 2_000,
},
},
},
},
},
intentConfigs: {
proofs: {
storage_duration_seconds: 604800,
hyperlane_duration_seconds: 3600,
},
},
liquidityManager: {
intervalDuration: 300000,
targetSlippage: 0.02,
thresholds: {
surplus: 0.1,
deficit: 0.2,
},
},
externalAPIs: {},
logger: {
usePino: true,
Expand Down
10 changes: 5 additions & 5 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { ChainMonitorModule } from './chain-monitor/chain-monitor.module'
import { EcoConfigService } from './eco-configs/eco-config.service'
import { LoggerModule } from 'nestjs-pino'
import { MongooseModule } from '@nestjs/mongoose'
import { EventEmitterModule } from '@nestjs/event-emitter'
import { IntentModule } from './intent/intent.module'
import { IntentSourceModel } from './intent/schemas/intent-source.schema'
import { BalanceModule } from './balance/balance.module'
Expand All @@ -15,21 +14,21 @@ import { ProcessorModule } from './bullmq/processors/processor.module'
import { SolverModule } from './solver/solver.module'
import { FlagsModule } from './flags/flags.module'
import { LiquidityManagerModule } from '@/liquidity-manager/liquidity-manager.module'
import { WatchModule } from '@/watch/watch.module'
import { IntervalModule } from '@/intervals/interval.module'

@Module({
imports: [
BalanceModule,
ChainMonitorModule,
EcoConfigModule.withAWS(),
EventEmitterModule.forRoot({
// the delimiter used to segment namespaces
delimiter: '.',
}),

FlagsModule,
HealthModule,
IntentModule,
SignModule,
IntentSourceModel,
IntervalModule,
ProcessorModule,
MongooseModule.forRootAsync({
inject: [EcoConfigService],
Expand All @@ -43,6 +42,7 @@ import { LiquidityManagerModule } from '@/liquidity-manager/liquidity-manager.mo
ProverModule,
SolverModule,
LiquidityManagerModule,
WatchModule,
...getPino(),
],
controllers: [],
Expand Down
55 changes: 55 additions & 0 deletions src/bullmq/processors/inbox.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'
import { QUEUES } from '@/common/redis/constants'
import { Injectable, Logger } from '@nestjs/common'
import { Job } from 'bullmq'
import { EcoLogMessage } from '@/common/logging/eco-log-message'
import { UtilsIntentService } from '@/intent/utils-intent.service'
import { FulfillmentLog } from '@/contracts/inbox'

@Injectable()
@Processor(QUEUES.INBOX.queue)
export class InboxProcessor extends WorkerHost {
private logger = new Logger(InboxProcessor.name)
constructor(private readonly utilsIntentService: UtilsIntentService) {
super()
}

async process(
job: Job<any, any, string>,
processToken?: string | undefined, // eslint-disable-line @typescript-eslint/no-unused-vars
): Promise<any> {
this.logger.debug(
EcoLogMessage.fromDefault({
message: `InboxProcessor: process`,
properties: {
job: job.name,
},
}),
)

switch (job.name) {
case QUEUES.INBOX.jobs.fulfillment:
return await this.utilsIntentService.updateOnFulfillment(job.data as FulfillmentLog)
default:
this.logger.error(
EcoLogMessage.fromDefault({
message: `InboxProcessor: Invalid job type ${job.name}`,
}),
)
return Promise.reject('Invalid job type')
}
}

@OnWorkerEvent('failed')
onJobFailed(job: Job<any, any, string>, error: Error) {
this.logger.error(
EcoLogMessage.fromDefault({
message: `InboxProcessor: Error processing job`,
properties: {
job,
error,
},
}),
)
}
}
54 changes: 54 additions & 0 deletions src/bullmq/processors/interval.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'
import { QUEUES } from '@/common/redis/constants'
import { Injectable, Logger } from '@nestjs/common'
import { Job } from 'bullmq'
import { EcoLogMessage } from '@/common/logging/eco-log-message'
import { RetryInfeasableIntentsService } from '@/intervals/retry-infeasable-intents.service'

@Injectable()
@Processor(QUEUES.INTERVAL.queue)
export class IntervalProcessor extends WorkerHost {
private logger = new Logger(IntervalProcessor.name)
constructor(private readonly retryInfeasableIntentsService: RetryInfeasableIntentsService) {
super()
}

async process(
job: Job<any, any, string>,
processToken?: string | undefined, // eslint-disable-line @typescript-eslint/no-unused-vars
): Promise<any> {
this.logger.debug(
EcoLogMessage.fromDefault({
message: `IntervalProcessor: process`,
properties: {
job: job.name,
},
}),
)

switch (job.name) {
case QUEUES.INTERVAL.jobs.retry_infeasable_intents:
return await this.retryInfeasableIntentsService.retryInfeasableIntents()
default:
this.logger.error(
EcoLogMessage.fromDefault({
message: `IntervalProcessor: Invalid job type ${job.name}`,
}),
)
return Promise.reject('Invalid job type')
}
}

@OnWorkerEvent('failed')
onJobFailed(job: Job<any, any, string>, error: Error) {
this.logger.error(
EcoLogMessage.fromDefault({
message: `IntervalProcessor: Error processing job`,
properties: {
job,
error,
},
}),
)
}
}
3 changes: 2 additions & 1 deletion src/bullmq/processors/processor.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import { SolveIntentProcessor } from './solve-intent.processor'
import { BalanceModule } from '../../balance/balance.module'
import { IntentModule } from '../../intent/intent.module'
import { SignModule } from '../../sign/sign.module'
import { InboxProcessor } from '@/bullmq/processors/inbox.processor'

@Module({
imports: [BalanceModule, IntentModule, SignModule],
providers: [EthWebsocketProcessor, SignerProcessor, SolveIntentProcessor],
providers: [EthWebsocketProcessor, SignerProcessor, SolveIntentProcessor, InboxProcessor],
})
export class ProcessorModule {}
15 changes: 8 additions & 7 deletions src/bullmq/processors/solve-intent.processor.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq'
import { QUEUES } from '../../common/redis/constants'
import { QUEUES } from '@/common/redis/constants'
import { Injectable, Logger } from '@nestjs/common'
import { Job } from 'bullmq'
import { EcoLogMessage } from '../../common/logging/eco-log-message'
import { FeasableIntentService } from '../../intent/feasable-intent.service'
import { ValidateIntentService } from '../../intent/validate-intent.service'
import { CreateIntentService } from '../../intent/create-intent.service'
import { FulfillIntentService } from '../../intent/fulfill-intent.service'
import { EcoLogMessage } from '@/common/logging/eco-log-message'
import { FeasableIntentService } from '@/intent/feasable-intent.service'
import { ValidateIntentService } from '@/intent/validate-intent.service'
import { CreateIntentService } from '@/intent/create-intent.service'
import { FulfillIntentService } from '@/intent/fulfill-intent.service'
import { Hex } from 'viem'
import { IntentCreatedLog } from '../../contracts'
import { IntentCreatedLog } from '@/contracts'

@Injectable()
@Processor(QUEUES.SOURCE_INTENT.queue)
Expand Down Expand Up @@ -40,6 +40,7 @@ export class SolveIntentProcessor extends WorkerHost {
case QUEUES.SOURCE_INTENT.jobs.create_intent:
return await this.createIntentService.createIntent(job.data as IntentCreatedLog)
case QUEUES.SOURCE_INTENT.jobs.validate_intent:
case QUEUES.SOURCE_INTENT.jobs.retry_intent:
return await this.validateIntentService.validateIntent(job.data as Hex)
case QUEUES.SOURCE_INTENT.jobs.feasable_intent:
return await this.feasableIntentService.feasableIntent(job.data as Hex)
Expand Down
11 changes: 11 additions & 0 deletions src/bullmq/utils/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,14 @@ export async function removeJobSchedulers(queue: Queue, jobName: string) {
}
}
}

/**
* Checks to see if there is a scheduled job of a given name in the queue.
* @param queue the queue to check
* @param jobName the name of the job to check
* @returns
*/
export async function isJobScheduled(queue: Queue, jobName: string): Promise<boolean> {
const repeatableJobs = await queue.getJobSchedulers()
return !!repeatableJobs.find((job) => job.name === jobName)
}
3 changes: 2 additions & 1 deletion src/chain-monitor/chain-monitor.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common'
import { IntentModule } from '../intent/intent.module'
import { ChainSyncService } from './chain-sync.service'
import { TransactionModule } from '../transaction/transaction.module'
import { WatchModule } from '@/watch/watch.module'

@Module({
imports: [IntentModule, TransactionModule],
imports: [IntentModule, TransactionModule, WatchModule],
providers: [ChainSyncService],
exports: [ChainSyncService],
})
Expand Down
4 changes: 2 additions & 2 deletions src/chain-monitor/chain-sync.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { IntentSource } from '../eco-configs/eco-config.types'
import { IntentCreatedLog } from '../contracts'
import { entries } from 'lodash'
import { BlockTag } from 'viem'
import { WatchIntentService } from '../intent/watch-intent.service'
import { WatchCreateIntentService } from '../watch/intent/watch-create-intent.service'
import { KernelAccountClientService } from '../transaction/smart-wallets/kernel/kernel-account-client.service'
import { IntentSourceAbi } from '@eco-foundation/routes-ts'

Expand All @@ -25,7 +25,7 @@ export class ChainSyncService implements OnApplicationBootstrap {
constructor(
@InjectModel(IntentSourceModel.name) private intentModel: Model<IntentSourceModel>,
private readonly kernelAccountClientService: KernelAccountClientService,
private readonly watchIntentService: WatchIntentService,
private readonly watchIntentService: WatchCreateIntentService,
private ecoConfigService: EcoConfigService,
) {}

Expand Down
8 changes: 4 additions & 4 deletions src/chain-monitor/tests/chain-sync.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { createMock, DeepMocked } from '@golevelup/ts-jest'
import { ChainSyncService } from '../chain-sync.service'
import { WatchIntentService } from '../../intent/watch-intent.service'
import { WatchCreateIntentService } from '../../watch/intent/watch-create-intent.service'
import { EcoConfigService } from '../../eco-configs/eco-config.service'
import { Test, TestingModule } from '@nestjs/testing'
import { getModelToken } from '@nestjs/mongoose'
Expand All @@ -14,7 +14,7 @@ import { IntentSourceAbi } from '@eco-foundation/routes-ts'
describe('ChainSyncService', () => {
let chainSyncService: ChainSyncService
let accountService: DeepMocked<KernelAccountClientService>
let watchIntentService: DeepMocked<WatchIntentService>
let watchIntentService: DeepMocked<WatchCreateIntentService>
let ecoConfigService: DeepMocked<EcoConfigService>

beforeEach(async () => {
Expand All @@ -25,7 +25,7 @@ describe('ChainSyncService', () => {
provide: KernelAccountClientService,
useValue: createMock<KernelAccountClientService>(),
},
{ provide: WatchIntentService, useValue: createMock<WatchIntentService>() },
{ provide: WatchCreateIntentService, useValue: createMock<WatchCreateIntentService>() },
{ provide: EcoConfigService, useValue: createMock<EcoConfigService>() },
{
provide: getModelToken(IntentSourceModel.name),
Expand All @@ -36,7 +36,7 @@ describe('ChainSyncService', () => {

chainSyncService = chainMod.get(ChainSyncService)
accountService = chainMod.get(KernelAccountClientService)
watchIntentService = chainMod.get(WatchIntentService)
watchIntentService = chainMod.get(WatchCreateIntentService)
ecoConfigService = chainMod.get(EcoConfigService) as DeepMocked<EcoConfigService>
})

Expand Down
8 changes: 4 additions & 4 deletions src/common/errors/eco-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ export class EcoError extends Error {
}

// WatchIntent Service
static WatchIntentUnsubscribeError = new Error('Could not unsubscribe from watch intent')
static WatchIntentUnsubscribeFromError(chainID: number) {
return new Error(`Could not unsubscribe from watch intent for chain : ${chainID}`)
static WatchEventUnsubscribeError = new Error('Could not unsubscribe from watch event')
static WatchEventUnsubscribeFromError(chainID: number) {
return new Error(`Could not unsubscribe from watch event for chain : ${chainID}`)
}
static WatchIntentNoUnsubscribeError(chainID: number) {
static WatchEventNoUnsubscribeError(chainID: number) {
return new Error(`There is no unwatch for chain : ${chainID}`)
}

Expand Down
15 changes: 15 additions & 0 deletions src/common/redis/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ export const QUEUES: Record<any, QueueInterface> = {
validate_intent: 'validate_intent',
feasable_intent: 'feasable_intent',
fulfill_intent: 'fulfill_intent',
retry_intent: 'retry_intent',
},
},
INTERVAL: {
queue: 'interval',
prefix: '{interval}',
jobs: {
retry_infeasable_intents: 'retry_infeasable_intents',
},
},
INBOX: {
queue: 'inbox',
prefix: '{inbox}',
jobs: {
fulfillment: 'fulfillment',
},
},
ETH_SOCKET: {
Expand Down
8 changes: 8 additions & 0 deletions src/contracts/inbox.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { InboxAbi } from '@eco-foundation/routes-ts'
import { ExtractAbiEvent } from 'abitype'
import { Prettify, Log } from 'viem'

// Define the type for the Fulfillment event log
export type FulfillmentLog = Prettify<
Log<bigint, number, false, ExtractAbiEvent<typeof InboxAbi, 'Fulfillment'>, true>
>
2 changes: 1 addition & 1 deletion src/contracts/prover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ export const PROOF_HYPERLANE = 1

export type ProofType = typeof PROOF_STORAGE | typeof PROOF_HYPERLANE

export const Proofs = {
export const Proofs: Record<string, ProofType> = {
Storage: PROOF_STORAGE,
Hyperlane: PROOF_HYPERLANE,
}
Loading

0 comments on commit 72e0906

Please sign in to comment.