From 7734d469bb95fc4a182d97da0630009ed81a5a42 Mon Sep 17 00:00:00 2001 From: hawyar Date: Wed, 5 Oct 2022 23:53:57 -0400 Subject: [PATCH 1/8] Add depoist event --- common/data/src/index.ts | 13 +-- scripts/ethereum/dev | 58 +++++------ services/crawler/src/index.ts | 52 +++++----- services/crawler/src/providers/Ethereum.ts | 80 ++++++++++----- services/crawler/src/providers/Iotex.ts | 110 ++++++++++----------- services/crawler/test/crawler.test.ts | 35 ++++--- 6 files changed, 194 insertions(+), 154 deletions(-) diff --git a/common/data/src/index.ts b/common/data/src/index.ts index 44a87bc68..4f3470b63 100644 --- a/common/data/src/index.ts +++ b/common/data/src/index.ts @@ -35,19 +35,16 @@ export type EventTableColumn = { network: string provider: string type: string + height: number + block: string + transaction: string created_at: string address: string - height: number to_address: string - candidate: string - candidate_list: string[] + validator: string + validator_list: string[] amount: string duration: number auto_stake: boolean - // payload: Record } - -// export type EventTableColumn = { -// [key in keyof typeof eventSchema.properties]: // what goes here? - export { eventSchema, aggSchema } \ No newline at end of file diff --git a/scripts/ethereum/dev b/scripts/ethereum/dev index b1e620555..c56242327 100755 --- a/scripts/ethereum/dev +++ b/scripts/ethereum/dev @@ -3,7 +3,7 @@ # # Example: # -# scripts/ethereum/dev -f +# scripts/ethereum/dev -f -n # # Further information: # See https://hardhat.org/hardhat-network/docs/overview @@ -24,43 +24,45 @@ else fi # Get args -while getopts f: flag +while getopts :f:n: flag do case "${flag}" in f) fork=${OPTARG};; + n) network=${OPTARG};; esac done -# Secret ID is just the name or ARN -ledger_seed_secret_id=consensus-networks-ledger-seed -echo "🤫 Getting $ledger_seed_secret_id for $profile" - -# Get the secret from AWS -ledger_seed=$(aws secretsmanager get-secret-value \ ---secret-id $ledger_seed_secret_id \ ---query SecretString \ ---output text \ ---profile $profile) - -export LEDGER_SEED="$ledger_seed" - -if [ -z "$fork" ]; then - echo "⛓ Running default localnode" - npm run dev:localnode --workspace @casimir/evm -else - # Secret ID is just the name or ARN - alchemy_secret_id=consensus-networks-alchemy-$fork - echo "🤫 Getting $alchemy_secret_id for $profile" - - # Get the secret from AWS - alchemy_key=$(aws secretsmanager get-secret-value \ - --secret-id $alchemy_secret_id \ +# Expose RPC URL directly if network is set to mainnet or testnet +if [ -n "$network" ]; then + # Get the RPC API key from AWS + rpc_secret_id=consensus-networks-ethereum-$network + rpc_key=$(aws secretsmanager get-secret-value \ + --secret-id $rpc_secret_id \ --query SecretString \ --output text \ --profile $profile) - echo "⛓ Running $fork fork localnode" - npm run dev:localnode --workspace @casimir/evm -- --fork https://eth-$fork.alchemyapi.io/v2/$alchemy_key + export PUBLIC_ETHEREUM_RPC="https://eth-$network.g.alchemy.com/v2/$rpc_key" + # export PUBLIC_SSV_ADDRESS="" # Todo get address (deterministic from deployer + tx count) +else + if [ -n "$fork" ]; then + # Get the RPC API key from AWS + rpc_secret_id=consensus-networks-ethereum-$fork + rpc_key=$(aws secretsmanager get-secret-value \ + --secret-id $rpc_secret_id \ + --query SecretString \ + --output text \ + --profile $profile) + + echo "⛓ Setting up ethereum chain with $fork fork" + npm run dev:node --workspace @casimir/evm -- --fork "https://eth-$fork.g.alchemy.com/v2/$rpc_key" + else + echo "⛓ Setting up ethereum chain without fork" + npm run dev:node --workspace @casimir/evm + fi + + # npm run deploy:ssv --workspace @casimir/evm + # export PUBLIC_SSV_ADDRESS="" # Todo get address (deterministic from deployer + tx count) fi diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index 77a715215..5691f1090 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -1,21 +1,24 @@ import { EventTableColumn } from '@casimir/data' -import {IotexNetworkType, IotexService, newIotexService} from './providers/Iotex' -import { EthereumService, newEthereumService } from './providers/Ethereum' +import {IotexNetworkType, IotexService, IotexServiceOptions, newIotexService} from './providers/Iotex' +import {EthereumService, EthereumServiceOptions, newEthereumService} from './providers/Ethereum' import { queryAthena, uploadToS3 } from '@casimir/helpers' export enum Chain { - Iotex = 'iotex', - Ethereum = 'ethereum' + Ethereum = 'ethereum', + Iotex = 'iotex' } export enum Provider { Casimir = 'casimir', } -export const defaultEventBucket = 'casimir-etl-event-bucket-dev' +export const eventOutputBucket = 'casimir-etl-event-bucket-dev' + +export type ServiceConfig = T extends Chain.Iotex ? IotexServiceOptions : T extends Chain.Ethereum ? EthereumServiceOptions : never export interface CrawlerConfig { chain: Chain + serviceOptions: ServiceConfig output?: `s3://${string}` verbose?: boolean } @@ -23,14 +26,16 @@ export interface CrawlerConfig { class Crawler { config: CrawlerConfig service: EthereumService | IotexService | null + serviceConfig: ServiceConfig constructor(config: CrawlerConfig) { this.config = config this.service = null + this.serviceConfig = config.serviceOptions } async setup(): Promise { if (this.config.chain === Chain.Ethereum) { - this.service = await newEthereumService({ url: 'http://localhost:8545'}) + this.service = await newEthereumService({ url: this.serviceConfig.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' }) return } @@ -53,7 +58,6 @@ class Crawler { async start(): Promise { if (this.service instanceof EthereumService) { const lastEvent = await this.getLastProcessedEvent() - const last = lastEvent !== null ? lastEvent.height : 0 const start = parseInt(last.toString()) + 1 @@ -63,19 +67,22 @@ class Crawler { const current = await this.service.getCurrentBlock() - for (let i = start as number; i < current.number; i++) { + for (let i = start; i < 2195; i++) { const { events, blockHash } = await this.service.getEvents(i) - const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n') - await uploadToS3({ - bucket: defaultEventBucket, - key: `${blockHash}-event.json`, - data: ndjson - }).finally(() => { - if (this.config.verbose) { - console.log(`uploaded ${events.length} event at height ${i}`) - } - }) + // const { events, blockHash } = await this.service.getEvents(15676563) } + // const { events, blockHash } = await this.service.getEvents(15676563) + // const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n') + // await uploadToS3({ + // bucket: defaultEventBucket, + // key: `${blockHash}-event.json`, + // data: ndjson + // }).finally(() => { + // if (this.config.verbose) { + // console.log(`uploaded ${events.length} event at height ${i}`) + // } + // }) + // } return } @@ -97,7 +104,7 @@ class Crawler { const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n') await uploadToS3({ - bucket: defaultEventBucket, + bucket: eventOutputBucket, key: `${hash}-event.json`, data: ndjson }).finally(() => { @@ -113,9 +120,10 @@ class Crawler { export async function crawler (config: CrawlerConfig): Promise { const chainCrawler = new Crawler({ - chain: config.chain, - output: config?.output ?? `s3://${defaultEventBucket}`, - verbose: config?.verbose ?? false + chain: config.chain, + serviceOptions: config.serviceOptions, + output: config?.output ?? `s3://${eventOutputBucket}`, + verbose: config?.verbose ?? false }) await chainCrawler.setup() diff --git a/services/crawler/src/providers/Ethereum.ts b/services/crawler/src/providers/Ethereum.ts index ba835e7ac..077a0d350 100644 --- a/services/crawler/src/providers/Ethereum.ts +++ b/services/crawler/src/providers/Ethereum.ts @@ -1,10 +1,17 @@ import { ethers } from 'ethers' import { EventTableColumn } from '@casimir/data' -import {Chain, Provider} from '../index' +import { Chain, Provider } from '../index' + +const BeaconDepositContract = { + '0x00000000219ab540356cBB839Cbe05303d7705Fa': { + abi: ['event DepositEvent (bytes pubkey, bytes withdrawal_credentials, bytes amount, bytes signature, bytes index)'] + } +} export type EthereumServiceOptions = { url: string network?: string + chainId?: number } export class EthereumService { @@ -15,8 +22,23 @@ export class EthereumService { this.chain = Chain.Ethereum this.network = opt.network || 'mainnet' this.provider = new ethers.providers.JsonRpcProvider({ - url: opt.url || 'http://localhost:8545', + url: opt.url, + }) + } + + parseLog(log: ethers.providers.Log): Record { + const abi = BeaconDepositContract[log.address as keyof typeof BeaconDepositContract].abi + const contractInterface = new ethers.utils.Interface(abi) + const parsedLog = contractInterface.parseLog(log) + const args = parsedLog.args.slice(-1 * parsedLog.eventFragment.inputs.length) + + const input: Record = {} + + parsedLog.eventFragment.inputs.forEach((key, index) => { + console.log('Key', key.name) + input[key.name] = args[index] }) + return input } async getEvents(height: number): Promise<{ blockHash: string, events: EventTableColumn[] }> { @@ -40,23 +62,25 @@ export class EthereumService { auto_stake: false, }) - if (block.transactions.length > 0) { - for (const tx of block.transactions) { - events.push({ - chain: this.chain, - network: this.network, - provider: Provider.Casimir, - type: tx.type === 0 ? 'transfer' : 'contract', - created_at: new Date(block.timestamp * 1000).toISOString(), - address: tx.from, - height: block.number, - to_address: tx.to || '', - candidate: '', - candidate_list: [], - duration: 0, - amount: tx.value.toString(), - auto_stake: false, - }) + if (block.transactions.length === 0) { + return { blockHash: block.hash, events } + } + + for await (const tx of block.transactions) { + const receipts = await this.provider.getTransactionReceipt(tx.hash) + + if (receipts.logs.length === 0) { + continue + } + + // check if its a regualr transfer + + for (const log of receipts.logs) { + if (log.address in BeaconDepositContract) { + // const contractInterface = new ethers.utils.Interface(BeaconDepositContract[log.address as keyof typeof BeaconDepositContract].abi) + const parsedLog = this.parseLog(log) + console.log(parsedLog) + } } } return { @@ -69,20 +93,26 @@ export class EthereumService { return await this.provider.getBlock(height) } - async getBlock(num: number): Promise { + async getBlockWithTx(num: number): Promise { return await this.provider.getBlockWithTransactions(num) } - async getTransaction(tx: string): Promise { - return await this.provider.getTransaction(tx) - } - on(event:string, cb: (block: ethers.providers.Block) => void): void { this.provider.on('block', async (blockNumber: number) => { - const block = await this.getBlock(blockNumber) + const block = await this.getBlockWithTx(blockNumber) cb(block) }) } + + async ping(rpcUrl: string): Promise { + const provider = new ethers.providers.JsonRpcProvider(rpcUrl) + try { + const block = await provider.getBlockNumber() + return true + } catch (e) { + return false + } + } } export function newEthereumService (opt: EthereumServiceOptions): EthereumService { diff --git a/services/crawler/src/providers/Iotex.ts b/services/crawler/src/providers/Iotex.ts index cb0f96979..eb55440b7 100644 --- a/services/crawler/src/providers/Iotex.ts +++ b/services/crawler/src/providers/Iotex.ts @@ -34,7 +34,7 @@ export enum IotexActionType { StakeChangeCandidate = 'stakeChangeCandidate', } -export type IotexOptions = Opts & { +export type IotexServiceOptions = Opts & { url: string network: IotexNetworkType } @@ -44,7 +44,7 @@ export class IotexService { network: IotexNetworkType provider: Antenna chainId: number - constructor (opt: IotexOptions) { + constructor (opt: IotexServiceOptions) { this.chain = Chain.Iotex this.network = opt.network || IotexNetworkType.Mainnet this.chainId = IotexNetworkType.Mainnet ? 4689 : 4690 @@ -63,6 +63,58 @@ export class IotexService { return type as IotexActionType } + async getBlocks(start: number, count: number): Promise { + if (start < 0 || count < 0) { + throw new Error('start and count must be greater than 0') + } + + if (start === 0) { + start = 1 + } + + if (count === 0) { + count = 100 + } + + const blocks = await this.provider.iotx.getBlockMetas({ byIndex: { start: start, count: count } }) + + return blocks + } + + async getBlockActions (index: number, count: number): Promise { + const actions = await this.provider.iotx.getActions({ + byIndex: { + start: index, + count: count + } + }) + return actions.actionInfo + } + + async getCurrentBlock(): Promise { + const { chainMeta } = await this.provider.iotx.getChainMeta({ + includePendingActions: false + }) + + const block = await this.provider.iotx.getBlockMetas({ byIndex: { start: parseInt(chainMeta.height), count: 1 } }) + return block + } + + async readableBlockStream (): Promise> { + const stream = await this.provider.iotx.streamBlocks({ + start: 1 + }) + return stream + } + + on(event: string, callback: (data: IStreamBlocksResponse) => void): void { + this.provider.iotx.streamBlocks({ + start: 1 + }).on('data', (data: IStreamBlocksResponse) => { + callback(data) + }) + } + async getEvents(height: number): Promise<{ hash: string, events: EventTableColumn[]}> { const events: EventTableColumn[] = [] @@ -194,59 +246,7 @@ export class IotexService { events } } - - async getBlocks(start: number, count: number): Promise { - if (start < 0 || count < 0) { - throw new Error('start and count must be greater than 0') - } - - if (start === 0) { - start = 1 - } - - if (count === 0) { - count = 100 - } - - const blocks = await this.provider.iotx.getBlockMetas({ byIndex: { start: start, count: count } }) - - return blocks - } - - async getBlockActions (index: number, count: number): Promise { - const actions = await this.provider.iotx.getActions({ - byIndex: { - start: index, - count: count - } - }) - return actions.actionInfo - } - - async getCurrentBlock(): Promise { - const { chainMeta } = await this.provider.iotx.getChainMeta({ - includePendingActions: false - }) - - const block = await this.provider.iotx.getBlockMetas({ byIndex: { start: parseInt(chainMeta.height), count: 1 } }) - return block - } - - async readableBlockStream (): Promise> { - const stream = await this.provider.iotx.streamBlocks({ - start: 1 - }) - return stream - } - - on(event: string, callback: (data: IStreamBlocksResponse) => void): void { - this.provider.iotx.streamBlocks({ - start: 1 - }).on('data', (data: IStreamBlocksResponse) => { - callback(data) - }) - } } -export function newIotexService (opt: IotexOptions): IotexService { +export function newIotexService (opt: IotexServiceOptions): IotexService { return new IotexService(opt) } \ No newline at end of file diff --git a/services/crawler/test/crawler.test.ts b/services/crawler/test/crawler.test.ts index 1f4c106c5..9509a939f 100644 --- a/services/crawler/test/crawler.test.ts +++ b/services/crawler/test/crawler.test.ts @@ -1,26 +1,29 @@ import { crawler } from '../src/index' import { Chain } from '../src/index' -test('init crawler for iotex', async () => { - const iotex = await crawler({ - chain: Chain.Iotex, - verbose: true - }) - await iotex.start() - expect(iotex.service).not.toBe(null) -}) - -jest.setTimeout(1000000) - -// test('init crawler for ethereum', async () => { -// const eth = await crawler({ -// chain: Chain.Ethereum, +// test('init crawler for iotex', async () => { +// const iotex = await crawler({ +// chain: Chain.Iotex, // verbose: true // }) -// await eth.start() -// expect(eth.service).not.toBe(null) +// await iotex.start() +// expect(iotex.service).not.toBe(null) // }) +jest.setTimeout(1000000) + +test('init crawler for ethereum', async () => { + const eth = await crawler({ + chain: Chain.Ethereum, + serviceOptions: { + url: 'https://eth-mainnet.alchemyapi.io/v2/5zDQdGo96cD5RJSDuUcS9HktqFOBJH05' + }, + verbose: true + }) + await eth.start() + expect(eth.service).not.toBe(null) +}) + // test('query athena thru service', async () => { // const supercrawler = await crawler({ // chain: Chain.Ethereum, From 1ff21879eeb5c8538e3477721aad39097591308f Mon Sep 17 00:00:00 2001 From: hawyar Date: Thu, 6 Oct 2022 11:35:01 -0400 Subject: [PATCH 2/8] Clean --- common/data/src/index.ts | 4 +- services/crawler/src/index.ts | 46 +++++++-------- services/crawler/src/providers/Ethereum.ts | 65 +++++++++++++++------- services/crawler/src/providers/Iotex.ts | 52 ++++++++--------- services/crawler/test/crawler.test.ts | 24 ++++---- 5 files changed, 106 insertions(+), 85 deletions(-) diff --git a/common/data/src/index.ts b/common/data/src/index.ts index 4f3470b63..97c57059a 100644 --- a/common/data/src/index.ts +++ b/common/data/src/index.ts @@ -19,7 +19,7 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] { let type: glue.Type = glue.Schema[typeKey] - if (name.endsWith('at')) type = glue.Schema.TIMESTAMP + if (name.endsWith('_at')) type = glue.Schema.TIMESTAMP if (name.endsWith('_list')) type = glue.Schema.array(glue.Schema.STRING) @@ -30,7 +30,7 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] { }) } -export type EventTableColumn = { +export type EventTableSchema = { chain: string network: string provider: string diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index 5691f1090..6eddb0c8b 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -1,7 +1,8 @@ -import { EventTableColumn } from '@casimir/data' +import { EventTableSchema } from '@casimir/data' import {IotexNetworkType, IotexService, IotexServiceOptions, newIotexService} from './providers/Iotex' import {EthereumService, EthereumServiceOptions, newEthereumService} from './providers/Ethereum' import { queryAthena, uploadToS3 } from '@casimir/helpers' +import fs from 'fs' export enum Chain { Ethereum = 'ethereum', @@ -14,11 +15,9 @@ export enum Provider { export const eventOutputBucket = 'casimir-etl-event-bucket-dev' -export type ServiceConfig = T extends Chain.Iotex ? IotexServiceOptions : T extends Chain.Ethereum ? EthereumServiceOptions : never - export interface CrawlerConfig { chain: Chain - serviceOptions: ServiceConfig + options?: IotexServiceOptions | EthereumServiceOptions output?: `s3://${string}` verbose?: boolean } @@ -26,16 +25,14 @@ export interface CrawlerConfig { class Crawler { config: CrawlerConfig service: EthereumService | IotexService | null - serviceConfig: ServiceConfig constructor(config: CrawlerConfig) { this.config = config this.service = null - this.serviceConfig = config.serviceOptions } async setup(): Promise { if (this.config.chain === Chain.Ethereum) { - this.service = await newEthereumService({ url: this.serviceConfig.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' }) + this.service = await newEthereumService({ url: this.config?.options?.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' }) return } @@ -46,7 +43,7 @@ class Crawler { throw new Error('InvalidChain: chain is not supported') } - async getLastProcessedEvent(): Promise { + async getLastProcessedEvent(): Promise { const event = await queryAthena(`SELECT * FROM "casimir_etl_database_dev"."casimir_etl_event_table_dev" where chain = '${this.config.chain}' ORDER BY height DESC limit 1`) if (event !== null && event.length === 1) { @@ -67,22 +64,19 @@ class Crawler { const current = await this.service.getCurrentBlock() - for (let i = start; i < 2195; i++) { - const { events, blockHash } = await this.service.getEvents(i) - // const { events, blockHash } = await this.service.getEvents(15676563) + for (let i = start; i < current.number; i++) { + const {events, blockHash} = await this.service.getEvents(15000000 + i) + const ndjson = events.map((e: EventTableSchema) => JSON.stringify(e)).join('\n') + await uploadToS3({ + bucket: eventOutputBucket, + key: `${blockHash}-event.json`, + data: ndjson + }).finally(() => { + if (this.config.verbose) { + console.log(`uploaded events for block ${blockHash}`) + } + }) } - // const { events, blockHash } = await this.service.getEvents(15676563) - // const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n') - // await uploadToS3({ - // bucket: defaultEventBucket, - // key: `${blockHash}-event.json`, - // data: ndjson - // }).finally(() => { - // if (this.config.verbose) { - // console.log(`uploaded ${events.length} event at height ${i}`) - // } - // }) - // } return } @@ -101,7 +95,7 @@ class Crawler { for (let i = start; i < currentHeight; i++) { const { hash, events } = await this.service.getEvents(i) - const ndjson = events.map((e: EventTableColumn) => JSON.stringify(e)).join('\n') + const ndjson = events.map((e: EventTableSchema) => JSON.stringify(e)).join('\n') await uploadToS3({ bucket: eventOutputBucket, @@ -109,7 +103,7 @@ class Crawler { data: ndjson }).finally(() => { if (this.config.verbose) { - console.log(`uploaded ${events.length} event at height ${i}`) + console.log(`uploaded events for block ${hash}`) } }) } @@ -121,7 +115,7 @@ class Crawler { export async function crawler (config: CrawlerConfig): Promise { const chainCrawler = new Crawler({ chain: config.chain, - serviceOptions: config.serviceOptions, + options: config.options, output: config?.output ?? `s3://${eventOutputBucket}`, verbose: config?.verbose ?? false }) diff --git a/services/crawler/src/providers/Ethereum.ts b/services/crawler/src/providers/Ethereum.ts index 077a0d350..b40707a10 100644 --- a/services/crawler/src/providers/Ethereum.ts +++ b/services/crawler/src/providers/Ethereum.ts @@ -1,5 +1,5 @@ import { ethers } from 'ethers' -import { EventTableColumn } from '@casimir/data' +import { EventTableSchema } from '@casimir/data' import { Chain, Provider } from '../index' const BeaconDepositContract = { @@ -41,8 +41,8 @@ export class EthereumService { return input } - async getEvents(height: number): Promise<{ blockHash: string, events: EventTableColumn[] }> { - const events: EventTableColumn[] = [] + async getEvents(height: number): Promise<{ blockHash: string, events: EventTableSchema[] }> { + const events: EventTableSchema[] = [] const block = await this.provider.getBlockWithTransactions(height) @@ -51,15 +51,17 @@ export class EthereumService { network: this.network, provider: Provider.Casimir, type: 'block', + block: block.hash, + transaction: "", created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: block.miner, height: block.number, - to_address: '', - candidate: '', + to_address: "", + validator: '', duration: 0, - candidate_list: [], + validator_list: [], amount: '0', - auto_stake: false, + auto_stake: false }) if (block.transactions.length === 0) { @@ -67,19 +69,52 @@ export class EthereumService { } for await (const tx of block.transactions) { + events.push({ + chain: this.chain, + network: this.network, + provider: Provider.Casimir, + type: 'transaction', + block: block.hash, + transaction: tx.hash, + created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), + address: tx.from, + to_address: tx.to || '', + height: block.number, + validator: '', + validator_list: [], + duration: 0, + amount: tx.value.toString(), + auto_stake: false + }) + const receipts = await this.provider.getTransactionReceipt(tx.hash) if (receipts.logs.length === 0) { continue } - // check if its a regualr transfer - for (const log of receipts.logs) { if (log.address in BeaconDepositContract) { - // const contractInterface = new ethers.utils.Interface(BeaconDepositContract[log.address as keyof typeof BeaconDepositContract].abi) const parsedLog = this.parseLog(log) console.log(parsedLog) + const logEvent: EventTableSchema = { + chain: this.chain, + network: this.network, + provider: Provider.Casimir, + type: 'deposit', + block: block.hash, + transaction: tx.hash, + created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), + address: log.address, + height: block.number, + to_address: '', + validator: '', + duration: 0, + validator_list: [], + amount: parsedLog.amount.toString(), + auto_stake: false + } + events.push(logEvent) } } } @@ -103,16 +138,6 @@ export class EthereumService { cb(block) }) } - - async ping(rpcUrl: string): Promise { - const provider = new ethers.providers.JsonRpcProvider(rpcUrl) - try { - const block = await provider.getBlockNumber() - return true - } catch (e) { - return false - } - } } export function newEthereumService (opt: EthereumServiceOptions): EthereumService { diff --git a/services/crawler/src/providers/Iotex.ts b/services/crawler/src/providers/Iotex.ts index eb55440b7..a42b0ea11 100644 --- a/services/crawler/src/providers/Iotex.ts +++ b/services/crawler/src/providers/Iotex.ts @@ -7,7 +7,7 @@ import { IStreamBlocksResponse, } from 'iotex-antenna/lib/rpc-method/types' import { Opts } from 'iotex-antenna/lib/antenna' -import { EventTableColumn } from '@casimir/data' +import { EventTableSchema } from '@casimir/data' import {Chain, Provider} from '../index' export enum IotexNetworkType { @@ -115,14 +115,16 @@ export class IotexService { }) } - async getEvents(height: number): Promise<{ hash: string, events: EventTableColumn[]}> { - const events: EventTableColumn[] = [] + async getEvents(height: number): Promise<{ hash: string, events: EventTableSchema[]}> { + const events: EventTableSchema[] = [] const block = await this.provider.iotx.getBlockMetas({byIndex: {start: height, count: 1}}) const blockMeta = block.blkMetas[0] events.push({ + block: blockMeta.hash, + transaction: "", chain: this.chain, network: this.network, provider: Provider.Casimir, @@ -131,11 +133,11 @@ export class IotexService { address: blockMeta.producerAddress, height: blockMeta.height, to_address: '', - candidate: '', + validator: '', duration: 0, - candidate_list: [], + validator_list: [], amount: '0', - auto_stake: false, + auto_stake: false }) const numOfActions = block.blkMetas[0].numActions @@ -150,7 +152,7 @@ export class IotexService { const actionType = this.deduceActionType(action) if (actionType === null) return - const actionEvent: Partial = { + const actionEvent: Partial = { chain: this.chain, network: this.network, provider: Provider.Casimir, @@ -159,9 +161,9 @@ export class IotexService { address: blockMeta.producerAddress, height: blockMeta.height, to_address: '', - candidate: '', + validator: '', duration: 0, - candidate_list: [], + validator_list: [], amount: '0', auto_stake: false, } @@ -169,60 +171,60 @@ export class IotexService { if (actionType === IotexActionType.transfer && actionCore.transfer) { actionEvent.amount = actionCore.transfer.amount actionEvent.to_address = actionCore.transfer.recipient - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.stakeCreate && actionCore.stakeCreate) { actionEvent.amount = actionCore.stakeCreate.stakedAmount - actionEvent.candidate = actionCore.stakeCreate.candidateName + actionEvent.validator = actionCore.stakeCreate.candidateName actionEvent.auto_stake = actionCore.stakeCreate.autoStake actionEvent.duration = actionCore.stakeCreate.stakedDuration - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.stakeAddDeposit && actionCore.stakeAddDeposit) { actionEvent.amount = actionCore.stakeAddDeposit.amount - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.execution && actionCore.execution) { actionEvent.amount = actionCore.execution.amount - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.putPollResult && actionCore.putPollResult) { if (actionCore.putPollResult.candidates) { - actionEvent.candidate_list = actionCore.putPollResult.candidates.candidates.map(c => c.address) + actionEvent.validator_list = actionCore.putPollResult.candidates.candidates.map(c => c.address) } if (actionCore.putPollResult.height) { actionEvent.height = typeof actionCore.putPollResult.height === 'string' ? parseInt(actionCore.putPollResult.height) : actionCore.putPollResult.height } - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.StakeChangeCandidate && actionCore.stakeChangeCandidate) { - actionEvent.candidate = actionCore.stakeChangeCandidate.candidateName - events.push(actionEvent as EventTableColumn) + actionEvent.validator = actionCore.stakeChangeCandidate.candidateName + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.stakeRestake && actionCore.stakeRestake) { actionEvent.duration = actionCore.stakeRestake.stakedDuration actionEvent.auto_stake = actionCore.stakeRestake.autoStake - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.candidateRegister && actionCore.candidateRegister) { actionEvent.amount = actionCore.candidateRegister.stakedAmount actionEvent.duration = actionCore.candidateRegister.stakedDuration actionEvent.auto_stake = actionCore.candidateRegister.autoStake - actionEvent.candidate = actionCore.candidateRegister.candidate.name - events.push(actionEvent as EventTableColumn) + actionEvent.validator = actionCore.candidateRegister.candidate.name + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.candidateUpdate && actionCore.candidateUpdate) { - actionEvent.candidate = actionCore.candidateUpdate.name - events.push(actionEvent as EventTableColumn) + actionEvent.validator = actionCore.candidateUpdate.name + events.push(actionEvent as EventTableSchema) } if (actionType === IotexActionType.claimFromRewardingFund && actionCore.claimFromRewardingFund) { @@ -231,7 +233,7 @@ export class IotexService { if (actionType === IotexActionType.depositToRewardingFund && actionCore.depositToRewardingFund) { actionEvent.amount = actionCore.depositToRewardingFund.amount - events.push(actionEvent as EventTableColumn) + events.push(actionEvent as EventTableSchema) } // if (actionType === IotexActionType.grantReward) {} @@ -239,7 +241,7 @@ export class IotexService { // if (actionType === IotexActionType.stakeWithdraw) {} return actionEvent }) - events.push(...blockActions as EventTableColumn[]) + events.push(...blockActions as EventTableSchema[]) } return { hash: blockMeta.hash, diff --git a/services/crawler/test/crawler.test.ts b/services/crawler/test/crawler.test.ts index 9509a939f..fa4766ed4 100644 --- a/services/crawler/test/crawler.test.ts +++ b/services/crawler/test/crawler.test.ts @@ -1,22 +1,13 @@ import { crawler } from '../src/index' import { Chain } from '../src/index' -// test('init crawler for iotex', async () => { -// const iotex = await crawler({ -// chain: Chain.Iotex, -// verbose: true -// }) -// await iotex.start() -// expect(iotex.service).not.toBe(null) -// }) - jest.setTimeout(1000000) test('init crawler for ethereum', async () => { const eth = await crawler({ chain: Chain.Ethereum, - serviceOptions: { - url: 'https://eth-mainnet.alchemyapi.io/v2/5zDQdGo96cD5RJSDuUcS9HktqFOBJH05' + options: { + url: 'https://eth-mainnet.alchemyapi.io/v2/5zDQdGo96cD5RJSDuUcS9HktqFOBJH05', }, verbose: true }) @@ -24,10 +15,19 @@ test('init crawler for ethereum', async () => { expect(eth.service).not.toBe(null) }) +// test('init crawler for iotex', async () => { +// const iotex = await crawler({ +// chain: Chain.Iotex, +// verbose: true +// }) +// await iotex.start() +// expect(iotex.service).not.toBe(null) +// }) + // test('query athena thru service', async () => { // const supercrawler = await crawler({ // chain: Chain.Ethereum, -// verbose: true +// verbose: true, // }) // // const lastBlock = await supercrawler.getLastProcessedEvent() From 1470cf59692cd897b40528869c5359945f902437 Mon Sep 17 00:00:00 2001 From: hawyar Date: Fri, 7 Oct 2022 14:26:52 -0400 Subject: [PATCH 3/8] More changes --- common/data/src/index.ts | 16 +++++++++ common/helpers/src/index.ts | 6 ++-- services/crawler/src/index.ts | 20 +++++------ services/crawler/src/providers/Ethereum.ts | 42 +++++++++------------- services/crawler/test/crawler.test.ts | 2 +- 5 files changed, 46 insertions(+), 40 deletions(-) diff --git a/common/data/src/index.ts b/common/data/src/index.ts index 97c57059a..7a21ced82 100644 --- a/common/data/src/index.ts +++ b/common/data/src/index.ts @@ -31,20 +31,36 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] { } export type EventTableSchema = { + /** The name of the chain (e.g. iotex, ethereum) */ chain: string + /** The name of the network (e.g. mainnet, testnet) */ network: string + /** The node provider (e.g. casimir, infura, alchemy), */ provider: string + /** The type of event (e.g. block, transaction, deposit) */ type: string + /** The block height */ height: number + /** The block hash */ block: string + /** The transaction hash */ transaction: string + /** The date timestamp of the event in ISO 8601 format (e.g. 2015-03-04T22:44:30.652Z) */ created_at: string + /** The address which initiated the event, a miner in case of block and a caller in case of other events */ address: string + /** The recipient's address */ to_address: string + /** The validator's address */ validator: string + /** The list of validators' addresses */ validator_list: string[] + /** The amount value associated with the transaction */ amount: string + /** The duration of the event */ duration: number + /** Is auto staking enabled */ auto_stake: boolean } + export { eventSchema, aggSchema } \ No newline at end of file diff --git a/common/helpers/src/index.ts b/common/helpers/src/index.ts index 2f7ae52b9..86a09ddc8 100644 --- a/common/helpers/src/index.ts +++ b/common/helpers/src/index.ts @@ -2,7 +2,7 @@ import { S3Client, S3ClientConfig, PutObjectCommand, GetObjectCommand } from '@a import { AthenaClient, AthenaClientConfig } from '@aws-sdk/client-athena' import { defaultProvider } from '@aws-sdk/credential-provider-node' import { StartQueryExecutionCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena' -import { EventTableColumn } from '@casimir/data' +import { EventTableSchema } from '@casimir/data' const defaultQueryOutputBucket = 'casimir-etl-output-bucket-dev' @@ -186,7 +186,7 @@ async function pollAthenaQueryOutput(queryId: string): Promise { * @param query - SQL query to run (make sure the correct permissions are set) * @return string - Query result */ -export async function queryAthena(query: string): Promise { +export async function queryAthena(query: string): Promise { if (!athena) { athena = await newAthenaClient() @@ -225,7 +225,7 @@ export async function queryAthena(query: string): Promise h.trim().replace(/"/g, '')) - const events: EventTableColumn[] = [] + const events: EventTableSchema[] = [] rows.forEach((curr, i) => { const row = curr.split(',') diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index 6eddb0c8b..b0a4f0595 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -66,16 +66,16 @@ class Crawler { for (let i = start; i < current.number; i++) { const {events, blockHash} = await this.service.getEvents(15000000 + i) - const ndjson = events.map((e: EventTableSchema) => JSON.stringify(e)).join('\n') - await uploadToS3({ - bucket: eventOutputBucket, - key: `${blockHash}-event.json`, - data: ndjson - }).finally(() => { - if (this.config.verbose) { - console.log(`uploaded events for block ${blockHash}`) - } - }) + const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') + // await uploadToS3({ + // bucket: eventOutputBucket, + // key: `${blockHash}-event.json`, + // data: ndjson + // }).finally(() => { + // if (this.config.verbose) { + // console.log(`uploaded events for block ${blockHash}`) + // } + // }) } return } diff --git a/services/crawler/src/providers/Ethereum.ts b/services/crawler/src/providers/Ethereum.ts index b40707a10..418e6acb3 100644 --- a/services/crawler/src/providers/Ethereum.ts +++ b/services/crawler/src/providers/Ethereum.ts @@ -41,35 +41,30 @@ export class EthereumService { return input } - async getEvents(height: number): Promise<{ blockHash: string, events: EventTableSchema[] }> { - const events: EventTableSchema[] = [] + async getEvents(height: number): Promise<{ blockHash: string, events: Partial[] }> { + const events: Partial[] = [] const block = await this.provider.getBlockWithTransactions(height) - events.push({ + const blockEvent = { chain: this.chain, network: this.network, provider: Provider.Casimir, type: 'block', block: block.hash, - transaction: "", created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: block.miner, height: block.number, - to_address: "", - validator: '', - duration: 0, - validator_list: [], - amount: '0', - auto_stake: false - }) + } + + events.push(blockEvent) if (block.transactions.length === 0) { return { blockHash: block.hash, events } } for await (const tx of block.transactions) { - events.push({ + const txEvent = { chain: this.chain, network: this.network, provider: Provider.Casimir, @@ -78,14 +73,12 @@ export class EthereumService { transaction: tx.hash, created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: tx.from, - to_address: tx.to || '', + to_address: tx.to, height: block.number, - validator: '', - validator_list: [], - duration: 0, amount: tx.value.toString(), - auto_stake: false - }) + } + + events.push(txEvent) const receipts = await this.provider.getTransactionReceipt(tx.hash) @@ -93,11 +86,11 @@ export class EthereumService { continue } - for (const log of receipts.logs) { + for await (const log of receipts.logs) { if (log.address in BeaconDepositContract) { const parsedLog = this.parseLog(log) - console.log(parsedLog) - const logEvent: EventTableSchema = { + + const logEvent = { chain: this.chain, network: this.network, provider: Provider.Casimir, @@ -107,13 +100,10 @@ export class EthereumService { created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: log.address, height: block.number, - to_address: '', - validator: '', - duration: 0, - validator_list: [], + to_address: tx.to || '', amount: parsedLog.amount.toString(), - auto_stake: false } + events.push(logEvent) } } diff --git a/services/crawler/test/crawler.test.ts b/services/crawler/test/crawler.test.ts index fa4766ed4..87b13436c 100644 --- a/services/crawler/test/crawler.test.ts +++ b/services/crawler/test/crawler.test.ts @@ -7,7 +7,7 @@ test('init crawler for ethereum', async () => { const eth = await crawler({ chain: Chain.Ethereum, options: { - url: 'https://eth-mainnet.alchemyapi.io/v2/5zDQdGo96cD5RJSDuUcS9HktqFOBJH05', + url: 'https://eth-mainnet.alchemyapi.io/v2/5zDQdGo96cD5RJSDuUcS9HktqFOBJH05' }, verbose: true }) From 1f98d14aae097e968e6b649bd6b4b466fb89b9c6 Mon Sep 17 00:00:00 2001 From: hawyar Date: Sat, 8 Oct 2022 12:29:31 -0400 Subject: [PATCH 4/8] Update schema --- common/data/src/index.ts | 18 ++++++++---- common/data/src/schemas/event.schema.json | 24 +++++++++++++--- services/crawler/src/index.ts | 32 ++++++++++++---------- services/crawler/src/providers/Ethereum.ts | 22 ++++++++++----- 4 files changed, 66 insertions(+), 30 deletions(-) diff --git a/common/data/src/index.ts b/common/data/src/index.ts index 7a21ced82..aed5e4ff3 100644 --- a/common/data/src/index.ts +++ b/common/data/src/index.ts @@ -31,11 +31,11 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] { } export type EventTableSchema = { - /** The name of the chain (e.g. iotex, ethereum) */ + /** Name of the chain (e.g. iotex, ethereum) */ chain: string - /** The name of the network (e.g. mainnet, testnet) */ + /** Name of the network (e.g. mainnet, testnet) */ network: string - /** The node provider (e.g. casimir, infura, alchemy), */ + /** "Name of the provider (e.g. casimir, infura, alchemy) */ provider: string /** The type of event (e.g. block, transaction, deposit) */ type: string @@ -51,12 +51,20 @@ export type EventTableSchema = { address: string /** The recipient's address */ to_address: string + /** The amount value associated with the transaction */ + amount: string + /** The total amount of gas used */ + gasUsed: number + /** The gas limit provided by transactions in the block */ + gasLimit: string + /** Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block */ + baseFee: string + /** Post-London Upgrade, this represents the part of the tx fee that is burnt */ + burntFee: string /** The validator's address */ validator: string /** The list of validators' addresses */ validator_list: string[] - /** The amount value associated with the transaction */ - amount: string /** The duration of the event */ duration: number /** Is auto staking enabled */ diff --git a/common/data/src/schemas/event.schema.json b/common/data/src/schemas/event.schema.json index 87a458584..9ba6cad74 100644 --- a/common/data/src/schemas/event.schema.json +++ b/common/data/src/schemas/event.schema.json @@ -44,6 +44,26 @@ "type": "string", "description": "The recipient's address" }, + "amount": { + "type": "string", + "description": "The amount of currency associated with the event" + }, + "gas_used": { + "type": "string", + "description": "The total amount of gas used" + }, + "gas_limit": { + "type": "string", + "description": "The gas limit provided by transactions in the block" + }, + "base_fee": { + "type": "string", + "description": "Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block" + }, + "burnt_fee": { + "type": "string", + "description": "Post-London Upgrade, this represents the part of the tx fee that is burnt" + }, "validator": { "type": "string", "description": "The validator's address" @@ -52,10 +72,6 @@ "type": "array", "description": "The list of validators in stake action" }, - "amount": { - "type": "string", - "description": "The amount of currency associated with the event" - }, "duration":{ "type": "string", "description": "The duration of the event" diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index b0a4f0595..fbf713725 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -54,19 +54,23 @@ class Crawler { async start(): Promise { if (this.service instanceof EthereumService) { - const lastEvent = await this.getLastProcessedEvent() - const last = lastEvent !== null ? lastEvent.height : 0 - const start = parseInt(last.toString()) + 1 - - if (this.config.verbose) { - console.log(`crawling ${this.config.chain} from block ${start}`) - } - - const current = await this.service.getCurrentBlock() - - for (let i = start; i < current.number; i++) { - const {events, blockHash} = await this.service.getEvents(15000000 + i) - const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') + // const lastEvent = await this.getLastProcessedEvent() + // const last = lastEvent !== null ? lastEvent.height : 0 + // const start = parseInt(last.toString()) + 1 + // + // if (this.config.verbose) { + // console.log(`crawling ${this.config.chain} from block ${start}`) + // } + // + // const current = await this.service.getCurrentBlock() + + const { events, blockHash } = await this.service.getEvents(15697244) + const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') + fs.writeFileSync('./test-events.json', ndjson) + + // for (let i = start; i < current.number; i++) { + // const { events, blockHash } = await this.service.getEvents(15697244 + i) + // const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') // await uploadToS3({ // bucket: eventOutputBucket, // key: `${blockHash}-event.json`, @@ -76,7 +80,7 @@ class Crawler { // console.log(`uploaded events for block ${blockHash}`) // } // }) - } + // } return } diff --git a/services/crawler/src/providers/Ethereum.ts b/services/crawler/src/providers/Ethereum.ts index 418e6acb3..7aff865ae 100644 --- a/services/crawler/src/providers/Ethereum.ts +++ b/services/crawler/src/providers/Ethereum.ts @@ -2,8 +2,9 @@ import { ethers } from 'ethers' import { EventTableSchema } from '@casimir/data' import { Chain, Provider } from '../index' -const BeaconDepositContract = { - '0x00000000219ab540356cBB839Cbe05303d7705Fa': { +const ContractsOfInterest = { + BeaconDepositContract: { + hash: '0x00000000219ab540356cBB839Cbe05303d7705Fa', abi: ['event DepositEvent (bytes pubkey, bytes withdrawal_credentials, bytes amount, bytes signature, bytes index)'] } } @@ -27,7 +28,7 @@ export class EthereumService { } parseLog(log: ethers.providers.Log): Record { - const abi = BeaconDepositContract[log.address as keyof typeof BeaconDepositContract].abi + const abi = ContractsOfInterest[log.address as keyof typeof ContractsOfInterest].abi const contractInterface = new ethers.utils.Interface(abi) const parsedLog = contractInterface.parseLog(log) const args = parsedLog.args.slice(-1 * parsedLog.eventFragment.inputs.length) @@ -55,8 +56,14 @@ export class EthereumService { created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: block.miner, height: block.number, + gasFee: block.gasUsed.toString(), + gasLimit: block.gasLimit.toString(), + gasUsed: block.gasUsed.toNumber(), + } + console.log("blockEvent", blockEvent) + events.push(blockEvent) if (block.transactions.length === 0) { @@ -87,23 +94,24 @@ export class EthereumService { } for await (const log of receipts.logs) { - if (log.address in BeaconDepositContract) { + if (log.address in ContractsOfInterest) { const parsedLog = this.parseLog(log) + const value = Buffer.from(parsedLog.amount.slice(2), 'hex').readBigUInt64BE(0).toString() + console.log('Parsed Log', parsedLog) const logEvent = { chain: this.chain, network: this.network, provider: Provider.Casimir, type: 'deposit', block: block.hash, - transaction: tx.hash, + transaction: log.transactionHash, created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: log.address, height: block.number, to_address: tx.to || '', - amount: parsedLog.amount.toString(), + amount: value, } - events.push(logEvent) } } From b68e45f80de6874ba4f734c153d487622c74ccf7 Mon Sep 17 00:00:00 2001 From: hawyar Date: Mon, 10 Oct 2022 12:00:28 -0400 Subject: [PATCH 5/8] Quick fix --- common/data/src/index.ts | 6 ++-- common/data/src/schemas/event.schema.json | 8 ++--- services/crawler/src/index.ts | 4 +-- services/crawler/src/providers/Ethereum.ts | 42 +++++++++++++++------- services/crawler/src/providers/Iotex.ts | 14 ++++---- 5 files changed, 46 insertions(+), 28 deletions(-) diff --git a/common/data/src/index.ts b/common/data/src/index.ts index aed5e4ff3..a09454fdc 100644 --- a/common/data/src/index.ts +++ b/common/data/src/index.ts @@ -56,11 +56,11 @@ export type EventTableSchema = { /** The total amount of gas used */ gasUsed: number /** The gas limit provided by transactions in the block */ - gasLimit: string + gasLimit: number /** Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block */ - baseFee: string + baseFee: number /** Post-London Upgrade, this represents the part of the tx fee that is burnt */ - burntFee: string + burntFee: number /** The validator's address */ validator: string /** The list of validators' addresses */ diff --git a/common/data/src/schemas/event.schema.json b/common/data/src/schemas/event.schema.json index 9ba6cad74..1261aaec6 100644 --- a/common/data/src/schemas/event.schema.json +++ b/common/data/src/schemas/event.schema.json @@ -49,19 +49,19 @@ "description": "The amount of currency associated with the event" }, "gas_used": { - "type": "string", + "type": "bigint", "description": "The total amount of gas used" }, "gas_limit": { - "type": "string", + "type": "bigint", "description": "The gas limit provided by transactions in the block" }, "base_fee": { - "type": "string", + "type": "bigint", "description": "Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block" }, "burnt_fee": { - "type": "string", + "type": "float", "description": "Post-London Upgrade, this represents the part of the tx fee that is burnt" }, "validator": { diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index fbf713725..dd3a35acf 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -66,8 +66,6 @@ class Crawler { const { events, blockHash } = await this.service.getEvents(15697244) const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') - fs.writeFileSync('./test-events.json', ndjson) - // for (let i = start; i < current.number; i++) { // const { events, blockHash } = await this.service.getEvents(15697244 + i) // const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') @@ -99,7 +97,7 @@ class Crawler { for (let i = start; i < currentHeight; i++) { const { hash, events } = await this.service.getEvents(i) - const ndjson = events.map((e: EventTableSchema) => JSON.stringify(e)).join('\n') + const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') await uploadToS3({ bucket: eventOutputBucket, diff --git a/services/crawler/src/providers/Ethereum.ts b/services/crawler/src/providers/Ethereum.ts index 7aff865ae..2a5d6673f 100644 --- a/services/crawler/src/providers/Ethereum.ts +++ b/services/crawler/src/providers/Ethereum.ts @@ -56,14 +56,15 @@ export class EthereumService { created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: block.miner, height: block.number, - gasFee: block.gasUsed.toString(), - gasLimit: block.gasLimit.toString(), gasUsed: block.gasUsed.toNumber(), - + gasLimit: block.gasLimit.toNumber(), + // @ts-ignore + baseFee: block.baseFeePerGas.toNumber(), + // @ts-ignore + burntFee: parseFloat(ethers.utils.formatEther(ethers.BigNumber.from(block.gasUsed).mul(block.baseFeePerGas))), } - console.log("blockEvent", blockEvent) - + console.log('Block Event', blockEvent) events.push(blockEvent) if (block.transactions.length === 0) { @@ -82,10 +83,10 @@ export class EthereumService { address: tx.from, to_address: tx.to, height: block.number, - amount: tx.value.toString(), + amount: ethers.utils.formatEther(tx.value.toString()) } - events.push(txEvent) + console.log('Tx Event', txEvent) const receipts = await this.provider.getTransactionReceipt(tx.hash) @@ -96,10 +97,9 @@ export class EthereumService { for await (const log of receipts.logs) { if (log.address in ContractsOfInterest) { const parsedLog = this.parseLog(log) - const value = Buffer.from(parsedLog.amount.slice(2), 'hex').readBigUInt64BE(0).toString() + // const value = Buffer.from(parsedLog.amount.slice(2), 'hex').readBigUInt64BE(0).toString() - console.log('Parsed Log', parsedLog) - const logEvent = { + const deposit = { chain: this.chain, network: this.network, provider: Provider.Casimir, @@ -110,10 +110,28 @@ export class EthereumService { address: log.address, height: block.number, to_address: tx.to || '', - amount: value, + // amount: value, } - events.push(logEvent) + + console.log('Deposit', deposit) + events.push(deposit) + continue + } + + const logEvent = { + chain: this.chain, + network: this.network, + provider: Provider.Casimir, + type: 'log', + block: block.hash, + transaction: log.transactionHash, + created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), + address: log.address, + height: block.number, + to_address: tx.to || '', + amount: tx.value.toString(), } + events.push(logEvent) } } return { diff --git a/services/crawler/src/providers/Iotex.ts b/services/crawler/src/providers/Iotex.ts index a42b0ea11..f085f69aa 100644 --- a/services/crawler/src/providers/Iotex.ts +++ b/services/crawler/src/providers/Iotex.ts @@ -115,14 +115,14 @@ export class IotexService { }) } - async getEvents(height: number): Promise<{ hash: string, events: EventTableSchema[]}> { - const events: EventTableSchema[] = [] + async getEvents(height: number): Promise<{ hash: string, events: Partial[]}> { + const events: Partial[] = [] const block = await this.provider.iotx.getBlockMetas({byIndex: {start: height, count: 1}}) const blockMeta = block.blkMetas[0] - events.push({ + const blockEvent = { block: blockMeta.hash, transaction: "", chain: this.chain, @@ -136,9 +136,11 @@ export class IotexService { validator: '', duration: 0, validator_list: [], - amount: '0', + amount: 0, auto_stake: false - }) + } + + const numOfActions = block.blkMetas[0].numActions @@ -169,7 +171,7 @@ export class IotexService { } if (actionType === IotexActionType.transfer && actionCore.transfer) { - actionEvent.amount = actionCore.transfer.amount + actionEvent.amount = parseInt(actionCore.transfer.amount).toString() actionEvent.to_address = actionCore.transfer.recipient events.push(actionEvent as EventTableSchema) } From 9bc0ed53012b58d95deb5b1835cd4c7f1f31e9e3 Mon Sep 17 00:00:00 2001 From: hawyar Date: Tue, 11 Oct 2022 13:58:25 -0400 Subject: [PATCH 6/8] Get contract of interest logs --- services/crawler/src/index.ts | 38 +++++++++--------- services/crawler/src/providers/Ethereum.ts | 45 ++++++++-------------- services/crawler/test/crawler.test.ts | 3 -- 3 files changed, 35 insertions(+), 51 deletions(-) diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index dd3a35acf..882f8f511 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -2,7 +2,7 @@ import { EventTableSchema } from '@casimir/data' import {IotexNetworkType, IotexService, IotexServiceOptions, newIotexService} from './providers/Iotex' import {EthereumService, EthereumServiceOptions, newEthereumService} from './providers/Ethereum' import { queryAthena, uploadToS3 } from '@casimir/helpers' -import fs from 'fs' +import * as fs from "fs"; export enum Chain { Ethereum = 'ethereum', @@ -32,7 +32,11 @@ class Crawler { async setup(): Promise { if (this.config.chain === Chain.Ethereum) { - this.service = await newEthereumService({ url: this.config?.options?.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' }) + try { + this.service = await newEthereumService({ url: this.config?.options?.url || process.env.PUBLIC_ETHEREUM_RPC_URL || 'http://localhost:8545' }) + } catch (err) { + throw new Error(`failed to setup ethereum service: ${err}`) + } return } @@ -54,21 +58,19 @@ class Crawler { async start(): Promise { if (this.service instanceof EthereumService) { - // const lastEvent = await this.getLastProcessedEvent() - // const last = lastEvent !== null ? lastEvent.height : 0 - // const start = parseInt(last.toString()) + 1 - // - // if (this.config.verbose) { - // console.log(`crawling ${this.config.chain} from block ${start}`) - // } - // - // const current = await this.service.getCurrentBlock() - - const { events, blockHash } = await this.service.getEvents(15697244) - const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') - // for (let i = start; i < current.number; i++) { - // const { events, blockHash } = await this.service.getEvents(15697244 + i) - // const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') + const lastEvent = await this.getLastProcessedEvent() + const last = lastEvent !== null ? lastEvent.height : 0 + const start = parseInt(last.toString()) + 1 + + if (this.config.verbose) { + console.log(`crawling ${this.config.chain} from block ${start}`) + } + + const current = await this.service.getCurrentBlock() + + for (let i = start; i < current.number; i++) { + const { events, blockHash } = await this.service.getEvents(i) + const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') // await uploadToS3({ // bucket: eventOutputBucket, // key: `${blockHash}-event.json`, @@ -78,7 +80,7 @@ class Crawler { // console.log(`uploaded events for block ${blockHash}`) // } // }) - // } + } return } diff --git a/services/crawler/src/providers/Ethereum.ts b/services/crawler/src/providers/Ethereum.ts index 2a5d6673f..fe6d4f135 100644 --- a/services/crawler/src/providers/Ethereum.ts +++ b/services/crawler/src/providers/Ethereum.ts @@ -36,7 +36,6 @@ export class EthereumService { const input: Record = {} parsedLog.eventFragment.inputs.forEach((key, index) => { - console.log('Key', key.name) input[key.name] = args[index] }) return input @@ -52,19 +51,16 @@ export class EthereumService { network: this.network, provider: Provider.Casimir, type: 'block', + height: block.number, block: block.hash, created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), address: block.miner, - height: block.number, gasUsed: block.gasUsed.toNumber(), gasLimit: block.gasLimit.toNumber(), - // @ts-ignore - baseFee: block.baseFeePerGas.toNumber(), - // @ts-ignore - burntFee: parseFloat(ethers.utils.formatEther(ethers.BigNumber.from(block.gasUsed).mul(block.baseFeePerGas))), + baseFee: block.baseFeePerGas?.toNumber(), + // burntFee: parseFloat(ethers.utils.formatEther(ethers.BigNumber.from(block.gasUsed).mul(block.baseFeePerGas as ethers.BigNumber))), } - console.log('Block Event', blockEvent) events.push(blockEvent) if (block.transactions.length === 0) { @@ -83,10 +79,14 @@ export class EthereumService { address: tx.from, to_address: tx.to, height: block.number, - amount: ethers.utils.formatEther(tx.value.toString()) + amount: ethers.utils.formatEther(tx.value.toString()), + gasUsed: block.gasUsed.toNumber(), + gasLimit: block.gasLimit.toNumber(), + baseFee: block.baseFeePerGas?.toNumber(), + // burntFee: parseFloat(ethers.utils.formatEther(ethers.BigNumber.from(block.gasUsed).mul(block.baseFeePerGas as ethers.BigNumber))), } - console.log('Tx Event', txEvent) + events.push(txEvent) const receipts = await this.provider.getTransactionReceipt(tx.hash) @@ -94,11 +94,9 @@ export class EthereumService { continue } - for await (const log of receipts.logs) { + for (const log of receipts.logs) { if (log.address in ContractsOfInterest) { const parsedLog = this.parseLog(log) - // const value = Buffer.from(parsedLog.amount.slice(2), 'hex').readBigUInt64BE(0).toString() - const deposit = { chain: this.chain, network: this.network, @@ -110,28 +108,15 @@ export class EthereumService { address: log.address, height: block.number, to_address: tx.to || '', - // amount: value, + amount: parsedLog.amount, + gasUsed: block.gasUsed.toNumber(), + gasLimit: block.gasLimit.toNumber(), + baseFee: block.baseFeePerGas?.toNumber(), + // burntFee: parseFloat(ethers.utils.formatEther(ethers.BigNumber.from(block.gasUsed).mul(block.baseFeePerGas as ethers.BigNumber))), } - - console.log('Deposit', deposit) events.push(deposit) continue } - - const logEvent = { - chain: this.chain, - network: this.network, - provider: Provider.Casimir, - type: 'log', - block: block.hash, - transaction: log.transactionHash, - created_at: new Date(block.timestamp * 1000).toISOString().replace('T', ' ').replace('Z', ''), - address: log.address, - height: block.number, - to_address: tx.to || '', - amount: tx.value.toString(), - } - events.push(logEvent) } } return { diff --git a/services/crawler/test/crawler.test.ts b/services/crawler/test/crawler.test.ts index 87b13436c..e2decfc17 100644 --- a/services/crawler/test/crawler.test.ts +++ b/services/crawler/test/crawler.test.ts @@ -6,9 +6,6 @@ jest.setTimeout(1000000) test('init crawler for ethereum', async () => { const eth = await crawler({ chain: Chain.Ethereum, - options: { - url: 'https://eth-mainnet.alchemyapi.io/v2/5zDQdGo96cD5RJSDuUcS9HktqFOBJH05' - }, verbose: true }) await eth.start() From 17c0417303f655e44bede51f578c70e7bbb875c7 Mon Sep 17 00:00:00 2001 From: hawyar Date: Tue, 11 Oct 2022 14:22:36 -0400 Subject: [PATCH 7/8] Clean --- services/crawler/src/index.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/services/crawler/src/index.ts b/services/crawler/src/index.ts index 882f8f511..047b73d55 100644 --- a/services/crawler/src/index.ts +++ b/services/crawler/src/index.ts @@ -71,15 +71,15 @@ class Crawler { for (let i = start; i < current.number; i++) { const { events, blockHash } = await this.service.getEvents(i) const ndjson = events.map((e: Partial) => JSON.stringify(e)).join('\n') - // await uploadToS3({ - // bucket: eventOutputBucket, - // key: `${blockHash}-event.json`, - // data: ndjson - // }).finally(() => { - // if (this.config.verbose) { - // console.log(`uploaded events for block ${blockHash}`) - // } - // }) + await uploadToS3({ + bucket: eventOutputBucket, + key: `${blockHash}-event.json`, + data: ndjson + }).finally(() => { + if (this.config.verbose) { + console.log(`uploaded events for block ${blockHash}`) + } + }) } return } From 20e5c3c205035b7e221cbeda85d874eab2687049 Mon Sep 17 00:00:00 2001 From: hawyar Date: Tue, 11 Oct 2022 16:40:05 -0400 Subject: [PATCH 8/8] Fix gas data types --- common/data/src/index.ts | 2 +- common/data/src/schemas/event.schema.json | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/data/src/index.ts b/common/data/src/index.ts index a09454fdc..2be4b32d5 100644 --- a/common/data/src/index.ts +++ b/common/data/src/index.ts @@ -14,7 +14,7 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] { return Object.keys(jsonSchema.properties).map((name: string) => { const property = jsonSchema.properties[name] - // 'STRING' | 'INTEGER' | 'BOOLEAN' | 'DOUBLE' | 'DECIMAL' | 'BIGINT' | 'TIMESTAMP' | 'JSON' | 'DATE' + // 'STRING' | 'INTEGER' | 'BOOLEAN' | 'DOUBLE' | 'DECIMAL' | 'BIG_INT' | 'TIMESTAMP' | 'JSON' | 'DATE' const typeKey = property.type.toUpperCase() as keyof glue.Schema let type: glue.Type = glue.Schema[typeKey] diff --git a/common/data/src/schemas/event.schema.json b/common/data/src/schemas/event.schema.json index 1261aaec6..081218a18 100644 --- a/common/data/src/schemas/event.schema.json +++ b/common/data/src/schemas/event.schema.json @@ -49,15 +49,15 @@ "description": "The amount of currency associated with the event" }, "gas_used": { - "type": "bigint", + "type": "big_int", "description": "The total amount of gas used" }, "gas_limit": { - "type": "bigint", + "type": "big_int", "description": "The gas limit provided by transactions in the block" }, "base_fee": { - "type": "bigint", + "type": "big_int", "description": "Post-London upgrade this represents the minimum gasUsed multiplier required for a transaction to be included in a block" }, "burnt_fee": {