Skip to content

Commit

Permalink
indexer-agent,-common,-cli: Actions queue
Browse files Browse the repository at this point in the history
- Add action queue models and resolvers to indexer-common
- Add actions module to indexer-cli
- Add actions worker that polls and executes approved action queue
items
  • Loading branch information
fordN committed May 9, 2022
1 parent d109af8 commit 7086518
Show file tree
Hide file tree
Showing 29 changed files with 3,852 additions and 1,096 deletions.
187 changes: 162 additions & 25 deletions packages/indexer-agent/src/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,25 @@ import {
toAddress,
} from '@graphprotocol/common-ts'
import {
Action,
ActionInput,
ActionItem,
ActionStatus,
ActionType,
Allocation,
allocationRewardsPool,
AllocationStatus,
RewardsPool,
BlockPointer,
IndexingDecisionBasis,
indexerError,
IndexerErrorCode,
INDEXING_RULE_GLOBAL,
IndexingDecisionBasis,
IndexingRuleAttributes,
Network,
NetworkSubgraph,
POIDisputeAttributes,
ReceiptCollector,
RewardsPool,
Subgraph,
SubgraphIdentifierType,
} from '@graphprotocol/indexer-common'
Expand All @@ -32,6 +37,8 @@ import { BigNumber, utils } from 'ethers'
import PQueue from 'p-queue'
import pMap from 'p-map'
import pFilter from 'p-filter'
import gql from 'graphql-tag'
import { NetworkMonitor } from '@graphprotocol/indexer-common'

const allocationInList = (
list: Allocation[],
Expand Down Expand Up @@ -125,6 +132,7 @@ class Agent {
metrics: Metrics
indexer: Indexer
network: Network
networkMonitor: NetworkMonitor
networkSubgraph: NetworkSubgraph
allocateOnNetworkSubgraph: boolean
registerIndexer: boolean
Expand All @@ -137,6 +145,7 @@ class Agent {
metrics: Metrics,
indexer: Indexer,
network: Network,
networkMonitor: NetworkMonitor,
networkSubgraph: NetworkSubgraph,
allocateOnNetworkSubgraph: boolean,
registerIndexer: boolean,
Expand All @@ -148,6 +157,7 @@ class Agent {
this.metrics = metrics
this.indexer = indexer
this.network = network
this.networkMonitor = networkMonitor
this.networkSubgraph = networkSubgraph
this.allocateOnNetworkSubgraph = allocateOnNetworkSubgraph
this.registerIndexer = registerIndexer
Expand Down Expand Up @@ -220,7 +230,7 @@ class Agent {
rule => rule.identifierType == SubgraphIdentifierType.SUBGRAPH,
)
.map(rule => rule.identifier!)
const subgraphsMatchingRules = await this.network.subgraphs(
const subgraphsMatchingRules = await this.networkMonitor.subgraphs(
subgraphRuleIds,
)
if (subgraphsMatchingRules.length >= 1) {
Expand Down Expand Up @@ -308,7 +318,7 @@ class Agent {
)

const activeAllocations = timer(120_000).tryMap(
() => this.network.allocations(AllocationStatus.Active),
() => this.networkMonitor.allocations(AllocationStatus.Active),
{
onError: () =>
this.logger.warn(
Expand All @@ -323,7 +333,7 @@ class Agent {
}).tryMap(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
({ activeAllocations, currentEpoch }) =>
this.network.recentlyClosedAllocations(
this.networkMonitor.recentlyClosedAllocations(
currentEpoch.toNumber(),
1, //TODO: Parameterize with a user provided value
),
Expand Down Expand Up @@ -845,6 +855,17 @@ class Agent {
allocation => allocation.createdAtEpoch < epoch,
),
async allocation => {
// Send UnallocateAction to the queue
await this.queueActions([
{
params: {
allocationID: allocation.id,
poi: undefined,
force: false,
},
type: ActionType.UNALLOCATE,
} as ActionItem,
])
await this.closeAllocation(epochStartBlock, allocation)
},
{ concurrency: 1 },
Expand All @@ -861,7 +882,7 @@ class Agent {

// Skip allocating if the previous allocation for this deployment was closed with a null or 0x00 POI
const closedAllocation = (
await this.network.closedAllocations(deployment)
await this.networkMonitor.closedAllocations(deployment)
)[0]
if (
closedAllocation &&
Expand All @@ -877,14 +898,42 @@ class Agent {
return
}

const allocationsCreated = await this.network.allocate(
deployment,
desiredAllocationAmount,
activeAllocations,
)
if (allocationsCreated) {
await this.receiptCollector.rememberAllocations([allocationsCreated])
}
// Send AllocateAction to the queue
const queuedAllocations = await this.queueActions([
{
params: {
deploymentID: deployment.ipfsHash,
amount: desiredAllocationAmount.toString(),
},
type: ActionType.ALLOCATE,
},
])

logger.info('Queued allocate actions', {
queuedActions: queuedAllocations,
})

const created = await this.queueActions([
{
params: {
deploymentID: deployment.ipfsHash,
amount: desiredAllocationAmount.toString(),
poi: undefined,
force: false,
},
type: ActionType.ALLOCATE,
},
])
// const allocationsCreated = await this.network.allocate(
// deployment,
// desiredAllocationAmount,
// activeAllocations,
// )
// // TODO: MOVE TO WORKER
// if (allocationsCreated) {
// await this.receiptCollector.rememberAllocations([allocationsCreated])
// }

return
}

Expand All @@ -909,7 +958,18 @@ class Agent {
.sort((a, b) => a.createdAtEpoch - b.closedAtEpoch)
.splice(0, allocationsToRemove),
async allocation => {
await this.closeAllocation(epochStartBlock, allocation)
// Send UnallocateAction to the queue
await this.queueActions([
{
params: {
allocationID: allocation.id,
poi: undefined,
force: false,
},
type: ActionType.UNALLOCATE,
},
])
// await this.closeAllocation(epochStartBlock, allocation)
},
{ concurrency: 1 },
)
Expand Down Expand Up @@ -954,6 +1014,19 @@ class Agent {
),
})

// Queue reallocate actions to be picked up by the worker
await pMap(expiredAllocations, async allocation => {
await this.queueActions([
{
params: {
allocationID: allocation.id,
amount: desiredAllocationAmount.toString(),
},
type: ActionType.REALLOCATE,
},
])
})

// We do a synchronous for-loop and await each iteration so that we can patch the contents
// of activeAllocations with new allocations as they are made. This is important so that each
// iteration gets an up to date copy of activeAllocations
Expand Down Expand Up @@ -986,6 +1059,47 @@ class Agent {
}
}

private async queueActions(actions: ActionItem[]): Promise<Action> {
const actionInputs = actions.map(action => {
return {
...action.params,
type: action.type,
source: 'indexerAgent',
reason: 'indexingRule',
status: ActionStatus.QUEUED,
priority: 0,
} as ActionInput
})

const actionResult = await this.indexer.indexerManagement
.mutation(
gql`
mutation queueActions($actions: [ActionInput!]!) {
queueActions(actions: $actions) {
id
type
source
reason
priority
status
}
}
`,
{ actions: actionInputs },
)
.toPromise()

if (actionResult.error) {
throw actionResult.error
}

this.logger.info(`Added action to the queue`, {
action: actionResult.data.queueActions,
})

return actionResult.data.queueActions
}

private async closeAllocation(
epochStartBlock: BlockPointer,
allocation: Allocation,
Expand Down Expand Up @@ -1033,14 +1147,24 @@ class Agent {
}

// Close the allocation
const closed = await this.network.close(allocation, poi)
const closed = await this.queueActions([
{
params: {
allocationID: allocation.id,
poi: undefined,
force: false,
},
type: ActionType.UNALLOCATE,
} as ActionItem,
])
// const closed = await this.network.close(allocation, poi)

// Collect query fees for this allocation
const collectingQueryFees = await this.receiptCollector.collectReceipts(
allocation,
)

return { closed, collectingQueryFees }
return { closed: closed !== undefined, collectingQueryFees }
}

private async reallocate(
Expand Down Expand Up @@ -1079,13 +1203,25 @@ class Agent {
}

// closeAndAllocate for the deployment
const newAllocation = await this.network.closeAndAllocate(
existingAllocation,
poi,
existingAllocation.subgraphDeployment.id,
allocationAmount,
activeAllocations,
)
const newAllocation = await this.queueActions([
{
params: {
allocationID: existingAllocation.id,
amount: allocationAmount.toString(),
poi: undefined,
force: false,
},
type: ActionType.REALLOCATE,
} as ActionItem,
])

// const newAllocation = await this.network.closeAndAllocate(
// existingAllocation,
// poi,
// existingAllocation.subgraphDeployment.id,
// allocationAmount,
// activeAllocations,
// )

// Collect query fees for the old allocation
const collectingQueryFees = await this.receiptCollector.collectReceipts(
Expand All @@ -1095,7 +1231,7 @@ class Agent {
return {
reallocated: newAllocation !== undefined,
collectingQueryFees,
newAllocation,
newAllocation: undefined,
}
}
}
Expand All @@ -1106,6 +1242,7 @@ export const startAgent = async (config: AgentConfig): Promise<Agent> => {
config.metrics,
config.indexer,
config.network,
config.networkMonitor,
config.networkSubgraph,
config.allocateOnNetworkSubgraph,
config.registerIndexer,
Expand Down
Loading

0 comments on commit 7086518

Please sign in to comment.