Skip to content

Commit

Permalink
indexer-agent,indexer-common,indexer-cli: Indexing rules by subgraph id
Browse files Browse the repository at this point in the history
- Update indexing-rule model: deployment --> identifier & identifierType
- CLI interface unchanged but now supports providing subgraph ids
where deployment ids were the only supported ids previously.
- Rules by subgraph id are processed into deployment based rules, so
the reconciliation logic doesn't need to change.
  • Loading branch information
fordN committed Dec 1, 2021
1 parent c248591 commit 373fb09
Show file tree
Hide file tree
Showing 30 changed files with 734 additions and 2,281 deletions.
86 changes: 77 additions & 9 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import {
toAddress,
} from '@graphprotocol/common-ts'
import {
allocationRewardsPool,
Allocation,
allocationRewardsPool,
AllocationStatus,
INDEXING_RULE_GLOBAL,
IndexingRuleAttributes,
RewardsPool,
BlockPointer,
indexerError,
IndexerErrorCode,
INDEXING_RULE_GLOBAL,
IndexingRuleAttributes,
NetworkSubgraph,
POIDisputeAttributes,
RewardsPool,
Subgraph,
SubgraphIdentifierType,
} from '@graphprotocol/indexer-common'
import { BlockPointer, NetworkSubgraph } from '@graphprotocol/indexer-common'
import { Indexer } from './indexer'
import { AgentConfig } from './types'
import { Network } from './network'
Expand Down Expand Up @@ -50,6 +53,48 @@ const uniqueDeployments = (
deployments: SubgraphDeploymentID[],
): SubgraphDeploymentID[] => deployments.filter(uniqueDeploymentsOnly)

const convertSubgraphBasedRulesToDeploymentBased = (
rules: IndexingRuleAttributes[],
subgraphs: Subgraph[],
previousVersionBuffer: number,
): IndexingRuleAttributes[] => {
const toAdd: IndexingRuleAttributes[] = []
rules.map(rule => {
const ruleSubgraph = subgraphs.find(
subgraph => subgraph.id == rule.identifier,
)
if (ruleSubgraph) {
const latestVersion = ruleSubgraph.versionCount
const latestDeploymentVersion = ruleSubgraph.versions.find(
version => version.version == latestVersion - 1,
)
if (latestDeploymentVersion) {
rule.identifier = latestDeploymentVersion!.deployment.toString()
rule.identifierType = SubgraphIdentifierType.DEPLOYMENT
const currentTimestamp = Math.floor(Date.now() / 1000)
if (
latestDeploymentVersion.createdAt >
currentTimestamp - previousVersionBuffer
) {
const previousDeploymentVersion = ruleSubgraph.versions.find(
version => version.version == latestVersion - 2,
)
if (previousDeploymentVersion) {
const previousDeploymentRule = { ...rule }
previousDeploymentRule.identifier =
previousDeploymentVersion!.deployment.toString()
previousDeploymentRule.identifierType =
SubgraphIdentifierType.DEPLOYMENT
toAdd.push(previousDeploymentRule)
}
}
}
}
})
rules.push(...toAdd)
return rules
}

class Agent {
logger: Logger
metrics: Metrics
Expand Down Expand Up @@ -137,11 +182,34 @@ class Agent {
)

const indexingRules = timer(60_000).tryMap(
() => this.indexer.indexingRules(true),
async () => {
let rules = await this.indexer.indexingRules(true)
const subgraphRuleIds = rules
.filter(
rule => rule.identifierType == SubgraphIdentifierType.SUBGRAPH,
)
.map(rule => rule.identifier!)
const subgraphsMatchingRules = await this.network.subgraphs(
subgraphRuleIds,
)
if (subgraphsMatchingRules.length >= 1) {
const epochLength =
await this.network.contracts.epochManager.epochLength()
const blockPeriod = 15
const bufferPeriod = epochLength.toNumber() * blockPeriod * 100 // 100 epochs
rules = convertSubgraphBasedRulesToDeploymentBased(
rules,
subgraphsMatchingRules,
bufferPeriod,
)
}
return rules
},
{
onError: () =>
onError: error =>
this.logger.warn(
`Failed to obtain indexing rules, trying again later`,
{ error },
),
},
)
Expand Down Expand Up @@ -602,8 +670,8 @@ class Agent {
) !== undefined,

// Indexing rule for the deployment (if any)
rules.find(rule => rule.deployment === deployment.bytes32) ||
rules.find(rule => rule.deployment === INDEXING_RULE_GLOBAL),
rules.find(rule => rule.identifier === deployment.bytes32) ||
rules.find(rule => rule.identifier === INDEXING_RULE_GLOBAL),

currentEpoch,
currentEpochStartBlock,
Expand Down
19 changes: 12 additions & 7 deletions packages/indexer-agent/src/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
IndexingStatusResolver,
BlockPointer,
IndexingStatus,
SubgraphIdentifierType,
} from '@graphprotocol/indexer-common'
import pRetry from 'p-retry'

Expand Down Expand Up @@ -215,7 +216,8 @@ export class Indexer {
gql`
query indexingRules($merged: Boolean!) {
indexingRules(merged: $merged) {
deployment
identifier
identifierType
allocationAmount
parallelAllocations
maxAllocationPercentage
Expand Down Expand Up @@ -249,23 +251,25 @@ export class Indexer {
const globalRule = await this.indexerManagement
.query(
gql`
query indexingRule($deployment: String!) {
indexingRule(deployment: $deployment, merged: false) {
deployment
query indexingRule($identifier: String!) {
indexingRule(identifier: $identifier, merged: false) {
identifier
identifierType
allocationAmount
decisionBasis
}
}
`,
{ deployment: INDEXING_RULE_GLOBAL },
{ identifier: INDEXING_RULE_GLOBAL },
)
.toPromise()

if (!globalRule.data.indexingRule) {
this.logger.info(`Creating default "global" indexing rule`)

const defaults = {
deployment: INDEXING_RULE_GLOBAL,
identifier: INDEXING_RULE_GLOBAL,
identifierType: SubgraphIdentifierType.GROUP,
allocationAmount: this.defaultAllocationAmount.toString(),
parallelAllocations: 1,
decisionBasis: 'rules',
Expand All @@ -276,7 +280,8 @@ export class Indexer {
gql`
mutation setIndexingRule($rule: IndexingRuleInput!) {
setIndexingRule(rule: $rule) {
deployment
identifier
identifierType
allocationAmount
parallelAllocations
maxAllocationPercentage
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import { Logger } from '@graphprotocol/common-ts'
import { DataTypes, QueryInterface } from 'sequelize'

interface MigrationContext {
queryInterface: QueryInterface
logger: Logger
}

interface Context {
context: MigrationContext
}

export async function up({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

logger.info(
`Rename indexing rule identifier column and add identifier type column`,
)

logger.info(`Checking if indexing rules table exists`)
const tables = await queryInterface.showAllTables()
if (!tables.includes('IndexingRules')) {
logger.info(`Indexing rules table does not exist, migration not necessary`)
return
}

logger.info(`Checking if indexing rules table needs to be migrated`)
const table = await queryInterface.describeTable('IndexingRules')
const subgraphIdentifierTypeColumn = table.identifierType
const subgraphIdentifierColumn = table.identifier
if (subgraphIdentifierTypeColumn && subgraphIdentifierColumn) {
logger.info(
`Identifier and identifierType columns already exist, migration not necessary`,
)
return
}

logger.info(`Adding identifierType column to IndexingRules table`)
await queryInterface.addColumn('IndexingRules', 'identifierType', {
type: DataTypes.ENUM('deployment', 'subgraph', 'group'),
primaryKey: true,
defaultValue: 'group',
})

await queryInterface.renameColumn('IndexingRules', 'deployment', 'identifier')
}

export async function down({ context }: Context): Promise<void> {
const { queryInterface, logger } = context

logger.info(
`Revert renaming indexing rule identifier column and adding identifierType column`,
)

return await queryInterface.sequelize.transaction({}, async transaction => {
const tables = await queryInterface.showAllTables()

if (tables.includes('IndexingRules')) {
await context.queryInterface.removeColumn(
'IndexingRules',
'identifierType',
{ transaction },
)

await queryInterface.renameColumn(
'IndexingRules',
'identifier',
'deployment',
{ transaction },
)
}
})
}
110 changes: 107 additions & 3 deletions packages/indexer-agent/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import {
INDEXER_ERROR_MESSAGES,
IndexerError,
NetworkSubgraph,
Subgraph,
SubgraphVersion,
} from '@graphprotocol/indexer-common'
import {
ContractTransaction,
Expand Down Expand Up @@ -468,11 +470,113 @@ export class Network {
)
}

async subgraphs(ids: string[]): Promise<Subgraph[]> {
if (ids.length == 0) {
return []
}
let subgraphs: Subgraph[] = []
const queryProgress = {
lastId: '',
first: 20,
fetched: 0,
exhausted: false,
retriesRemaining: 10,
}
this.logger.info(`Query subgraphs in batches of ${queryProgress.first}`)

while (!queryProgress.exhausted) {
this.logger.debug(`Query subgraphs by id`, {
queryProgress: queryProgress,
subgraphIds: ids,
})
try {
const result = await this.networkSubgraph.query(
gql`
query subgraphs(
$first: Int!
$lastId: String!
$subgraphs: [String!]!
) {
subgraphs(
where: { id_gt: $lastId, id_in: $subgraphs }
orderBy: id
orderDirection: asc
first: $first
) {
id
versionCount
versions {
version
createdAt
subgraphDeployment {
id
}
}
}
}
`,
{
first: queryProgress.first,
lastId: queryProgress.lastId,
subgraphs: ids,
},
)

if (result.error) {
throw result.error
}

// Convert return object to Subgraph interface
// eslint-disable-next-line @typescript-eslint/no-explicit-any
const results = result.data.subgraphs.map((subgraph: any) => {
subgraph.versions = subgraph.versions.map(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(versionItem: any) => {
return {
version: versionItem.version,
createdAt: versionItem.createdAt,
deployment: new SubgraphDeploymentID(
versionItem.subgraphDeployment.id,
),
} as SubgraphVersion
},
)
return subgraph
})

// In the case of a fresh graph network there will be no published subgraphs, handle gracefully
if (results.length == 0) {
return []
}

queryProgress.exhausted = results.length < queryProgress.first
queryProgress.fetched += results.length
queryProgress.lastId = results[results.length - 1].id

subgraphs = subgraphs.concat(results)
} catch (error) {
queryProgress.retriesRemaining--
this.logger.error(`Failed to query subgraphs by id`, {
retriesRemaining: queryProgress.retriesRemaining,
error: error,
})
if (queryProgress.retriesRemaining <= 0) {
const err = indexerError(IndexerErrorCode.IE009, error)
this.logger.error(`Failed to query subgraphs`, {
err,
})
throw err
}
}
}
this.logger.debug(`Found ${subgraphs.length} matching subgraphs`)
return subgraphs
}
async subgraphDeploymentsWorthIndexing(
rules: IndexingRuleAttributes[],
): Promise<SubgraphDeploymentID[]> {
const globalRule = rules.find(
rule => rule.deployment === INDEXING_RULE_GLOBAL,
rule => rule.identifier === INDEXING_RULE_GLOBAL,
)

const deployments = []
Expand Down Expand Up @@ -536,7 +640,7 @@ export class Network {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.filter((deployment: any) => {
const deploymentRule =
rules.find(rule => rule.deployment === deployment.id) ||
rules.find(rule => rule.identifier === deployment.id) ||
globalRule

// The deployment is not eligible for deployment if it doesn't have an allocation amount
Expand Down Expand Up @@ -580,7 +684,7 @@ export class Network {
avgQueryFees: avgQueryFees.toString(),
},
indexingRule: {
deployment: deploymentRule.deployment,
deployment: deploymentRule.identifier,
minStake: deploymentRule.minStake
? BigNumber.from(deploymentRule.minStake).toString()
: null,
Expand Down
Loading

0 comments on commit 373fb09

Please sign in to comment.