Skip to content

Commit

Permalink
Merge pull request #119 from consensusnetworks/feature/iotex-etl
Browse files Browse the repository at this point in the history
Retrieve last block & schema change
  • Loading branch information
hawyar authored Sep 13, 2022
2 parents 9c5c5bd + 6917c1a commit 2566f37
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 48 deletions.
5 changes: 4 additions & 1 deletion common/data/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {

let type: glue.Type = glue.Schema[typeKey]

if (name.endsWith('_at')) type = glue.Schema.DATE
if (name.endsWith('at')) type = glue.Schema.DATE

if (name === 'candidate_list') type = glue.Schema.array(glue.Schema.STRING)

if (name.endsWith('amount')) type = glue.Schema.BIG_INT

const comment = property.description
return { name, type, comment }
})
Expand Down
2 changes: 1 addition & 1 deletion common/data/src/schemas/event.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"description": "The list of candidates in a stake action"
},
"amount": {
"type": "integer",
"type": "string",
"description": "The amount of the currency in the event"
},
"duration":{
Expand Down
79 changes: 47 additions & 32 deletions services/crawler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const queryOutputLocation = 's3://cms-lds-agg/cms_hcf_aggregates'
const EE = new EventEmitter()

let s3: S3Client | null = null
const athena: AthenaClient | null = null
let athena: AthenaClient | null = null

export enum Chain {
Iotex = 'iotex',
Expand Down Expand Up @@ -61,7 +61,6 @@ class Crawler {
})

if ($metadata.httpStatusCode !== 200) throw new Error('FailedUploadBlock: unable to upload block')
console.log(`Uploaded ${key}`)
}

async prepare (): Promise<void> {
Expand All @@ -88,55 +87,63 @@ class Crawler {

if (s3 === null) s3 = await newS3Client()


if (this.service instanceof IotexService) {

const { chainMeta } = await this.service.getChainMetadata()
const height = parseInt(chainMeta.height)
const trips = Math.ceil(height / 1000)
const blocksPerRequest = 1000

for (let i = 0; i < trips; i++) {
console.log(`Starting trip ${i + 1} of ${trips}`)
const { blkMetas: blocks } = await this.service.getBlocks(12000000 , 1000)
if (blocks.length === 0) continue
const lastBlock = await this.retrieveLastBlock()

const start = lastBlock === 0 ? 0 : lastBlock + 1
const trips = Math.ceil(height / blocksPerRequest)

for (let i = start; i < trips; i++) {
const { blkMetas: blocks } = await this.service.getBlocks(i, blocksPerRequest)

if (blocks.length === 0) continue

for await (const block of blocks) {
let events: EventTableColumn[] = []
const actions = await this.service.getBlockActions(block.height, block.numActions)
const actions = await this.service.getBlockActions(block.height, block.numActions)

if (actions.length === 0 || actions[0].action.core === undefined) continue

for await (const action of actions) {
const core = action.action.core

if (core === undefined) continue

const type = Object.keys(core).filter(k => k !== undefined)[Object.keys(core).length - 2]

const event = this.service.convertToGlueSchema({ type, block, action})
events.push(event)
}
const ndjson = events.map(a => JSON.stringify(a)).join('\n')
const key = `${block.hash}-events.json`
await this.upload(key, ndjson)
events = []

const ndjson = events.map(a => JSON.stringify(a)).join('\n')
events.forEach(e => console.log(e.height + ' ' + e.address + ' ' + e.type))
const key = `${block.hash}-events.json`
await this.upload(key, ndjson)
events = []
}
}
return
}
throw new Error('not implemented yet')
}

async retrieveLastBlock(): Promise<void> {
async retrieveLastBlock(): Promise<number> {
if (this.athenaClient === null) this.athenaClient = await newAthenaClient()

const execCmd = new StartQueryExecutionCommand({
QueryString: 'SELECT * FROM "casimir_etl_database_dev"."casimir_etl_event_table_dev" LIMIT 1',
QueryString: 'SELECT height FROM "casimir_etl_database_dev"."casimir_etl_event_table_dev" ORDER BY height DESC LIMIT 1',
WorkGroup: 'primary',
ResultConfiguration: {
OutputLocation: queryOutputLocation,
}
})

const res = await this.athenaClient.send(execCmd)

if (res.$metadata.httpStatusCode !== 200) {
throw new Error('FailedQuery: unable to query Athena')
}
Expand Down Expand Up @@ -172,6 +179,7 @@ class Crawler {
if (this.athenaClient === null) throw new Error('NullAthenaClient: athena client is not initialized')

const getStateRes = await this.athenaClient.send(getStateCmd)

if (getStateRes.$metadata.httpStatusCode !== 200) throw new Error('FailedQuery: unable to query Athena')
if (getStateRes.QueryExecution === undefined) throw new Error('InvalidQueryExecution: query execution is undefined')
if (getStateRes.QueryExecution.Status === undefined) throw new Error('InvalidQueryExecutionStatus: query execution status is undefined')
Expand All @@ -189,9 +197,10 @@ class Crawler {

const getResultFromS3 = async (): Promise<string> => {
if (s3 === null) throw new Error('NullS3Client: s3 client is not initialized')

const {$metadata, Body} = await s3.send(new GetObjectCommand({
Bucket: 'cms-lds-agg',
Key: 'cms_hcf_aggregates/3d116aad-523e-4763-bdbc-8198dafd5b35.csv'
Key: `cms_hcf_aggregates/${res.QueryExecutionId}.csv`
}))

if ($metadata.httpStatusCode !== 200) throw new Error('FailedQuery: unable retrieve result from S3')
Expand All @@ -209,11 +218,14 @@ class Crawler {

await queryState()

// wait for athena to write to s3
await new Promise(resolve => setTimeout(resolve, 2000))

const raw = await getResultFromS3()
const columns = raw.split('\n')[0].split(',').map(c => c.trim().replace(/"/g, ''))

const rows = raw.split('\n').slice(1).map(r => r.split(',').map(c => c.trim().replace(/"/g, '')))
const last = rows[rows.length - 1][columns.indexOf('height')]
const height = raw.split('\n').filter(l => l !== '')[1].replace(/"/g, '')

return parseInt(height)
}

async stop(): Promise<void> {
Expand Down Expand Up @@ -257,11 +269,17 @@ async function newAthenaClient(opt?: AthenaClientConfig): Promise<AthenaClient>

if (opt.credentials === undefined) {
opt = {
credentials: await defaultProvider()
credentials: defaultProvider()
}
}

if (athena === null) {
athena = new AthenaClient(opt)
return athena
}
const client = new AthenaClient(opt)
athena = client

return client
}

Expand All @@ -274,24 +292,21 @@ async function newS3Client (opt?: S3ClientConfig): Promise<S3Client> {

if (opt.credentials === undefined) {
opt = {
credentials: await defaultProvider()
credentials: defaultProvider()
}
}

if (s3 === null) {
s3 = new S3Client(opt)
return s3
}

const client = new S3Client(opt)
return client
}
s3 = client

async function testme() {
const crawler = new Crawler({
chain: Chain.Iotex,
verbose: true
})
await crawler.retrieveLastBlock()
return client
}

testme()

export async function crawler (config: CrawlerConfig): Promise<Crawler> {
const c = new Crawler({
chain: config?.chain ?? Chain.Iotex,
Expand Down
30 changes: 16 additions & 14 deletions services/crawler/test/crawler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ import { crawler, Chain } from '../src/index'

jest.setTimeout(20000)

test('init crawler', async () => {
const supercrawler = await crawler({
chain: Chain.Iotex,
verbose: true
})
await supercrawler.retrieveLastBlock()
expect(supercrawler.service).not.toBe(null)
})

test('stream', async () => {
test('get last block', async () => {
const supercrawler = await crawler({
chain: Chain.Iotex,
verbose: true
})

expect(supercrawler).not.toBe(null)
// supercrawler.on('block', (block) => {
// console.log(block)
// })
expect(supercrawler.service).not.toBe(null)
const lastBlock = await supercrawler.retrieveLastBlock()
expect(typeof lastBlock).toBe('number')
})

// test('stream', async () => {
// const supercrawler = await crawler({
// chain: Chain.Iotex,
// verbose: true
// })
//
// expect(supercrawler).not.toBe(null)
// // supercrawler.on('block', (block) => {
// // console.log(block)
// // })
// })
//

0 comments on commit 2566f37

Please sign in to comment.