Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add beacon deposit event #142

Merged
merged 10 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions common/data/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {

let type: glue.Type = glue.Schema[typeKey]

if (name.endsWith('at')) type = glue.Schema.TIMESTAMP
if (name.endsWith('_at')) type = glue.Schema.TIMESTAMP

if (name.endsWith('_list')) type = glue.Schema.array(glue.Schema.STRING)

Expand All @@ -30,24 +30,45 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {
})
}

export type EventTableColumn = {
export type EventTableSchema = {
/** Name of the chain (e.g. iotex, ethereum) */
chain: string
/** Name of the network (e.g. mainnet, testnet) */
network: string
/** "Name of the provider (e.g. casimir, infura, alchemy) */
provider: string
/** The type of event (e.g. block, transaction, deposit) */
type: string
/** The block height */
height: number
/** The block hash */
block: string
/** The transaction hash */
transaction: string
/** The date timestamp of the event in ISO 8601 format (e.g. 2015-03-04T22:44:30.652Z) */
created_at: string
/** The address which initiated the event, a miner in case of block and a caller in case of other events */
address: string
height: number
/** The recipient's address */
to_address: string
candidate: string
candidate_list: string[]
/** The amount value associated with the transaction */
amount: string
/** The total amount of gas used */
gasUsed: number
/** The gas limit provided by transactions in the block */
gasLimit: number
/** Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block */
baseFee: number
/** Post-London Upgrade, this represents the part of the tx fee that is burnt */
burntFee: number
/** The validator's address */
validator: string
/** The list of validators' addresses */
validator_list: string[]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts making most of these optional and then including them only when needed? When trying to make sense of the data for querying, it may be clearer to make columns don't apply to a particular row empty (as in the column is not used for that particular event type). For the specific example of validator_list, there would be a conventional difference between an empty array and a null value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I updated the schema for both.

/** The duration of the event */
duration: number
/** Is auto staking enabled */
auto_stake: boolean
// payload: Record<string, unknown>
}

// export type EventTableColumn = {
// [key in keyof typeof eventSchema.properties]: // what goes here?

export { eventSchema, aggSchema }
24 changes: 20 additions & 4 deletions common/data/src/schemas/event.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@
"type": "string",
"description": "The recipient's address"
},
"amount": {
"type": "string",
"description": "The amount of currency associated with the event"
},
"gas_used": {
"type": "bigint",
"description": "The total amount of gas used"
},
"gas_limit": {
"type": "bigint",
"description": "The gas limit provided by transactions in the block"
},
"base_fee": {
"type": "bigint",
"description": "Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block"
},
"burnt_fee": {
"type": "float",
"description": "Post-London Upgrade, this represents the part of the tx fee that is burnt"
},
"validator": {
"type": "string",
"description": "The validator's address"
Expand All @@ -52,10 +72,6 @@
"type": "array",
"description": "The list of validators in stake action"
},
"amount": {
"type": "string",
"description": "The amount of currency associated with the event"
},
"duration":{
"type": "string",
"description": "The duration of the event"
Expand Down
6 changes: 3 additions & 3 deletions common/helpers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { S3Client, S3ClientConfig, PutObjectCommand, GetObjectCommand } from '@a
import { AthenaClient, AthenaClientConfig } from '@aws-sdk/client-athena'
import { defaultProvider } from '@aws-sdk/credential-provider-node'
import { StartQueryExecutionCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena'
import { EventTableColumn } from '@casimir/data'
import { EventTableSchema } from '@casimir/data'

const defaultQueryOutputBucket = 'casimir-etl-output-bucket-dev'

Expand Down Expand Up @@ -186,7 +186,7 @@ async function pollAthenaQueryOutput(queryId: string): Promise<void> {
* @param query - SQL query to run (make sure the correct permissions are set)
* @return string - Query result
*/
export async function queryAthena(query: string): Promise<EventTableColumn[] | null> {
export async function queryAthena(query: string): Promise<EventTableSchema[] | null> {

if (!athena) {
athena = await newAthenaClient()
Expand Down Expand Up @@ -225,7 +225,7 @@ export async function queryAthena(query: string): Promise<EventTableColumn[] | n

const header = rows.splice(0, 1)[0].split(',').map((h: string) => h.trim().replace(/"/g, ''))

const events: EventTableColumn[] = []
const events: EventTableSchema[] = []

rows.forEach((curr, i) => {
const row = curr.split(',')
Expand Down
3 changes: 2 additions & 1 deletion scripts/ethereum/dev
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# Example:
#
# scripts/ethereum/dev -f <fork-name>
# scripts/ethereum/dev -f <fork-name> -n <network-name>
#
# Further information:
# See https://hardhat.org/hardhat-network/docs/overview
Expand All @@ -28,6 +28,7 @@ while getopts :f: flag
do
case "${flag}" in
f) fork=${OPTARG};;
n) network=${OPTARG};;
esac
done

Expand Down
44 changes: 25 additions & 19 deletions services/crawler/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import { EventTableColumn } from '@casimir/data'
import {IotexNetworkType, IotexService, newIotexService} from './providers/Iotex'
import { EthereumService, newEthereumService } from './providers/Ethereum'
import { EventTableSchema } from '@casimir/data'
import {IotexNetworkType, IotexService, IotexServiceOptions, newIotexService} from './providers/Iotex'
import {EthereumService, EthereumServiceOptions, newEthereumService} from './providers/Ethereum'
import { queryAthena, uploadToS3 } from '@casimir/helpers'
import * as fs from "fs";

export enum Chain {
Iotex = 'iotex',
Ethereum = 'ethereum'
Ethereum = 'ethereum',
Iotex = 'iotex'
}

export enum Provider {
Casimir = 'casimir',
}

export const defaultEventBucket = 'casimir-etl-event-bucket-dev'
export const eventOutputBucket = 'casimir-etl-event-bucket-dev'

export interface CrawlerConfig {
chain: Chain
options?: IotexServiceOptions | EthereumServiceOptions
output?: `s3://${string}`
verbose?: boolean
}
Expand All @@ -30,7 +32,11 @@ class Crawler {

async setup(): Promise<void> {
if (this.config.chain === Chain.Ethereum) {
this.service = await newEthereumService({ url: 'http://localhost:8545'})
try {
this.service = await newEthereumService({ url: this.config?.options?.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' })
} catch (err) {
throw new Error(`failed to setup ethereum service: ${err}`)
}
return
}

Expand All @@ -41,7 +47,7 @@ class Crawler {
throw new Error('InvalidChain: chain is not supported')
}

async getLastProcessedEvent(): Promise<EventTableColumn | null> {
async getLastProcessedEvent(): Promise<EventTableSchema | null> {
const event = await queryAthena(`SELECT * FROM "casimir_etl_database_dev"."casimir_etl_event_table_dev" where chain = '${this.config.chain}' ORDER BY height DESC limit 1`)

if (event !== null && event.length === 1) {
Expand All @@ -53,7 +59,6 @@ class Crawler {
async start(): Promise<void> {
if (this.service instanceof EthereumService) {
const lastEvent = await this.getLastProcessedEvent()

const last = lastEvent !== null ? lastEvent.height : 0
const start = parseInt(last.toString()) + 1

Expand All @@ -63,16 +68,16 @@ class Crawler {

const current = await this.service.getCurrentBlock()

for (let i = start as number; i < current.number; i++) {
for (let i = start; i < current.number; i++) {
const { events, blockHash } = await this.service.getEvents(i)
const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n')
const ndjson = events.map((e: Partial<EventTableSchema>) => JSON.stringify(e)).join('\n')
await uploadToS3({
bucket: defaultEventBucket,
bucket: eventOutputBucket,
key: `${blockHash}-event.json`,
data: ndjson
}).finally(() => {
if (this.config.verbose) {
console.log(`uploaded ${events.length} event at height ${i}`)
console.log(`uploaded events for block ${blockHash}`)
}
})
}
Expand All @@ -94,15 +99,15 @@ class Crawler {

for (let i = start; i < currentHeight; i++) {
const { hash, events } = await this.service.getEvents(i)
const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n')
const ndjson = events.map((e: Partial<EventTableSchema>) => JSON.stringify(e)).join('\n')

await uploadToS3({
bucket: defaultEventBucket,
bucket: eventOutputBucket,
key: `${hash}-event.json`,
data: ndjson
}).finally(() => {
if (this.config.verbose) {
console.log(`uploaded ${events.length} event at height ${i}`)
console.log(`uploaded events for block ${hash}`)
}
})
}
Expand All @@ -113,9 +118,10 @@ class Crawler {

export async function crawler (config: CrawlerConfig): Promise<Crawler> {
const chainCrawler = new Crawler({
chain: config.chain,
output: config?.output ?? `s3://${defaultEventBucket}`,
verbose: config?.verbose ?? false
chain: config.chain,
options: config.options,
output: config?.output ?? `s3://${eventOutputBucket}`,
verbose: config?.verbose ?? false
})

await chainCrawler.setup()
Expand Down
Loading