Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-service: Various robustness improvements #821

Merged
merged 2 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/indexer-common/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export enum IndexerErrorCode {
IE072 = 'IE072',
IE073 = 'IE073',
IE074 = 'IE074',
IE075 = 'IE075',
}

export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
Expand Down Expand Up @@ -163,6 +164,7 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
IE072: 'Failed to execute batch tx (contract: staking)',
IE073: 'Failed to query subgraph features from indexing statuses endpoint',
IE074: 'Failed to deploy subgraph: network not supported',
IE075: 'Failed to connect to network contracts',
}

export type IndexerErrorCause = unknown
Expand Down
15 changes: 11 additions & 4 deletions packages/indexer-common/src/network-subgraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export interface NetworkSubgraphCreateOptions {
graphNode: GraphNode
deployment: SubgraphDeploymentID
}
subgraphFreshnessChecker: SubgraphFreshnessChecker
subgraphFreshnessChecker?: SubgraphFreshnessChecker
}

interface DeploymentStatus {
Expand All @@ -32,7 +32,7 @@ interface NetworkSubgraphOptions {
status: Eventual<DeploymentStatus>
graphNode: GraphNode
}
subgraphFreshnessChecker: SubgraphFreshnessChecker
subgraphFreshnessChecker?: SubgraphFreshnessChecker
}

export type QueryResult<Data> = Pick<
Expand All @@ -42,7 +42,7 @@ export type QueryResult<Data> = Pick<

export class NetworkSubgraph {
logger: Logger
freshnessChecker: SubgraphFreshnessChecker
freshnessChecker: SubgraphFreshnessChecker | undefined
endpointClient?: AxiosInstance

public readonly deployment?: {
Expand Down Expand Up @@ -172,7 +172,14 @@ export class NetworkSubgraph {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
variables?: Record<string, any>,
): Promise<QueryResult<Data>> {
return this.freshnessChecker.checkedQuery(this, query, variables)
if (this.freshnessChecker) {
return this.freshnessChecker.checkedQuery(this, query, variables)
} else {
this.logger.warn(
`Cannot perform 'checkedQuery' as no freshnessChecker has been configured, falling back to standard 'query'`,
)
return this.query(query, variables)
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
8 changes: 5 additions & 3 deletions packages/indexer-service/src/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ export const monitorEligibleAllocations = ({
const refreshAllocations = async (
currentAllocations: Allocation[],
): Promise<Allocation[]> => {
logger.debug('Refresh eligible allocations')
logger.debug('Refresh eligible allocations', {
protocolNetwork,
})

try {
const currentEpochResult = await networkSubgraph.checkedQuery(gql`
const currentEpochResult = await networkSubgraph.query(gql`
query {
graphNetwork(id: "1") {
currentEpoch
Expand All @@ -59,7 +61,7 @@ export const monitorEligibleAllocations = ({

const currentEpoch = currentEpochResult.data.graphNetwork.currentEpoch

const result = await networkSubgraph.checkedQuery(
const result = await networkSubgraph.query(
gql`
query allocations($indexer: String!, $closedAtEpochThreshold: Int!) {
indexer(id: $indexer) {
Expand Down
105 changes: 42 additions & 63 deletions packages/indexer-service/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ import { BigNumber, Wallet } from 'ethers'
import fs from 'fs'
import { parse as yaml_parse } from 'yaml'

const DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE = 0
const SUGGESTED_SUBGRAPH_MAX_BLOCK_DISTANCE_ON_L2 =
50 + DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE
const DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS = 5_000

import {
connectContracts,
connectDatabase,
createLogger,
createMetrics,
createMetricsServer,
NetworkContracts,
SubgraphDeploymentID,
toAddress,
} from '@graphprotocol/common-ts'
Expand All @@ -31,13 +27,13 @@ import {
registerIndexerErrorMetrics,
resolveChainId,
validateProviderNetworkIdentifier,
SubgraphFreshnessChecker,
} from '@graphprotocol/indexer-common'

import { createServer } from '../server'
import { QueryProcessor } from '../queries'
import { ensureAttestationSigners, monitorEligibleAllocations } from '../allocations'
import { AllocationReceiptManager } from '../query-fees'
import pRetry from 'p-retry'

export default {
command: 'start',
Expand Down Expand Up @@ -182,18 +178,6 @@ export default {
type: 'string',
required: false,
})
.option('subgraph-max-block-distance', {
description: 'How many blocks subgraphs are allowed to stay behind chain head',
type: 'number',
default: DEFAULT_SUBGRAPH_MAX_BLOCK_DISTANCE,
group: 'Protocol',
})
.option('subgraph-freshness-sleep-milliseconds', {
description: 'How long to wait before retrying subgraph query if it is not fresh',
type: 'number',
default: DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS,
group: 'Protocol',
})

.check(argv => {
if (!argv['network-subgraph-endpoint'] && !argv['network-subgraph-deployment']) {
Expand Down Expand Up @@ -317,45 +301,6 @@ export default {
const networkIdentifier = await networkProvider.getNetwork()
const protocolNetwork = resolveChainId(networkIdentifier.chainId)

// Warn about inappropriate max block distance for subgraph threshold checks for given networks.
if (protocolNetwork.startsWith('eip155:42161')) {
// Arbitrum-One and Arbitrum-Goerli
if (argv.subgraphMaxBlockDistance <= SUGGESTED_SUBGRAPH_MAX_BLOCK_DISTANCE_ON_L2) {
logger.warn(
`Consider increasing 'subgraph-max-block-distance' for Arbitrum networks`,
{
problem:
'A low subgraph freshness threshold might cause the Agent to discard too many subgraph queries in fast-paced networks.',
hint: `Increase the 'subgraph-max-block-distance' parameter to a value that accomodates for block and indexing speeds.`,
configuredValue: argv.subgraphMaxBlockDistance,
},
)
}
if (
argv.subgraphFreshnessSleepMilliseconds <=
DEFAULT_SUBGRAPH_FRESHNESS_SLEEP_MILLISECONDS
) {
logger.warn(
`Consider increasing 'subgraph-freshness-sleep-milliseconds' for Arbitrum networks`,
{
problem:
'A short subgraph freshness wait time might be insufficient for the subgraph to sync with fast-paced networks.',
hint: `Increase the 'subgraph-freshness-sleep-milliseconds' parameter to a value that accomodates for block and indexing speeds.`,
configuredValue: argv.subgraphFreshnessSleepMilliseconds,
},
)
}
}

const subgraphFreshnessChecker = new SubgraphFreshnessChecker(
'Network Subgraph',
networkProvider,
argv.subgraphMaxBlockDistance,
argv.subgraphFreshnessSleepMilliseconds,
logger.child({ component: 'FreshnessChecker' }),
Infinity,
)

const networkSubgraph = await NetworkSubgraph.create({
logger,
endpoint: argv.networkSubgraphEndpoint,
Expand All @@ -365,7 +310,6 @@ export default {
deployment: new SubgraphDeploymentID(argv.networkSubgraphDeployment),
}
: undefined,
subgraphFreshnessChecker,
})
logger.info(`Successfully connected to network subgraph`)

Expand All @@ -391,7 +335,7 @@ export default {
chainId: networkIdentifier.chainId,
})

let contracts = undefined
let contracts: NetworkContracts | undefined = undefined
try {
contracts = await connectContracts(
networkProvider,
Expand All @@ -401,11 +345,8 @@ export default {
} catch (error) {
logger.error(
`Failed to connect to contracts, please ensure you are using the intended Ethereum Network`,
{
error,
},
)
throw error
throw indexerError(IndexerErrorCode.IE075, error)
}

logger.info('Successfully connected to contracts', {
Expand Down Expand Up @@ -435,6 +376,44 @@ export default {
operator: address.toString(),
})

// Validate the operator wallet matches the operator set for the indexer
const isOperator = await pRetry(
async () =>
contracts!.staking.isOperator(
wallet.address.toString(),
indexerAddress.toString(),
),
{
retries: 10,
maxTimeout: 10000,
onFailedAttempt: err => {
logger.warn(
`contracts.staking.isOperator(${wallet.address.toString()}, ${indexerAddress.toString()}) failed`,
{
attempt: err.attemptNumber,
retriesLeft: err.retriesLeft,
err: err.message,
},
)
},
} as pRetry.Options,
)

if (isOperator == false) {
logger.error(
'Operator wallet is not allowed for indexer, please see attached debug suggestions',
{
debugSuggestion1: 'verify that operator wallet is set for indexer account',
debugSuggestion2:
'verify that service and agent are both using correct operator wallet mnemonic',
},
)
throw indexerError(
IndexerErrorCode.IE034,
`contracts.staking.isOperator returned 'False'`,
)
}

// Monitor indexer allocations that we may receive traffic for
const allocations = monitorEligibleAllocations({
indexer: indexerAddress,
Expand Down
Loading