Skip to content

Commit

Permalink
Merge pull request #142 from consensusnetworks/feature/beacon-deposit…
Browse files Browse the repository at this point in the history
…-event

Add beacon deposit event
  • Loading branch information
hawyar authored Oct 11, 2022
2 parents 9a39605 + 467fcc2 commit 100f421
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 171 deletions.
41 changes: 31 additions & 10 deletions common/data/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {
return Object.keys(jsonSchema.properties).map((name: string) => {
const property = jsonSchema.properties[name]

// 'STRING' | 'INTEGER' | 'BOOLEAN' | 'DOUBLE' | 'DECIMAL' | 'BIGINT' | 'TIMESTAMP' | 'JSON' | 'DATE'
// 'STRING' | 'INTEGER' | 'BOOLEAN' | 'DOUBLE' | 'DECIMAL' | 'BIG_INT' | 'TIMESTAMP' | 'JSON' | 'DATE'
const typeKey = property.type.toUpperCase() as keyof glue.Schema

let type: glue.Type = glue.Schema[typeKey]

if (name.endsWith('at')) type = glue.Schema.TIMESTAMP
if (name.endsWith('_at')) type = glue.Schema.TIMESTAMP

if (name.endsWith('_list')) type = glue.Schema.array(glue.Schema.STRING)

Expand All @@ -30,24 +30,45 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {
})
}

export type EventTableColumn = {
export type EventTableSchema = {
/** Name of the chain (e.g. iotex, ethereum) */
chain: string
/** Name of the network (e.g. mainnet, testnet) */
network: string
/** "Name of the provider (e.g. casimir, infura, alchemy) */
provider: string
/** The type of event (e.g. block, transaction, deposit) */
type: string
/** The block height */
height: number
/** The block hash */
block: string
/** The transaction hash */
transaction: string
/** The date timestamp of the event in ISO 8601 format (e.g. 2015-03-04T22:44:30.652Z) */
created_at: string
/** The address which initiated the event, a miner in case of block and a caller in case of other events */
address: string
height: number
/** The recipient's address */
to_address: string
candidate: string
candidate_list: string[]
/** The amount value associated with the transaction */
amount: string
/** The total amount of gas used */
gasUsed: number
/** The gas limit provided by transactions in the block */
gasLimit: number
/** Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block */
baseFee: number
/** Post-London Upgrade, this represents the part of the tx fee that is burnt */
burntFee: number
/** The validator's address */
validator: string
/** The list of validators' addresses */
validator_list: string[]
/** The duration of the event */
duration: number
/** Is auto staking enabled */
auto_stake: boolean
// payload: Record<string, unknown>
}

// export type EventTableColumn = {
// [key in keyof typeof eventSchema.properties]: // what goes here?

export { eventSchema, aggSchema }
24 changes: 20 additions & 4 deletions common/data/src/schemas/event.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,26 @@
"type": "string",
"description": "The recipient's address"
},
"amount": {
"type": "string",
"description": "The amount of currency associated with the event"
},
"gas_used": {
"type": "big_int",
"description": "The total amount of gas used"
},
"gas_limit": {
"type": "big_int",
"description": "The gas limit provided by transactions in the block"
},
"base_fee": {
"type": "big_int",
"description": "Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block"
},
"burnt_fee": {
"type": "float",
"description": "Post-London Upgrade, this represents the part of the tx fee that is burnt"
},
"validator": {
"type": "string",
"description": "The validator's address"
Expand All @@ -52,10 +72,6 @@
"type": "array",
"description": "The list of validators in stake action"
},
"amount": {
"type": "string",
"description": "The amount of currency associated with the event"
},
"duration":{
"type": "string",
"description": "The duration of the event"
Expand Down
6 changes: 3 additions & 3 deletions common/helpers/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { S3Client, S3ClientConfig, PutObjectCommand, GetObjectCommand } from '@a
import { AthenaClient, AthenaClientConfig } from '@aws-sdk/client-athena'
import { defaultProvider } from '@aws-sdk/credential-provider-node'
import { StartQueryExecutionCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena'
import { EventTableColumn } from '@casimir/data'
import { EventTableSchema } from '@casimir/data'

const defaultQueryOutputBucket = 'casimir-etl-output-bucket-dev'

Expand Down Expand Up @@ -186,7 +186,7 @@ async function pollAthenaQueryOutput(queryId: string): Promise<void> {
* @param query - SQL query to run (make sure the correct permissions are set)
* @return string - Query result
*/
export async function queryAthena(query: string): Promise<EventTableColumn[] | null> {
export async function queryAthena(query: string): Promise<EventTableSchema[] | null> {

if (!athena) {
athena = await newAthenaClient()
Expand Down Expand Up @@ -225,7 +225,7 @@ export async function queryAthena(query: string): Promise<EventTableColumn[] | n

const header = rows.splice(0, 1)[0].split(',').map((h: string) => h.trim().replace(/"/g, ''))

const events: EventTableColumn[] = []
const events: EventTableSchema[] = []

rows.forEach((curr, i) => {
const row = curr.split(',')
Expand Down
3 changes: 2 additions & 1 deletion scripts/ethereum/dev
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
# Example:
#
# scripts/ethereum/dev -f <fork-name>
# scripts/ethereum/dev -f <fork-name> -n <network-name>
#
# Further information:
# See https://hardhat.org/hardhat-network/docs/overview
Expand All @@ -28,6 +28,7 @@ while getopts :f: flag
do
case "${flag}" in
f) fork=${OPTARG};;
n) network=${OPTARG};;
esac
done

Expand Down
44 changes: 25 additions & 19 deletions services/crawler/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import { EventTableColumn } from '@casimir/data'
import {IotexNetworkType, IotexService, newIotexService} from './providers/Iotex'
import { EthereumService, newEthereumService } from './providers/Ethereum'
import { EventTableSchema } from '@casimir/data'
import {IotexNetworkType, IotexService, IotexServiceOptions, newIotexService} from './providers/Iotex'
import {EthereumService, EthereumServiceOptions, newEthereumService} from './providers/Ethereum'
import { queryAthena, uploadToS3 } from '@casimir/helpers'
import * as fs from "fs";

export enum Chain {
Iotex = 'iotex',
Ethereum = 'ethereum'
Ethereum = 'ethereum',
Iotex = 'iotex'
}

export enum Provider {
Casimir = 'casimir',
}

export const defaultEventBucket = 'casimir-etl-event-bucket-dev'
export const eventOutputBucket = 'casimir-etl-event-bucket-dev'

export interface CrawlerConfig {
chain: Chain
options?: IotexServiceOptions | EthereumServiceOptions
output?: `s3://${string}`
verbose?: boolean
}
Expand All @@ -30,7 +32,11 @@ class Crawler {

async setup(): Promise<void> {
if (this.config.chain === Chain.Ethereum) {
this.service = await newEthereumService({ url: 'http://localhost:8545'})
try {
this.service = await newEthereumService({ url: this.config?.options?.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' })
} catch (err) {
throw new Error(`failed to setup ethereum service: ${err}`)
}
return
}

Expand All @@ -41,7 +47,7 @@ class Crawler {
throw new Error('InvalidChain: chain is not supported')
}

async getLastProcessedEvent(): Promise<EventTableColumn | null> {
async getLastProcessedEvent(): Promise<EventTableSchema | null> {
const event = await queryAthena(`SELECT * FROM "casimir_etl_database_dev"."casimir_etl_event_table_dev" where chain = '${this.config.chain}' ORDER BY height DESC limit 1`)

if (event !== null && event.length === 1) {
Expand All @@ -53,7 +59,6 @@ class Crawler {
async start(): Promise<void> {
if (this.service instanceof EthereumService) {
const lastEvent = await this.getLastProcessedEvent()

const last = lastEvent !== null ? lastEvent.height : 0
const start = parseInt(last.toString()) + 1

Expand All @@ -63,16 +68,16 @@ class Crawler {

const current = await this.service.getCurrentBlock()

for (let i = start as number; i < current.number; i++) {
for (let i = start; i < current.number; i++) {
const { events, blockHash } = await this.service.getEvents(i)
const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n')
const ndjson = events.map((e: Partial<EventTableSchema>) => JSON.stringify(e)).join('\n')
await uploadToS3({
bucket: defaultEventBucket,
bucket: eventOutputBucket,
key: `${blockHash}-event.json`,
data: ndjson
}).finally(() => {
if (this.config.verbose) {
console.log(`uploaded ${events.length} event at height ${i}`)
console.log(`uploaded events for block ${blockHash}`)
}
})
}
Expand All @@ -94,15 +99,15 @@ class Crawler {

for (let i = start; i < currentHeight; i++) {
const { hash, events } = await this.service.getEvents(i)
const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n')
const ndjson = events.map((e: Partial<EventTableSchema>) => JSON.stringify(e)).join('\n')

await uploadToS3({
bucket: defaultEventBucket,
bucket: eventOutputBucket,
key: `${hash}-event.json`,
data: ndjson
}).finally(() => {
if (this.config.verbose) {
console.log(`uploaded ${events.length} event at height ${i}`)
console.log(`uploaded events for block ${hash}`)
}
})
}
Expand All @@ -113,9 +118,10 @@ class Crawler {

export async function crawler (config: CrawlerConfig): Promise<Crawler> {
const chainCrawler = new Crawler({
chain: config.chain,
output: config?.output ?? `s3://${defaultEventBucket}`,
verbose: config?.verbose ?? false
chain: config.chain,
options: config.options,
output: config?.output ?? `s3://${eventOutputBucket}`,
verbose: config?.verbose ?? false
})

await chainCrawler.setup()
Expand Down
Loading

0 comments on commit 100f421

Please sign in to comment.