Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

agent,common: Migrate subgraph deployment assignments to use explicit 'paused' mechanism #868

Merged
merged 5 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions packages/indexer-agent/src/__tests__/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,11 @@ const setup = async () => {
queryFeeModels = defineQueryFeeModels(sequelize)
sequelize = await sequelize.sync({ force: true })

const indexNodeIDs = ['node_1']

graphNode = new GraphNode(
logger,
'http://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
'https://test-status-endpoint.xyz',
indexNodeIDs,
)

const yamlObj = loadTestYamlConfig()
Expand All @@ -163,7 +160,6 @@ const setup = async () => {
indexerManagementClient = await createIndexerManagementClient({
models,
graphNode,
indexNodeIDs,
logger,
defaults: {
globalIndexingRule: {
Expand Down
20 changes: 10 additions & 10 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
networkIsL2,
networkIsL1,
DeploymentManagementMode,
SubgraphStatus,
} from '@graphprotocol/indexer-common'

import PQueue from 'p-queue'
Expand Down Expand Up @@ -939,17 +940,17 @@ export class Agent {
eligibleAllocations.map(allocation => allocation.subgraphDeployment.id),
)

// Identify which subgraphs to deploy and which to remove
// Identify which subgraphs to deploy and which to pause
const deploy = targetDeployments.filter(
deployment => !deploymentInList(activeDeployments, deployment),
)
const remove = activeDeployments.filter(
const pause = activeDeployments.filter(
deployment =>
!deploymentInList(targetDeployments, deployment) &&
!deploymentInList(eligibleAllocationDeployments, deployment),
)

if (deploy.length + remove.length !== 0) {
if (deploy.length + pause.length !== 0) {
logger.info('Deployment changes', {
indexingNetworkSubgraph,
syncing: activeDeployments.map(id => id.display),
Expand All @@ -958,19 +959,20 @@ export class Agent {
id => id.display,
),
deploy: deploy.map(id => id.display),
remove: remove.map(id => id.display),
pause: pause.map(id => id.display),
})
} else {
logger.debug('No deployment changes are necessary')
}

// ----------------------------------------------------------------------------------------
// Execute Deployments (Add, Remove)
// Execute Deployments (Add, Pause)
// ----------------------------------------------------------------------------------------

// Deploy/remove up to 10 subgraphs in parallel
const queue = new PQueue({ concurrency: 10 })

const currentAssignments =
await this.graphNode.subgraphDeploymentsAssignments(SubgraphStatus.ALL)
// Index all new deployments worth indexing
await queue.addAll(
deploy.map(deployment => async () => {
Expand All @@ -982,15 +984,13 @@ export class Agent {
})

// Ensure the deployment is deployed to the indexer
// Note: we're not waiting here, as sometimes indexing a subgraph
// will block if the IPFS files cannot be retrieved
await this.graphNode.ensure(name, deployment)
await this.graphNode.ensure(name, deployment, currentAssignments)
}),
)

// Stop indexing deployments that are no longer worth indexing
await queue.addAll(
remove.map(deployment => async () => this.graphNode.remove(deployment)),
pause.map(deployment => async () => this.graphNode.pause(deployment)),
)

await queue.onIdle()
Expand Down
15 changes: 0 additions & 15 deletions packages/indexer-agent/src/commands/common-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,6 @@ import { parseDeploymentManagementMode } from '@graphprotocol/indexer-common'
// Injects all CLI options shared between this module's commands into a `yargs.Argv` object.
export function injectCommonStartupOptions(argv: Argv): Argv {
argv
.option('index-node-ids', {
description:
'Node IDs of Graph nodes to use for indexing (separated by commas)',
type: 'string',
array: true,
required: true,
coerce: (
arg, // TODO: we shouldn't need to coerce because yargs already separates values by space
) =>
arg.reduce(
(acc: string[], value: string) => [...acc, ...value.split(',')],
[],
),
group: 'Indexer Infrastructure',
})
.option('indexer-management-port', {
description: 'Port to serve the indexer management API at',
type: 'number',
Expand Down
3 changes: 1 addition & 2 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ export async function run(
argv.graphNodeAdminEndpoint,
argv.graphNodeQueryEndpoint,
argv.graphNodeStatusEndpoint,
argv.indexNodeIds,
)

// --------------------------------------------------------------------------------
Expand Down Expand Up @@ -558,6 +557,7 @@ export async function run(
logger,
graphNodeAdminEndpoint: argv.graphNodeAdminEndpoint,
networkSpecifications,
graphNode: graphNode,
},
storage: new SequelizeStorage({ sequelize }),
logger: console,
Expand Down Expand Up @@ -614,7 +614,6 @@ export async function run(
const indexerManagementClient = await createIndexerManagementClient({
models: managementModels,
graphNode,
indexNodeIDs: argv.indexNodeIds,
logger,
defaults: {
globalIndexingRule: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Logger } from '@graphprotocol/common-ts'
import {
GraphNode,
specification,
SubgraphStatus,
} from '@graphprotocol/indexer-common'
import { QueryInterface } from 'sequelize'

interface MigrationContext {
queryInterface: QueryInterface
logger: Logger
networkSpecifications: specification.NetworkSpecification[]
graphNode: GraphNode
nodeIds: string[]
}

interface Context {
context: MigrationContext
}

export async function up({ context }: Context): Promise<void> {
const { logger, graphNode } = context
logger.info(
'Begin up migration: migrate subgraph deployment assignments to new pause mechanism',
)

const indexNodes = (await graphNode.indexNodes()).filter(
(node: { id: string; deployments: Array<string> }) => {
return node.id && node.id !== 'removed'
},
)
logger.info('Index nodes', {
indexNodes,
})

const targetNode =
indexNodes.sort((nodeA, nodeB) => {
return nodeA.deployments.length - nodeB.deployments.length
})[0]?.id || 'default'

const virtuallyPausedDeploymentAssignments =
await graphNode.subgraphDeploymentsAssignments(SubgraphStatus.PAUSED)

logger.info(
'Reassigning paused subgraphs to valid node_id (targetNode), then pausing',
{
pausedSubgraphs: virtuallyPausedDeploymentAssignments.map(
details => details.id,
),
targetNode,
},
)

for (const deploymentAssignment of virtuallyPausedDeploymentAssignments) {
await graphNode.reassign(deploymentAssignment.id, targetNode)
await graphNode.pause(deploymentAssignment.id)
logger.debug('Successfully reassigned and paused deployment', {
deployment: deploymentAssignment.id.ipfsHash,
})
}
}

export async function down({ context }: Context): Promise<void> {
const { logger, graphNode } = context
logger.info(
'Begin down migration: revert to using virtual subgraph deployment pause mechanism',
)

const pausedDeploymentAssignments =
await graphNode.subgraphDeploymentsAssignments(SubgraphStatus.PAUSED)

logger.info(`Reassigning paused subgraphs to node_id = 'removed'`, {
pausedSubgraphs: pausedDeploymentAssignments.map(details => details.id),
})

for (const deploymentAssignment of pausedDeploymentAssignments) {
await graphNode.reassign(deploymentAssignment.id, 'removed')
logger.debug(`Successfully reassigned deployment to node_id = 'removed'`, {
deployment: deploymentAssignment.id.ipfsHash,
})
}
}
3 changes: 0 additions & 3 deletions packages/indexer-cli/src/__tests__/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,11 @@ export const setup = async (multiNetworksEnabled: boolean) => {
sequelize = await sequelize.sync({ force: true })

const statusEndpoint = 'http://127.0.0.1:8030/graphql'
const indexNodeIDs = ['node_1']
const graphNode = new GraphNode(
logger,
'http://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
statusEndpoint,
indexNodeIDs,
)

const network = await Network.create(
Expand Down Expand Up @@ -122,7 +120,6 @@ export const setup = async (multiNetworksEnabled: boolean) => {
indexerManagementClient = await createIndexerManagementClient({
models,
graphNode,
indexNodeIDs,
logger,
defaults,
multiNetworks,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ const setup = async () => {
'https://test-admin-endpoint.xyz',
'https://test-query-endpoint.xyz',
'https://test-status-endpoint.xyz',
[],
)

const network = await Network.create(
Expand Down
4 changes: 3 additions & 1 deletion packages/indexer-common/src/allocations/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ export const monitorEligibleAllocations = ({
const allocations = [...activeAllocations, ...recentlyClosedAllocations]

if (allocations.length == 0) {
throw new Error(`No data / indexer not found on chain`)
logger.warn(`No data / indexer not found on chain`, {
allocations: [],
})
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
4 changes: 4 additions & 0 deletions packages/indexer-common/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ export enum IndexerErrorCode {
IE073 = 'IE073',
IE074 = 'IE074',
IE075 = 'IE075',
IE076 = 'IE076',
IE077 = 'IE077',
}

export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
Expand Down Expand Up @@ -165,6 +167,8 @@ export const INDEXER_ERROR_MESSAGES: Record<IndexerErrorCode, string> = {
IE073: 'Failed to query subgraph features from indexing statuses endpoint',
IE074: 'Failed to deploy subgraph: network not supported',
IE075: 'Failed to connect to network contracts',
IE076: 'Failed to resume subgraph deployment',
IE077: 'Failed to allocate: subgraph not healthily syncing',
}

export type IndexerErrorCause = unknown
Expand Down
Loading
Loading