Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-service detailed query logs #428

Merged
merged 2 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/indexer-service/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ export default {
default: 'debug',
group: 'Indexer Infrastructure',
})
.option('query-timing-logs', {
description: 'Log time spent on each query received',
type: 'boolean',
default: false,
required: false,
group: 'Indexer Infrastructure',
})
.option('vector-node', {
description: 'URL of a vector node',
type: 'string',
Expand Down Expand Up @@ -436,6 +443,7 @@ export default {
metrics,
receiptManager,
signers,
queryTimingLogs: argv.queryTimingLogs,
})

const indexerManagementClient = await createIndexerManagementClient({
Expand Down
56 changes: 53 additions & 3 deletions packages/indexer-service/src/queries.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import axios, { AxiosInstance, AxiosResponse } from 'axios'
import axios, { AxiosInstance, AxiosResponse, AxiosRequestConfig } from 'axios'

import { Logger, Metrics, Eventual } from '@graphprotocol/common-ts'
import {
Expand All @@ -18,6 +18,16 @@ export interface PaidQueryProcessorOptions {
graphNode: string
signers: Eventual<AttestationSignerMap>
receiptManager: ReceiptManager
queryTimingLogs: boolean
}

interface AxiosRequestConfigWithTime extends AxiosRequestConfig {
meta?: { requestStartedAt?: number }
}

interface AxiosResponseWithTime extends AxiosResponse {
responseTime?: number
config: AxiosRequestConfigWithTime
}

export class QueryProcessor implements QueryProcessorInterface {
Expand All @@ -26,15 +36,18 @@ export class QueryProcessor implements QueryProcessorInterface {
graphNode: AxiosInstance
signers: Eventual<AttestationSignerMap>
receiptManager: ReceiptManager
queryTimingLogs: boolean

constructor({
logger,
metrics,
graphNode,
receiptManager,
signers,
queryTimingLogs,
}: PaidQueryProcessorOptions) {
this.logger = logger
this.queryTimingLogs = queryTimingLogs
this.metrics = metrics
this.signers = signers
this.graphNode = axios.create({
Expand All @@ -51,6 +64,34 @@ export class QueryProcessor implements QueryProcessorInterface {
// Don't throw on bad responses
validateStatus: () => true,
})

if (this.queryTimingLogs) {
// Set up Axios for request response time measurement
// https://sabljakovich.medium.com/axios-response-time-capture-and-log-8ff54a02275d
this.graphNode.interceptors.request.use(function (x: AxiosRequestConfigWithTime) {
// to avoid overwriting if another interceptor
// already defined the same object (meta)
x.meta = x.meta || {}
x.meta.requestStartedAt = new Date().getTime()
return x
})
this.graphNode.interceptors.response.use(
function (x: AxiosResponseWithTime) {
if (x.config.meta?.requestStartedAt !== undefined) {
x.responseTime = new Date().getTime() - x.config.meta?.requestStartedAt
}
return x
},
// Handle 4xx & 5xx responses
function (x: AxiosResponseWithTime) {
if (x.config.meta?.requestStartedAt !== undefined) {
x.responseTime = new Date().getTime() - x.config.meta.requestStartedAt
}
throw x
},
)
}

this.receiptManager = receiptManager
}

Expand Down Expand Up @@ -79,10 +120,10 @@ export class QueryProcessor implements QueryProcessorInterface {
receipt,
})

const allocationID = await this.receiptManager.add(receipt)
const parsedReceipt = await this.receiptManager.add(receipt)

// Look up or derive a signer for the attestation for this query
const signer = (await this.signers.value()).get(allocationID)
const signer = (await this.signers.value()).get(parsedReceipt.allocation)

// Fail query outright if we have no signer for this attestation
if (signer === undefined) {
Expand All @@ -108,6 +149,15 @@ export class QueryProcessor implements QueryProcessorInterface {
attestation = await signer.createAttestation(query, response.data)
}

if (this.queryTimingLogs) {
this.logger.info('Done executing paid query', {
deployment: subgraphDeploymentID.ipfsHash,
fees: parsedReceipt.fees.toBigInt().toString(),
query: query,
responseTime: (response as AxiosResponseWithTime).responseTime ?? null,
})
}

return {
status: 200,
result: {
Expand Down
10 changes: 7 additions & 3 deletions packages/indexer-service/src/query-fees/allocations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ export class AllocationReceiptManager implements ReceiptManager {
}

// Saves the receipt and returns the allocation for signing
async add(receiptData: string): Promise<Address> {
async add(receiptData: string): Promise<{
id: string
allocation: Address
fees: BigNumber
}> {
// Security: Input validation
if (!allocationReceiptValidator.test(receiptData)) {
throw indexerError(IndexerErrorCode.IE031, 'Expecting 264 hex characters')
Expand All @@ -93,7 +97,7 @@ export class AllocationReceiptManager implements ReceiptManager {

// If the fee is 0, validate verifier and return allocation ID for the signer
if (receipt.fees.isZero()) {
return receipt.allocation
return receipt
}

this._queue({
Expand All @@ -103,7 +107,7 @@ export class AllocationReceiptManager implements ReceiptManager {
signature,
})

return receipt.allocation
return receipt
}

/// Flushes all receipts that have been registered by this moment in time.
Expand Down
7 changes: 6 additions & 1 deletion packages/indexer-service/src/query-fees/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import { Address } from '@graphprotocol/common-ts'
import { BigNumber } from 'ethers'

export * from './allocations'

export interface ReceiptManager {
// Saves the query fees and returns the allocation for signing
add(receiptData: string): Promise<Address>
add(receiptData: string): Promise<{
id: string
allocation: Address
fees: BigNumber
}>
}