Skip to content

Commit

Permalink
Merge pull request #132 from consensusnetworks/featrure/add-ethereum
Browse files Browse the repository at this point in the history
Add Ethereum and update Iotex actions
  • Loading branch information
shanejearley authored Sep 28, 2022
2 parents 5b71519 + d06c08a commit 0d022d5
Show file tree
Hide file tree
Showing 15 changed files with 1,925 additions and 562 deletions.
2 changes: 1 addition & 1 deletion common/data/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
},
"devDependencies": {
"@types/node": "^17.0.38",
"esbuild": "^0.14.42",
"esbuild": "^0.15.9",
"esno": "^0.16.3"
},
"dependencies": {
Expand Down
4 changes: 2 additions & 2 deletions common/data/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export function schemaToGlueColumns(jsonSchema: JsonSchema): glue.Column[] {

let type: glue.Type = glue.Schema[typeKey]

if (name.endsWith('at')) type = glue.Schema.DATE
if (name.endsWith('at')) type = glue.Schema.TIMESTAMP

if (name === 'candidate_list') type = glue.Schema.array(glue.Schema.STRING)

Expand All @@ -41,7 +41,7 @@ export type EventTableColumn = {
to_address: string
candidate: string
candidate_list: string[]
amount: number
amount: string
duration: number
auto_stake: boolean
// payload: Record<string, unknown>
Expand Down
2 changes: 1 addition & 1 deletion common/data/src/schemas/agg.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
},
"first_staked_at": {
"type": "string",
"description": "The first date (MM-DD-YYYY) that a wallet staked"
"description": "First staked at datestring"
},
"total_staked_amount": {
"type": "string",
Expand Down
12 changes: 6 additions & 6 deletions common/data/src/schemas/event.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
},
"height": {
"type": "integer",
"description": "The block height of the event"
"description": "The height of the event"
},
"created_at": {
"type": "string",
"description": "The date (MM-DD-YYYY) of the event"
"description": "The date and time of the event in ISO 8601 format e.g. 2015-03-04T22:44:30.652Z"
},
"address": {
"type": "string",
"description": "The address that initiated the event"
"description": "The address which initiated the event"
},
"to_address": {
"type": "string",
"description": "The address which received the action event"
"description": "The recipient's address"
},
"candidate": {
"type": "string",
Expand All @@ -46,11 +46,11 @@
},
"amount": {
"type": "string",
"description": "The amount of the currency in the event"
"description": "The amount of currency associated with the event"
},
"duration":{
"type": "string",
"description": "The duration of the action"
"description": "The duration of the event"
},
"auto_stake": {
"type": "boolean",
Expand Down
2 changes: 1 addition & 1 deletion common/helpers/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
},
"devDependencies": {
"@types/node": "^17.0.38",
"esbuild": "^0.14.42",
"esbuild": "^0.15.9",
"esno": "^0.16.3"
}
}
232 changes: 232 additions & 0 deletions common/helpers/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import { S3Client, S3ClientConfig, PutObjectCommand, GetObjectCommand } from '@aws-sdk/client-s3'
import { AthenaClient, AthenaClientConfig } from '@aws-sdk/client-athena'
import { defaultProvider } from '@aws-sdk/credential-provider-node'
import { StartQueryExecutionCommand, GetQueryExecutionCommand } from '@aws-sdk/client-athena'
import { EventTableColumn } from '@casimir/data'

/**
* Converts any string to PascalCase.
*
Expand All @@ -9,4 +15,230 @@ export function pascalCase(str: string): string {
return str.replace(/\w+/g, (word) => {
return word[0].toUpperCase() + word.slice(1).toLowerCase()
})
}

let athena: AthenaClient | null = null
let s3: S3Client | null = null

/**
* Creates a new Athena client
*
* @param opt - Athena client config
* @returns Athena client
*
*/
export async function newAthenaClient(opt?: AthenaClientConfig): Promise<AthenaClient> {
if (opt?.region === undefined) {
opt = {
region: 'us-east-2'
}
}

if (opt.credentials === undefined) {
opt = {
credentials: defaultProvider()
}
}
const client = new AthenaClient(opt)
athena = client

return client
}

/**
* Creates a new S3 client
*
* @param opt - S3 client config
* @returns S3 client
*
*/
export async function newS3Client (opt?: S3ClientConfig): Promise<S3Client> {
if (s3) {
return s3
}

if (opt?.region === undefined) {
opt = {
region: 'us-east-2'
}
}

if (opt.credentials === undefined) {
opt = {
credentials: defaultProvider()
}
}

const client = new S3Client(opt)
s3 = client

return client
}

/**
* Uploads data to S3
*
* @param input.bucket - Bucket destination
* @param input.key - Key destination
* @param input.data - Data to be uploaded
*
*/
export async function uploadToS3( input: { bucket: string, key: string, data: string }): Promise<void> {
if (!s3) {
s3 = await newS3Client()
}

const upload = new PutObjectCommand({
Bucket: input.bucket,
Key: input.key,
Body: input.data
})

const { $metadata } = await s3.send(upload)
if ($metadata.httpStatusCode !== 200) throw new Error('Error uploading to s3')
}

/**
* Get data from S3
*
* @param input.bucket - Bucket destination
* @param input.key - Key destination
* @param input.data - Data to be uploaded
* @return data - Data from S3
*
*/
export async function getFromS3(bucket: string, key: string): Promise<string> {
if (!s3) {
s3 = await newS3Client()
}

const { $metadata, Body } = await s3.send(new GetObjectCommand({
Bucket: bucket,
Key: key
// Bucket: 'cms-lds-agg',
// Key: `cms_hcf_aggregates/${res.QueryExecutionId}.csv`
}))

if ($metadata.httpStatusCode !== 200) throw new Error('FailedQuery: unable retrieve result from S3')
if (Body === undefined) throw new Error('InvalidQueryResult: query result is undefined')

let chunk = ''

// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
for await (const data of Body) {
chunk += data.toString()
}
return chunk
}

let retry = 0
let backoff = 500

/**
* Poll for Athena query's result
*
* @param input.bucket - Bucket destination
* @param input.key - Key destination
* @param input.data - Data to be uploaded
*
*/
async function pollAthenaQueryOutput(queryId: string): Promise<void> {
if (!athena) {
athena = await newAthenaClient()
}

const getStateCmd = new GetQueryExecutionCommand({
QueryExecutionId: queryId
})

const { $metadata, QueryExecution } = await athena.send(getStateCmd)

if ($metadata.httpStatusCode !== 200) throw new Error('FailedQuery: unable to query Athena')
if (QueryExecution === undefined) throw new Error('InvalidQueryExecution: query execution is undefined')
if (QueryExecution.Status === undefined) throw new Error('InvalidQueryExecutionStatus: query execution status is undefined')

if (QueryExecution.Status.State === 'QUEUED' || QueryExecution.Status.State === 'RUNNING') {
setTimeout(() => {
pollAthenaQueryOutput(queryId)
retry++
backoff = backoff + 500
}, backoff)
}

if (QueryExecution.Status.State === 'FAILED') {
const reason = QueryExecution.Status.StateChangeReason
if (reason && reason.includes('HIVE_BAD_DATA')) {
throw new Error('FailedQuery: Check the table for bad data')
} else {
throw new Error('QueryFailed: query failed')
}
}
if (QueryExecution.Status.State === 'SUCCEEDED')
return
}

/**
* Runs a SQL query on Athena table
*
* @param query - SQL query to run (make sure the correct permissions are set)
* @return string - Query result
*/
export async function queryAthena(query: string): Promise<EventTableColumn[] | null> {

if (!athena) {
athena = await newAthenaClient()
}

const execCmd = new StartQueryExecutionCommand({
QueryString: query,
WorkGroup: 'primary',
ResultConfiguration: {
OutputLocation: 's3://cms-lds-agg/cms_hcf_aggregates/'
}
})

const { $metadata, QueryExecutionId } = await athena.send(execCmd)

if ($metadata.httpStatusCode !== 200) {
throw new Error('FailedQuery: unable to query Athena')
}

if (QueryExecutionId === undefined) {
throw new Error('InvalidQueryExecutionId: query execution id is undefined')
}

await pollAthenaQueryOutput(QueryExecutionId)

// wait for athena to finish writing to s3
await new Promise(resolve => setTimeout(resolve, 2000))

const raw = await getFromS3('cms-lds-agg', `cms_hcf_aggregates/${QueryExecutionId}.csv`)

const rows = raw.split('\n').filter(r => r !== '')

if (rows.length <= 1) {
return null
}

const header = rows.splice(0, 1)[0].split(',').map((h: string) => h.trim().replace(/"/g, ''))

const events: EventTableColumn[] = []

rows.forEach((curr, i) => {
const row = curr.split(',')
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
const event: EventTableColumn = {}
row.forEach((r, i) => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
event[header[i]] = r.trim().replace(/"/g, '')
})

if (event) {
events.push(event)
}
})
return events
}
Loading

0 comments on commit 0d022d5

Please sign in to comment.