From 0215e27d38969bd9412222c4144117526d9a31f8 Mon Sep 17 00:00:00 2001 From: Ford Date: Fri, 2 Aug 2024 11:34:08 -0700 Subject: [PATCH] agent: Allow configuring polling interval for data collection - Also make polling intervals more consistent (Eventuals timers). --- packages/indexer-agent/src/agent.ts | 62 ++++++++------------ packages/indexer-agent/src/commands/start.ts | 24 ++++++-- packages/indexer-agent/src/types.ts | 29 +++++---- 3 files changed, 62 insertions(+), 53 deletions(-) diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 7086a0e9f..abf1c5dde 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -44,6 +44,7 @@ import pMap from 'p-map' import pFilter from 'p-filter' import mapValues from 'lodash.mapvalues' import zip from 'lodash.zip' +import { AgentConfigs, NetworkAndOperator } from './types' type ActionReconciliationContext = [AllocationDecision[], number, number] @@ -126,13 +127,6 @@ export const convertSubgraphBasedRulesToDeploymentBased = ( return rules } -// Represents a pair of Network and Operator instances belonging to the same protocol -// network. Used when mapping over multiple protocol networks. -type NetworkAndOperator = { - network: Network - operator: Operator -} - // Extracts the network identifier from a pair of matching Network and Operator objects. function networkAndOperatorIdentity({ network, @@ -196,26 +190,21 @@ export class Agent { offchainSubgraphs: SubgraphDeploymentID[] autoMigrationSupport: boolean deploymentManagement: DeploymentManagementMode - - constructor( - logger: Logger, - metrics: Metrics, - graphNode: GraphNode, - operators: Operator[], - indexerManagement: IndexerManagementClient, - networks: Network[], - offchainSubgraphs: SubgraphDeploymentID[], - autoMigrationSupport: boolean, - deploymentManagement: DeploymentManagementMode, - ) { - this.logger = logger.child({ component: 'Agent' }) - this.metrics = metrics - this.graphNode = graphNode - this.indexerManagement = indexerManagement - this.multiNetworks = createMultiNetworks(networks, operators) - this.offchainSubgraphs = offchainSubgraphs - this.autoMigrationSupport = !!autoMigrationSupport - this.deploymentManagement = deploymentManagement + pollingInterval: number + + constructor(configs: AgentConfigs) { + this.logger = configs.logger.child({ component: 'Agent' }) + this.metrics = configs.metrics + this.graphNode = configs.graphNode + this.indexerManagement = configs.indexerManagement + this.multiNetworks = createMultiNetworks( + configs.networks, + configs.operators, + ) + this.offchainSubgraphs = configs.offchainSubgraphs + this.autoMigrationSupport = !!configs.autoMigrationSupport + this.deploymentManagement = configs.deploymentManagement + this.pollingInterval = configs.pollingInterval } async start(): Promise { @@ -261,9 +250,11 @@ export class Agent { } reconciliationLoop() { + const requestIntervalSmall = this.pollingInterval + const requestIntervalLarge = this.pollingInterval * 5 const logger = this.logger.child({ component: 'ReconciliationLoop' }) const currentEpochNumber: Eventual> = timer( - 600_000, + requestIntervalLarge, ).tryMap( async () => await this.multiNetworks.map(({ network }) => { @@ -279,7 +270,7 @@ export class Agent { ) const maxAllocationEpochs: Eventual> = timer( - 600_000, + requestIntervalLarge, ).tryMap( () => this.multiNetworks.map(({ network }) => { @@ -295,7 +286,7 @@ export class Agent { ) const indexingRules: Eventual> = - timer(20_000).tryMap( + timer(requestIntervalSmall).tryMap( async () => { return this.multiNetworks.map(async ({ network, operator }) => { logger.trace('Fetching indexing rules', { @@ -332,7 +323,7 @@ export class Agent { ) const activeDeployments: Eventual = timer( - 60_000, + requestIntervalSmall, ).tryMap( () => { logger.trace('Fetching active deployments') @@ -348,7 +339,7 @@ export class Agent { ) const networkDeployments: Eventual> = - timer(240_000).tryMap( + timer(requestIntervalSmall).tryMap( async () => await this.multiNetworks.map(({ network }) => { logger.trace('Fetching network deployments', { @@ -367,7 +358,7 @@ export class Agent { const eligibleTransferDeployments: Eventual< NetworkMapped - > = timer(300_000).tryMap( + > = timer(requestIntervalLarge).tryMap( async () => { // Return early if the auto migration feature is disabled. if (!this.autoMigrationSupport) { @@ -535,7 +526,6 @@ export class Agent { // let targetDeployments be an union of targetAllocations // and offchain subgraphs. const targetDeployments: Eventual = join({ - ticker: timer(120_000), indexingRules, networkDeploymentAllocationDecisions, }).tryMap( @@ -569,7 +559,7 @@ export class Agent { ) const activeAllocations: Eventual> = timer( - 120_000, + requestIntervalSmall, ).tryMap( () => this.multiNetworks.map(({ network }) => { @@ -646,7 +636,7 @@ export class Agent { ) join({ - ticker: timer(240_000), + ticker: timer(requestIntervalLarge), currentEpochNumber, maxAllocationEpochs, activeDeployments, diff --git a/packages/indexer-agent/src/commands/start.ts b/packages/indexer-agent/src/commands/start.ts index e4b0f03b4..fd9db785f 100644 --- a/packages/indexer-agent/src/commands/start.ts +++ b/packages/indexer-agent/src/commands/start.ts @@ -34,6 +34,7 @@ import { NetworkSpecification } from '@graphprotocol/indexer-common/dist/network import { BigNumber } from 'ethers' import { displayZodParsingError } from '@graphprotocol/indexer-common' import { readFileSync } from 'fs' +import { AgentConfigs } from '../types' // eslint-disable-next-line @typescript-eslint/no-explicit-any export type AgentOptions = { [key: string]: any } & Argv['argv'] @@ -298,6 +299,13 @@ export const start = { default: 'auto', group: 'Indexer Infrastructure', }) + .option('polling-interval', { + description: 'Polling interval for data collection', + type: 'number', + required: false, + default: 120_000, + group: 'Indexer Infrastructure', + }) .option('auto-allocation-min-batch-size', { description: `Minimum number of allocation transactions inside a batch for auto allocation management. No obvious upperbound, with default of 1`, type: 'number', @@ -667,17 +675,21 @@ export async function run( // -------------------------------------------------------------------------------- // * The Agent itself // -------------------------------------------------------------------------------- - const agent = new Agent( + const agentConfigs: AgentConfigs = { logger, metrics, graphNode, operators, - indexerManagementClient, + indexerManagement: indexerManagementClient, networks, - argv.offchainSubgraphs.map((s: string) => new SubgraphDeploymentID(s)), - argv.enableAutoMigrationSupport, - argv.deploymentManagement, - ) + deploymentManagement: argv.deploymentManagement, + autoMigrationSupport: argv.enableAutoMigrationSupport, + offchainSubgraphs: argv.offchainSubgraphs.map( + (s: string) => new SubgraphDeploymentID(s), + ), + pollingInterval: argv.pollingInterval, + } + const agent = new Agent(agentConfigs) await agent.start() } diff --git a/packages/indexer-agent/src/types.ts b/packages/indexer-agent/src/types.ts index be21b3327..f71f9d295 100644 --- a/packages/indexer-agent/src/types.ts +++ b/packages/indexer-agent/src/types.ts @@ -1,21 +1,28 @@ import { Logger, Metrics, SubgraphDeploymentID } from '@graphprotocol/common-ts' import { Network, - NetworkSubgraph, - ReceiptCollector, GraphNode, + DeploymentManagementMode, + IndexerManagementClient, + Operator, } from '@graphprotocol/indexer-common' -import { NetworkMonitor } from '@graphprotocol/indexer-common' -export interface AgentConfig { +// Represents a pair of Network and Operator instances belonging to the same protocol +// network. Used when mapping over multiple protocol networks. +export type NetworkAndOperator = { + network: Network + operator: Operator +} + +export interface AgentConfigs { logger: Logger metrics: Metrics - indexer: GraphNode - network: Network - networkMonitor: NetworkMonitor - networkSubgraph: NetworkSubgraph - allocateOnNetworkSubgraph: boolean - registerIndexer: boolean + graphNode: GraphNode + operators: Operator[] + indexerManagement: IndexerManagementClient + networks: Network[] + deploymentManagement: DeploymentManagementMode + autoMigrationSupport: boolean offchainSubgraphs: SubgraphDeploymentID[] - receiptCollector: ReceiptCollector + pollingInterval: number }